forked from creiht/formic
/
tasks.go
120 lines (109 loc) · 2.75 KB
/
tasks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package main
import (
"log"
"github.com/creiht/formic"
"github.com/gholt/store"
"golang.org/x/net/context"
)
type UpdateItem struct {
id []byte
block uint64
blocksize uint64
size uint64
mtime int64
}
type Updatinator struct {
in chan *UpdateItem
fs FileService
}
func newUpdatinator(in chan *UpdateItem, fs FileService) *Updatinator {
return &Updatinator{
in: in,
fs: fs,
}
}
func (u *Updatinator) run() {
// TODO: Add fan-out based on the id of the update
for {
toupdate := <-u.in
log.Println("Updating: ", toupdate)
// TODO: Need better context
ctx := context.Background()
err := u.fs.Update(ctx, toupdate.id, toupdate.block, toupdate.blocksize, toupdate.size, toupdate.mtime)
if err != nil {
log.Println("Update failed, requeing: ", err)
u.in <- toupdate
}
}
}
type DeleteItem struct {
parent []byte
name string
}
type Deletinator struct {
in chan *DeleteItem
fs FileService
}
func newDeletinator(in chan *DeleteItem, fs FileService) *Deletinator {
return &Deletinator{
in: in,
fs: fs,
}
}
func (d *Deletinator) run() {
// TODO: Parallelize this thing?
for {
todelete := <-d.in
log.Println("Deleting: ", todelete)
// TODO: Need better context
ctx := context.Background()
// Get the dir entry info
dirent, err := d.fs.GetDirent(ctx, todelete.parent, todelete.name)
if store.IsNotFound(err) {
// NOTE: If it isn't found then it is likely deleted.
// Do we need to do more to ensure this?
// Skip for now
continue
}
if err != nil {
// TODO Better error handling?
// re-q the id, to try again later
log.Print("Delete error getting dirent: ", err)
d.in <- todelete
continue
}
ts := dirent.Tombstone
if ts == nil {
// TODO: probably an overwrite. just remove old file
continue
}
deleted := uint64(0)
for b := uint64(0); b < ts.Blocks; b++ {
// Delete each block
id := formic.GetID(ts.FsId, ts.Inode, b+1)
err := d.fs.DeleteChunk(ctx, id, ts.Dtime)
if err != nil && !store.IsNotFound(err) && err != ErrStoreHasNewerValue {
continue
}
deleted++
}
if deleted == ts.Blocks {
// Everything is deleted so delete the entry
err := d.fs.DeleteChunk(ctx, formic.GetID(ts.FsId, ts.Inode, 0), ts.Dtime)
if err != nil && !store.IsNotFound(err) && err != ErrStoreHasNewerValue {
// Couldn't delete the inode entry so try again later
d.in <- todelete
continue
}
err = d.fs.DeleteListing(ctx, todelete.parent, todelete.name, ts.Dtime)
if err != nil && !store.IsNotFound(err) && err != ErrStoreHasNewerValue {
log.Println(" Err: ", err)
// TODO: Better error handling
// Ignore for now to be picked up later?
}
} else {
// If all artifacts are not deleted requeue for later
d.in <- todelete
}
}
}