-
Notifications
You must be signed in to change notification settings - Fork 348
/
tracker.go
90 lines (80 loc) · 2.21 KB
/
tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package multiparts
import (
"context"
"errors"
"fmt"
"time"
"github.com/treeverse/lakefs/pkg/db"
)
type MultipartUpload struct {
UploadID string `db:"upload_id"`
Path string `db:"path"`
CreationDate time.Time `db:"creation_date"`
PhysicalAddress string `db:"physical_address"`
}
type Tracker interface {
Create(ctx context.Context, uploadID, path, physicalAddress string, creationTime time.Time) error
Get(ctx context.Context, uploadID string) (*MultipartUpload, error)
Delete(ctx context.Context, uploadID string) error
}
type tracker struct {
db db.Database
}
var (
ErrMultipartUploadNotFound = fmt.Errorf("multipart upload not found")
ErrInvalidUploadID = errors.New("invalid upload id")
)
func NewTracker(adb db.Database) Tracker {
return &tracker{
db: adb,
}
}
func (m *tracker) Create(ctx context.Context, uploadID, path, physicalAddress string, creationTime time.Time) error {
if uploadID == "" {
return ErrInvalidUploadID
}
_, err := m.db.Transact(ctx, func(tx db.Tx) (interface{}, error) {
_, err := tx.Exec(`INSERT INTO gateway_multiparts (upload_id,path,creation_date,physical_address)
VALUES ($1, $2, $3, $4)`,
uploadID, path, creationTime, physicalAddress)
return nil, err
})
return err
}
func (m *tracker) Get(ctx context.Context, uploadID string) (*MultipartUpload, error) {
if uploadID == "" {
return nil, ErrInvalidUploadID
}
res, err := m.db.Transact(ctx, func(tx db.Tx) (interface{}, error) {
var m MultipartUpload
if err := tx.Get(&m, `
SELECT upload_id, path, creation_date, physical_address
FROM gateway_multiparts
WHERE upload_id = $1`,
uploadID); err != nil {
return nil, err
}
return &m, nil
})
if err != nil {
return nil, err
}
return res.(*MultipartUpload), nil
}
func (m *tracker) Delete(ctx context.Context, uploadID string) error {
if uploadID == "" {
return ErrInvalidUploadID
}
_, err := m.db.Transact(ctx, func(tx db.Tx) (interface{}, error) {
res, err := tx.Exec(`DELETE FROM gateway_multiparts WHERE upload_id = $1`, uploadID)
if err != nil {
return nil, err
}
affected := res.RowsAffected()
if affected != 1 {
return nil, ErrMultipartUploadNotFound
}
return nil, nil
})
return err
}