forked from hyperledger/fabric-sdk-go
/
deliverclient.go
executable file
·140 lines (118 loc) · 4.2 KB
/
deliverclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package deliverclient
import (
"math"
"time"
ab "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric-sdk-go/pkg/common/logging"
"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/api"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client"
deliverconn "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/connection"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/dispatcher"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/seek"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/endpoint"
"github.com/pkg/errors"
)
var logger = logging.NewLogger("fabsdk/fab")
// deliverProvider is the connection provider used for connecting to the Deliver service
var deliverProvider = func(context fabcontext.Client, chConfig fab.ChannelCfg, peer fab.Peer) (api.Connection, error) {
eventEndpoint, ok := peer.(api.EventEndpoint)
if !ok {
panic("peer is not an EventEndpoint")
}
return deliverconn.New(context, chConfig, deliverconn.Deliver, peer.URL(), eventEndpoint.Opts()...)
}
// deliverFilteredProvider is the connection provider used for connecting to the DeliverFiltered service
var deliverFilteredProvider = func(context fabcontext.Client, chConfig fab.ChannelCfg, peer fab.Peer) (api.Connection, error) {
eventEndpoint, ok := peer.(api.EventEndpoint)
if !ok {
panic("peer is not an EventEndpoint")
}
return deliverconn.New(context, chConfig, deliverconn.DeliverFiltered, peer.URL(), eventEndpoint.Opts()...)
}
// Client connects to a peer and receives channel events, such as bock, filtered block, chaincode, and transaction status events.
type Client struct {
client.Client
params
}
// New returns a new deliver event client
func New(context fabcontext.Client, chConfig fab.ChannelCfg, discoveryService fab.DiscoveryService, opts ...options.Opt) (*Client, error) {
params := defaultParams()
options.Apply(params, opts)
// Use a custom Discovery Service which wraps the given discovery service
// and produces event endpoints containing additional GRPC options.
discoveryWrapper, err := endpoint.NewEndpointDiscoveryWrapper(context, chConfig.ID(), discoveryService)
if err != nil {
return nil, err
}
client := &Client{
Client: *client.New(
dispatcher.New(context, chConfig, discoveryWrapper, params.connProvider, opts...),
opts...,
),
params: *params,
}
client.SetAfterConnectHandler(client.seek)
client.SetBeforeReconnectHandler(client.setSeekFromLastBlockReceived)
if err := client.Start(); err != nil {
return nil, err
}
return client, nil
}
func (c *Client) seek() error {
logger.Debug("Sending seek request....")
seekInfo, err := c.seekInfo()
if err != nil {
return err
}
errch := make(chan error)
err1 := c.Submit(dispatcher.NewSeekEvent(seekInfo, errch))
if err1 != nil {
return err1
}
select {
case err = <-errch:
case <-time.After(c.respTimeout):
err = errors.New("timeout waiting for deliver status response")
}
if err != nil {
logger.Errorf("Unable to send seek request: %s", err)
return err
}
logger.Debug("Successfully sent seek")
return nil
}
func (c *Client) setSeekFromLastBlockReceived() error {
c.Lock()
defer c.Unlock()
// Make sure that, when we reconnect, we receive all of the events that we've missed
lastBlockNum := c.Dispatcher().LastBlockNum()
if lastBlockNum < math.MaxUint64 {
c.seekType = seek.FromBlock
c.fromBlock = c.Dispatcher().LastBlockNum() + 1
} else {
// We haven't received any blocks yet. Just ask for the newest
c.seekType = seek.Newest
}
return nil
}
func (c *Client) seekInfo() (*ab.SeekInfo, error) {
c.RLock()
defer c.RUnlock()
switch c.seekType {
case seek.Newest:
return seek.InfoNewest(), nil
case seek.Oldest:
return seek.InfoOldest(), nil
case seek.FromBlock:
return seek.InfoFrom(c.fromBlock), nil
default:
return nil, errors.Errorf("unsupported seek type:[%s]", c.seekType)
}
}