-
Notifications
You must be signed in to change notification settings - Fork 0
/
system.go
304 lines (253 loc) · 6.83 KB
/
system.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
// package ipnsfs implements an in memory model of a mutable ipns filesystem,
// to be used by the fuse filesystem.
//
// It consists of four main structs:
// 1) The Filesystem
// The filesystem serves as a container and entry point for the ipns filesystem
// 2) KeyRoots
// KeyRoots represent the root of the keyspace controlled by a given keypair
// 3) Directories
// 4) Files
package ipnsfs
import (
"errors"
"os"
"sync"
"time"
key "github.com/ipfs/go-ipfs/blocks/key"
dag "github.com/ipfs/go-ipfs/merkledag"
namesys "github.com/ipfs/go-ipfs/namesys"
ci "github.com/ipfs/go-ipfs/p2p/crypto"
path "github.com/ipfs/go-ipfs/path"
pin "github.com/ipfs/go-ipfs/pin"
ft "github.com/ipfs/go-ipfs/unixfs"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
)
var log = eventlog.Logger("ipnsfs")
var ErrIsDirectory = errors.New("error: is a directory")
// Filesystem is the writeable fuse filesystem structure
type Filesystem struct {
ctx context.Context
dserv dag.DAGService
nsys namesys.NameSystem
resolver *path.Resolver
pins pin.Pinner
roots map[string]*KeyRoot
}
// NewFilesystem instantiates an ipns filesystem using the given parameters and locally owned keys
func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSystem, pins pin.Pinner, keys ...ci.PrivKey) (*Filesystem, error) {
roots := make(map[string]*KeyRoot)
fs := &Filesystem{
ctx: ctx,
roots: roots,
nsys: nsys,
dserv: ds,
pins: pins,
resolver: &path.Resolver{DAG: ds},
}
for _, k := range keys {
pkh, err := k.GetPublic().Hash()
if err != nil {
return nil, err
}
root, err := fs.newKeyRoot(ctx, k)
if err != nil {
return nil, err
}
roots[key.Key(pkh).Pretty()] = root
}
return fs, nil
}
func (fs *Filesystem) Close() error {
wg := sync.WaitGroup{}
for _, r := range fs.roots {
wg.Add(1)
go func(r *KeyRoot) {
defer wg.Done()
err := r.Publish(fs.ctx)
if err != nil {
log.Info(err)
return
}
}(r)
}
wg.Wait()
return nil
}
// GetRoot returns the KeyRoot of the given name
func (fs *Filesystem) GetRoot(name string) (*KeyRoot, error) {
r, ok := fs.roots[name]
if ok {
return r, nil
}
return nil, os.ErrNotExist
}
type childCloser interface {
closeChild(string, *dag.Node) error
}
type NodeType int
const (
TFile NodeType = iota
TDir
)
// FSNode represents any node (directory, root, or file) in the ipns filesystem
type FSNode interface {
GetNode() (*dag.Node, error)
Type() NodeType
Lock()
Unlock()
}
// KeyRoot represents the root of a filesystem tree pointed to by a given keypair
type KeyRoot struct {
key ci.PrivKey
name string
// node is the merkledag node pointed to by this keypair
node *dag.Node
// A pointer to the filesystem to access components
fs *Filesystem
// val represents the node pointed to by this key. It can either be a File or a Directory
val FSNode
repub *Republisher
}
// newKeyRoot creates a new KeyRoot for the given key, and starts up a republisher routine
// for it
func (fs *Filesystem) newKeyRoot(parent context.Context, k ci.PrivKey) (*KeyRoot, error) {
hash, err := k.GetPublic().Hash()
if err != nil {
return nil, err
}
name := "/ipns/" + key.Key(hash).String()
root := new(KeyRoot)
root.key = k
root.fs = fs
root.name = name
ctx, cancel := context.WithCancel(parent)
defer cancel()
pointsTo, err := fs.nsys.Resolve(ctx, name)
if err != nil {
err = namesys.InitializeKeyspace(ctx, fs.dserv, fs.nsys, fs.pins, k)
if err != nil {
return nil, err
}
pointsTo, err = fs.nsys.Resolve(ctx, name)
if err != nil {
return nil, err
}
}
mnode, err := fs.resolver.ResolvePath(ctx, pointsTo)
if err != nil {
log.Errorf("Failed to retrieve value '%s' for ipns entry: %s\n", pointsTo, err)
return nil, err
}
root.node = mnode
root.repub = NewRepublisher(root, time.Millisecond*300, time.Second*3)
go root.repub.Run(parent)
pbn, err := ft.FromBytes(mnode.Data)
if err != nil {
log.Error("IPNS pointer was not unixfs node")
return nil, err
}
switch pbn.GetType() {
case ft.TDirectory:
root.val = NewDirectory(pointsTo.String(), mnode, root, fs)
case ft.TFile, ft.TMetadata, ft.TRaw:
fi, err := NewFile(pointsTo.String(), mnode, root, fs)
if err != nil {
return nil, err
}
root.val = fi
default:
panic("unrecognized! (NYI)")
}
return root, nil
}
func (kr *KeyRoot) GetValue() FSNode {
return kr.val
}
// closeChild implements the childCloser interface, and signals to the publisher that
// there are changes ready to be published
func (kr *KeyRoot) closeChild(name string, nd *dag.Node) error {
kr.repub.Touch()
return nil
}
// Publish publishes the ipns entry associated with this key
func (kr *KeyRoot) Publish(ctx context.Context) error {
child, ok := kr.val.(FSNode)
if !ok {
return errors.New("child of key root not valid type")
}
nd, err := child.GetNode()
if err != nil {
return err
}
// Holding this lock so our child doesnt change out from under us
child.Lock()
k, err := kr.fs.dserv.Add(nd)
if err != nil {
child.Unlock()
return err
}
child.Unlock()
// Dont want to hold the lock while we publish
// otherwise we are holding the lock through a costly
// network operation
kp := path.FromKey(k)
ev := &eventlog.Metadata{"name": kr.name, "key": kp}
defer log.EventBegin(ctx, "ipnsfsPublishing", ev).Done()
log.Info("ipnsfs publishing %s -> %s", kr.name, kp)
return kr.fs.nsys.Publish(ctx, kr.key, kp)
}
// Republisher manages when to publish the ipns entry associated with a given key
type Republisher struct {
TimeoutLong time.Duration
TimeoutShort time.Duration
Publish chan struct{}
root *KeyRoot
}
// NewRepublisher creates a new Republisher object to republish the given keyroot
// using the given short and long time intervals
func NewRepublisher(root *KeyRoot, tshort, tlong time.Duration) *Republisher {
return &Republisher{
TimeoutShort: tshort,
TimeoutLong: tlong,
Publish: make(chan struct{}, 1),
root: root,
}
}
// Touch signals that an update has occurred since the last publish.
// Multiple consecutive touches may extend the time period before
// the next Publish occurs in order to more efficiently batch updates
func (np *Republisher) Touch() {
select {
case np.Publish <- struct{}{}:
default:
}
}
// Run is the main republisher loop
func (np *Republisher) Run(ctx context.Context) {
for {
select {
case <-np.Publish:
quick := time.After(np.TimeoutShort)
longer := time.After(np.TimeoutLong)
wait:
select {
case <-ctx.Done():
return
case <-np.Publish:
quick = time.After(np.TimeoutShort)
goto wait
case <-quick:
case <-longer:
}
log.Info("Publishing Changes!")
err := np.root.Publish(ctx)
if err != nil {
log.Error("republishRoot error: %s", err)
}
case <-ctx.Done():
return
}
}
}