forked from AliyunContainerService/pouch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
request_cache.go
125 lines (110 loc) · 3.23 KB
/
request_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package stream
import (
"container/list"
"crypto/rand"
"encoding/base64"
"fmt"
"math"
"sync"
"time"
)
var (
// CacheTTL is timeout after which tokens become invalid.
CacheTTL = 1 * time.Minute
// MaxInFlight is the maximum number of in-flight requests to allow.
MaxInFlight = 1000
// TokenLen is the length of the random base64 encoded token identifying the request.
TokenLen = 8
)
// RequestCache caches streaming (exec/attach/port-forward) requests and generates a single-use
// random token for their retrieval. The requestCache is used for building streaming URLs without
// the need to encode every request parameter in the URL.
type RequestCache struct {
// tokens maps the generate token to the request for fast retrieval.
tokens map[string]*list.Element
// ll maintains an age-ordered request list for faster garbage collection of expired requests.
ll *list.List
lock sync.Mutex
}
// Request representing an *ExecRequest, *AttachRequest, or *PortForwardRequest Type.
type Request interface{}
type cacheEntry struct {
token string
req Request
expireTime time.Time
}
// NewRequestCache return a RequestCache
func NewRequestCache() *RequestCache {
return &RequestCache{
ll: list.New(),
tokens: make(map[string]*list.Element),
}
}
// Insert the given request into the cache and returns the token used for fetching it out.
func (c *RequestCache) Insert(req Request) (token string, err error) {
c.lock.Lock()
defer c.lock.Unlock()
// Remove expired entries.
c.gc()
// If the cache is full, reject the request.
if c.ll.Len() == MaxInFlight {
return "", ErrorTooManyInFlight()
}
token, err = c.generateUniqueToken()
if err != nil {
return "", err
}
ele := c.ll.PushFront(&cacheEntry{token, req, time.Now().Add(CacheTTL)})
c.tokens[token] = ele
return token, nil
}
// Consume the token (remove it from the cache) and return the cached request, if found.
func (c *RequestCache) Consume(token string) (req Request, found bool) {
c.lock.Lock()
defer c.lock.Unlock()
ele, ok := c.tokens[token]
if !ok {
return nil, false
}
c.ll.Remove(ele)
delete(c.tokens, token)
entry := ele.Value.(*cacheEntry)
if time.Now().After(entry.expireTime) {
// Entry already expired.
return nil, false
}
return entry.req, true
}
// generateUniqueToken generates a random URL-safe token and ensures uniqueness.
func (c *RequestCache) generateUniqueToken() (string, error) {
const maxTries = 10
// Number of bytes to be TokenLen when base64 encoded.
tokenSize := math.Ceil(float64(TokenLen) * 6 / 8)
rawToken := make([]byte, int(tokenSize))
for i := 0; i < maxTries; i++ {
if _, err := rand.Read(rawToken); err != nil {
return "", err
}
encoded := base64.RawURLEncoding.EncodeToString(rawToken)
token := encoded[:TokenLen]
// If it's unique, return it. Otherwise retry.
if _, exists := c.tokens[encoded]; !exists {
return token, nil
}
}
return "", fmt.Errorf("failed to generate unique token")
}
// Must be write-locked prior to calling.
func (c *RequestCache) gc() {
now := time.Now()
for c.ll.Len() > 0 {
oldest := c.ll.Back()
entry := oldest.Value.(*cacheEntry)
if !now.After(entry.expireTime) {
return
}
// Oldest value is expired; remove it.
c.ll.Remove(oldest)
delete(c.tokens, entry.token)
}
}