-
-
Notifications
You must be signed in to change notification settings - Fork 13
/
subscription.go
118 lines (96 loc) · 2.86 KB
/
subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package core
import (
"errors"
"fmt"
"sync"
)
var (
transientSubscriptions subscriptionStore
ErrPublisherNotUniquelyIdentifiable = errors.New("publisher not uniquely identifiable")
ErrSubscriberNotUniquelyIdentifiable = errors.New("subscriber not uniquely identifiable")
)
// A Subscription holds metadata about the subscription of a Subscriber to a Publisher.
type Subscription struct {
lock sync.Mutex
publisher Value
subscriber Subscriber
creationDate DateTime
filter Pattern
}
type Subscriptions struct {
lock sync.Mutex
subscriptions []*Subscription
}
func (s *Subscriptions) ReceivePublications(ctx *Context, pub *Publication) {
s.lock.Lock()
defer s.lock.Unlock()
for _, sub := range s.subscriptions {
if sub.filter.Test(ctx, pub) {
sub.subscriber.ReceivePublication(ctx, pub)
}
}
}
type subscriptionStore struct {
lock sync.Mutex
publisherToSubscriptions map[TransientID]*Subscriptions
subscriberToSubscriptions map[TransientID]*Subscriptions
}
func (s *subscriptionStore) getSubscriberSubscriptions(ctx *Context, subscriber Subscriber) (*Subscriptions, bool, error) {
subscriberFastId, ok := TransientIdOf(subscriber)
if !ok {
return nil, false, fmt.Errorf("failed to get subscriptions: %w", ErrPublisherNotUniquelyIdentifiable)
}
s.lock.Lock()
defer s.lock.Unlock()
subs := s.subscriberToSubscriptions[subscriberFastId]
if subs == nil {
return nil, false, nil
}
return subs, true, nil
}
func (s *subscriptionStore) getPublisherSubscriptions(ctx *Context, publisher Value) (*Subscriptions, bool, error) {
publisherFastId, ok := TransientIdOf(publisher)
if !ok {
return nil, false, fmt.Errorf("failed to get subscriptions: %w", ErrSubscriberNotUniquelyIdentifiable)
}
s.lock.Lock()
defer s.lock.Unlock()
subs := s.publisherToSubscriptions[publisherFastId]
if subs == nil {
return nil, false, nil
}
return subs, true, nil
}
func (s *subscriptionStore) addSubscription(ctx *Context, sub *Subscription) error {
s.lock.Lock()
defer s.lock.Unlock()
publisherFastId, ok := TransientIdOf(sub.publisher)
if !ok {
return fmt.Errorf("failed to add subscription: %w", ErrPublisherNotUniquelyIdentifiable)
}
subscriberFastId, ok := TransientIdOf(sub.subscriber)
if !ok {
return fmt.Errorf("failed to add subscription: %w", ErrSubscriberNotUniquelyIdentifiable)
}
{
subs := s.publisherToSubscriptions[publisherFastId]
if subs == nil {
subs = &Subscriptions{}
s.publisherToSubscriptions[publisherFastId] = subs
}
subs.lock.Lock()
subs.subscriptions = append(subs.subscriptions, sub)
subs.lock.Unlock()
}
{
subs := s.subscriberToSubscriptions[subscriberFastId]
if subs == nil {
subs = &Subscriptions{}
s.subscriberToSubscriptions[subscriberFastId] = subs
}
subs.lock.Lock()
subs.subscriptions = append(subs.subscriptions, sub)
subs.lock.Unlock()
}
return nil
}