-
Notifications
You must be signed in to change notification settings - Fork 390
/
pieceinfo.go
160 lines (130 loc) · 4.37 KB
/
pieceinfo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package storagenodedb
import (
"context"
"database/sql"
"time"
"github.com/gogo/protobuf/proto"
"github.com/zeebo/errs"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/storagenode/pieces"
)
type pieceinfo struct{ *InfoDB }
// PieceInfo returns database for storing piece information
func (db *DB) PieceInfo() pieces.DB { return db.info.PieceInfo() }
// PieceInfo returns database for storing piece information
func (db *InfoDB) PieceInfo() pieces.DB { return &pieceinfo{db} }
// Add inserts piece information into the database.
func (db *pieceinfo) Add(ctx context.Context, info *pieces.Info) (err error) {
defer mon.Task()(&ctx)(&err)
certdb := db.CertDB()
certid, err := certdb.Include(ctx, info.Uplink)
if err != nil {
return ErrInfo.Wrap(err)
}
uplinkPieceHash, err := proto.Marshal(info.UplinkPieceHash)
if err != nil {
return ErrInfo.Wrap(err)
}
defer db.locked()()
_, err = db.db.ExecContext(ctx, db.Rebind(`
INSERT INTO
pieceinfo(satellite_id, piece_id, piece_size, piece_expiration, uplink_piece_hash, uplink_cert_id)
VALUES (?,?,?,?,?,?)
`), info.SatelliteID, info.PieceID, info.PieceSize, info.PieceExpiration, uplinkPieceHash, certid)
return ErrInfo.Wrap(err)
}
// Get gets piece information by satellite id and piece id.
func (db *pieceinfo) Get(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (_ *pieces.Info, err error) {
defer mon.Task()(&ctx)(&err)
info := &pieces.Info{}
info.SatelliteID = satelliteID
info.PieceID = pieceID
var uplinkPieceHash []byte
var uplinkIdentity []byte
db.mu.Lock()
err = db.db.QueryRowContext(ctx, db.Rebind(`
SELECT piece_size, piece_expiration, uplink_piece_hash, certificate.peer_identity
FROM pieceinfo
INNER JOIN certificate ON pieceinfo.uplink_cert_id = certificate.cert_id
WHERE satellite_id = ? AND piece_id = ?
`), satelliteID, pieceID).Scan(&info.PieceSize, &info.PieceExpiration, &uplinkPieceHash, &uplinkIdentity)
db.mu.Unlock()
if err != nil {
return nil, ErrInfo.Wrap(err)
}
info.UplinkPieceHash = &pb.PieceHash{}
err = proto.Unmarshal(uplinkPieceHash, info.UplinkPieceHash)
if err != nil {
return nil, ErrInfo.Wrap(err)
}
info.Uplink, err = decodePeerIdentity(ctx, uplinkIdentity)
if err != nil {
return nil, ErrInfo.Wrap(err)
}
return info, nil
}
// Delete deletes piece information.
func (db *pieceinfo) Delete(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
_, err = db.db.ExecContext(ctx, db.Rebind(`
DELETE FROM pieceinfo
WHERE satellite_id = ?
AND piece_id = ?
`), satelliteID, pieceID)
return ErrInfo.Wrap(err)
}
// DeleteFailed marks piece as a failed deletion.
func (db *pieceinfo) DeleteFailed(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, now time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
_, err = db.db.ExecContext(ctx, db.Rebind(`
UPDATE pieceinfo
SET deletion_failed_at = ?
WHERE satellite_id = ?
AND piece_id = ?
`), now, satelliteID, pieceID)
return ErrInfo.Wrap(err)
}
// GetExpired gets pieceinformation identites that are expired.
func (db *pieceinfo) GetExpired(ctx context.Context, expiredAt time.Time, limit int64) (infos []pieces.ExpiredInfo, err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
rows, err := db.db.QueryContext(ctx, db.Rebind(`
SELECT satellite_id, piece_id, piece_size
FROM pieceinfo
WHERE piece_expiration < ? AND ((deletion_failed_at IS NULL) OR deletion_failed_at <> ?)
ORDER BY satellite_id
LIMIT ?
`), expiredAt, expiredAt, limit)
if err != nil {
return nil, ErrInfo.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
info := pieces.ExpiredInfo{}
err = rows.Scan(&info.SatelliteID, &info.PieceID, &info.PieceSize)
if err != nil {
return infos, ErrInfo.Wrap(err)
}
infos = append(infos, info)
}
return infos, nil
}
// SpaceUsed calculates disk space used by all pieces
func (db *pieceinfo) SpaceUsed(ctx context.Context) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
var sum sql.NullInt64
err = db.db.QueryRowContext(ctx, db.Rebind(`
SELECT SUM(piece_size)
FROM pieceinfo
`)).Scan(&sum)
if err == sql.ErrNoRows || !sum.Valid {
return 0, nil
}
return sum.Int64, err
}