-
Notifications
You must be signed in to change notification settings - Fork 402
/
blob.go
145 lines (118 loc) · 3.87 KB
/
blob.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package filestore
import (
"bufio"
"context"
"io"
"os"
"github.com/zeebo/errs"
"storj.io/storj/storage"
)
const (
// FormatV0 is the identifier for storage format v0, which also corresponds to an absence of
// format version information.
FormatV0 storage.FormatVersion = 0
// FormatV1 is the identifier for storage format v1.
FormatV1 storage.FormatVersion = 1
// Note: New FormatVersion values should be consecutive, as certain parts of this blob store
// iterate over them numerically and check for blobs stored with each version.
)
const (
// MaxFormatVersionSupported is the highest supported storage format version for reading, and
// the only supported storage format version for writing. If stored blobs claim a higher
// storage format version than this, or a caller requests _writing_ a storage format version
// which is not this, this software will not know how to perform the read or write and an error
// will be returned.
MaxFormatVersionSupported = FormatV1
// MinFormatVersionSupported is the lowest supported storage format version for reading. If
// stored blobs claim a lower storage format version than this, this software will not know how
// to perform the read and an error will be returned.
MinFormatVersionSupported = FormatV0
)
// blobReader implements reading blobs.
type blobReader struct {
*os.File
formatVersion storage.FormatVersion
}
func newBlobReader(file *os.File, formatVersion storage.FormatVersion) *blobReader {
return &blobReader{file, formatVersion}
}
// Size returns how large is the blob.
func (blob *blobReader) Size() (int64, error) {
stat, err := blob.Stat()
if err != nil {
return 0, err
}
return stat.Size(), err
}
// StorageFormatVersion gets the storage format version being used by the blob.
func (blob *blobReader) StorageFormatVersion() storage.FormatVersion {
return blob.formatVersion
}
// blobWriter implements writing blobs.
type blobWriter struct {
ref storage.BlobRef
store *blobStore
closed bool
formatVersion storage.FormatVersion
buffer *bufio.Writer
fh *os.File
}
func newBlobWriter(ref storage.BlobRef, store *blobStore, formatVersion storage.FormatVersion, file *os.File, bufferSize int) *blobWriter {
return &blobWriter{
ref: ref,
store: store,
closed: false,
formatVersion: formatVersion,
buffer: bufio.NewWriterSize(file, bufferSize),
fh: file,
}
}
// Write adds data to the blob.
func (blob *blobWriter) Write(p []byte) (int, error) {
return blob.buffer.Write(p)
}
// Cancel discards the blob.
func (blob *blobWriter) Cancel(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
if blob.closed {
return nil
}
blob.closed = true
err = blob.fh.Close()
removeErr := os.Remove(blob.fh.Name())
return Error.Wrap(errs.Combine(err, removeErr))
}
// Commit moves the file to the target location.
func (blob *blobWriter) Commit(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
if blob.closed {
return Error.New("already closed")
}
blob.closed = true
if err := blob.buffer.Flush(); err != nil {
return err
}
err = blob.store.dir.Commit(ctx, blob.fh, blob.ref, blob.formatVersion)
return Error.Wrap(err)
}
// Seek flushes any buffer and seeks the underlying file.
func (blob *blobWriter) Seek(offset int64, whence int) (int64, error) {
if err := blob.buffer.Flush(); err != nil {
return 0, err
}
return blob.fh.Seek(offset, whence)
}
// Size returns how much has been written so far.
func (blob *blobWriter) Size() (int64, error) {
pos, err := blob.Seek(0, io.SeekCurrent)
if err != nil {
return 0, err
}
return pos, err
}
// StorageFormatVersion indicates what storage format version the blob is using.
func (blob *blobWriter) StorageFormatVersion() storage.FormatVersion {
return blob.formatVersion
}