forked from chihaya/chihaya
/
directory.go
132 lines (123 loc) · 3.75 KB
/
directory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Package directory implements container which
// checks if hash present in any of torrent file
// placed in some directory.
// Note: Unlike List, this container also stores torrent name as value
package directory
import (
"context"
"fmt"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/util/dirwatch"
"github.com/minio/sha256-simd"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/middleware/torrentapproval/container"
"github.com/sot-tech/mochi/middleware/torrentapproval/container/list"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/storage"
)
var logger = log.NewLogger("middleware/torrent approval/directory")
func init() {
container.Register("directory", build)
}
// Config - implementation of directory container configuration.
// Extends list.Config because uses the same storage and Approved function.
type Config struct {
list.Config
// Path in filesystem where torrent files stored and should be watched
Path string
}
func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, error) {
c := new(Config)
if err := conf.Unmarshal(c); err != nil {
return nil, fmt.Errorf("unable to deserialise configuration: %w", err)
}
var err error
d := &directory{
List: list.List{
Invert: c.Invert,
Storage: st,
StorageCtx: c.StorageCtx,
},
watcher: nil,
}
if len(d.StorageCtx) == 0 {
logger.Warn().
Str("name", "StorageCtx").
Str("provided", d.StorageCtx).
Str("default", container.DefaultStorageCtxName).
Msg("falling back to default configuration")
d.StorageCtx = container.DefaultStorageCtxName
}
var w *dirwatch.Instance
if w, err = dirwatch.New(c.Path); err != nil {
return nil, fmt.Errorf("unable to initialize directory watch: %w", err)
}
d.watcher = w
go func() {
for event := range d.watcher.Events {
var mi *metainfo.MetaInfo
if mi, err = metainfo.LoadFromFile(event.TorrentFilePath); err == nil {
s256 := sha256.New()
s256.Write(mi.InfoBytes)
v2hash, _ := bittorrent.NewInfoHash(s256.Sum(nil))
switch event.Change {
case dirwatch.Added:
var name string
if info, err := mi.UnmarshalInfo(); err == nil {
name = info.Name
} else {
logger.Error().
Err(err).
Str("file", event.TorrentFilePath).
Stringer("infoHash", event.InfoHash).
Stringer("infoHashV2", v2hash).
Msg("unable to unmarshal torrent info")
}
if len(name) == 0 {
name = list.DUMMY
}
bName := []byte(name)
logger.Err(d.Storage.Put(context.Background(), d.StorageCtx, storage.Entry{
Key: event.InfoHash.AsString(),
Value: bName,
}, storage.Entry{
Key: v2hash.RawString(),
Value: bName,
}, storage.Entry{
Key: v2hash.TruncateV1().RawString(),
Value: bName,
})).
Str("action", "add").
Str("file", event.TorrentFilePath).
Stringer("infoHash", event.InfoHash).
Stringer("infoHashV2", v2hash).
Msg("approval torrent watcher event")
case dirwatch.Removed:
logger.Err(d.Storage.Delete(context.Background(), c.StorageCtx, event.InfoHash.AsString(), v2hash.RawString(), v2hash.TruncateV1().RawString())).
Str("action", "delete").
Str("file", event.TorrentFilePath).
Stringer("infoHash", event.InfoHash).
Stringer("infoHashV2", v2hash).
Msg("approval torrent watcher event")
}
} else {
logger.Error().Err(err).
Str("file", event.TorrentFilePath).
Msg("unable to load torrent file")
}
}
}()
return d, err
}
type directory struct {
list.List
watcher *dirwatch.Instance
}
// Close closes watching of torrent directory
func (d *directory) Close() error {
if d.watcher != nil {
d.watcher.Close()
}
return nil
}