-
Notifications
You must be signed in to change notification settings - Fork 402
/
store.go
160 lines (126 loc) · 3.91 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package objects
import (
"context"
"io"
"time"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/ranger"
"storj.io/storj/pkg/storj"
"storj.io/storj/storage"
"storj.io/storj/uplink/storage/streams"
)
var mon = monkit.Package()
// Meta is the full object metadata
type Meta struct {
pb.SerializableMeta
Modified time.Time
Expiration time.Time
Size int64
Checksum string
}
// ListItem is a single item in a listing
type ListItem struct {
Path storj.Path
Meta Meta
IsPrefix bool
}
// Store for objects
type Store interface {
Meta(ctx context.Context, path storj.Path) (meta Meta, err error)
Get(ctx context.Context, path storj.Path) (rr ranger.Ranger, meta Meta, err error)
Put(ctx context.Context, path storj.Path, data io.Reader, metadata pb.SerializableMeta, expiration time.Time) (meta Meta, err error)
Delete(ctx context.Context, path storj.Path) (err error)
List(ctx context.Context, prefix, startAfter, endBefore storj.Path, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error)
}
type objStore struct {
store streams.Store
pathCipher storj.CipherSuite
}
// NewStore for objects
func NewStore(store streams.Store, pathCipher storj.CipherSuite) Store {
return &objStore{store: store, pathCipher: pathCipher}
}
func (o *objStore) Meta(ctx context.Context, path storj.Path) (meta Meta, err error) {
defer mon.Task()(&ctx)(&err)
if len(path) == 0 {
return Meta{}, storj.ErrNoPath.New("")
}
m, err := o.store.Meta(ctx, path, o.pathCipher)
if storage.ErrKeyNotFound.Has(err) {
err = storj.ErrObjectNotFound.Wrap(err)
}
return convertMeta(m), err
}
func (o *objStore) Get(ctx context.Context, path storj.Path) (
rr ranger.Ranger, meta Meta, err error) {
defer mon.Task()(&ctx)(&err)
if len(path) == 0 {
return nil, Meta{}, storj.ErrNoPath.New("")
}
rr, m, err := o.store.Get(ctx, path, o.pathCipher)
if storage.ErrKeyNotFound.Has(err) {
err = storj.ErrObjectNotFound.Wrap(err)
}
return rr, convertMeta(m), err
}
func (o *objStore) Put(ctx context.Context, path storj.Path, data io.Reader, metadata pb.SerializableMeta, expiration time.Time) (meta Meta, err error) {
defer mon.Task()(&ctx)(&err)
if len(path) == 0 {
return Meta{}, storj.ErrNoPath.New("")
}
// TODO(kaloyan): autodetect content type
// if metadata.GetContentType() == "" {}
b, err := proto.Marshal(&metadata)
if err != nil {
return Meta{}, err
}
m, err := o.store.Put(ctx, path, o.pathCipher, data, b, expiration)
return convertMeta(m), err
}
func (o *objStore) Delete(ctx context.Context, path storj.Path) (err error) {
defer mon.Task()(&ctx)(&err)
if len(path) == 0 {
return storj.ErrNoPath.New("")
}
err = o.store.Delete(ctx, path, o.pathCipher)
if storage.ErrKeyNotFound.Has(err) {
err = storj.ErrObjectNotFound.Wrap(err)
}
return err
}
func (o *objStore) List(ctx context.Context, prefix, startAfter, endBefore storj.Path, recursive bool, limit int, metaFlags uint32) (
items []ListItem, more bool, err error) {
defer mon.Task()(&ctx)(&err)
strItems, more, err := o.store.List(ctx, prefix, startAfter, endBefore, o.pathCipher, recursive, limit, metaFlags)
if err != nil {
return nil, false, err
}
items = make([]ListItem, len(strItems))
for i, itm := range strItems {
items[i] = ListItem{
Path: itm.Path,
Meta: convertMeta(itm.Meta),
IsPrefix: itm.IsPrefix,
}
}
return items, more, nil
}
// convertMeta converts stream metadata to object metadata
func convertMeta(m streams.Meta) Meta {
ser := pb.SerializableMeta{}
err := proto.Unmarshal(m.Data, &ser)
if err != nil {
zap.S().Warnf("Failed deserializing metadata: %v", err)
}
return Meta{
Modified: m.Modified,
Expiration: m.Expiration,
Size: m.Size,
SerializableMeta: ser,
}
}