/
subscriptions.go
202 lines (166 loc) · 6.88 KB
/
subscriptions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
package subscriptions
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/go-kit/kit/transport/http/jsonrpc"
gethlog "github.com/ethereum/go-ethereum/log"
gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/ten-protocol/go-ten/go/common"
"github.com/ten-protocol/go-ten/go/common/log"
"github.com/ten-protocol/go-ten/go/rpc"
wecommon "github.com/ten-protocol/go-ten/tools/walletextension/common"
"github.com/ten-protocol/go-ten/tools/walletextension/userconn"
)
type SubscriptionManager struct {
subscriptionMappings map[string][]*gethrpc.ClientSubscription
logger gethlog.Logger
mu sync.Mutex
}
func New(logger gethlog.Logger) *SubscriptionManager {
return &SubscriptionManager{
subscriptionMappings: make(map[string][]*gethrpc.ClientSubscription),
logger: logger,
}
}
// HandleNewSubscriptions subscribes to an event with all the clients provided.
// Doing this is necessary because we have relevancy rule, and we want to subscribe sometimes with all clients to get all the events
func (sm *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req *wecommon.RPCRequest, resp *interface{}, userConn userconn.UserConn) error {
if len(req.Params) == 0 {
return fmt.Errorf("could not subscribe as no subscription namespace was provided")
}
sm.logger.Info(fmt.Sprintf("Subscribing to event %s with %d clients", req.Params, len(clients)))
// create subscriptionID which will enable user to unsubscribe from all subscriptions
userSubscriptionID := gethrpc.NewID()
// create a common channel for subscriptions from all accounts
funnelMultipleAccountsChan := make(chan common.IDAndLog)
// read from a multiple accounts channel and write results to userConn
go readFromChannelAndWriteToUserConn(funnelMultipleAccountsChan, userConn, userSubscriptionID, sm.logger)
// iterate over all clients and subscribe for each of them
for _, client := range clients {
subscription, err := client.Subscribe(context.Background(), resp, rpc.SubscribeNamespace, funnelMultipleAccountsChan, req.Params...)
if err != nil {
return fmt.Errorf("could not call %s with params %v. Cause: %w", req.Method, req.Params, err)
}
sm.UpdateSubscriptionMapping(string(userSubscriptionID), subscription)
// We periodically check if the websocket is closed, and terminate the subscription.
// TODO: Check if it will be much more efficient to create just one go routine for all clients together
go sm.checkIfUserConnIsClosedAndUnsubscribe(userConn, subscription, string(userSubscriptionID))
}
// We return subscriptionID with resp interface. We want to use userSubscriptionID to allow unsubscribing
*resp = userSubscriptionID
return nil
}
func readFromChannelAndWriteToUserConn(channel chan common.IDAndLog, userConn userconn.UserConn, userSubscriptionID gethrpc.ID, logger gethlog.Logger) {
buffer := NewCircularBuffer(wecommon.DeduplicationBufferSize)
for data := range channel {
// create unique identifier for current log
uniqueLogKey := LogKey{
BlockHash: data.Log.BlockHash,
TxHash: data.Log.TxHash,
Index: data.Log.Index,
}
// check if the current event is a duplicate (and skip it if it is)
if buffer.Contains(uniqueLogKey) {
continue
}
jsonResponse, err := prepareLogResponse(data, userSubscriptionID)
if err != nil {
logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, data.SubID, log.ErrKey, err)
continue
}
// the current log is unique, and we want to add it to our buffer and proceed with forwarding to the user
buffer.Push(uniqueLogKey)
logger.Trace(fmt.Sprintf("Forwarding log from Obscuro node: %s", jsonResponse), log.SubIDKey, data.SubID)
err = userConn.WriteResponse(jsonResponse)
if err != nil {
logger.Error("could not write the JSON log to the websocket on subscription %", log.SubIDKey, data.SubID, log.ErrKey, err)
continue
}
}
}
func (sm *SubscriptionManager) unsubscribeAndRemove(userSubscriptionID string, subscription *gethrpc.ClientSubscription) {
sm.mu.Lock()
defer sm.mu.Unlock()
subscription.Unsubscribe()
subscriptions, exists := sm.subscriptionMappings[userSubscriptionID]
if !exists {
sm.logger.Error("subscription that needs to be removed is not present in subscriptionMappings for userSubscriptionID: %s", userSubscriptionID)
return
}
for i, s := range subscriptions {
if s != subscription {
continue
}
// Remove the subscription from the slice
lastIndex := len(subscriptions) - 1
subscriptions[i] = subscriptions[lastIndex]
subscriptions = subscriptions[:lastIndex]
// If the slice is empty, delete the key from the map
if len(subscriptions) == 0 {
delete(sm.subscriptionMappings, userSubscriptionID)
} else {
sm.subscriptionMappings[userSubscriptionID] = subscriptions
}
break
}
}
func (sm *SubscriptionManager) checkIfUserConnIsClosedAndUnsubscribe(userConn userconn.UserConn, subscription *gethrpc.ClientSubscription, userSubscriptionID string) {
for !userConn.IsClosed() {
time.Sleep(100 * time.Millisecond)
}
sm.unsubscribeAndRemove(userSubscriptionID, subscription)
}
func (sm *SubscriptionManager) UpdateSubscriptionMapping(userSubscriptionID string, subscription *gethrpc.ClientSubscription) {
// Ensure there is no concurrent map writes
sm.mu.Lock()
defer sm.mu.Unlock()
// Check if the userSubscriptionID already exists in the map
subscriptions, exists := sm.subscriptionMappings[userSubscriptionID]
// If it doesn't exist, create a new slice for it
if !exists {
subscriptions = []*gethrpc.ClientSubscription{}
}
// Check if the subscription is already in the slice, if not, add it
subscriptionExists := false
for _, sub := range subscriptions {
if sub == subscription {
subscriptionExists = true
break
}
}
if !subscriptionExists {
sm.subscriptionMappings[userSubscriptionID] = append(subscriptions, subscription)
}
}
// Formats the log to be sent as an Eth JSON-RPC response.
func prepareLogResponse(idAndLog common.IDAndLog, userSubscriptionID gethrpc.ID) ([]byte, error) {
paramsMap := make(map[string]interface{})
paramsMap[wecommon.JSONKeySubscription] = userSubscriptionID
paramsMap[wecommon.JSONKeyResult] = idAndLog.Log
respMap := make(map[string]interface{})
respMap[wecommon.JSONKeyRPCVersion] = jsonrpc.Version
respMap[wecommon.JSONKeyMethod] = wecommon.MethodEthSubscription
respMap[wecommon.JSONKeyParams] = paramsMap
jsonResponse, err := json.Marshal(respMap)
if err != nil {
return nil, fmt.Errorf("could not marshal log response to JSON. Cause: %w", err)
}
return jsonResponse, nil
}
func (sm *SubscriptionManager) HandleUnsubscribe(userSubscriptionID string, rpcResp *interface{}) {
subscriptions, exists := sm.subscriptionMappings[userSubscriptionID]
if !exists {
*rpcResp = false
return
}
sm.mu.Lock()
defer sm.mu.Unlock()
for _, sub := range subscriptions {
sub.Unsubscribe()
}
delete(sm.subscriptionMappings, userSubscriptionID)
*rpcResp = true
}