/
onmemory_storage.go
102 lines (79 loc) · 2.4 KB
/
onmemory_storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package onmemory
import (
"context"
"fmt"
"github.com/saiya/dsps/server/config"
"github.com/saiya/dsps/server/domain"
"github.com/saiya/dsps/server/logger"
"github.com/saiya/dsps/server/storage/deps"
"github.com/saiya/dsps/server/sync"
)
// NewOnmemoryStorage creates Storage instance
func NewOnmemoryStorage(ctx context.Context, config *config.OnmemoryStorageConfig, systemClock domain.SystemClock, channelProvider domain.ChannelProvider, deps deps.StorageDeps) (domain.Storage, error) {
s := &onmemoryStorage{
lock: sync.NewLock(),
systemClock: systemClock,
channelProvider: channelProvider,
pubsubEnabled: !config.DisablePubSub,
jwtEnabled: !config.DisableJwt,
runGcOnShutdown: config.RunGCOnShutdown,
daemonSystem: sync.NewDaemonSystem("dsps.storage.onmemory", sync.DaemonSystemDeps{
Telemetry: deps.Telemetry,
Sentry: deps.Sentry,
}, func(ctx context.Context, name string, err error) {
logger.Of(ctx).Error(fmt.Sprintf(`error in background routine "%s"`, name), err)
}),
channels: map[domain.ChannelID]*onmemoryChannel{},
revokedJwts: map[domain.JwtJti]domain.JwtExp{},
}
s.startGC()
return s, nil
}
type onmemoryStorage struct {
lock sync.Lock
pubsubEnabled bool
jwtEnabled bool
systemClock domain.SystemClock
channelProvider domain.ChannelProvider
daemonSystem *sync.DaemonSystem
runGcOnShutdown bool
channels map[domain.ChannelID]*onmemoryChannel
revokedJwts map[domain.JwtJti]domain.JwtExp
}
func (s *onmemoryStorage) String() string {
return "onmemory"
}
func (s *onmemoryStorage) Shutdown(ctx context.Context) error {
logger.Of(ctx).Debugf(logger.CatStorage, "Closing on-memory storage...")
if err := s.daemonSystem.Shutdown(ctx); err != nil {
logger.Of(ctx).WarnError(logger.CatStorage, "Failed to stop background routines", err)
}
if s.runGcOnShutdown {
// Note: GC locks s.lock, so that do not call this after s.lock
if err := s.GC(ctx); err != nil {
return err
}
}
unlock, err := s.lock.Lock(ctx)
if err != nil {
return err
}
defer unlock()
s.channels = map[domain.ChannelID]*onmemoryChannel{} // Drop all data
return nil
}
func (s *onmemoryStorage) AsPubSubStorage() domain.PubSubStorage {
if !s.pubsubEnabled {
return nil
}
return s
}
func (s *onmemoryStorage) AsJwtStorage() domain.JwtStorage {
if !s.jwtEnabled {
return nil
}
return s
}
func (s *onmemoryStorage) GetFileDescriptorPressure() int {
return 0
}