-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
89 lines (72 loc) · 2.75 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package filters
import (
"context"
"sync"
"time"
"github.com/lantah/go/services/orbitr/internal/db2/history"
"github.com/lantah/go/services/orbitr/internal/ingest/processors"
"github.com/lantah/go/support/log"
)
var (
// the filter config cache will be checked against latest from db at most once per each of this interval.
//lint:ignore ST1011, don't need the linter warn on literal assignment
filterConfigCheckIntervalSeconds time.Duration = 100
filterConfigCheckIntervalSecondsLock sync.RWMutex
)
func GetFilterConfigCheckIntervalSeconds() time.Duration {
filterConfigCheckIntervalSecondsLock.RLock()
defer filterConfigCheckIntervalSecondsLock.RUnlock()
return filterConfigCheckIntervalSeconds
}
func SetFilterConfigCheckIntervalSeconds(t time.Duration) {
filterConfigCheckIntervalSecondsLock.Lock()
defer filterConfigCheckIntervalSecondsLock.Unlock()
filterConfigCheckIntervalSeconds = t
}
var (
LOG = log.WithFields(log.F{
"filters": "load",
})
)
type filtersCache struct {
assetFilter AssetFilter
accountFilter AccountFilter
lastFilterConfigCheckUnixEpoch int64
}
type Filters interface {
GetFilters(filterQ history.QFilter, ctx context.Context) []processors.LedgerTransactionFilterer
}
func NewFilters() Filters {
return &filtersCache{
assetFilter: NewAssetFilter(),
accountFilter: NewAccountFilter(),
}
}
// Provide list of the active filters. Optimize performance by caching the list, only
// rebuild the list on expiration time interval. Method is NOT thread-safe.
func (f *filtersCache) GetFilters(filterQ history.QFilter, ctx context.Context) []processors.LedgerTransactionFilterer {
// only attempt to refresh filter config cache state at configured interval limit
if time.Now().Unix() < (f.lastFilterConfigCheckUnixEpoch + int64(GetFilterConfigCheckIntervalSeconds().Seconds())) {
return f.convertCacheToList()
}
f.lastFilterConfigCheckUnixEpoch = time.Now().Unix()
LOG.Info("expired filter config cache, refresh from db")
if filterConfig, err := filterQ.GetAssetFilterConfig(ctx); err != nil {
LOG.Errorf("unable to refresh asset filter config %v", err)
} else {
if err := f.assetFilter.RefreshAssetFilter(&filterConfig); err != nil {
LOG.Errorf("unable to refresh asset filter config %v", err)
}
}
if filterConfig, err := filterQ.GetAccountFilterConfig(ctx); err != nil {
LOG.Errorf("unable to refresh account filter config %v", err)
} else {
if err := f.accountFilter.RefreshAccountFilter(&filterConfig); err != nil {
LOG.Errorf("unable to refresh account filter config %v", err)
}
}
return f.convertCacheToList()
}
func (f *filtersCache) convertCacheToList() []processors.LedgerTransactionFilterer {
return []processors.LedgerTransactionFilterer{f.assetFilter, f.accountFilter}
}