-
Notifications
You must be signed in to change notification settings - Fork 142
/
typed_pool.go
112 lines (97 loc) · 3 KB
/
typed_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// Copyright 2020 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0
package mempool
import (
"fmt"
"io"
"time"
"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/wasp/packages/isc"
"github.com/iotaledger/wasp/packages/kv/codec"
)
type typedPool[V isc.Request] struct {
waitReq WaitReq
requests *shrinkingmap.ShrinkingMap[isc.RequestRefKey, *typedPoolEntry[V]]
sizeMetric func(int)
timeMetric func(time.Duration)
log *logger.Logger
}
type typedPoolEntry[V isc.Request] struct {
req V
ts time.Time
}
var _ RequestPool[isc.OffLedgerRequest] = &typedPool[isc.OffLedgerRequest]{}
func NewTypedPool[V isc.Request](waitReq WaitReq, sizeMetric func(int), timeMetric func(time.Duration), log *logger.Logger) RequestPool[V] {
return &typedPool[V]{
waitReq: waitReq,
requests: shrinkingmap.New[isc.RequestRefKey, *typedPoolEntry[V]](),
sizeMetric: sizeMetric,
timeMetric: timeMetric,
log: log,
}
}
func (olp *typedPool[V]) Has(reqRef *isc.RequestRef) bool {
return olp.requests.Has(reqRef.AsKey())
}
func (olp *typedPool[V]) Get(reqRef *isc.RequestRef) V {
entry, exists := olp.requests.Get(reqRef.AsKey())
if !exists {
return *new(V)
}
return entry.req
}
func (olp *typedPool[V]) Add(request V) {
refKey := isc.RequestRefFromRequest(request).AsKey()
if olp.requests.Set(refKey, &typedPoolEntry[V]{req: request, ts: time.Now()}) {
olp.log.Debugf("ADD %v as key=%v", request.ID(), refKey)
olp.sizeMetric(olp.requests.Size())
}
olp.waitReq.MarkAvailable(request)
}
func (olp *typedPool[V]) Remove(request V) {
refKey := isc.RequestRefFromRequest(request).AsKey()
if entry, ok := olp.requests.Get(refKey); ok {
if olp.requests.Delete(refKey) {
olp.log.Debugf("DEL %v as key=%v", request.ID(), refKey)
}
olp.sizeMetric(olp.requests.Size())
olp.timeMetric(time.Since(entry.ts))
}
}
func (olp *typedPool[V]) Filter(predicate func(request V, ts time.Time) bool) {
olp.requests.ForEach(func(refKey isc.RequestRefKey, entry *typedPoolEntry[V]) bool {
if !predicate(entry.req, entry.ts) {
if olp.requests.Delete(refKey) {
olp.log.Debugf("DEL %v as key=%v", entry.req.ID(), refKey)
olp.timeMetric(time.Since(entry.ts))
}
}
return true
})
olp.sizeMetric(olp.requests.Size())
}
func (olp *typedPool[V]) Iterate(f func(e *typedPoolEntry[V])) {
olp.requests.ForEach(func(refKey isc.RequestRefKey, entry *typedPoolEntry[V]) bool {
f(entry)
return true
})
olp.sizeMetric(olp.requests.Size())
}
func (olp *typedPool[V]) StatusString() string {
return fmt.Sprintf("{|req|=%d}", olp.requests.Size())
}
func (olp *typedPool[V]) WriteContent(w io.Writer) {
olp.requests.ForEach(func(_ isc.RequestRefKey, entry *typedPoolEntry[V]) bool {
jsonData, err := isc.RequestToJSON(entry.req)
if err != nil {
return false // stop iteration
}
_, err = w.Write(codec.EncodeUint32(uint32(len(jsonData))))
if err != nil {
return false // stop iteration
}
_, err = w.Write(jsonData)
return err == nil
})
}