forked from INFURA/go-ethlibs
/
subscription.go
84 lines (68 loc) · 1.53 KB
/
subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package websocket
import (
"context"
"encoding/json"
"sync"
"github.com/pkg/errors"
"github.com/INFURA/go-ethlibs/jsonrpc"
)
type Subscription interface {
Response() *jsonrpc.RawResponse
ID() string
Ch() chan *jsonrpc.Notification
Unsubscribe(ctx context.Context) error
Done() <-chan struct{}
Err() error
}
type subscription struct {
response *jsonrpc.RawResponse
subscriptionID string
ch chan *jsonrpc.Notification
conn *connection
ctx context.Context
cancel context.CancelFunc
err error
mu sync.RWMutex
}
func (s *subscription) Response() *jsonrpc.RawResponse {
return s.response
}
func (s *subscription) ID() string {
return s.subscriptionID
}
func (s *subscription) Ch() chan *jsonrpc.Notification {
return s.ch
}
type SubscriptionParams struct {
Subscription string `json:"subscription"`
Result json.RawMessage `json:"result"`
}
func (s *subscription) Unsubscribe(ctx context.Context) error {
request := jsonrpc.Request{
ID: jsonrpc.ID{
Str: s.subscriptionID,
},
Method: "eth_unsubscribe",
Params: jsonrpc.MustParams(s.subscriptionID),
}
response, err := s.conn.Request(ctx, &request)
if err != nil {
return errors.Wrap(err, "unsubscribe failed")
}
if response.Error != nil {
return errors.Errorf("%v", response.Error)
}
s.cancel()
return nil
}
func (s *subscription) Done() <-chan struct{} {
return s.ctx.Done()
}
func (s *subscription) Err() error {
s.mu.RLock()
defer s.mu.RUnlock()
if s.err != nil {
return s.err
}
return s.ctx.Err()
}