This repository has been archived by the owner on Apr 15, 2020. It is now read-only.
/
pubsubsimulator.go
154 lines (130 loc) · 4.44 KB
/
pubsubsimulator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package remote
import (
"context"
"sync"
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/eventstream"
"github.com/AsynkronIT/protoactor-go/plugin"
"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/quorumcontrol/tupelo-go-sdk/gossip3/middleware"
"github.com/quorumcontrol/tupelo-go-sdk/tracing"
)
func NewSimulatedPubSub() *SimulatedPubSub {
return &SimulatedPubSub{
eventStream: &eventstream.EventStream{},
validators: make(map[string]PubSubValidator),
validatorLock: new(sync.RWMutex),
}
}
// SimulatedBroadcaster is a simulated in-memory pubsub that doesn't need a network connection
type SimulatedPubSub struct {
eventStream *eventstream.EventStream
validators map[string]PubSubValidator
validatorLock *sync.RWMutex
}
type simulatorMessage struct {
topic string
msg proto.Message
}
// Implements the broadcast necessary for the client side to send to the network
func (sb *SimulatedPubSub) Broadcast(topic string, message proto.Message) error {
middleware.Log.Debugw("publishing")
isValid := true
sb.validatorLock.RLock()
defer sb.validatorLock.RUnlock()
validator, ok := sb.validators[topic]
if ok {
isValid = validator(context.Background(), *new(peer.ID), message)
}
if isValid {
sb.eventStream.Publish(&simulatorMessage{
topic: topic,
msg: message,
})
}
return nil
}
// returns subscriber props that can be used to listent to broadcast events
func (sb *SimulatedPubSub) NewSubscriberProps(topic string) *actor.Props {
return newSimulatedSubscriberProps(topic, sb, true)
}
func (sb *SimulatedPubSub) Subscribe(ctx spawner, topic string, subscribers ...*actor.PID) *actor.PID {
return ctx.Spawn(newSimulatedSubscriberProps(topic, sb, false, subscribers...))
}
func (sb *SimulatedPubSub) RegisterTopicValidator(topic string, validatorFunc PubSubValidator, opts ...pubsub.ValidatorOpt) error {
sb.validatorLock.Lock()
defer sb.validatorLock.Unlock()
_, ok := sb.validators[topic]
if ok {
// we allow multiple validators here actually because
// the simulator is used across tupelos
return nil
}
sb.validators[topic] = validatorFunc
return nil
}
func (sb *SimulatedPubSub) UnregisterTopicValidator(topic string) {
sb.validatorLock.Lock()
defer sb.validatorLock.Unlock()
delete(sb.validators, topic)
}
type simulatedSubscriber struct {
middleware.LogAwareHolder
tracing.ContextHolder
subscription *eventstream.Subscription
pubsubSystem *SimulatedPubSub
topic string
notifyParent bool
subscribers []*actor.PID
}
// A NetworkSubscriber is a subscription to a pubsub style system for a specific message type
// it is designed to be spawned inside another context so that it can use Parent in order to
// deliver the messages
func newSimulatedSubscriberProps(topic string, simulatedPubSub *SimulatedPubSub, notifyParent bool, subscribers ...*actor.PID) *actor.Props {
return actor.PropsFromProducer(func() actor.Actor {
return &simulatedSubscriber{
topic: topic,
pubsubSystem: simulatedPubSub,
notifyParent: notifyParent,
subscribers: subscribers,
}
}).WithReceiverMiddleware(
middleware.LoggingMiddleware,
plugin.Use(&middleware.LogPlugin{}),
)
}
func (bs *simulatedSubscriber) Receive(actorContext actor.Context) {
switch actorContext.Message().(type) {
case *actor.Started:
bs.Log.Debugw("subscribed", "topic", bs.topic, "subscribers", bs.subscribers)
parent := actorContext.Parent()
sub := bs.pubsubSystem.eventStream.Subscribe(func(evt interface{}) {
// there is a short delay on adding the predicate, so this make sure
// nothijng slips through
if evt.(*simulatorMessage).topic != bs.topic {
return
}
msg := evt.(*simulatorMessage).msg
bs.Log.Debugw("received", "topic", bs.topic, "subscribers", bs.subscribers, "msg", msg)
isValid := true
bs.pubsubSystem.validatorLock.RLock()
defer bs.pubsubSystem.validatorLock.RUnlock()
if validator, ok := bs.pubsubSystem.validators[evt.(*simulatorMessage).topic]; ok {
isValid = validator(context.Background(), *new(peer.ID), msg)
}
if isValid {
if bs.notifyParent {
actor.EmptyRootContext.Send(parent, evt.(*simulatorMessage).msg)
}
for _, subscriber := range bs.subscribers {
actor.EmptyRootContext.Send(subscriber, evt.(*simulatorMessage).msg)
}
}
})
bs.subscription = sub
case *actor.Stopping:
bs.pubsubSystem.eventStream.Unsubscribe(bs.subscription)
}
}