Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
satellite/{reputation,satellitedb}: move addAudit
This functionality will be needed in both packages, so here we move it
into the more general reputation-code package and export it for use in
satellitedb.

This also removes the related UpdateAuditHistory() signature from the
reputation DB interface, since it doesn't have anything to do with the
db. It doesn't need to be a method, either.

Finally, this changes the test for addAudit to be a plain test function
instead of using testplanet.Run(). It didn't need a whole testplanet
setup or any databases.

Refs: #4601

Change-Id: I90f6a909e5404f03ad776b95cfa2f248308c57c1
  • Loading branch information
thepaul authored and zeebo committed May 20, 2022
1 parent 0bf1252 commit 951a842
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 166 deletions.
67 changes: 65 additions & 2 deletions satellite/reputation/audithistory.go
Expand Up @@ -46,8 +46,8 @@ func AuditHistoryToPB(auditHistory AuditHistory) (historyPB *pb.AuditHistory) {
return historyPB
}

// AuditHistoryFromBytes decodes an AuditHistory from the givenprotobuf-encoded
// internalpb.AuditHistory object.
// AuditHistoryFromBytes returns the corresponding AuditHistory for the given
// encoded internalpb.AuditHistory.
func AuditHistoryFromBytes(encodedHistory []byte) (history AuditHistory, err error) {
var pbHistory internalpb.AuditHistory
if err := pb.Unmarshal(encodedHistory, &pbHistory); err != nil {
Expand All @@ -64,3 +64,66 @@ func AuditHistoryFromBytes(encodedHistory []byte) (history AuditHistory, err err
}
return history, nil
}

// AddAuditToHistory adds a single online/not-online event to an AuditHistory.
// If the AuditHistory contains windows that are now outside the tracking
// period, those windows will be trimmed.
func AddAuditToHistory(a *internalpb.AuditHistory, online bool, auditTime time.Time, config AuditHistoryConfig) error {
newAuditWindowStartTime := auditTime.Truncate(config.WindowSize)
earliestWindow := newAuditWindowStartTime.Add(-config.TrackingPeriod)
// windowsModified is used to determine whether we will need to recalculate the score because windows have been added or removed.
windowsModified := false

// delete windows outside of tracking period scope
updatedWindows := a.Windows
for i, window := range a.Windows {
if window.WindowStart.Before(earliestWindow) {
updatedWindows = a.Windows[i+1:]
windowsModified = true
} else {
// windows are in order, so if this window is in the tracking period, we are done deleting windows
break
}
}
a.Windows = updatedWindows

// if there are no windows or the latest window has passed, add another window
if len(a.Windows) == 0 || a.Windows[len(a.Windows)-1].WindowStart.Before(newAuditWindowStartTime) {
windowsModified = true
a.Windows = append(a.Windows, &internalpb.AuditWindow{WindowStart: newAuditWindowStartTime})
}

latestIndex := len(a.Windows) - 1
if a.Windows[latestIndex].WindowStart.After(newAuditWindowStartTime) {
return Error.New("cannot add audit to audit history; window already passed")
}

// add new audit to latest window
if online {
a.Windows[latestIndex].OnlineCount++
}
a.Windows[latestIndex].TotalCount++

// if no windows were added or removed, score does not change
if !windowsModified {
return nil
}

if len(a.Windows) <= 1 {
a.Score = 1
return nil
}

totalWindowScores := 0.0
for i, window := range a.Windows {
// do not include last window in score
if i+1 == len(a.Windows) {
break
}
totalWindowScores += float64(window.OnlineCount) / float64(window.TotalCount)
}

// divide by number of windows-1 because last window is not included
a.Score = totalWindowScores / float64(len(a.Windows)-1)
return nil
}
156 changes: 62 additions & 94 deletions satellite/reputation/audithistory_test.go
Expand Up @@ -8,103 +8,71 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"

"storj.io/common/pb"
"storj.io/common/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/reputation"
)

func TestAuditHistoryBasic(t *testing.T) {
const windowSize = time.Hour
const trackingPeriod = 2 * time.Hour

testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Reputation.AuditHistory.WindowSize = windowSize
config.Reputation.AuditHistory.TrackingPeriod = trackingPeriod
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
db := planet.Satellites[0].DB.Reputation()

startingWindow := time.Now().Truncate(time.Hour)
windowsInTrackingPeriod := int(trackingPeriod.Seconds() / windowSize.Seconds())
currentWindow := startingWindow

config := planet.Satellites[0].Config.Reputation.AuditHistory
newHistory := &internalpb.AuditHistory{}
historyBytes, err := pb.Marshal(newHistory)
require.NoError(t, err)
updateReq := reputation.UpdateRequest{
AuditOutcome: reputation.AuditOffline,
AuditHistory: config,
}
// online score should be 1 until the first window is finished
res, err := db.UpdateAuditHistory(ctx, historyBytes, updateReq, currentWindow.Add(2*time.Minute))
require.NoError(t, err)
require.EqualValues(t, 1, res.NewScore)
require.False(t, res.TrackingPeriodFull)

updateReq.AuditOutcome = reputation.AuditSuccess
res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow.Add(20*time.Minute))
require.NoError(t, err)
require.EqualValues(t, 1, res.NewScore)
require.False(t, res.TrackingPeriodFull)

// move to next window
currentWindow = currentWindow.Add(time.Hour)

// online score should be now be 0.5 since the first window is complete with one online audit and one offline audit
updateReq.AuditOutcome = reputation.AuditOffline
res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow.Add(2*time.Minute))
require.NoError(t, err)
require.EqualValues(t, 0.5, res.NewScore)
require.False(t, res.TrackingPeriodFull)

updateReq.AuditOutcome = reputation.AuditSuccess
res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow.Add(20*time.Minute))
require.NoError(t, err)
require.EqualValues(t, 0.5, res.NewScore)
require.False(t, res.TrackingPeriodFull)

// move to next window
currentWindow = currentWindow.Add(time.Hour)

// try to add an audit for an old window, expect error
updateReq.AuditOutcome = reputation.AuditSuccess
_, err = db.UpdateAuditHistory(ctx, res.History, updateReq, startingWindow)
require.Error(t, err)

// add another online audit for the latest window; score should still be 0.5
updateReq.AuditOutcome = reputation.AuditSuccess
res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow)
require.NoError(t, err)
require.EqualValues(t, 0.5, res.NewScore)
// now that we have two full windows other than the current one, tracking period should be considered full.
require.True(t, res.TrackingPeriodFull)
// add another online audit for the latest window; score should still be 0.5
updateReq.AuditOutcome = reputation.AuditSuccess
res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow.Add(45*time.Minute))
require.NoError(t, err)
require.EqualValues(t, 0.5, res.NewScore)
require.True(t, res.TrackingPeriodFull)

currentWindow = currentWindow.Add(time.Hour)
// in the current state, there are windowsInTrackingPeriod windows with a score of 0.5
// and one window with a score of 1.0. The Math below calculates the new score when the latest
// window gets included in the tracking period, and the earliest 0.5 window gets dropped.
expectedScore := (0.5*float64(windowsInTrackingPeriod-1) + 1) / float64(windowsInTrackingPeriod)
// add online audit for next window; score should now be expectedScore
updateReq.AuditOutcome = reputation.AuditSuccess
res, err = db.UpdateAuditHistory(ctx, res.History, updateReq, currentWindow.Add(time.Minute))
require.NoError(t, err)
require.EqualValues(t, expectedScore, res.NewScore)
require.True(t, res.TrackingPeriodFull)
})
func TestAddAuditToHistory(t *testing.T) {
config := reputation.AuditHistoryConfig{
WindowSize: time.Hour,
TrackingPeriod: 2 * time.Hour,
GracePeriod: time.Hour,
OfflineThreshold: 0.6,
OfflineDQEnabled: true,
OfflineSuspensionEnabled: true,
}

startingWindow := time.Now().Truncate(time.Hour)
windowsInTrackingPeriod := int(config.TrackingPeriod.Seconds() / config.WindowSize.Seconds())
currentWindow := startingWindow

history := &internalpb.AuditHistory{}

// online score should be 1 until the first window is finished
err := reputation.AddAuditToHistory(history, false, currentWindow.Add(2*time.Minute), config)
require.NoError(t, err)
require.EqualValues(t, 1, history.Score)

err = reputation.AddAuditToHistory(history, true, currentWindow.Add(20*time.Minute), config)
require.NoError(t, err)
require.EqualValues(t, 1, history.Score)

// move to next window
currentWindow = currentWindow.Add(time.Hour)

// online score should be now be 0.5 since the first window is complete with one online audit and one offline audit
err = reputation.AddAuditToHistory(history, false, currentWindow.Add(2*time.Minute), config)
require.NoError(t, err)
require.EqualValues(t, 0.5, history.Score)

err = reputation.AddAuditToHistory(history, true, currentWindow.Add(20*time.Minute), config)
require.NoError(t, err)
require.EqualValues(t, 0.5, history.Score)

// move to next window
currentWindow = currentWindow.Add(time.Hour)

// try to add an audit for an old window, expect error
err = reputation.AddAuditToHistory(history, true, startingWindow, config)
require.Error(t, err)

// add another online audit for the latest window; score should still be 0.5
err = reputation.AddAuditToHistory(history, true, currentWindow, config)
require.NoError(t, err)
require.EqualValues(t, 0.5, history.Score)
// add another online audit for the latest window; score should still be 0.5
err = reputation.AddAuditToHistory(history, true, currentWindow.Add(45*time.Minute), config)
require.NoError(t, err)
require.EqualValues(t, 0.5, history.Score)

currentWindow = currentWindow.Add(time.Hour)
// in the current state, there are windowsInTrackingPeriod windows with a score of 0.5
// and one window with a score of 1.0. The Math below calculates the new score when the latest
// window gets included in the tracking period, and the earliest 0.5 window gets dropped.
expectedScore := (0.5*float64(windowsInTrackingPeriod-1) + 1) / float64(windowsInTrackingPeriod)
// add online audit for next window; score should now be expectedScore
err = reputation.AddAuditToHistory(history, true, currentWindow.Add(time.Minute), config)
require.NoError(t, err)
require.EqualValues(t, expectedScore, history.Score)
}
2 changes: 0 additions & 2 deletions satellite/reputation/service.go
Expand Up @@ -24,8 +24,6 @@ type DB interface {
DisqualifyNode(ctx context.Context, nodeID storj.NodeID, disqualifiedAt time.Time, reason overlay.DisqualificationReason) (err error)
// SuspendNodeUnknownAudit suspends a storage node for unknown audits.
SuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error)
// UpdateAuditHistory updates a node's audit history
UpdateAuditHistory(ctx context.Context, oldHistory []byte, updateReq UpdateRequest, auditTime time.Time) (res *UpdateAuditHistoryResponse, err error)
}

// Info contains all reputation data to be stored in DB.
Expand Down
68 changes: 2 additions & 66 deletions satellite/satellitedb/audithistory.go
Expand Up @@ -12,73 +12,9 @@ import (
"storj.io/storj/satellite/reputation"
)

func addAudit(a *internalpb.AuditHistory, auditTime time.Time, online bool, config reputation.AuditHistoryConfig) error {
newAuditWindowStartTime := auditTime.Truncate(config.WindowSize)
earliestWindow := newAuditWindowStartTime.Add(-config.TrackingPeriod)
// windowsModified is used to determine whether we will need to recalculate the score because windows have been added or removed.
windowsModified := false

// delete windows outside of tracking period scope
updatedWindows := a.Windows
for i, window := range a.Windows {
if window.WindowStart.Before(earliestWindow) {
updatedWindows = a.Windows[i+1:]
windowsModified = true
} else {
// windows are in order, so if this window is in the tracking period, we are done deleting windows
break
}
}
a.Windows = updatedWindows

// if there are no windows or the latest window has passed, add another window
if len(a.Windows) == 0 || a.Windows[len(a.Windows)-1].WindowStart.Before(newAuditWindowStartTime) {
windowsModified = true
a.Windows = append(a.Windows, &internalpb.AuditWindow{WindowStart: newAuditWindowStartTime})
}

latestIndex := len(a.Windows) - 1
if a.Windows[latestIndex].WindowStart.After(newAuditWindowStartTime) {
return Error.New("cannot add audit to audit history; window already passed")
}

// add new audit to latest window
if online {
a.Windows[latestIndex].OnlineCount++
}
a.Windows[latestIndex].TotalCount++

// if no windows were added or removed, score does not change
if !windowsModified {
return nil
}

if len(a.Windows) <= 1 {
a.Score = 1
return nil
}

totalWindowScores := 0.0
for i, window := range a.Windows {
// do not include last window in score
if i+1 == len(a.Windows) {
break
}
totalWindowScores += float64(window.OnlineCount) / float64(window.TotalCount)
}

// divide by number of windows-1 because last window is not included
a.Score = totalWindowScores / float64(len(a.Windows)-1)
return nil
}

func (reputations *reputations) UpdateAuditHistory(ctx context.Context, oldHistory []byte, updateReq reputation.UpdateRequest, auditTime time.Time) (res *reputation.UpdateAuditHistoryResponse, err error) {
func updateAuditHistory(ctx context.Context, oldHistory []byte, config reputation.AuditHistoryConfig, online bool, auditTime time.Time) (res *reputation.UpdateAuditHistoryResponse, err error) {
defer mon.Task()(&ctx)(&err)

config := updateReq.AuditHistory

online := updateReq.AuditOutcome != reputation.AuditOffline

res = &reputation.UpdateAuditHistoryResponse{
NewScore: 1,
TrackingPeriodFull: false,
Expand All @@ -91,7 +27,7 @@ func (reputations *reputations) UpdateAuditHistory(ctx context.Context, oldHisto
return res, err
}

err = addAudit(history, auditTime, online, config)
err = reputation.AddAuditToHistory(history, online, auditTime, config)
if err != nil {
return res, err
}
Expand Down
4 changes: 2 additions & 2 deletions satellite/satellitedb/reputations.go
Expand Up @@ -60,7 +60,7 @@ func (reputations *reputations) Update(ctx context.Context, updateReq reputation
AuditHistory: historyBytes,
}

auditHistoryResponse, err := reputations.UpdateAuditHistory(ctx, historyBytes, updateReq, now)
auditHistoryResponse, err := updateAuditHistory(ctx, historyBytes, updateReq.AuditHistory, updateReq.AuditOutcome != reputation.AuditOffline, now)
if err != nil {
return nil, Error.Wrap(err)
}
Expand Down Expand Up @@ -88,7 +88,7 @@ func (reputations *reputations) Update(ctx context.Context, updateReq reputation
return &status, nil
}

auditHistoryResponse, err := reputations.UpdateAuditHistory(ctx, dbNode.AuditHistory, updateReq, now)
auditHistoryResponse, err := updateAuditHistory(ctx, dbNode.AuditHistory, updateReq.AuditHistory, updateReq.AuditOutcome != reputation.AuditOffline, now)
if err != nil {
return nil, Error.Wrap(err)
}
Expand Down

1 comment on commit 951a842

@storjrobot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit has been mentioned on Storj Community Forum (official). There might be relevant details there:

https://forum.storj.io/t/release-preparation-v1-56/18583/1

Please sign in to comment.