-
Notifications
You must be signed in to change notification settings - Fork 178
/
blob_io.go
202 lines (163 loc) · 5.11 KB
/
blob_io.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
package blobs
import (
"bytes"
"errors"
"io"
"github.com/ipfs/go-cid"
)
var ErrBlobChannelWriterClosed = errors.New("blob channel writer is already closed")
// BlobChannelWriter is a writer which splits the data written to it into blobs and sends them to a blob channel.
type BlobChannelWriter struct {
maxBlobSize int
blobs chan<- Blob
buf *bytes.Buffer
cids []cid.Cid
closed bool
bytesSent uint64
}
var _ io.WriteCloser = (*BlobChannelWriter)(nil)
var _ io.ByteWriter = (*BlobChannelWriter)(nil)
func (bw *BlobChannelWriter) BytesSent() uint64 {
return bw.bytesSent
}
func (bw *BlobChannelWriter) CidsSent() []cid.Cid {
return bw.cids
}
// Write writes len(data) bytes from data to the underlying blob channel. It returns the number of bytes written
// from data (0 <= n <= len(data)) or ErrBlobChannelWriterClosed if the Blob Channel Writer was already previously
// closed via a call to Close. It will always return a non-nil error if it returns n < len(data).
func (bw *BlobChannelWriter) Write(data []byte) (int, error) {
if bw.closed {
return 0, ErrBlobChannelWriterClosed
}
var n int
for n < len(data) {
size := bw.maxBlobSize - bw.buf.Len()
if n+size > len(data) {
size = len(data) - n
}
m, err := bw.buf.Write(data[n : n+size])
n += m
if err != nil {
return n, err
}
// if we have a full blob, send it to the blob channel
if bw.buf.Len() >= bw.maxBlobSize {
bw.sendNewBlob()
}
}
return n, nil
}
// sendNewBlob sends the currently buffered data to the blob channel and resets the buffer.
func (bw *BlobChannelWriter) sendNewBlob() {
blob := NewBlob(bw.buf.Bytes())
bw.blobs <- blob
bw.cids = append(bw.cids, blob.Cid())
bw.bytesSent += uint64(bw.buf.Len())
// reset the buffer
bw.buf = &bytes.Buffer{}
}
// WriteByte writes a single byte to the underlying blob channel. It returns an error if the byte could not be written.
// Returns ErrBlobChannelWriterClosed if the Blob Channel Writer was already previously closed via a call to Close
func (bw *BlobChannelWriter) WriteByte(c byte) error {
if bw.closed {
return ErrBlobChannelWriterClosed
}
if err := bw.buf.WriteByte(c); err != nil {
return err
}
if bw.buf.Len() >= bw.maxBlobSize {
bw.sendNewBlob()
}
return nil
}
// Flush flushes any buffered data to the underlying blob channel as a new blob. It returns an error if the flush failed.
// Returns ErrBlobChannelWriterClosed if the Blob Channel Writer was already previously closed via a call to Close
func (bw *BlobChannelWriter) Flush() error {
if bw.closed {
return ErrBlobChannelWriterClosed
}
if bw.buf.Len() > 0 {
bw.sendNewBlob()
}
return nil
}
// Close flushes any buffered data to the underlying blob channel and closes the blob channel.
func (bw *BlobChannelWriter) Close() error {
if err := bw.Flush(); err != nil {
return err
}
bw.closed = true
return nil
}
func NewBlobChannelWriter(blobChan chan<- Blob, maxBlobSize int) *BlobChannelWriter {
return &BlobChannelWriter{
maxBlobSize: maxBlobSize,
blobs: blobChan,
buf: &bytes.Buffer{},
}
}
// BlobChannelReader is a reader which reads data from a blob channel.
type BlobChannelReader struct {
blobs <-chan Blob
buf *bytes.Buffer
cids []cid.Cid
finished bool
bytesReceived uint64
}
var _ io.Reader = (*BlobChannelReader)(nil)
var _ io.ByteReader = (*BlobChannelReader)(nil)
func (br *BlobChannelReader) BytesReceived() uint64 {
return br.bytesReceived
}
func (br *BlobChannelReader) CidsReceived() []cid.Cid {
return br.cids
}
// Read reads up to len(data) bytes from the underlying blob channel into data. It returns the number of bytes read
// (0 <= n <= len(data)) and any error encountered.
// Returns io.EOF if the incoming blob channel was closed and all available data has been read.
func (br *BlobChannelReader) Read(data []byte) (int, error) {
if br.finished {
return 0, io.EOF
}
if len(data) == 0 {
return 0, nil
}
// if all the data from the current buffer has been read, receive the next blob from the channel
for br.buf.Len() == 0 {
if !br.receiveNewBlob() {
br.finished = true
return 0, io.EOF
}
}
return br.buf.Read(data)
}
// retrieveNewBlob retrieves a new blob from the blob channel and sets the buffer to the blob's data.
func (br *BlobChannelReader) receiveNewBlob() bool {
blob, ok := <-br.blobs
if !ok {
return false
}
br.buf = bytes.NewBuffer(blob.RawData())
br.cids = append(br.cids, blob.Cid())
br.bytesReceived += uint64(len(blob.RawData()))
return true
}
// ReadByte reads a single byte from the underlying blob channel. It returns an error if the byte could not be read.
// Returns io.EOF if the incoming blob channel was closed and all available data has been read.
func (br *BlobChannelReader) ReadByte() (byte, error) {
if br.finished {
return 0, io.EOF
}
// use a for loop here to guard against empty blobs
for br.buf.Len() == 0 {
if !br.receiveNewBlob() {
br.finished = true
return 0, io.EOF
}
}
return br.buf.ReadByte()
}
func NewBlobChannelReader(blobChan <-chan Blob) *BlobChannelReader {
return &BlobChannelReader{blobs: blobChan, buf: new(bytes.Buffer)}
}