/
server.go
124 lines (103 loc) · 3.28 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Copyright 2016 The Upspin Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package storecache is a caching proxy between a client and all stores.
// References are stored as files in the local file system.
package storecache
import (
"fmt"
"path"
"upspin.io/errors"
"upspin.io/log"
"upspin.io/upspin"
)
// server implements upspin.Storeserver.
type server struct {
cfg upspin.Config
// The on disk cache.
cache *storeCache
// The store server this dialed server should talk to.
authority upspin.Endpoint
}
// New creates a new store cache that implements upspin.StoreServer.
// For writeback caches, it also returns a function to flush Blocks
// that are waiting to be written back. This is important to allow
// the client to flush out Access file blocks before writing the
// DirEntry.
func New(cfg upspin.Config, cacheDir string, maxBytes int64, writethrough bool) (upspin.StoreServer, func(upspin.Location), error) {
c, blockFlusher, err := newCache(cfg, path.Join(cacheDir, "storecache"), path.Join(cacheDir, "storewritebackqueue"), maxBytes, writethrough)
if err != nil {
return nil, nil, err
}
return &server{
cfg: cfg,
cache: c,
}, blockFlusher, nil
}
func (s *server) Dial(config upspin.Config, e upspin.Endpoint) (upspin.Service, error) {
s2 := *s
s2.authority = e
return &s2, nil
}
var errNotDialed = errors.Str("store/cache: can't handle request to unassigned authority (must dial first)")
func (s *server) Get(ref upspin.Reference) ([]byte, *upspin.Refdata, []upspin.Location, error) {
if s.authority.Transport == upspin.Unassigned {
return nil, nil, nil, errNotDialed
}
op := logf("Get %q", ref)
// Do not pass on the HTTP base from the underlying storage
// or subsequent Gets will bypass the cache.
if ref == upspin.HTTPBaseMetadata {
return nil, nil, nil, op.error(errors.E(errors.NotExist))
}
data, locs, err := s.cache.get(s.cfg, ref, s.authority)
if err != nil {
return nil, nil, nil, op.error(err)
}
refdata := &upspin.Refdata{
Reference: ref,
Volatile: false, // TODO
Duration: 0, // TODO
}
return data, refdata, locs, nil
}
func (s *server) Put(data []byte) (*upspin.Refdata, error) {
if s.authority.Transport == upspin.Unassigned {
return nil, errNotDialed
}
op := logf("Put %.30x...", data)
ref, err := s.cache.put(s.cfg, data, s.authority)
if err != nil {
return nil, op.error(err)
}
refdata := &upspin.Refdata{
Reference: ref,
Volatile: false, // TODO
Duration: 0, // TODO
}
return refdata, nil
}
// Delete implements proto.StoreServer.
func (s *server) Delete(ref upspin.Reference) error {
if s.authority.Transport == upspin.Unassigned {
return errNotDialed
}
op := logf("Delete %q", ref)
err := s.cache.delete(s.cfg, ref, s.authority)
if err != nil {
return op.error(err)
}
return nil
}
func (s *server) Endpoint() upspin.Endpoint { return s.authority }
func (s *server) Close() {}
func logf(format string, args ...interface{}) operation {
s := fmt.Sprintf(format, args...)
log.Debug.Print("store/storecache: " + s)
return operation(s)
}
type operation string
func (op operation) error(err error) error {
logf("%v failed: %v", op, err)
return errors.E(errors.Op("store/storecache."+string(op)), err)
}