/
swarm.go
160 lines (137 loc) · 4.43 KB
/
swarm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package core
import (
"container/list"
"fmt"
"github.com/golang/glog"
)
// SwarmDataHandler is a function that's invoked with new data chunks that
// have just been received.
type SwarmDataHandler func(DataMsg)
// SwarmConfig is used to create a Swarm.
type SwarmConfig struct {
// Metadata stores the metadata of a Swarm. See the docs for
// SwarmMetadata for details.
Metadata SwarmMetadata
// DataHandler is invoked in a new goroutine whenever new data chunks
// have been received.
DataHandler SwarmDataHandler
}
// SwarmMetadata stores the metadata of a Swarm
// See: https://tools.ietf.org/html/rfc7574#section-3.1
type SwarmMetadata struct {
ID SwarmID
ChunkSize int
// TODO: chunk addressing method
// TODO: content integrity protection method
// TODO: Merkle hash tree function (if applicable)
}
// Swarm tracks info related to a swarm
type Swarm struct {
// chans is a peer ID -> channel ID map for this swarm
// it does not include this peer, because this peer does not have a local channel ID
chans map[PeerID]ChanID
// TODO: other swarm metadata stored here
// chunkstore tracks the chunks that are locally stored for this swarm
localChunks map[ChunkID]*Chunk
// haves maps ChunkID to a list of peers that have that chunk (peers tracked by peer ID)
remoteHaves map[ChunkID]*list.List
metadata SwarmMetadata
dataHandler SwarmDataHandler
}
// NewSwarm creates a new Swarm
func NewSwarm(config SwarmConfig) *Swarm {
chans := make(map[PeerID]ChanID)
localChunks := make(map[ChunkID]*Chunk)
remoteHaves := make(map[ChunkID]*list.List)
return &Swarm{
chans: chans,
localChunks: localChunks,
remoteHaves: remoteHaves,
metadata: config.Metadata,
dataHandler: config.DataHandler,
}
}
// AddRemoteHave tells this Swarm that the remote peer p has ChunkID c
func (s *Swarm) AddRemoteHave(c ChunkID, p PeerID) {
_, ok := s.remoteHaves[c]
if !ok {
s.remoteHaves[c] = list.New()
}
s.remoteHaves[c].PushFront(p)
}
// checkChunk returns whether the chunk has any errors (e.g. wrong size)
func (s *Swarm) checkChunk(c *Chunk) error {
ref := s.ChunkSize()
if size := len(c.B); ref != size {
return fmt.Errorf("checkChunk got size=%d, should be %d", size, ref)
}
return nil
}
// AddLocalChunk stores the chunk locally
func (s *Swarm) AddLocalChunk(cid ChunkID, c *Chunk) error {
if err := s.checkChunk(c); err != nil {
return err
}
_, ok := s.localChunks[cid]
if ok {
glog.Warningf("addChunk overwriting existing chunk at id=%v", cid)
}
s.localChunks[cid] = c
return nil
}
// AddLocalChunks stores a contiguous chunk range, with input data batched in one array
func (s *Swarm) AddLocalChunks(start ChunkID, end ChunkID, data []byte) error {
chunkSize := s.metadata.ChunkSize
for i := start; i <= end; i++ {
c := newChunk(i, chunkSize)
dstart := (int(i) - int(start)) * chunkSize
dend := dstart + chunkSize
n := copy(c.B, data[dstart:dend])
if n != chunkSize {
return fmt.Errorf("AddLocalChunks bad copy")
}
s.localChunks[i] = c
}
return nil
}
// LocalChunks returns the local chunks store for this Swarm
func (s *Swarm) LocalChunks() map[ChunkID]*Chunk {
return s.localChunks
}
// WantChunk returns whether this Swarm wants the chunk locally
func (s *Swarm) WantChunk(id ChunkID) bool {
// Simple implementation for now... if it's not in localChunks, we want it.
_, ok := s.localChunks[id]
return !ok
}
// DataFromLocalChunks returns packs the data from the chunk range into a single array
func (s *Swarm) DataFromLocalChunks(start ChunkID, end ChunkID) ([]byte, error) {
chunkSize := s.metadata.ChunkSize
n := int(end) - int(start) + 1
if n <= 0 {
return nil, fmt.Errorf("DataFromLocalChunks bad range (%d, %d)", start, end)
}
b := make([]byte, n*chunkSize)
for i := start; i <= end; i++ {
c, ok := s.localChunks[i]
if !ok {
return nil, fmt.Errorf("DataFromLocalChunks could not find local chunk %d", i)
}
bstart := (int(i) - int(start)) * chunkSize
bend := bstart + chunkSize
bn := copy(b[bstart:bend], c.B)
if bn != chunkSize {
return nil, fmt.Errorf("DataFromLocalChunks bad read from local chunk %d (read %d bytes, chunksize=%d", i, bn, chunkSize)
}
}
return b, nil
}
// ChunkSize returns the chunk size for this Swarm
func (s *Swarm) ChunkSize() int {
return s.metadata.ChunkSize
}
// ChanID returns the channel ID for the given peer, also returns bool indicating success of the lookup
func (s *Swarm) ChanID(id PeerID) (ChanID, bool) {
c, ok := s.chans[id]
return c, ok
}