This repository has been archived by the owner on Mar 17, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
file.go
162 lines (140 loc) · 3.83 KB
/
file.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package main
import (
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"golang.org/x/net/context"
"github.com/korylprince/jettison/lib/cache"
"github.com/korylprince/jettison/lib/file"
"github.com/korylprince/jettison/lib/rpc"
)
//FileService manages the local files for the jettison client
type FileService struct {
config *Config
cache cache.Cache
client rpc.FileSetClient
sets map[string]*file.VersionedSet //group:VersionedSet
mu *sync.RWMutex
scan chan []string //chan groups
}
//NewFileService returns a new FileService
func NewFileService(config *Config, c cache.Cache, client rpc.FileSetClient) *FileService {
f := &FileService{
config: config,
cache: c,
client: client,
sets: make(map[string]*file.VersionedSet),
mu: new(sync.RWMutex),
scan: make(chan []string, len(config.Groups)),
}
go f.timer()
return f
}
//Scan causes the FileService to rescan the groups
func (s *FileService) Scan(groups ...string) {
s.scan <- groups
}
//Versions returns the current FileSet versions
func (s *FileService) Versions() map[string]uint64 {
//map[group]version
v := make(map[string]uint64)
s.mu.RLock()
defer s.mu.RUnlock()
for group, vs := range s.sets {
v[group] = vs.Version
}
return v
}
func (s *FileService) timer() {
groups := s.config.Groups
for {
err := s.check(groups...)
if err != nil {
log.Println("FileService: Error downloading files:", err)
}
select {
case <-time.After(s.config.CheckInterval * time.Second):
groups = s.config.Groups
case groups = <-s.scan:
}
}
}
func (s *FileService) check(groups ...string) error {
resp, err := s.client.Get(context.Background(), &rpc.FileSetRequest{Groups: groups})
if err != nil {
return fmt.Errorf("FileSetRequest error: %v", err)
}
//convert fileset
var grps sort.StringSlice
sets := make(map[string]*file.VersionedSet)
for group, set := range resp.Sets {
sets[group] = &file.VersionedSet{Set: set.Set, Version: set.Version}
grps = append(grps, fmt.Sprintf("{Group: %s, Len: %d, Version: %d}", group, len(set.Set), set.Version))
}
grps.Sort()
log.Printf("FileSetResponse: %s\n", strings.Join(grps, ", "))
//walk and download
return s.walk(sets)
}
func (s *FileService) walk(sets map[string]*file.VersionedSet) error {
for group, vs := range sets {
for hash, path := range vs.Set {
_, _, err := s.cache.Get(path)
if err == cache.ErrorInvalidCacheEntry {
err = Download(fmt.Sprintf("http://%s/file/%d", s.config.HTTPServerAddr, hash), path, hash)
if err != nil {
return fmt.Errorf("Download: Error: %v", err)
}
log.Printf("Download: Path: %s, Hash: %d\n", path, hash)
err = s.cache.Put(path, hash, time.Now())
if err != nil {
return fmt.Errorf("Cache.Put error: %v", err)
}
} else if err != nil {
return fmt.Errorf("Cache.Get error: %v", err)
}
}
//everything has been downloaded and cached so update version
s.mu.Lock()
s.sets[group] = vs
s.mu.Unlock()
}
return nil
}
//Download downloads url to path, verifing that the file's hash matches hash
func Download(url, path string, hash uint64) error {
resp, err := http.Get(url)
if err != nil {
return fmt.Errorf("Error getting %s: %v", url, err)
}
defer resp.Body.Close()
err = os.MkdirAll(filepath.Dir(path), 0777)
if err != nil {
return fmt.Errorf("Error creating directory %s: %v", filepath.Dir(path), err)
}
f, err := os.Create(path)
if err != nil {
return fmt.Errorf("Error creating file %s: %v", path, err)
}
_, err = io.Copy(f, resp.Body)
if err != nil {
f.Close()
return fmt.Errorf("Error writing to file %s: %v", path, err)
}
f.Close()
h, err := file.Hash(path)
if err != nil {
return fmt.Errorf("Error hashing file %s: %v", path, err)
}
if hash != h {
return fmt.Errorf("Hash mismatch on file %s: Expected %d, Result: %d", path, hash, h)
}
return nil
}