Skip to content

Commit

Permalink
satellite/satellitedb/attribution: update value attribution query to …
Browse files Browse the repository at this point in the history
…return byte-hours

Before, the VA query was summing the total and dividing by the number of
rows. This gives the average bytes stored per hour, but we charge for
usage with byte-hours. Why not do value attribution the same way?
To do that, we don't divide by the number of rows. We also have object
and segment fees so return segment-hours and object-hours too.

Change-Id: I1f18b7e1b2bae1d3fae1ca3b93bfc24db5b9b0e6
  • Loading branch information
cam-a committed Apr 8, 2022
1 parent c105562 commit d4ad3a3
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 63 deletions.
14 changes: 10 additions & 4 deletions cmd/satellite/reports/attribution.go
Expand Up @@ -25,8 +25,11 @@ var headers = []string{
"userAgent",
"projectID",
"bucketName",
"byte-hours:Total",
"bytes:BWEgress",
"gbHours",
"segmentHours",
"objectHours",
"hours",
"gbEgress",
}

// GenerateAttributionCSV creates a report with.
Expand Down Expand Up @@ -81,13 +84,16 @@ func csvRowToStringSlice(p *attribution.CSVRow) ([]string, error) {
if err != nil {
return nil, errs.New("Invalid Project ID")
}
totalGBPerHour := memory.Size(p.TotalBytesPerHour).GB()
gbHours := memory.Size(p.ByteHours).GB()
egressGBData := memory.Size(p.EgressData).GB()
record := []string{
string(p.UserAgent),
projectID.String(),
string(p.BucketName),
strconv.FormatFloat(totalGBPerHour, 'f', 4, 64),
strconv.FormatFloat(gbHours, 'f', 4, 64),
strconv.FormatFloat(p.SegmentHours, 'f', 4, 64),
strconv.FormatFloat(p.ObjectHours, 'f', 4, 64),
strconv.FormatFloat(float64(p.Hours), 'f', 4, 64),
strconv.FormatFloat(egressGBData, 'f', 4, 64),
}
return record, nil
Expand Down
15 changes: 9 additions & 6 deletions satellite/attribution/db.go
Expand Up @@ -27,12 +27,15 @@ type Info struct {

// CSVRow represents data from QueryAttribution without exposing dbx.
type CSVRow struct {
PartnerID []byte
UserAgent []byte
ProjectID []byte
BucketName []byte
TotalBytesPerHour float64
EgressData int64
PartnerID []byte
UserAgent []byte
ProjectID []byte
BucketName []byte
ByteHours float64
SegmentHours float64
ObjectHours float64
EgressData int64
Hours int
}

// DB implements the database for value attribution table.
Expand Down
239 changes: 217 additions & 22 deletions satellite/attribution/db_test.go
Expand Up @@ -18,6 +18,7 @@ import (
"storj.io/storj/satellite"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/attribution"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)

Expand Down Expand Up @@ -48,9 +49,13 @@ type AttributionTestData struct {
inlineSize int64
egressSize int64

dataCounter int
expectedTotalBytes int64
expectedEgress int64
dataCounter int
expectedByteHours float64
expectedSegmentHours float64
expectedObjectHours float64
expectedHours float64

expectedEgress int64
}

func (testData *AttributionTestData) init() {
Expand Down Expand Up @@ -250,6 +255,97 @@ func TestQueryAllAttribution(t *testing.T) {
padding: 2,
}}

for i, td := range testData {
td := td
td.init()

info := attribution.Info{td.projectID, td.bucketName, td.partnerID, td.userAgent, time.Time{}}
_, err := db.Attribution().Insert(ctx, &info)
require.NoError(t, err)
for i := 0; i < td.hoursOfData; i++ {
if i < td.hoursOfData/2 {
createTallyData(ctx, t, db.ProjectAccounting(), &td)
} else {
createBWData(ctx, t, db.Orders(), &td)
}
}
testData[i] = td
}
verifyAllData(ctx, t, db.Attribution(), testData, false)
})
}

func TestQueryAllAttributionNoStorage(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
now := time.Now()

projectID := testrand.UUID()
partnerID := testrand.UUID()
userAgent := []byte("agent1")
alphaBucket := []byte("alpha")
betaBucket := []byte("beta")
testData := []AttributionTestData{
{
name: "new partnerID, userAgent, projectID, alpha",
partnerID: testrand.UUID(),
userAgent: []byte("agent2"),
projectID: projectID,
bucketName: alphaBucket,

remoteSize: 0,
inlineSize: 0,
egressSize: egressSize,

start: time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()),
end: time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, now.Location()),
padding: 2,
},
{
name: "partnerID, userAgent, new projectID, alpha",
partnerID: partnerID,
userAgent: userAgent,
projectID: testrand.UUID(),
bucketName: alphaBucket,

remoteSize: 0,
inlineSize: 0,
egressSize: egressSize / 2,

start: time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()),
end: time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, now.Location()),
padding: 2,
},
{
name: "new partnerID, userAgent, projectID, beta",
partnerID: testrand.UUID(),
userAgent: []byte("agent3"),
projectID: projectID,
bucketName: betaBucket,

remoteSize: 0,
inlineSize: 0,
egressSize: egressSize / 3,

start: time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()),
end: time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, now.Location()),
padding: 2,
},
{
name: "partnerID, userAgent new projectID, beta",
partnerID: partnerID,
userAgent: userAgent,
projectID: testrand.UUID(),
bucketName: betaBucket,

remoteSize: 0,
inlineSize: 0,
egressSize: egressSize / 4,

start: time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()),
end: time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, now.Location()),
padding: 2,
}}

for i, td := range testData {
td := td
td.init()
Expand All @@ -259,11 +355,99 @@ func TestQueryAllAttribution(t *testing.T) {
require.NoError(t, err)

for i := 0; i < td.hoursOfData; i++ {
createData(ctx, t, db, &td)
createBWData(ctx, t, db.Orders(), &td)
}
testData[i] = td
}
verifyAllData(ctx, t, db.Attribution(), testData)
verifyAllData(ctx, t, db.Attribution(), testData, false)
})
}

func TestQueryAllAttributionNoBW(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
now := time.Now()

projectID := testrand.UUID()
partnerID := testrand.UUID()
userAgent := []byte("agent1")
alphaBucket := []byte("alpha")
betaBucket := []byte("beta")
testData := []AttributionTestData{
{
name: "new partnerID, userAgent, projectID, alpha",
partnerID: testrand.UUID(),
userAgent: []byte("agent2"),
projectID: projectID,
bucketName: alphaBucket,

remoteSize: remoteSize,
inlineSize: inlineSize,
egressSize: 0,

start: time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()),
end: time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, now.Location()),
padding: 2,
},
{
name: "partnerID, userAgent, new projectID, alpha",
partnerID: partnerID,
userAgent: userAgent,
projectID: testrand.UUID(),
bucketName: alphaBucket,

remoteSize: remoteSize / 2,
inlineSize: inlineSize / 2,
egressSize: 0,

start: time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()),
end: time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, now.Location()),
padding: 2,
},
{
name: "new partnerID, userAgent, projectID, beta",
partnerID: testrand.UUID(),
userAgent: []byte("agent3"),
projectID: projectID,
bucketName: betaBucket,

remoteSize: remoteSize / 3,
inlineSize: inlineSize / 3,
egressSize: 0,

start: time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()),
end: time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, now.Location()),
padding: 2,
},
{
name: "partnerID, userAgent new projectID, beta",
partnerID: partnerID,
userAgent: userAgent,
projectID: testrand.UUID(),
bucketName: betaBucket,

remoteSize: remoteSize / 4,
inlineSize: inlineSize / 4,
egressSize: 0,

start: time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()),
end: time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, now.Location()),
padding: 2,
}}

for i, td := range testData {
td := td
td.init()

info := attribution.Info{td.projectID, td.bucketName, td.partnerID, td.userAgent, time.Time{}}
_, err := db.Attribution().Insert(ctx, &info)
require.NoError(t, err)

for i := 0; i < td.hoursOfData; i++ {
createTallyData(ctx, t, db.ProjectAccounting(), &td)
}
testData[i] = td
}
verifyAllData(ctx, t, db.Attribution(), testData, true)
})
}

Expand All @@ -286,13 +470,13 @@ func verifyData(ctx *testcontext.Context, t *testing.T, attributionDB attributio
assert.Equal(t, testData.userAgent, r.UserAgent, testData.name)
assert.Equal(t, testData.projectID[:], r.ProjectID, testData.name)
assert.Equal(t, testData.bucketName, r.BucketName, testData.name)
assert.Equal(t, float64(testData.expectedTotalBytes/testData.hours), r.TotalBytesPerHour, testData.name)
assert.Equal(t, testData.expectedByteHours, r.ByteHours, testData.name)
assert.Equal(t, testData.expectedEgress, r.EgressData, testData.name)
}
require.NotEqual(t, 0, count, "Results were returned, but did not match all of the the projectIDs.")
}

func verifyAllData(ctx *testcontext.Context, t *testing.T, attributionDB attribution.DB, testData []AttributionTestData) {
func verifyAllData(ctx *testcontext.Context, t *testing.T, attributionDB attribution.DB, testData []AttributionTestData, storageInEveryHour bool) {
results, err := attributionDB.QueryAllAttribution(ctx, testData[0].start, testData[0].end)
require.NoError(t, err)
require.NotEqual(t, 0, len(results), "Results must not be empty.")
Expand All @@ -312,8 +496,13 @@ func verifyAllData(ctx *testcontext.Context, t *testing.T, attributionDB attribu
assert.Equal(t, tt.userAgent, r.UserAgent, tt.name)
assert.Equal(t, tt.projectID[:], r.ProjectID, tt.name)
assert.Equal(t, tt.bucketName, r.BucketName, tt.name)
assert.Equal(t, float64(tt.expectedTotalBytes/tt.hours), r.TotalBytesPerHour, tt.name)
assert.Equal(t, tt.expectedByteHours, r.ByteHours, tt.name)
assert.Equal(t, tt.expectedEgress, r.EgressData, tt.name)
if storageInEveryHour {
assert.Equal(t, tt.expectedHours, float64(tt.hours))
} else {
assert.Less(t, tt.expectedHours, float64(tt.hours))
}
}
}

Expand All @@ -322,47 +511,53 @@ func verifyAllData(ctx *testcontext.Context, t *testing.T, attributionDB attribu
}

func createData(ctx *testcontext.Context, t *testing.T, db satellite.DB, testData *AttributionTestData) {
projectAccoutingDB := db.ProjectAccounting()
orderDB := db.Orders()

// split the expected egress size into two separate bucket_bandwidth_rollup rows to test attribution query summation
err := orderDB.UpdateBucketBandwidthSettle(ctx, testData.projectID, testData.bucketName, pb.PieceAction_GET, testData.egressSize/2, 0, testData.bwStart)
require.NoError(t, err)
createBWData(ctx, t, db.Orders(), testData)
createTallyData(ctx, t, db.ProjectAccounting(), testData)
}

err = orderDB.UpdateBucketBandwidthSettle(ctx, testData.projectID, testData.bucketName, pb.PieceAction_GET, testData.egressSize/2, 0, testData.bwStart.Add(2*time.Hour))
func createBWData(ctx *testcontext.Context, t *testing.T, orderDB orders.DB, testData *AttributionTestData) {
err := orderDB.UpdateBucketBandwidthSettle(ctx, testData.projectID, testData.bucketName, pb.PieceAction_GET, testData.egressSize, 0, testData.bwStart)
require.NoError(t, err)

// Only GET should be counted. So this should not effect results
err = orderDB.UpdateBucketBandwidthSettle(ctx, testData.projectID, testData.bucketName, pb.PieceAction_GET_AUDIT, testData.egressSize, 0, testData.bwStart)
require.NoError(t, err)

if (testData.bwStart.After(testData.start) || testData.bwStart.Equal(testData.start)) && testData.bwStart.Before(testData.end) {
testData.expectedEgress += testData.egressSize
}
testData.bwStart = testData.bwStart.Add(time.Hour)
}

func createTallyData(ctx *testcontext.Context, t *testing.T, projectAccoutingDB accounting.ProjectAccounting, testData *AttributionTestData) {
testData.dataCounter++
testData.dataInterval = testData.dataInterval.Add(time.Minute * 30)
_, err = createTallyData(ctx, projectAccoutingDB, testData.projectID, testData.bucketName, testData.remoteSize*int64(testData.dataCounter), testData.inlineSize*int64(testData.dataCounter), testData.dataInterval)
_, err := insertTallyData(ctx, projectAccoutingDB, testData.projectID, testData.bucketName, testData.remoteSize*int64(testData.dataCounter), testData.inlineSize*int64(testData.dataCounter), testData.dataInterval)
require.NoError(t, err)

// I think inserting into tallies twice is testing that only the latest row in the hour is used in the VA calculation.
testData.dataCounter++
testData.dataInterval = testData.dataInterval.Add(time.Minute * 30)
tally, err := createTallyData(ctx, projectAccoutingDB, testData.projectID, testData.bucketName, testData.remoteSize*int64(testData.dataCounter), testData.inlineSize*int64(testData.dataCounter), testData.dataInterval)
tally, err := insertTallyData(ctx, projectAccoutingDB, testData.projectID, testData.bucketName, testData.remoteSize*int64(testData.dataCounter), testData.inlineSize*int64(testData.dataCounter), testData.dataInterval)
require.NoError(t, err)

if (testData.dataInterval.After(testData.start) || testData.dataInterval.Equal(testData.start)) && testData.dataInterval.Before(testData.end) {
testData.expectedTotalBytes += tally.TotalBytes
testData.expectedEgress += testData.egressSize
testData.expectedByteHours += float64(tally.TotalBytes)
testData.expectedHours++
testData.expectedSegmentHours += float64(tally.TotalSegmentCount)
testData.expectedObjectHours += float64(tally.ObjectCount)
}
}

func createTallyData(ctx *testcontext.Context, projectAccoutingDB accounting.ProjectAccounting, projectID uuid.UUID, bucket []byte, remote int64, inline int64, dataIntervalStart time.Time) (_ accounting.BucketStorageTally, err error) {
func insertTallyData(ctx *testcontext.Context, projectAccoutingDB accounting.ProjectAccounting, projectID uuid.UUID, bucket []byte, remote int64, inline int64, dataIntervalStart time.Time) (_ accounting.BucketStorageTally, err error) {
tally := accounting.BucketStorageTally{
BucketName: string(bucket),
ProjectID: projectID,
IntervalStart: dataIntervalStart,

ObjectCount: 0,
ObjectCount: 1,

TotalSegmentCount: 0,
TotalSegmentCount: 1,

TotalBytes: inline + remote,
MetadataSize: 0,
Expand Down

0 comments on commit d4ad3a3

Please sign in to comment.