Skip to content

Commit

Permalink
pkg/server,private/testplanet: start to listen on quic
Browse files Browse the repository at this point in the history
This PR introduces a new listener that can listen for quic traffic on
both storagenodes and satellites.

Change-Id: I5eb5bc82c37dde20d3be2ec8fa5f69c18fae0af0
  • Loading branch information
VinozzZ committed Jan 27, 2021
1 parent f18cb24 commit 02845e7
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 98 deletions.
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk=
github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
Expand Down
4 changes: 2 additions & 2 deletions pkg/quic/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ type closeTrackingConn struct {
rpc.ConnectorConn
}

// trackClose wraps the conn and sets a finalizer on the returned value to
// TrackClose wraps the conn and sets a finalizer on the returned value to
// close the conn and monitor that it was leaked.
func trackClose(conn rpc.ConnectorConn) rpc.ConnectorConn {
func TrackClose(conn rpc.ConnectorConn) rpc.ConnectorConn {
tracked := &closeTrackingConn{ConnectorConn: conn}
runtime.SetFinalizer(tracked, (*closeTrackingConn).finalize)
return tracked
Expand Down
2 changes: 1 addition & 1 deletion pkg/quic/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (c Connector) DialContext(ctx context.Context, tlsConfig *tls.Config, addre
}

return &timedConn{
ConnectorConn: trackClose(conn),
ConnectorConn: TrackClose(conn),
rate: c.transferRate,
}, nil
}
Expand Down
56 changes: 47 additions & 9 deletions pkg/server/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/zeebo/errs"

"storj.io/common/netutil"
"storj.io/common/rpc"
"storj.io/storj/pkg/quic"
)

// defaultUserTimeout is the value we use for the TCP_USER_TIMEOUT setting.
Expand All @@ -19,24 +21,27 @@ const defaultUserTimeout = 60 * time.Second
// and monitors if the returned connections are closed or leaked.
func wrapListener(lis net.Listener) net.Listener {
if lis, ok := lis.(*net.TCPListener); ok {
return newUserTimeoutListener(lis)
return newTCPUserTimeoutListener(lis)
}
if lis, ok := lis.(*quic.Listener); ok {
return newQUICTrackedListener(lis)
}
return lis
}

// userTimeoutListener wraps a tcp listener so that it sets the TCP_USER_TIMEOUT
// tcpUserTimeoutListener wraps a tcp listener so that it sets the TCP_USER_TIMEOUT
// value for each socket it returns.
type userTimeoutListener struct {
type tcpUserTimeoutListener struct {
lis *net.TCPListener
}

// newUserTimeoutListener wraps the tcp listener in a userTimeoutListener.
func newUserTimeoutListener(lis *net.TCPListener) *userTimeoutListener {
return &userTimeoutListener{lis: lis}
// newTCPUserTimeoutListener wraps the tcp listener in a userTimeoutListener.
func newTCPUserTimeoutListener(lis *net.TCPListener) *tcpUserTimeoutListener {
return &tcpUserTimeoutListener{lis: lis}
}

// Accept waits for and returns the next connection to the listener.
func (lis *userTimeoutListener) Accept() (net.Conn, error) {
func (lis *tcpUserTimeoutListener) Accept() (net.Conn, error) {
conn, err := lis.lis.AcceptTCP()
if err != nil {
return nil, err
Expand All @@ -50,11 +55,44 @@ func (lis *userTimeoutListener) Accept() (net.Conn, error) {

// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
func (lis *userTimeoutListener) Close() error {
func (lis *tcpUserTimeoutListener) Close() error {
return lis.lis.Close()
}

// Addr returns the listener's network address.
func (lis *tcpUserTimeoutListener) Addr() net.Addr {
return lis.lis.Addr()
}

type quicTrackedListener struct {
lis *quic.Listener
}

func newQUICTrackedListener(lis *quic.Listener) *quicTrackedListener {
return &quicTrackedListener{lis: lis}
}

func (lis *quicTrackedListener) Accept() (net.Conn, error) {
conn, err := lis.lis.Accept()
if err != nil {
return nil, err
}

connectorConn, ok := conn.(rpc.ConnectorConn)
if !ok {
return nil, Error.New("quic connection doesn't implement required methods")
}

return quic.TrackClose(connectorConn), nil
}

// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
func (lis *quicTrackedListener) Close() error {
return lis.lis.Close()
}

// Addr returns the listener's network address.
func (lis *userTimeoutListener) Addr() net.Addr {
func (lis *quicTrackedListener) Addr() net.Addr {
return lis.lis.Addr()
}
36 changes: 25 additions & 11 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net"
"sync"

quicgo "github.com/lucas-clemente/quic-go"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand All @@ -21,6 +22,7 @@ import (
"storj.io/drpc/drpcserver"
jaeger "storj.io/monkit-jaeger"
"storj.io/storj/pkg/listenmux"
"storj.io/storj/pkg/quic"
)

// Config holds server specific configuration parameters.
Expand All @@ -33,9 +35,10 @@ type Config struct {
}

type public struct {
listener net.Listener
drpc *drpcserver.Server
mux *drpcmux.Mux
tcpListener net.Listener
quicListener net.Listener
drpc *drpcserver.Server
mux *drpcmux.Mux
}

type private struct {
Expand Down Expand Up @@ -71,22 +74,28 @@ func New(log *zap.Logger, tlsOptions *tlsopts.Options, publicAddr, privateAddr s
Manager: rpc.NewDefaultManagerOptions(),
}

publicListener, err := net.Listen("tcp", publicAddr)
publicTCPListener, err := net.Listen("tcp", publicAddr)
if err != nil {
return nil, err
}

publicQUICListener, err := quic.NewListener(tlsOptions.ServerTLSConfig(), publicTCPListener.Addr().String(), &quicgo.Config{MaxIdleTimeout: defaultUserTimeout})
if err != nil {
return nil, errs.Combine(err, publicTCPListener.Close())
}

publicMux := drpcmux.New()
publicTracingHandler := rpctracing.NewHandler(publicMux, jaeger.RemoteTraceHandler)
server.public = public{
listener: wrapListener(publicListener),
drpc: drpcserver.NewWithOptions(publicTracingHandler, serverOptions),
mux: publicMux,
tcpListener: wrapListener(publicTCPListener),
quicListener: wrapListener(publicQUICListener),
drpc: drpcserver.NewWithOptions(publicTracingHandler, serverOptions),
mux: publicMux,
}

privateListener, err := net.Listen("tcp", privateAddr)
if err != nil {
return nil, errs.Combine(err, publicListener.Close())
return nil, errs.Combine(err, publicTCPListener.Close(), publicQUICListener.Close())
}
privateMux := drpcmux.New()
privateTracingHandler := rpctracing.NewHandler(privateMux, jaeger.RemoteTraceHandler)
Expand All @@ -103,7 +112,7 @@ func New(log *zap.Logger, tlsOptions *tlsopts.Options, publicAddr, privateAddr s
func (p *Server) Identity() *identity.FullIdentity { return p.tlsOptions.Ident }

// Addr returns the server's public listener address.
func (p *Server) Addr() net.Addr { return p.public.listener.Addr() }
func (p *Server) Addr() net.Addr { return p.public.tcpListener.Addr() }

// PrivateAddr returns the server's private listener address.
func (p *Server) PrivateAddr() net.Addr { return p.private.listener.Addr() }
Expand All @@ -127,7 +136,8 @@ func (p *Server) Close() error {
// We ignore these errors because there's not really anything to do
// even if they happen, and they'll just be errors due to duplicate
// closes anyway.
_ = p.public.listener.Close()
_ = p.public.quicListener.Close()
_ = p.public.tcpListener.Close()
_ = p.private.listener.Close()
return nil
}
Expand Down Expand Up @@ -156,7 +166,7 @@ func (p *Server) Run(ctx context.Context) (err error) {
// a chance to be notified that they're done running.
const drpcHeader = "DRPC!!!1"

publicMux := listenmux.New(p.public.listener, len(drpcHeader))
publicMux := listenmux.New(p.public.tcpListener, len(drpcHeader))
publicDRPCListener := tls.NewListener(publicMux.Route(drpcHeader), p.tlsOptions.ServerTLSConfig())

privateMux := listenmux.New(p.private.listener, len(drpcHeader))
Expand Down Expand Up @@ -197,6 +207,10 @@ func (p *Server) Run(ctx context.Context) (err error) {
defer cancel()
return p.public.drpc.Serve(ctx, publicDRPCListener)
})
group.Go(func() error {
defer cancel()
return p.public.drpc.Serve(ctx, p.public.quicListener)
})
group.Go(func() error {
defer cancel()
return p.private.drpc.Serve(ctx, privateDRPCListener)
Expand Down

0 comments on commit 02845e7

Please sign in to comment.