-
-
Notifications
You must be signed in to change notification settings - Fork 17
/
projection.go
232 lines (218 loc) · 7.42 KB
/
projection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
package service
import (
"context"
"fmt"
"time"
cache "github.com/plgd-dev/go-coap/v3/pkg/cache"
"github.com/plgd-dev/go-coap/v3/pkg/runner/periodic"
"github.com/plgd-dev/hub/v2/pkg/log"
"github.com/plgd-dev/hub/v2/resource-aggregate/commands"
"github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus"
"github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventstore"
projectionRA "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/projection"
"github.com/plgd-dev/hub/v2/resource-aggregate/events"
"github.com/plgd-dev/kit/v2/strings"
)
// hasMatchingType returns true for matching a resource type.
// An empty typeFilter matches all resource types.
func hasMatchingType(resourceTypes []string, typeFilter strings.Set) bool {
if len(typeFilter) == 0 {
return true
}
if len(resourceTypes) == 0 {
return false
}
return typeFilter.HasOneOf(resourceTypes...)
}
type Projection struct {
*projectionRA.Projection
expiration time.Duration
cache *cache.Cache[string, string]
}
func NewProjection(ctx context.Context, name string, store eventstore.EventStore, subscriber eventbus.Subscriber, newModelFunc eventstore.FactoryModelFunc, expiration time.Duration) (*Projection, error) {
projection, err := projectionRA.NewProjection(ctx, name, store, subscriber, newModelFunc)
if err != nil {
return nil, fmt.Errorf("cannot create server: %w", err)
}
cleanupInterval := expiration / 2
if cleanupInterval < time.Second {
cleanupInterval = expiration
}
if cleanupInterval > time.Minute {
cleanupInterval = time.Minute
}
cache := cache.NewCache[string, string]()
add := periodic.New(ctx.Done(), cleanupInterval)
add(func(now time.Time) bool {
cache.CheckExpirations(now)
return true
})
return &Projection{Projection: projection, cache: cache, expiration: expiration}, nil
}
func (p *Projection) LoadResourceLinks(deviceIDFilter, toReloadDevices strings.Set, onResourceLinkProjection func(m *resourceLinksProjection) error) error {
for deviceID := range deviceIDFilter {
reload := true
var err error
p.Models(func(m eventstore.Model) (wantNext bool) {
rl := m.(*resourceLinksProjection)
if rl.LenResources() == 0 {
return false
}
reload = false
err = onResourceLinkProjection(rl)
return err == nil
}, commands.NewResourceID(deviceID, commands.ResourceLinksHref))
if reload && toReloadDevices != nil {
toReloadDevices.Add(deviceID)
}
if err != nil {
return err
}
}
return nil
}
func (p *Projection) ReloadDevices(ctx context.Context, deviceIDFilter strings.Set) {
for deviceID := range deviceIDFilter {
created, err := p.Register(ctx, deviceID)
if err != nil {
log.Errorf("cannot register to projection for %v: %w", deviceID, err)
continue
}
p.cache.Delete(deviceID)
p.cache.LoadOrStore(deviceID, cache.NewElement(deviceID, time.Now().Add(p.expiration), func(deviceID string) {
if err := p.Unregister(deviceID); err != nil {
log.Errorf("failed to unregister device %v in projection cache during eviction: %w", deviceID, err)
}
}))
if !created {
err := p.ForceUpdate(ctx, commands.NewResourceID(deviceID, ""))
if err != nil {
log.Errorf("cannot update projection for device %v: %w", deviceID, err)
}
defer func(ID string) {
if err := p.Unregister(ID); err != nil {
log.Errorf("failed to unregister device %v in projection cache after replacement: %w", ID, err)
}
}(deviceID)
}
}
}
func (p *Projection) LoadDevicesMetadata(deviceIDFilter, toReloadDevices strings.Set, onDeviceMetadataProjection func(m *deviceMetadataProjection) error) error {
var err error
for deviceID := range deviceIDFilter {
reload := true
p.Models(func(m eventstore.Model) (wantNext bool) {
dm := m.(*deviceMetadataProjection)
if !dm.IsInitialized() {
return true
}
reload = false
err = onDeviceMetadataProjection(dm)
return err == nil
}, commands.NewResourceID(deviceID, commands.StatusHref))
if reload && toReloadDevices != nil {
toReloadDevices.Add(deviceID)
}
}
return err
}
// Group filter first by device ID and then by resource ID
func getResourceIDMapFilter(resourceIDFilter []*commands.ResourceId) map[string]map[string]bool {
resourceIDMapFilter := make(map[string]map[string]bool)
for _, resourceID := range resourceIDFilter {
if resourceID.GetHref() == "" {
resourceIDMapFilter[resourceID.GetDeviceId()] = nil
continue
}
hrefs, present := resourceIDMapFilter[resourceID.GetDeviceId()]
if present && hrefs == nil {
continue
}
if !present {
resourceIDMapFilter[resourceID.GetDeviceId()] = make(map[string]bool)
}
resourceIDMapFilter[resourceID.GetDeviceId()][resourceID.GetHref()] = true
}
return resourceIDMapFilter
}
func (p *Projection) wantToReloadDevice(rl *resourceLinksProjection, hrefFilter map[string]bool, typeFilter strings.Set) bool {
var finalReload bool
rl.IterateOverResources(func(res *commands.Resource) (wantNext bool) {
if len(hrefFilter) > 0 && !hrefFilter[res.GetHref()] {
return true
}
if !hasMatchingType(res.GetResourceTypes(), typeFilter) {
return true
}
reload := true
p.Models(func(eventstore.Model) (wantNext bool) {
reload = false
return true
}, commands.NewResourceID(rl.GetDeviceID(), res.GetHref()))
if reload {
finalReload = true
return false
}
return true
})
return finalReload
}
func (p *Projection) loadResourceWithLinks(deviceID string, hrefFilter map[string]bool, typeFilter strings.Set, toReloadDevices strings.Set, onResource func(*Resource) error) error {
isMatchingResource := func(res *commands.Resource) bool {
if len(hrefFilter) > 0 && !hrefFilter[res.GetHref()] {
return false
}
if !hasMatchingType(res.GetResourceTypes(), typeFilter) {
return false
}
return true
}
isSnapShotEvent := func(model eventstore.Model) bool {
e, ok := model.(interface{ EventType() string })
if !ok {
panic(fmt.Errorf("invalid event type(%T)", model))
}
t := e.EventType()
return t == events.NewResourceLinksSnapshotTaken().EventType() ||
t == events.NewDeviceMetadataSnapshotTaken().EventType()
}
iterateResources := func(rl *resourceLinksProjection) error {
var err error
rl.IterateOverResources(func(res *commands.Resource) (wantNext bool) {
if !isMatchingResource(res) {
return true
}
p.Models(func(model eventstore.Model) (wantNext bool) {
if isSnapShotEvent(model) {
return true
}
rp := model.(*resourceProjection)
err = onResource(&Resource{
projection: rp,
Resource: res,
})
return err == nil
}, commands.NewResourceID(rl.GetDeviceID(), res.GetHref()))
return true
})
return err
}
return p.LoadResourceLinks(strings.Set{deviceID: struct{}{}}, toReloadDevices, func(rl *resourceLinksProjection) error {
if p.wantToReloadDevice(rl, hrefFilter, typeFilter) && toReloadDevices != nil {
// if toReloadDevices == nil it means that Reload was executed but all resources are not available yet, we want to provide partial resoures then.
toReloadDevices.Add(rl.GetDeviceID())
return nil
}
return iterateResources(rl)
})
}
func (p *Projection) LoadResourcesWithLinks(resourceIDFilter []*commands.ResourceId, typeFilter strings.Set, toReloadDevices strings.Set, onResource func(*Resource) error) error {
resourceIDMapFilter := getResourceIDMapFilter(resourceIDFilter)
for deviceID, hrefFilter := range resourceIDMapFilter { // filter duplicit load
err := p.loadResourceWithLinks(deviceID, hrefFilter, typeFilter, toReloadDevices, onResource)
if err != nil {
return err
}
}
return nil
}