diff --git a/satellite/reputation/audithistory.go b/satellite/reputation/audithistory.go index 28e75e63da0d..51096247cbc5 100644 --- a/satellite/reputation/audithistory.go +++ b/satellite/reputation/audithistory.go @@ -46,8 +46,8 @@ func AuditHistoryToPB(auditHistory AuditHistory) (historyPB *pb.AuditHistory) { return historyPB } -// AuditHistoryFromBytes decodes an AuditHistory from the givenprotobuf-encoded -// internalpb.AuditHistory object. +// AuditHistoryFromBytes returns the corresponding AuditHistory for the given +// encoded internalpb.AuditHistory. func AuditHistoryFromBytes(encodedHistory []byte) (history AuditHistory, err error) { var pbHistory internalpb.AuditHistory if err := pb.Unmarshal(encodedHistory, &pbHistory); err != nil { @@ -64,3 +64,66 @@ func AuditHistoryFromBytes(encodedHistory []byte) (history AuditHistory, err err } return history, nil } + +// AddAuditToHistory adds a single online/not-online event to an AuditHistory. +// If the AuditHistory contains windows that are now outside the tracking +// period, those windows will be trimmed. +func AddAuditToHistory(a *internalpb.AuditHistory, online bool, auditTime time.Time, config AuditHistoryConfig) error { + newAuditWindowStartTime := auditTime.Truncate(config.WindowSize) + earliestWindow := newAuditWindowStartTime.Add(-config.TrackingPeriod) + // windowsModified is used to determine whether we will need to recalculate the score because windows have been added or removed. + windowsModified := false + + // delete windows outside of tracking period scope + updatedWindows := a.Windows + for i, window := range a.Windows { + if window.WindowStart.Before(earliestWindow) { + updatedWindows = a.Windows[i+1:] + windowsModified = true + } else { + // windows are in order, so if this window is in the tracking period, we are done deleting windows + break + } + } + a.Windows = updatedWindows + + // if there are no windows or the latest window has passed, add another window + if len(a.Windows) == 0 || a.Windows[len(a.Windows)-1].WindowStart.Before(newAuditWindowStartTime) { + windowsModified = true + a.Windows = append(a.Windows, &internalpb.AuditWindow{WindowStart: newAuditWindowStartTime}) + } + + latestIndex := len(a.Windows) - 1 + if a.Windows[latestIndex].WindowStart.After(newAuditWindowStartTime) { + return Error.New("cannot add audit to audit history; window already passed") + } + + // add new audit to latest window + if online { + a.Windows[latestIndex].OnlineCount++ + } + a.Windows[latestIndex].TotalCount++ + + // if no windows were added or removed, score does not change + if !windowsModified { + return nil + } + + if len(a.Windows) <= 1 { + a.Score = 1 + return nil + } + + totalWindowScores := 0.0 + for i, window := range a.Windows { + // do not include last window in score + if i+1 == len(a.Windows) { + break + } + totalWindowScores += float64(window.OnlineCount) / float64(window.TotalCount) + } + + // divide by number of windows-1 because last window is not included + a.Score = totalWindowScores / float64(len(a.Windows)-1) + return nil +} diff --git a/satellite/reputation/audithistory_test.go b/satellite/reputation/audithistory_test.go index 4d5c396b0f68..a2755f6ddb7c 100644 --- a/satellite/reputation/audithistory_test.go +++ b/satellite/reputation/audithistory_test.go @@ -8,103 +8,71 @@ import ( "time" "github.com/stretchr/testify/require" - "go.uber.org/zap" - "storj.io/common/pb" - "storj.io/common/testcontext" - "storj.io/storj/private/testplanet" - "storj.io/storj/satellite" "storj.io/storj/satellite/internalpb" "storj.io/storj/satellite/reputation" ) -func TestAuditHistoryBasic(t *testing.T) { - const windowSize = time.Hour - const trackingPeriod = 2 * time.Hour - - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 1, - Reconfigure: testplanet.Reconfigure{ - Satellite: func(log *zap.Logger, index int, config *satellite.Config) { - config.Reputation.AuditHistory.WindowSize = windowSize - config.Reputation.AuditHistory.TrackingPeriod = trackingPeriod - }, - }, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - db := planet.Satellites[0].DB.Reputation() - - startingWindow := time.Now().Truncate(time.Hour) - windowsInTrackingPeriod := int(trackingPeriod.Seconds() / windowSize.Seconds()) - currentWindow := startingWindow - - config := planet.Satellites[0].Config.Reputation.AuditHistory - newHistory := &internalpb.AuditHistory{} - historyBytes, err := pb.Marshal(newHistory) - require.NoError(t, err) - updateReq := reputation.UpdateRequest{ - AuditOutcome: reputation.AuditOffline, - AuditHistory: config, - } - // online score should be 1 until the first window is finished - res, err := db.UpdateAuditHistory(ctx, historyBytes, updateReq, currentWindow.Add(2*time.Minute)) - require.NoError(t, err) - require.EqualValues(t, 1, res.NewScore) - require.False(t, res.TrackingPeriodFull) - - updateReq.AuditOutcome = reputation.AuditSuccess - res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow.Add(20*time.Minute)) - require.NoError(t, err) - require.EqualValues(t, 1, res.NewScore) - require.False(t, res.TrackingPeriodFull) - - // move to next window - currentWindow = currentWindow.Add(time.Hour) - - // online score should be now be 0.5 since the first window is complete with one online audit and one offline audit - updateReq.AuditOutcome = reputation.AuditOffline - res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow.Add(2*time.Minute)) - require.NoError(t, err) - require.EqualValues(t, 0.5, res.NewScore) - require.False(t, res.TrackingPeriodFull) - - updateReq.AuditOutcome = reputation.AuditSuccess - res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow.Add(20*time.Minute)) - require.NoError(t, err) - require.EqualValues(t, 0.5, res.NewScore) - require.False(t, res.TrackingPeriodFull) - - // move to next window - currentWindow = currentWindow.Add(time.Hour) - - // try to add an audit for an old window, expect error - updateReq.AuditOutcome = reputation.AuditSuccess - _, err = db.UpdateAuditHistory(ctx, res.History, updateReq, startingWindow) - require.Error(t, err) - - // add another online audit for the latest window; score should still be 0.5 - updateReq.AuditOutcome = reputation.AuditSuccess - res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow) - require.NoError(t, err) - require.EqualValues(t, 0.5, res.NewScore) - // now that we have two full windows other than the current one, tracking period should be considered full. - require.True(t, res.TrackingPeriodFull) - // add another online audit for the latest window; score should still be 0.5 - updateReq.AuditOutcome = reputation.AuditSuccess - res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow.Add(45*time.Minute)) - require.NoError(t, err) - require.EqualValues(t, 0.5, res.NewScore) - require.True(t, res.TrackingPeriodFull) - - currentWindow = currentWindow.Add(time.Hour) - // in the current state, there are windowsInTrackingPeriod windows with a score of 0.5 - // and one window with a score of 1.0. The Math below calculates the new score when the latest - // window gets included in the tracking period, and the earliest 0.5 window gets dropped. - expectedScore := (0.5*float64(windowsInTrackingPeriod-1) + 1) / float64(windowsInTrackingPeriod) - // add online audit for next window; score should now be expectedScore - updateReq.AuditOutcome = reputation.AuditSuccess - res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow.Add(time.Minute)) - require.NoError(t, err) - require.EqualValues(t, expectedScore, res.NewScore) - require.True(t, res.TrackingPeriodFull) - }) +func TestAddAuditToHistory(t *testing.T) { + config := reputation.AuditHistoryConfig{ + WindowSize: time.Hour, + TrackingPeriod: 2 * time.Hour, + GracePeriod: time.Hour, + OfflineThreshold: 0.6, + OfflineDQEnabled: true, + OfflineSuspensionEnabled: true, + } + + startingWindow := time.Now().Truncate(time.Hour) + windowsInTrackingPeriod := int(config.TrackingPeriod.Seconds() / config.WindowSize.Seconds()) + currentWindow := startingWindow + + history := &internalpb.AuditHistory{} + + // online score should be 1 until the first window is finished + err := reputation.AddAuditToHistory(history, false, currentWindow.Add(2*time.Minute), config) + require.NoError(t, err) + require.EqualValues(t, 1, history.Score) + + err = reputation.AddAuditToHistory(history, true, currentWindow.Add(20*time.Minute), config) + require.NoError(t, err) + require.EqualValues(t, 1, history.Score) + + // move to next window + currentWindow = currentWindow.Add(time.Hour) + + // online score should be now be 0.5 since the first window is complete with one online audit and one offline audit + err = reputation.AddAuditToHistory(history, false, currentWindow.Add(2*time.Minute), config) + require.NoError(t, err) + require.EqualValues(t, 0.5, history.Score) + + err = reputation.AddAuditToHistory(history, true, currentWindow.Add(20*time.Minute), config) + require.NoError(t, err) + require.EqualValues(t, 0.5, history.Score) + + // move to next window + currentWindow = currentWindow.Add(time.Hour) + + // try to add an audit for an old window, expect error + err = reputation.AddAuditToHistory(history, true, startingWindow, config) + require.Error(t, err) + + // add another online audit for the latest window; score should still be 0.5 + err = reputation.AddAuditToHistory(history, true, currentWindow, config) + require.NoError(t, err) + require.EqualValues(t, 0.5, history.Score) + // add another online audit for the latest window; score should still be 0.5 + err = reputation.AddAuditToHistory(history, true, currentWindow.Add(45*time.Minute), config) + require.NoError(t, err) + require.EqualValues(t, 0.5, history.Score) + + currentWindow = currentWindow.Add(time.Hour) + // in the current state, there are windowsInTrackingPeriod windows with a score of 0.5 + // and one window with a score of 1.0. The Math below calculates the new score when the latest + // window gets included in the tracking period, and the earliest 0.5 window gets dropped. + expectedScore := (0.5*float64(windowsInTrackingPeriod-1) + 1) / float64(windowsInTrackingPeriod) + // add online audit for next window; score should now be expectedScore + err = reputation.AddAuditToHistory(history, true, currentWindow.Add(time.Minute), config) + require.NoError(t, err) + require.EqualValues(t, expectedScore, history.Score) } diff --git a/satellite/reputation/service.go b/satellite/reputation/service.go index 0186b81bb064..585f35c7a9c2 100644 --- a/satellite/reputation/service.go +++ b/satellite/reputation/service.go @@ -24,8 +24,6 @@ type DB interface { DisqualifyNode(ctx context.Context, nodeID storj.NodeID, disqualifiedAt time.Time, reason overlay.DisqualificationReason) (err error) // SuspendNodeUnknownAudit suspends a storage node for unknown audits. SuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error) - // UpdateAuditHistory updates a node's audit history - UpdateAuditHistory(ctx context.Context, oldHistory []byte, updateReq UpdateRequest, auditTime time.Time) (res *UpdateAuditHistoryResponse, err error) } // Info contains all reputation data to be stored in DB. diff --git a/satellite/satellitedb/audithistory.go b/satellite/satellitedb/audithistory.go index 2f177d72b5d2..e504ca6b7dae 100644 --- a/satellite/satellitedb/audithistory.go +++ b/satellite/satellitedb/audithistory.go @@ -12,73 +12,9 @@ import ( "storj.io/storj/satellite/reputation" ) -func addAudit(a *internalpb.AuditHistory, auditTime time.Time, online bool, config reputation.AuditHistoryConfig) error { - newAuditWindowStartTime := auditTime.Truncate(config.WindowSize) - earliestWindow := newAuditWindowStartTime.Add(-config.TrackingPeriod) - // windowsModified is used to determine whether we will need to recalculate the score because windows have been added or removed. - windowsModified := false - - // delete windows outside of tracking period scope - updatedWindows := a.Windows - for i, window := range a.Windows { - if window.WindowStart.Before(earliestWindow) { - updatedWindows = a.Windows[i+1:] - windowsModified = true - } else { - // windows are in order, so if this window is in the tracking period, we are done deleting windows - break - } - } - a.Windows = updatedWindows - - // if there are no windows or the latest window has passed, add another window - if len(a.Windows) == 0 || a.Windows[len(a.Windows)-1].WindowStart.Before(newAuditWindowStartTime) { - windowsModified = true - a.Windows = append(a.Windows, &internalpb.AuditWindow{WindowStart: newAuditWindowStartTime}) - } - - latestIndex := len(a.Windows) - 1 - if a.Windows[latestIndex].WindowStart.After(newAuditWindowStartTime) { - return Error.New("cannot add audit to audit history; window already passed") - } - - // add new audit to latest window - if online { - a.Windows[latestIndex].OnlineCount++ - } - a.Windows[latestIndex].TotalCount++ - - // if no windows were added or removed, score does not change - if !windowsModified { - return nil - } - - if len(a.Windows) <= 1 { - a.Score = 1 - return nil - } - - totalWindowScores := 0.0 - for i, window := range a.Windows { - // do not include last window in score - if i+1 == len(a.Windows) { - break - } - totalWindowScores += float64(window.OnlineCount) / float64(window.TotalCount) - } - - // divide by number of windows-1 because last window is not included - a.Score = totalWindowScores / float64(len(a.Windows)-1) - return nil -} - -func (reputations *reputations) UpdateAuditHistory(ctx context.Context, oldHistory []byte, updateReq reputation.UpdateRequest, auditTime time.Time) (res *reputation.UpdateAuditHistoryResponse, err error) { +func updateAuditHistory(ctx context.Context, oldHistory []byte, config reputation.AuditHistoryConfig, online bool, auditTime time.Time) (res *reputation.UpdateAuditHistoryResponse, err error) { defer mon.Task()(&ctx)(&err) - config := updateReq.AuditHistory - - online := updateReq.AuditOutcome != reputation.AuditOffline - res = &reputation.UpdateAuditHistoryResponse{ NewScore: 1, TrackingPeriodFull: false, @@ -91,7 +27,7 @@ func (reputations *reputations) UpdateAuditHistory(ctx context.Context, oldHisto return res, err } - err = addAudit(history, auditTime, online, config) + err = reputation.AddAuditToHistory(history, online, auditTime, config) if err != nil { return res, err } diff --git a/satellite/satellitedb/reputations.go b/satellite/satellitedb/reputations.go index 9728a2d77b52..48a592c66175 100644 --- a/satellite/satellitedb/reputations.go +++ b/satellite/satellitedb/reputations.go @@ -60,7 +60,7 @@ func (reputations *reputations) Update(ctx context.Context, updateReq reputation AuditHistory: historyBytes, } - auditHistoryResponse, err := reputations.UpdateAuditHistory(ctx, historyBytes, updateReq, now) + auditHistoryResponse, err := updateAuditHistory(ctx, historyBytes, updateReq.AuditHistory, updateReq.AuditOutcome != reputation.AuditOffline, now) if err != nil { return nil, Error.Wrap(err) } @@ -88,7 +88,7 @@ func (reputations *reputations) Update(ctx context.Context, updateReq reputation return &status, nil } - auditHistoryResponse, err := reputations.UpdateAuditHistory(ctx, dbNode.AuditHistory, updateReq, now) + auditHistoryResponse, err := updateAuditHistory(ctx, dbNode.AuditHistory, updateReq.AuditHistory, updateReq.AuditOutcome != reputation.AuditOffline, now) if err != nil { return nil, Error.Wrap(err) }