-
Notifications
You must be signed in to change notification settings - Fork 402
/
transform.go
179 lines (160 loc) · 5.41 KB
/
transform.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package encryption
import (
"bytes"
"context"
"io"
"io/ioutil"
"storj.io/storj/internal/readcloser"
"storj.io/storj/pkg/ranger"
)
// A Transformer is a data transformation that may change the size of the blocks
// of data it operates on in a deterministic fashion.
type Transformer interface {
InBlockSize() int // The block size prior to transformation
OutBlockSize() int // The block size after transformation
Transform(out, in []byte, blockNum int64) ([]byte, error)
}
type transformedReader struct {
r io.ReadCloser
t Transformer
blockNum int64
inbuf []byte
outbuf []byte
expectedSize int64
bytesRead int
}
// NoopTransformer is a dummy Transformer that passes data through without modifying it
type NoopTransformer struct{}
// InBlockSize is 1
func (t *NoopTransformer) InBlockSize() int {
return 1
}
// OutBlockSize is 1
func (t *NoopTransformer) OutBlockSize() int {
return 1
}
// Transform returns the input without modification
func (t *NoopTransformer) Transform(out, in []byte, blockNum int64) ([]byte, error) {
return append(out, in...), nil
}
// TransformReader applies a Transformer to a Reader. startingBlockNum should
// probably be 0 unless you know you're already starting at a block offset.
func TransformReader(r io.ReadCloser, t Transformer,
startingBlockNum int64) io.ReadCloser {
return &transformedReader{
r: r,
t: t,
blockNum: startingBlockNum,
inbuf: make([]byte, t.InBlockSize()),
outbuf: make([]byte, 0, t.OutBlockSize()),
}
}
// TransformReaderSize creates a TransformReader with expected size,
// i.e. the number of bytes that is expected to be read from this reader.
// If less than the expected bytes are read, the reader will return
// io.ErrUnexpectedEOF instead of io.EOF.
func TransformReaderSize(r io.ReadCloser, t Transformer,
startingBlockNum int64, expectedSize int64) io.ReadCloser {
return &transformedReader{
r: r,
t: t,
blockNum: startingBlockNum,
inbuf: make([]byte, t.InBlockSize()),
outbuf: make([]byte, 0, t.OutBlockSize()),
expectedSize: expectedSize,
}
}
func (t *transformedReader) Read(p []byte) (n int, err error) {
if len(t.outbuf) <= 0 {
// If there's no more buffered data left, let's fill the buffer with
// the next block
b, err := io.ReadFull(t.r, t.inbuf)
t.bytesRead += b
if err == io.EOF && int64(t.bytesRead) < t.expectedSize {
return 0, io.ErrUnexpectedEOF
} else if err != nil {
return 0, err
}
t.outbuf, err = t.t.Transform(t.outbuf, t.inbuf, t.blockNum)
if err != nil {
return 0, Error.Wrap(err)
}
t.blockNum++
}
// return as much as we can from the current buffered block
n = copy(p, t.outbuf)
// slide the uncopied data to the beginning of the buffer
copy(t.outbuf, t.outbuf[n:])
// resize the buffer
t.outbuf = t.outbuf[:len(t.outbuf)-n]
return n, nil
}
func (t *transformedReader) Close() error {
return t.r.Close()
}
type transformedRanger struct {
rr ranger.Ranger
t Transformer
}
// Transform will apply a Transformer to a Ranger.
func Transform(rr ranger.Ranger, t Transformer) (ranger.Ranger, error) {
if rr.Size()%int64(t.InBlockSize()) != 0 {
return nil, Error.New("invalid transformer and range reader combination." +
"the range reader size is not a multiple of the block size")
}
return &transformedRanger{rr: rr, t: t}, nil
}
func (t *transformedRanger) Size() int64 {
blocks := t.rr.Size() / int64(t.t.InBlockSize())
return blocks * int64(t.t.OutBlockSize())
}
// CalcEncompassingBlocks is a useful helper function that, given an offset,
// length, and blockSize, will tell you which blocks contain the requested
// offset and length
func CalcEncompassingBlocks(offset, length int64, blockSize int) (
firstBlock, blockCount int64) {
firstBlock = offset / int64(blockSize)
if length <= 0 {
return firstBlock, 0
}
lastBlock := (offset + length) / int64(blockSize)
if (offset+length)%int64(blockSize) == 0 {
return firstBlock, lastBlock - firstBlock
}
return firstBlock, 1 + lastBlock - firstBlock
}
func (t *transformedRanger) Range(ctx context.Context, offset, length int64) (io.ReadCloser, error) {
// Range may not have been called for block-aligned offsets and lengths, so
// let's figure out which blocks encompass the request
firstBlock, blockCount := CalcEncompassingBlocks(
offset, length, t.t.OutBlockSize())
// If block count is 0, there is nothing to transform, so return a dumb
// reader that will just return io.EOF on read
if blockCount == 0 {
return ioutil.NopCloser(bytes.NewReader(nil)), nil
}
// okay, now let's get the range on the underlying ranger for those blocks
// and then Transform it.
r, err := t.rr.Range(ctx,
firstBlock*int64(t.t.InBlockSize()),
blockCount*int64(t.t.InBlockSize()))
if err != nil {
return nil, err
}
tr := TransformReaderSize(r, t.t, firstBlock, blockCount*int64(t.t.InBlockSize()))
// the range we got potentially includes more than we wanted. if the
// offset started past the beginning of the first block, we need to
// swallow the first few bytes
_, err = io.CopyN(ioutil.Discard, tr,
offset-firstBlock*int64(t.t.OutBlockSize()))
if err != nil {
if err == io.EOF {
return nil, io.ErrUnexpectedEOF
}
return nil, Error.Wrap(err)
}
// the range might have been too long. only return what was requested
return readcloser.LimitReadCloser(tr, length), nil
}