From 951a842fd4040327e8197d3c381cf3033aef3260 Mon Sep 17 00:00:00 2001 From: paul cannon Date: Wed, 4 May 2022 16:15:29 -0500 Subject: [PATCH] satellite/{reputation,satellitedb}: move addAudit This functionality will be needed in both packages, so here we move it into the more general reputation-code package and export it for use in satellitedb. This also removes the related UpdateAuditHistory() signature from the reputation DB interface, since it doesn't have anything to do with the db. It doesn't need to be a method, either. Finally, this changes the test for addAudit to be a plain test function instead of using testplanet.Run(). It didn't need a whole testplanet setup or any databases. Refs: https://github.com/storj/storj/issues/4601 Change-Id: I90f6a909e5404f03ad776b95cfa2f248308c57c1 --- satellite/reputation/audithistory.go | 67 +++++++++- satellite/reputation/audithistory_test.go | 156 +++++++++------------- satellite/reputation/service.go | 2 - satellite/satellitedb/audithistory.go | 68 +--------- satellite/satellitedb/reputations.go | 4 +- 5 files changed, 131 insertions(+), 166 deletions(-) diff --git a/satellite/reputation/audithistory.go b/satellite/reputation/audithistory.go index 28e75e63da0d..51096247cbc5 100644 --- a/satellite/reputation/audithistory.go +++ b/satellite/reputation/audithistory.go @@ -46,8 +46,8 @@ func AuditHistoryToPB(auditHistory AuditHistory) (historyPB *pb.AuditHistory) { return historyPB } -// AuditHistoryFromBytes decodes an AuditHistory from the givenprotobuf-encoded -// internalpb.AuditHistory object. +// AuditHistoryFromBytes returns the corresponding AuditHistory for the given +// encoded internalpb.AuditHistory. func AuditHistoryFromBytes(encodedHistory []byte) (history AuditHistory, err error) { var pbHistory internalpb.AuditHistory if err := pb.Unmarshal(encodedHistory, &pbHistory); err != nil { @@ -64,3 +64,66 @@ func AuditHistoryFromBytes(encodedHistory []byte) (history AuditHistory, err err } return history, nil } + +// AddAuditToHistory adds a single online/not-online event to an AuditHistory. +// If the AuditHistory contains windows that are now outside the tracking +// period, those windows will be trimmed. +func AddAuditToHistory(a *internalpb.AuditHistory, online bool, auditTime time.Time, config AuditHistoryConfig) error { + newAuditWindowStartTime := auditTime.Truncate(config.WindowSize) + earliestWindow := newAuditWindowStartTime.Add(-config.TrackingPeriod) + // windowsModified is used to determine whether we will need to recalculate the score because windows have been added or removed. + windowsModified := false + + // delete windows outside of tracking period scope + updatedWindows := a.Windows + for i, window := range a.Windows { + if window.WindowStart.Before(earliestWindow) { + updatedWindows = a.Windows[i+1:] + windowsModified = true + } else { + // windows are in order, so if this window is in the tracking period, we are done deleting windows + break + } + } + a.Windows = updatedWindows + + // if there are no windows or the latest window has passed, add another window + if len(a.Windows) == 0 || a.Windows[len(a.Windows)-1].WindowStart.Before(newAuditWindowStartTime) { + windowsModified = true + a.Windows = append(a.Windows, &internalpb.AuditWindow{WindowStart: newAuditWindowStartTime}) + } + + latestIndex := len(a.Windows) - 1 + if a.Windows[latestIndex].WindowStart.After(newAuditWindowStartTime) { + return Error.New("cannot add audit to audit history; window already passed") + } + + // add new audit to latest window + if online { + a.Windows[latestIndex].OnlineCount++ + } + a.Windows[latestIndex].TotalCount++ + + // if no windows were added or removed, score does not change + if !windowsModified { + return nil + } + + if len(a.Windows) <= 1 { + a.Score = 1 + return nil + } + + totalWindowScores := 0.0 + for i, window := range a.Windows { + // do not include last window in score + if i+1 == len(a.Windows) { + break + } + totalWindowScores += float64(window.OnlineCount) / float64(window.TotalCount) + } + + // divide by number of windows-1 because last window is not included + a.Score = totalWindowScores / float64(len(a.Windows)-1) + return nil +} diff --git a/satellite/reputation/audithistory_test.go b/satellite/reputation/audithistory_test.go index 4d5c396b0f68..a2755f6ddb7c 100644 --- a/satellite/reputation/audithistory_test.go +++ b/satellite/reputation/audithistory_test.go @@ -8,103 +8,71 @@ import ( "time" "github.com/stretchr/testify/require" - "go.uber.org/zap" - "storj.io/common/pb" - "storj.io/common/testcontext" - "storj.io/storj/private/testplanet" - "storj.io/storj/satellite" "storj.io/storj/satellite/internalpb" "storj.io/storj/satellite/reputation" ) -func TestAuditHistoryBasic(t *testing.T) { - const windowSize = time.Hour - const trackingPeriod = 2 * time.Hour - - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 1, - Reconfigure: testplanet.Reconfigure{ - Satellite: func(log *zap.Logger, index int, config *satellite.Config) { - config.Reputation.AuditHistory.WindowSize = windowSize - config.Reputation.AuditHistory.TrackingPeriod = trackingPeriod - }, - }, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - db := planet.Satellites[0].DB.Reputation() - - startingWindow := time.Now().Truncate(time.Hour) - windowsInTrackingPeriod := int(trackingPeriod.Seconds() / windowSize.Seconds()) - currentWindow := startingWindow - - config := planet.Satellites[0].Config.Reputation.AuditHistory - newHistory := &internalpb.AuditHistory{} - historyBytes, err := pb.Marshal(newHistory) - require.NoError(t, err) - updateReq := reputation.UpdateRequest{ - AuditOutcome: reputation.AuditOffline, - AuditHistory: config, - } - // online score should be 1 until the first window is finished - res, err := db.UpdateAuditHistory(ctx, historyBytes, updateReq, currentWindow.Add(2*time.Minute)) - require.NoError(t, err) - require.EqualValues(t, 1, res.NewScore) - require.False(t, res.TrackingPeriodFull) - - updateReq.AuditOutcome = reputation.AuditSuccess - res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow.Add(20*time.Minute)) - require.NoError(t, err) - require.EqualValues(t, 1, res.NewScore) - require.False(t, res.TrackingPeriodFull) - - // move to next window - currentWindow = currentWindow.Add(time.Hour) - - // online score should be now be 0.5 since the first window is complete with one online audit and one offline audit - updateReq.AuditOutcome = reputation.AuditOffline - res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow.Add(2*time.Minute)) - require.NoError(t, err) - require.EqualValues(t, 0.5, res.NewScore) - require.False(t, res.TrackingPeriodFull) - - updateReq.AuditOutcome = reputation.AuditSuccess - res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow.Add(20*time.Minute)) - require.NoError(t, err) - require.EqualValues(t, 0.5, res.NewScore) - require.False(t, res.TrackingPeriodFull) - - // move to next window - currentWindow = currentWindow.Add(time.Hour) - - // try to add an audit for an old window, expect error - updateReq.AuditOutcome = reputation.AuditSuccess - _, err = db.UpdateAuditHistory(ctx, res.History, updateReq, startingWindow) - require.Error(t, err) - - // add another online audit for the latest window; score should still be 0.5 - updateReq.AuditOutcome = reputation.AuditSuccess - res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow) - require.NoError(t, err) - require.EqualValues(t, 0.5, res.NewScore) - // now that we have two full windows other than the current one, tracking period should be considered full. - require.True(t, res.TrackingPeriodFull) - // add another online audit for the latest window; score should still be 0.5 - updateReq.AuditOutcome = reputation.AuditSuccess - res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow.Add(45*time.Minute)) - require.NoError(t, err) - require.EqualValues(t, 0.5, res.NewScore) - require.True(t, res.TrackingPeriodFull) - - currentWindow = currentWindow.Add(time.Hour) - // in the current state, there are windowsInTrackingPeriod windows with a score of 0.5 - // and one window with a score of 1.0. The Math below calculates the new score when the latest - // window gets included in the tracking period, and the earliest 0.5 window gets dropped. - expectedScore := (0.5*float64(windowsInTrackingPeriod-1) + 1) / float64(windowsInTrackingPeriod) - // add online audit for next window; score should now be expectedScore - updateReq.AuditOutcome = reputation.AuditSuccess - res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow.Add(time.Minute)) - require.NoError(t, err) - require.EqualValues(t, expectedScore, res.NewScore) - require.True(t, res.TrackingPeriodFull) - }) +func TestAddAuditToHistory(t *testing.T) { + config := reputation.AuditHistoryConfig{ + WindowSize: time.Hour, + TrackingPeriod: 2 * time.Hour, + GracePeriod: time.Hour, + OfflineThreshold: 0.6, + OfflineDQEnabled: true, + OfflineSuspensionEnabled: true, + } + + startingWindow := time.Now().Truncate(time.Hour) + windowsInTrackingPeriod := int(config.TrackingPeriod.Seconds() / config.WindowSize.Seconds()) + currentWindow := startingWindow + + history := &internalpb.AuditHistory{} + + // online score should be 1 until the first window is finished + err := reputation.AddAuditToHistory(history, false, currentWindow.Add(2*time.Minute), config) + require.NoError(t, err) + require.EqualValues(t, 1, history.Score) + + err = reputation.AddAuditToHistory(history, true, currentWindow.Add(20*time.Minute), config) + require.NoError(t, err) + require.EqualValues(t, 1, history.Score) + + // move to next window + currentWindow = currentWindow.Add(time.Hour) + + // online score should be now be 0.5 since the first window is complete with one online audit and one offline audit + err = reputation.AddAuditToHistory(history, false, currentWindow.Add(2*time.Minute), config) + require.NoError(t, err) + require.EqualValues(t, 0.5, history.Score) + + err = reputation.AddAuditToHistory(history, true, currentWindow.Add(20*time.Minute), config) + require.NoError(t, err) + require.EqualValues(t, 0.5, history.Score) + + // move to next window + currentWindow = currentWindow.Add(time.Hour) + + // try to add an audit for an old window, expect error + err = reputation.AddAuditToHistory(history, true, startingWindow, config) + require.Error(t, err) + + // add another online audit for the latest window; score should still be 0.5 + err = reputation.AddAuditToHistory(history, true, currentWindow, config) + require.NoError(t, err) + require.EqualValues(t, 0.5, history.Score) + // add another online audit for the latest window; score should still be 0.5 + err = reputation.AddAuditToHistory(history, true, currentWindow.Add(45*time.Minute), config) + require.NoError(t, err) + require.EqualValues(t, 0.5, history.Score) + + currentWindow = currentWindow.Add(time.Hour) + // in the current state, there are windowsInTrackingPeriod windows with a score of 0.5 + // and one window with a score of 1.0. The Math below calculates the new score when the latest + // window gets included in the tracking period, and the earliest 0.5 window gets dropped. + expectedScore := (0.5*float64(windowsInTrackingPeriod-1) + 1) / float64(windowsInTrackingPeriod) + // add online audit for next window; score should now be expectedScore + err = reputation.AddAuditToHistory(history, true, currentWindow.Add(time.Minute), config) + require.NoError(t, err) + require.EqualValues(t, expectedScore, history.Score) } diff --git a/satellite/reputation/service.go b/satellite/reputation/service.go index 0186b81bb064..585f35c7a9c2 100644 --- a/satellite/reputation/service.go +++ b/satellite/reputation/service.go @@ -24,8 +24,6 @@ type DB interface { DisqualifyNode(ctx context.Context, nodeID storj.NodeID, disqualifiedAt time.Time, reason overlay.DisqualificationReason) (err error) // SuspendNodeUnknownAudit suspends a storage node for unknown audits. SuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error) - // UpdateAuditHistory updates a node's audit history - UpdateAuditHistory(ctx context.Context, oldHistory []byte, updateReq UpdateRequest, auditTime time.Time) (res *UpdateAuditHistoryResponse, err error) } // Info contains all reputation data to be stored in DB. diff --git a/satellite/satellitedb/audithistory.go b/satellite/satellitedb/audithistory.go index 2f177d72b5d2..e504ca6b7dae 100644 --- a/satellite/satellitedb/audithistory.go +++ b/satellite/satellitedb/audithistory.go @@ -12,73 +12,9 @@ import ( "storj.io/storj/satellite/reputation" ) -func addAudit(a *internalpb.AuditHistory, auditTime time.Time, online bool, config reputation.AuditHistoryConfig) error { - newAuditWindowStartTime := auditTime.Truncate(config.WindowSize) - earliestWindow := newAuditWindowStartTime.Add(-config.TrackingPeriod) - // windowsModified is used to determine whether we will need to recalculate the score because windows have been added or removed. - windowsModified := false - - // delete windows outside of tracking period scope - updatedWindows := a.Windows - for i, window := range a.Windows { - if window.WindowStart.Before(earliestWindow) { - updatedWindows = a.Windows[i+1:] - windowsModified = true - } else { - // windows are in order, so if this window is in the tracking period, we are done deleting windows - break - } - } - a.Windows = updatedWindows - - // if there are no windows or the latest window has passed, add another window - if len(a.Windows) == 0 || a.Windows[len(a.Windows)-1].WindowStart.Before(newAuditWindowStartTime) { - windowsModified = true - a.Windows = append(a.Windows, &internalpb.AuditWindow{WindowStart: newAuditWindowStartTime}) - } - - latestIndex := len(a.Windows) - 1 - if a.Windows[latestIndex].WindowStart.After(newAuditWindowStartTime) { - return Error.New("cannot add audit to audit history; window already passed") - } - - // add new audit to latest window - if online { - a.Windows[latestIndex].OnlineCount++ - } - a.Windows[latestIndex].TotalCount++ - - // if no windows were added or removed, score does not change - if !windowsModified { - return nil - } - - if len(a.Windows) <= 1 { - a.Score = 1 - return nil - } - - totalWindowScores := 0.0 - for i, window := range a.Windows { - // do not include last window in score - if i+1 == len(a.Windows) { - break - } - totalWindowScores += float64(window.OnlineCount) / float64(window.TotalCount) - } - - // divide by number of windows-1 because last window is not included - a.Score = totalWindowScores / float64(len(a.Windows)-1) - return nil -} - -func (reputations *reputations) UpdateAuditHistory(ctx context.Context, oldHistory []byte, updateReq reputation.UpdateRequest, auditTime time.Time) (res *reputation.UpdateAuditHistoryResponse, err error) { +func updateAuditHistory(ctx context.Context, oldHistory []byte, config reputation.AuditHistoryConfig, online bool, auditTime time.Time) (res *reputation.UpdateAuditHistoryResponse, err error) { defer mon.Task()(&ctx)(&err) - config := updateReq.AuditHistory - - online := updateReq.AuditOutcome != reputation.AuditOffline - res = &reputation.UpdateAuditHistoryResponse{ NewScore: 1, TrackingPeriodFull: false, @@ -91,7 +27,7 @@ func (reputations *reputations) UpdateAuditHistory(ctx context.Context, oldHisto return res, err } - err = addAudit(history, auditTime, online, config) + err = reputation.AddAuditToHistory(history, online, auditTime, config) if err != nil { return res, err } diff --git a/satellite/satellitedb/reputations.go b/satellite/satellitedb/reputations.go index 9728a2d77b52..48a592c66175 100644 --- a/satellite/satellitedb/reputations.go +++ b/satellite/satellitedb/reputations.go @@ -60,7 +60,7 @@ func (reputations *reputations) Update(ctx context.Context, updateReq reputation AuditHistory: historyBytes, } - auditHistoryResponse, err := reputations.UpdateAuditHistory(ctx, historyBytes, updateReq, now) + auditHistoryResponse, err := updateAuditHistory(ctx, historyBytes, updateReq.AuditHistory, updateReq.AuditOutcome != reputation.AuditOffline, now) if err != nil { return nil, Error.Wrap(err) } @@ -88,7 +88,7 @@ func (reputations *reputations) Update(ctx context.Context, updateReq reputation return &status, nil } - auditHistoryResponse, err := reputations.UpdateAuditHistory(ctx, dbNode.AuditHistory, updateReq, now) + auditHistoryResponse, err := updateAuditHistory(ctx, dbNode.AuditHistory, updateReq.AuditHistory, updateReq.AuditOutcome != reputation.AuditOffline, now) if err != nil { return nil, Error.Wrap(err) }