Skip to content

Commit

Permalink
satellite/overlay: add IP port to node offline email
Browse files Browse the repository at this point in the history
Add config, NodeTagsIPPortEmails, to create a list of node tags for whom
to add IP and port to offline emails. Send IP and port in customerio
Notifier.

Change-Id: I18e7ff0de574b2b49d568578d37c6a925901569f
  • Loading branch information
cam-a authored and Storj Robot committed Jan 31, 2024
1 parent a209127 commit 957d37b
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 14 deletions.
8 changes: 7 additions & 1 deletion satellite/nodeevents/customerio.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type CustomerioBatch struct {
type CustomerioData struct {
Satellite string `json:"satellite"`
NodeIDs string `json:"nodeIDs"`
IPPorts string `json:"ipPorts"`
}

// NewCustomerioNotifier is a constructor for CustomerioNotifier.
Expand Down Expand Up @@ -70,21 +71,26 @@ func (c *CustomerioNotifier) Notify(ctx context.Context, satellite string, event
return err
}

var nodeIDs string
var nodeIDs, ipPorts string
seen := make(map[storj.NodeID]struct{})
for _, e := range events {
if _, ok := seen[e.NodeID]; !ok {
seen[e.NodeID] = struct{}{}
nodeIDs = nodeIDs + e.NodeID.String() + ","
if e.LastIPPort != nil {
ipPorts = ipPorts + *e.LastIPPort + ","
}
}
}
nodeIDs = strings.TrimSuffix(nodeIDs, ",")
ipPorts = strings.TrimSuffix(ipPorts, ",")

batch := CustomerioBatch{
Name: eventName,
Data: CustomerioData{
Satellite: satellite,
NodeIDs: nodeIDs,
IPPorts: ipPorts,
},
}
data, err := json.Marshal(batch)
Expand Down
1 change: 1 addition & 0 deletions satellite/overlay/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Config struct {
NodeSoftwareUpdateEmailCooldown time.Duration `help:"the amount of time to wait between sending Node Software Update emails" default:"168h"`
RepairExcludedCountryCodes []string `help:"list of country codes to exclude nodes from target repair selection" default:"" testDefault:"FR,BE"`
SendNodeEmails bool `help:"whether to send emails to nodes" default:"false"`
NodeTagsIPPortEmails []string `help:"comma separated list of node tags for whom to add last ip and port to emails. Currently only for offline emails." default:""`
MinimumNewNodeIDDifficulty int `help:"the minimum node id difficulty required for new nodes. existing nodes remain allowed" devDefault:"0" releaseDefault:"36"`
AsOfSystemTime time.Duration `help:"default AS OF SYSTEM TIME for service" default:"-10s" testDefault:"0"`
}
Expand Down
41 changes: 28 additions & 13 deletions satellite/overlay/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,13 @@ type NodeReputation struct {
//
// architecture: Service
type Service struct {
log *zap.Logger
db DB
nodeEvents nodeevents.DB
satelliteName string
satelliteAddress string
config Config
log *zap.Logger
db DB
nodeEvents nodeevents.DB
satelliteName string
satelliteAddress string
nodeTagsIPPortEmails []string
config Config

GeoIP geoip.IPToCountry
UploadSelectionCache *UploadSelectionCache
Expand Down Expand Up @@ -358,12 +359,13 @@ func NewService(log *zap.Logger, db DB, nodeEvents nodeevents.DB, placements nod
}

return &Service{
log: log,
db: db,
nodeEvents: nodeEvents,
satelliteAddress: satelliteAddr,
satelliteName: satelliteName,
config: config,
log: log,
db: db,
nodeEvents: nodeEvents,
satelliteAddress: satelliteAddr,
satelliteName: satelliteName,
nodeTagsIPPortEmails: config.NodeTagsIPPortEmails,
config: config,

GeoIP: geoIP,

Expand Down Expand Up @@ -466,9 +468,22 @@ func (service *Service) InsertOfflineNodeEvents(ctx context.Context, cooldown ti

count = len(nodes)

var lastIPPorts map[storj.NodeID]*string
if len(service.nodeTagsIPPortEmails) > 0 {
var nodeIDs storj.NodeIDList
for id := range nodes {
nodeIDs = append(nodeIDs, id)
}
lastIPPorts, err = service.db.GetLastIPPortByNodeTagNames(ctx, nodeIDs, service.nodeTagsIPPortEmails)
if err != nil {
service.log.Error("could not get last IP and ports for nodes", zap.Error(err))
}
}

var successful storj.NodeIDList
for id, email := range nodes {
_, err = service.nodeEvents.Insert(ctx, email, nil, id, nodeevents.Offline)
lastIPPort := lastIPPorts[id]
_, err = service.nodeEvents.Insert(ctx, email, lastIPPort, id, nodeevents.Offline)
if err != nil {
service.log.Error("could not insert node offline into node events", zap.Error(err))
} else {
Expand Down
75 changes: 75 additions & 0 deletions satellite/overlay/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,3 +1003,78 @@ func TestUpdateCheckInBelowMinVersionEvent(t *testing.T) {
require.True(t, ne2.CreatedAt.After(ne1.CreatedAt))
})
}

func TestInsertOfflineNodeEvents(t *testing.T) {
tagName := "test-tag-name"
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.SendNodeEmails = true
// testplanet storagenode default version is "v0.0.1".
// set this as minimum version so storagenode doesn't start below it.
config.Overlay.Node.MinimumVersion = "v0.0.1"
config.Overlay.NodeTagsIPPortEmails = []string{tagName}
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
service := planet.Satellites[0].Overlay.Service
node1 := planet.StorageNodes[0]
node2 := planet.StorageNodes[1]
node1.Contact.Chore.Pause(ctx)
node2.Contact.Chore.Pause(ctx)

require.NoError(t, service.UpdateNodeTags(ctx, []nodeselection.NodeTag{
{
NodeID: node1.ID(),
SignedAt: time.Now(),
Signer: node1.ID(),
Name: tagName,
Value: []byte{1, 2, 3},
},
}))

n1LastIPPort := "127.0.0.1:1234"

for i := 0; i < 2; i++ {
n := planet.StorageNodes[i]
lastIPPort := n1LastIPPort
if i == 1 {
lastIPPort = ""
}
require.NoError(t, planet.Satellites[0].DB.OverlayCache().UpdateCheckIn(ctx, overlay.NodeCheckInInfo{
NodeID: n.ID(),
IsUp: true,
LastIPPort: lastIPPort,
Address: &pb.NodeAddress{Address: "127.0.0.1"},
Operator: &pb.NodeOperator{
Email: n.Config.Operator.Email,
},
Version: &pb.NodeVersion{Version: "v1.1.1"},
}, time.Now().Add(-48*time.Hour), overlay.NodeSelectionConfig{}))
}

n, err := service.Get(ctx, node1.ID())
require.NoError(t, err)
require.NotNil(t, n)

count, err := service.InsertOfflineNodeEvents(ctx, 0, 72*time.Hour, 3)
require.NoError(t, err)
require.Equal(t, 2, count)

ne, err := planet.Satellites[0].DB.NodeEvents().GetLatestByEmailAndEvent(ctx, node1.Config.Operator.Email, nodeevents.Offline)
require.NoError(t, err)
require.Equal(t, node1.ID(), ne.NodeID)
require.Equal(t, node1.Config.Operator.Email, ne.Email)
require.Equal(t, nodeevents.Offline, ne.Event)
require.NotNil(t, ne.LastIPPort)
require.Equal(t, n1LastIPPort, *ne.LastIPPort)

ne, err = planet.Satellites[0].DB.NodeEvents().GetLatestByEmailAndEvent(ctx, node2.Config.Operator.Email, nodeevents.Offline)
require.NoError(t, err)
require.Equal(t, node2.ID(), ne.NodeID)
require.Equal(t, node2.Config.Operator.Email, ne.Email)
require.Equal(t, nodeevents.Offline, ne.Event)
require.Nil(t, ne.LastIPPort)
})
}
3 changes: 3 additions & 0 deletions satellite/satellite-config.yaml.lock
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# the amount of time to wait between sending Node Software Update emails
# overlay.node-software-update-email-cooldown: 168h0m0s

# comma separated list of node tags for whom to add last ip and port to emails. Currently only for offline emails.
# overlay.node-tags-ip-port-emails: []

# default duration for AS OF SYSTEM TIME
# overlay.node.as-of-system-time.default-interval: -10s

Expand Down

0 comments on commit 957d37b

Please sign in to comment.