-
Notifications
You must be signed in to change notification settings - Fork 173
/
watcher.go
116 lines (105 loc) · 2.9 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package chanwatcher
import (
"context"
"path"
"strings"
"time"
servicescommon "github.com/pydio/cells/v4/common"
"github.com/pydio/cells/v4/common/log"
"github.com/pydio/cells/v4/common/proto/tree"
"github.com/pydio/cells/v4/common/sync/model"
)
type Watcher struct {
model.PathSyncSource
Context context.Context
NodeChanges chan *tree.NodeChangeEvent
Prefix string
}
func NewWatcher(ctx context.Context, src model.PathSyncSource, prefix string) *Watcher {
return &Watcher{
PathSyncSource: src,
NodeChanges: make(chan *tree.NodeChangeEvent, 1000),
Prefix: prefix,
Context: ctx,
}
}
func (w *Watcher) Watch(recursivePath string) (*model.WatchObject, error) {
eventChan := make(chan model.EventInfo)
errorChan := make(chan error)
doneChan := make(chan bool)
wConn := make(chan model.WatchConnectionInfo)
wo := &model.WatchObject{
EventInfoChan: eventChan,
ErrorChan: errorChan,
DoneChan: doneChan,
ConnectionInfo: wConn,
}
go func() {
for {
select {
case event := <-w.NodeChanges:
var node *tree.Node
var eType model.EventType
switch event.Type {
case tree.NodeChangeEvent_CREATE:
eType = model.EventCreate
node = event.GetTarget()
case tree.NodeChangeEvent_DELETE:
eType = model.EventRemove
node = event.GetSource()
default:
break
}
if node != nil {
meta := event.Metadata
if meta == nil {
meta = make(map[string]string)
}
cleanMeta := make(map[string]string)
for k, v := range meta {
if k == servicescommon.XPydioSessionUuid && strings.HasPrefix(v, servicescommon.SyncSessionClose_) {
cleanMeta[k] = strings.TrimPrefix(v, servicescommon.SyncSessionClose_)
} else {
cleanMeta[k] = v
}
}
objectPath := strings.TrimPrefix(node.GetPath(), w.Prefix)
log.Logger(w.Context).Debug("Got Event", event.Zap())
// If file is .pydio, also send an event on corresponding folder
if strings.HasSuffix(objectPath, servicescommon.PydioSyncHiddenFile) {
eventChan <- model.EventInfo{
Type: eType,
Time: time.Now().Format(time.RFC822),
Path: path.Dir(objectPath),
Etag: node.GetEtag(),
Folder: true,
Size: node.GetSize(),
Source: w.PathSyncSource,
Metadata: cleanMeta,
}
}
eventChan <- model.EventInfo{
Type: eType,
Time: time.Now().Format(time.RFC822),
Path: objectPath,
Etag: node.GetEtag(),
Folder: !node.IsLeaf(),
Size: node.GetSize(),
Source: w.PathSyncSource,
Metadata: meta,
}
}
case <-w.Context.Done():
break
}
}
}()
return wo, nil
}
func (w *Watcher) ComputeChecksum(ctx context.Context, node *tree.Node) error {
if cs, ok := w.PathSyncSource.(model.ChecksumProvider); ok {
return cs.ComputeChecksum(ctx, node)
} else {
return nil
}
}