forked from hyperledger-archives/burrow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
events.go
162 lines (138 loc) · 4.4 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
// Copyright 2017 Monax Industries Limited
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package event
import (
"crypto/rand"
"encoding/hex"
"strings"
"fmt"
"github.com/hyperledger/burrow/logging"
"github.com/hyperledger/burrow/logging/loggers"
"github.com/hyperledger/burrow/txs"
go_events "github.com/tendermint/go-events"
tm_types "github.com/tendermint/tendermint/types"
)
// TODO: [Silas] this is a compatibility layer between our event types and
// TODO: go-events. Our ultimate plan is to replace go-events with our own pub-sub
// TODO: code that will better allow us to manage and multiplex events from different
// TODO: subsystems
// Oh for a sum type
// We are using this as a marker interface for the
type anyEventData interface{}
type EventEmitter interface {
Subscribe(subId, event string, callback func(txs.EventData)) error
Unsubscribe(subId string) error
}
func NewEvents(eventSwitch go_events.EventSwitch, logger loggers.InfoTraceLogger) *events {
return &events{eventSwitch: eventSwitch, logger: logging.WithScope(logger, "Events")}
}
// Provides an EventEmitter that wraps many underlying EventEmitters as a
// convenience for Subscribing and Unsubscribing on multiple EventEmitters at
// once
func Multiplex(events ...EventEmitter) *multiplexedEvents {
return &multiplexedEvents{events}
}
// The events struct has methods for working with events.
type events struct {
eventSwitch go_events.EventSwitch
logger loggers.InfoTraceLogger
}
// Subscribe to an event.
func (evts *events) Subscribe(subId, event string,
callback func(txs.EventData)) error {
cb := func(evt go_events.EventData) {
eventData, err := mapToOurEventData(evt)
if err != nil {
logging.InfoMsg(evts.logger, "Failed to map go-events EventData to our EventData",
"error", err,
"event", event)
}
callback(eventData)
}
evts.eventSwitch.AddListenerForEvent(subId, event, cb)
return nil
}
// Un-subscribe from an event.
func (evts *events) Unsubscribe(subId string) error {
evts.eventSwitch.RemoveListener(subId)
return nil
}
type multiplexedEvents struct {
eventEmitters []EventEmitter
}
// Subscribe to an event.
func (multiEvents *multiplexedEvents) Subscribe(subId, event string,
callback func(txs.EventData)) error {
for _, eventEmitter := range multiEvents.eventEmitters {
err := eventEmitter.Subscribe(subId, event, callback)
if err != nil {
return err
}
}
return nil
}
// Un-subscribe from an event.
func (multiEvents *multiplexedEvents) Unsubscribe(subId string) error {
for _, eventEmitter := range multiEvents.eventEmitters {
err := eventEmitter.Unsubscribe(subId)
if err != nil {
return err
}
}
return nil
}
// *********************************** Events ***********************************
// EventSubscribe
type EventSub struct {
SubId string `json:"sub_id"`
}
// EventUnsubscribe
type EventUnsub struct {
Result bool `json:"result"`
}
// EventPoll
type PollResponse struct {
Events []interface{} `json:"events"`
}
// **************************************************************************************
// Helper function
func GenerateSubId() (string, error) {
b := make([]byte, 32)
_, err := rand.Read(b)
if err != nil {
return "", fmt.Errorf("Could not generate random bytes for a subscription"+
" id: %v", err)
}
rStr := hex.EncodeToString(b)
return strings.ToUpper(rStr), nil
}
func mapToOurEventData(eventData anyEventData) (txs.EventData, error) {
// TODO: [Silas] avoid this with a better event pub-sub system of our own
// TODO: that maybe involves a registry of events
switch eventData := eventData.(type) {
case txs.EventData:
return eventData, nil
case tm_types.EventDataNewBlock:
return txs.EventDataNewBlock{
Block: eventData.Block,
}, nil
case tm_types.EventDataNewBlockHeader:
return txs.EventDataNewBlockHeader{
Header: eventData.Header,
}, nil
default:
return nil, fmt.Errorf("EventData not recognised as known EventData: %v",
eventData)
}
}