-
Notifications
You must be signed in to change notification settings - Fork 0
/
teracache.go
109 lines (84 loc) · 2.15 KB
/
teracache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package teracache
import (
"context"
"github.com/tmrts/teracache/payload"
"github.com/tmrts/teracache/router"
"github.com/tmrts/teracache/service"
lru "github.com/tmrts/teracache/cache"
)
type Provider func(ctx context.Context, key string) (payload.Payload, error)
type Interface interface {
Get(context.Context, string) (payload.Payload, error)
}
type topic struct {
lru lru.Interface
provider Provider
router router.Interface
svc service.CacheServer
}
const (
RouterPort = 20274
ServicePort = 20275
)
type Topic struct {
ID string
Capacity int
Peers []string
Provider Provider
}
// New creates a cache instance that participates in the given topic. Once a Get
// request to the cache fails, the cache uses the topic provider function to
// retrieve the missing element. The cache is bootstrapped using the given peer
// addresses.
func New(t Topic) (Interface, error) {
// TODO(tmrts): use a LFU and if cache-key space consumption is not a
// problem, migrate to optimized cache replacement algorithms that use
// extra queues
// TODO(tmrts): utilize the eviction callback in LRU
lruCache := lru.NewLRU(t.Capacity, nil)
r, err := router.New(RouterPort)
if err != nil {
return nil, err
}
if err := r.Join(t.Peers); err != nil {
return nil, err
}
c := &topic{
lru: lruCache,
provider: t.Provider,
router: r,
}
// FIXME(tmrts): needs restructuring
svc, err := service.NewServer(ServicePort, c)
if err != nil {
return nil, err
}
c.svc = svc
return c, nil
}
func (c *topic) Get(ctx context.Context, key string) (payload.Payload, error) {
obj, ok := c.lru.Get(key)
if ok {
return obj, nil
}
// FIXME(tmrts): leaky abstraction, refactor at once!
owner, ownedByMe, err := c.router.Route(key)
if !ownedByMe {
clnt := service.NewClient(owner.Addr.String(), key)
// TODO(tmrts): utilize the context and request-scoped information.
p, shouldCache, err := clnt(context.TODO())
if err != nil {
return nil, err
}
if shouldCache {
defer c.lru.Add(key, p)
}
return p, nil
}
p, err := c.provider(context.TODO(), key)
if err != nil {
return nil, err
}
defer c.lru.Add(key, p)
return p, nil
}