-
Notifications
You must be signed in to change notification settings - Fork 111
/
decoder.go
77 lines (72 loc) · 1.62 KB
/
decoder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package rs
import (
"github.com/klauspost/reedsolomon"
"io"
)
type decoder struct {
readers []io.Reader
writers []io.Writer
enc reedsolomon.Encoder
size int64
cache []byte
cacheSize int
total int64
}
func NewDecoder(readers []io.Reader, writers []io.Writer, size int64) *decoder {
enc, _ := reedsolomon.New(DATA_SHARDS, PARITY_SHARDS)
return &decoder{readers, writers, enc, size, nil, 0, 0}
}
func (d *decoder) Read(p []byte) (n int, err error) {
if d.cacheSize == 0 {
e := d.getData()
if e != nil {
return 0, e
}
}
length := len(p)
if d.cacheSize < length {
length = d.cacheSize
}
d.cacheSize -= length
copy(p, d.cache[:length])
d.cache = d.cache[length:]
return length, nil
}
func (d *decoder) getData() error {
if d.total == d.size {
return io.EOF
}
shards := make([][]byte, ALL_SHARDS)
repairIds := make([]int, 0)
for i := range shards {
if d.readers[i] == nil {
repairIds = append(repairIds, i)
} else {
shards[i] = make([]byte, BLOCK_PER_SHARD)
n, e := io.ReadFull(d.readers[i], shards[i])
if e != nil && e != io.EOF && e != io.ErrUnexpectedEOF {
shards[i] = nil
} else if n != BLOCK_PER_SHARD {
shards[i] = shards[i][:n]
}
}
}
e := d.enc.Reconstruct(shards)
if e != nil {
return e
}
for i := range repairIds {
id := repairIds[i]
d.writers[id].Write(shards[id])
}
for i := 0; i < DATA_SHARDS; i++ {
shardSize := int64(len(shards[i]))
if d.total+shardSize > d.size {
shardSize -= d.total + shardSize - d.size
}
d.cache = append(d.cache, shards[i][:shardSize]...)
d.cacheSize += int(shardSize)
d.total += shardSize
}
return nil
}