Skip to content

Commit

Permalink
satellite/contact,storagenode/contact: try ping back to nodes through
Browse files Browse the repository at this point in the history
QUIC

We want to encourage storagenodes to open their udp port. This PR
changes contact service in satellite to try to connect to nodes through
QUIC. If satellite can't reach nodes through quic, it will send an error
message back to nodes. On the nodes side, it will always log out error
message from check in if the error message is not empty.
Whether satellite can reach nodes through quic has no affect on nodes'
uptime check.

Change-Id: I5ebf80f921c4a6504997d83c8bd45226da9d3703
  • Loading branch information
VinozzZ authored and Yingrong Zhao committed Apr 20, 2021
1 parent f6ec7f9 commit a3c437a
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 6 deletions.
80 changes: 80 additions & 0 deletions satellite/contact/contact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"storj.io/common/rpc/rpcpeer"
"storj.io/common/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/storagenode"
)

func TestSatelliteContactEndpoint(t *testing.T) {
Expand Down Expand Up @@ -42,6 +43,85 @@ func TestSatelliteContactEndpoint(t *testing.T) {
})
require.NoError(t, err)
require.NotNil(t, resp)
require.True(t, resp.PingNodeSuccess)
require.True(t, resp.PingNodeSuccessQuic)

peerID, err := planet.Satellites[0].DB.PeerIdentities().Get(ctx, nodeInfo.ID)
require.NoError(t, err)
require.Equal(t, ident.PeerIdentity(), peerID)
})
}

func TestSatelliteContactEndpoint_Failure(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
StorageNode: func(index int, config *storagenode.Config) {
config.Server.DisableTCPTLS = true
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
nodeInfo := planet.StorageNodes[0].Contact.Service.Local()
ident := planet.StorageNodes[0].Identity

peer := rpcpeer.Peer{
Addr: &net.TCPAddr{
IP: net.ParseIP(nodeInfo.Address),
Port: 5,
},
State: tls.ConnectionState{
PeerCertificates: []*x509.Certificate{ident.Leaf, ident.CA},
},
}
peerCtx := rpcpeer.NewContext(ctx, &peer)
resp, err := planet.Satellites[0].Contact.Endpoint.CheckIn(peerCtx, &pb.CheckInRequest{
Address: nodeInfo.Address,
Version: &nodeInfo.Version,
Capacity: &nodeInfo.Capacity,
Operator: &nodeInfo.Operator,
})
require.NoError(t, err)
require.NotNil(t, resp)
require.False(t, resp.PingNodeSuccess)
require.False(t, resp.PingNodeSuccessQuic)

peerID, err := planet.Satellites[0].DB.PeerIdentities().Get(ctx, nodeInfo.ID)
require.NoError(t, err)
require.Equal(t, ident.PeerIdentity(), peerID)
})
}
func TestSatelliteContactEndpoint_QUIC_Unreachable(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
StorageNode: func(index int, config *storagenode.Config) {
config.Server.DisableQUIC = true
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
nodeInfo := planet.StorageNodes[0].Contact.Service.Local()
ident := planet.StorageNodes[0].Identity

peer := rpcpeer.Peer{
Addr: &net.TCPAddr{
IP: net.ParseIP(nodeInfo.Address),
Port: 5,
},
State: tls.ConnectionState{
PeerCertificates: []*x509.Certificate{ident.Leaf, ident.CA},
},
}
peerCtx := rpcpeer.NewContext(ctx, &peer)
resp, err := planet.Satellites[0].Contact.Endpoint.CheckIn(peerCtx, &pb.CheckInRequest{
Address: nodeInfo.Address,
Version: &nodeInfo.Version,
Capacity: &nodeInfo.Capacity,
Operator: &nodeInfo.Operator,
})
require.NoError(t, err)
require.NotNil(t, resp)
require.True(t, resp.PingNodeSuccess)
require.False(t, resp.PingNodeSuccessQuic)

peerID, err := planet.Satellites[0].DB.PeerIdentities().Get(ctx, nodeInfo.ID)
require.NoError(t, err)
Expand Down
7 changes: 4 additions & 3 deletions satellite/contact/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (endpoint *Endpoint) CheckIn(ctx context.Context, req *pb.CheckInRequest) (
ID: nodeID,
Address: req.Address,
}
pingNodeSuccess, pingErrorMessage, err := endpoint.service.PingBack(ctx, nodeurl)
pingNodeSuccess, pingNodeSuccessQUIC, pingErrorMessage, err := endpoint.service.PingBack(ctx, nodeurl)
if err != nil {
endpoint.log.Info("failed to ping back address", zap.String("node address", req.Address), zap.Stringer("Node ID", nodeID), zap.Error(err))
if errPingBackDial.Has(err) {
Expand Down Expand Up @@ -115,8 +115,9 @@ func (endpoint *Endpoint) CheckIn(ctx context.Context, req *pb.CheckInRequest) (

endpoint.log.Debug("checking in", zap.String("node addr", req.Address), zap.Bool("ping node success", pingNodeSuccess), zap.String("ping node err msg", pingErrorMessage))
return &pb.CheckInResponse{
PingNodeSuccess: pingNodeSuccess,
PingErrorMessage: pingErrorMessage,
PingNodeSuccess: pingNodeSuccess,
PingNodeSuccessQuic: pingNodeSuccessQUIC,
PingErrorMessage: pingErrorMessage,
}, nil
}

Expand Down
40 changes: 37 additions & 3 deletions satellite/contact/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"storj.io/common/rpc"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/storj"
"storj.io/storj/pkg/quic"
"storj.io/storj/satellite/overlay"
)

Expand Down Expand Up @@ -66,7 +67,7 @@ func (service *Service) Local() overlay.NodeDossier {
func (service *Service) Close() error { return nil }

// PingBack pings the node to test connectivity.
func (service *Service) PingBack(ctx context.Context, nodeurl storj.NodeURL) (_ bool, _ string, err error) {
func (service *Service) PingBack(ctx context.Context, nodeurl storj.NodeURL) (_ bool, _ bool, _ string, err error) {
defer mon.Task()(&ctx)(&err)

if service.timeout > 0 {
Expand All @@ -77,6 +78,7 @@ func (service *Service) PingBack(ctx context.Context, nodeurl storj.NodeURL) (_

pingNodeSuccess := true
var pingErrorMessage string
var pingNodeSuccessQUIC bool

client, err := dialNodeURL(ctx, service.dialer, nodeurl)
if err != nil {
Expand All @@ -91,7 +93,7 @@ func (service *Service) PingBack(ctx context.Context, nodeurl storj.NodeURL) (_
service.log.Debug("pingBack failed to dial storage node",
zap.String("pingErrorMessage", pingErrorMessage),
)
return pingNodeSuccess, pingErrorMessage, nil
return pingNodeSuccess, pingNodeSuccessQUIC, pingErrorMessage, nil
}
defer func() { err = errs.Combine(err, client.Close()) }()

Expand All @@ -104,7 +106,39 @@ func (service *Service) PingBack(ctx context.Context, nodeurl storj.NodeURL) (_
zap.Stringer("Node ID", nodeurl.ID),
zap.String("pingErrorMessage", pingErrorMessage),
)

return pingNodeSuccess, pingNodeSuccessQUIC, pingErrorMessage, nil
}

pingNodeSuccessQUIC = true
err = service.pingNodeQUIC(ctx, nodeurl)
if err != nil {
// udp ping back is optional right now, it shouldn't affect contact service's
// control flow
pingNodeSuccessQUIC = false
pingErrorMessage = err.Error()
}

return pingNodeSuccess, pingNodeSuccessQUIC, pingErrorMessage, nil
}

func (service *Service) pingNodeQUIC(ctx context.Context, nodeurl storj.NodeURL) error {
udpDialer := service.dialer
udpDialer.Connector = quic.NewDefaultConnector(nil)
udpClient, err := dialNodeURL(ctx, udpDialer, nodeurl)
if err != nil {
mon.Event("failed_dial_quic")
return Error.New("failed to dial storage node (ID: %s) at address %s using QUIC: %q", nodeurl.ID.String(), nodeurl.Address, err)
}
defer func() {
_ = udpClient.Close()
}()

_, err = udpClient.pingNode(ctx, &pb.ContactPingRequest{})
if err != nil {
mon.Event("failed_ping_node_quic")
return Error.New("failed to ping storage node using QUIC, your node indicated error code: %d, %q", rpcstatus.Code(err), err)
}

return pingNodeSuccess, pingErrorMessage, nil
return nil
}
3 changes: 3 additions & 0 deletions storagenode/contact/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ func (service *Service) pingSatelliteOnce(ctx context.Context, id storj.NodeID)
if resp != nil && !resp.PingNodeSuccess {
return errPingSatellite.New("%s", resp.PingErrorMessage)
}
if resp.PingErrorMessage != "" {
service.log.Warn("Your node is still considered to be online but encountered an error.", zap.Stringer("Satellite ID", id), zap.String("Error", resp.GetPingErrorMessage()))
}
return nil
}

Expand Down

0 comments on commit a3c437a

Please sign in to comment.