-
Notifications
You must be signed in to change notification settings - Fork 351
/
tracker.go
107 lines (92 loc) · 2.76 KB
/
tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package multipart
import (
"context"
"errors"
"fmt"
"time"
"github.com/treeverse/lakefs/pkg/kv"
"google.golang.org/protobuf/types/known/timestamppb"
)
const storePartitionKey = "multiparts"
type Metadata map[string]string
type Upload struct {
// UploadID A unique identifier for the uploaded part
UploadID string `db:"upload_id"`
// Path Multipart path in repository
Path string `db:"path"`
// CreationDate Creation date of the part
CreationDate time.Time `db:"creation_date"`
// PhysicalAddress Physical address of the part in the storage
PhysicalAddress string `db:"physical_address"`
// Metadata Additional metadata as required (by storage vendor etc.)
Metadata Metadata `db:"metadata"`
// ContentType Original file's content-type
ContentType string `db:"content_type"`
}
type Tracker interface {
Create(ctx context.Context, multipart Upload) error
Get(ctx context.Context, uploadID string) (*Upload, error)
Delete(ctx context.Context, uploadID string) error
}
type tracker struct {
store kv.Store
}
var (
ErrMultipartUploadNotFound = errors.New("multipart upload not found")
ErrInvalidUploadID = errors.New("invalid upload id")
)
func NewTracker(store kv.Store) Tracker {
return &tracker{
store: store,
}
}
func multipartFromProto(pb *UploadData) *Upload {
return &Upload{
UploadID: pb.UploadId,
Path: pb.Path,
CreationDate: pb.CreationDate.AsTime(),
PhysicalAddress: pb.PhysicalAddress,
Metadata: pb.Metadata,
ContentType: pb.ContentType,
}
}
func protoFromMultipart(m *Upload) *UploadData {
return &UploadData{
UploadId: m.UploadID,
Path: m.Path,
CreationDate: timestamppb.New(m.CreationDate),
PhysicalAddress: m.PhysicalAddress,
Metadata: m.Metadata,
ContentType: m.ContentType,
}
}
func (m *tracker) Create(ctx context.Context, multipart Upload) error {
if multipart.UploadID == "" {
return ErrInvalidUploadID
}
return kv.SetMsgIf(ctx, m.store, storePartitionKey, []byte(multipart.UploadID), protoFromMultipart(&multipart), nil)
}
func (m *tracker) Get(ctx context.Context, uploadID string) (*Upload, error) {
if uploadID == "" {
return nil, ErrInvalidUploadID
}
data := &UploadData{}
_, err := kv.GetMsg(ctx, m.store, storePartitionKey, []byte(uploadID), data)
if err != nil {
return nil, err
}
return multipartFromProto(data), nil
}
func (m *tracker) Delete(ctx context.Context, uploadID string) error {
if uploadID == "" {
return ErrInvalidUploadID
}
key := []byte(uploadID)
if _, err := m.store.Get(ctx, []byte(storePartitionKey), key); err != nil {
if errors.Is(err, kv.ErrNotFound) {
return fmt.Errorf("%w uploadID=%s", ErrMultipartUploadNotFound, uploadID)
}
return err
}
return m.store.Delete(ctx, []byte(storePartitionKey), key)
}