-
Notifications
You must be signed in to change notification settings - Fork 390
/
consoledb.go
157 lines (124 loc) · 4 KB
/
consoledb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package storagenodedb
import (
"context"
"database/sql"
"time"
"github.com/zeebo/errs"
"storj.io/storj/internal/date"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/storagenode/console"
)
type consoledb struct {
*InfoDB
}
// Console returns console.DB
func (db *InfoDB) Console() console.DB { return &consoledb{db} }
// Console returns console.DB
func (db *DB) Console() console.DB { return db.info.Console() }
// GetSatelliteIDs returns list of satelliteIDs that storagenode has interacted with
// at least once
func (db *consoledb) GetSatelliteIDs(ctx context.Context, from, to time.Time) (_ storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
var satellites storj.NodeIDList
rows, err := db.db.QueryContext(ctx, db.Rebind(`
SELECT DISTINCT satellite_id
FROM bandwidth_usage
WHERE ? <= created_at AND created_at <= ?`), from.UTC(), to.UTC())
if err != nil {
if err == sql.ErrNoRows {
return satellites, nil
}
return nil, err
}
defer func() {
err = errs.Combine(err, rows.Close())
}()
for rows.Next() {
var satelliteID storj.NodeID
if err = rows.Scan(&satelliteID); err != nil {
return nil, err
}
satellites = append(satellites, satelliteID)
}
return satellites, nil
}
// GetDailyBandwidthUsed returns slice of daily bandwidth usage for provided time range,
// sorted in ascending order
func (db *consoledb) GetDailyTotalBandwidthUsed(ctx context.Context, from, to time.Time) (_ []console.BandwidthUsed, err error) {
defer mon.Task()(&ctx)(&err)
since, _ := date.DayBoundary(from.UTC())
_, before := date.DayBoundary(to.UTC())
return db.getDailyBandwidthUsed(ctx,
"WHERE ? <= created_at AND created_at <= ?",
since.UTC(), before.UTC())
}
// GetDailyBandwidthUsed returns slice of daily bandwidth usage for provided time range,
// sorted in ascending order for particular satellite
func (db *consoledb) GetDailyBandwidthUsed(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ []console.BandwidthUsed, err error) {
defer mon.Task()(&ctx)(&err)
since, _ := date.DayBoundary(from.UTC())
_, before := date.DayBoundary(to.UTC())
return db.getDailyBandwidthUsed(ctx,
"WHERE satellite_id = ? AND ? <= created_at AND created_at <= ?",
satelliteID, since.UTC(), before.UTC())
}
// getDailyBandwidthUsed returns slice of grouped by date bandwidth usage
// sorted in ascending order and applied condition if any
func (db *consoledb) getDailyBandwidthUsed(ctx context.Context, cond string, args ...interface{}) (_ []console.BandwidthUsed, err error) {
defer mon.Task()(&ctx)(&err)
query := db.Rebind(`
SELECT action, SUM(amount), created_at
FROM bandwidth_usage
` + cond + `
GROUP BY DATE(created_at), action
ORDER BY created_at ASC
`)
rows, err := db.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
defer func() {
err = errs.Combine(err, rows.Close())
}()
var dates []time.Time
dailyBandwidth := make(map[time.Time]*console.BandwidthUsed, 0)
for rows.Next() {
var action int32
var amount int64
var createdAt time.Time
err = rows.Scan(&action, &amount, &createdAt)
if err != nil {
return nil, err
}
from, to := date.DayBoundary(createdAt)
bandwidthUsed, ok := dailyBandwidth[from]
if !ok {
bandwidthUsed = &console.BandwidthUsed{
From: from,
To: to,
}
dates = append(dates, from)
dailyBandwidth[from] = bandwidthUsed
}
switch pb.PieceAction(action) {
case pb.PieceAction_GET:
bandwidthUsed.Egress.Usage = amount
case pb.PieceAction_GET_AUDIT:
bandwidthUsed.Egress.Audit = amount
case pb.PieceAction_GET_REPAIR:
bandwidthUsed.Egress.Repair = amount
case pb.PieceAction_PUT:
bandwidthUsed.Ingress.Usage = amount
case pb.PieceAction_PUT_REPAIR:
bandwidthUsed.Ingress.Repair = amount
}
}
var bandwidthUsedList []console.BandwidthUsed
for _, date := range dates {
bandwidthUsedList = append(bandwidthUsedList, *dailyBandwidth[date])
}
return bandwidthUsedList, nil
}