forked from hyperledger/fabric-sdk-go
/
deliverclient.go
executable file
·161 lines (134 loc) · 4.91 KB
/
deliverclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package deliverclient
import (
"math"
"time"
ab "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric-sdk-go/pkg/common/logging"
"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/api"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client"
deliverconn "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/connection"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/dispatcher"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/seek"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/endpoint"
"github.com/pkg/errors"
)
var logger = logging.NewLogger("fabsdk/fab")
// deliverProvider is the connection provider used for connecting to the Deliver service
var deliverProvider = func(context fabcontext.Client, chConfig fab.ChannelCfg, peer fab.Peer) (api.Connection, error) {
if peer == nil {
return nil, errors.New("Peer is nil")
}
eventEndpoint, ok := peer.(api.EventEndpoint)
if !ok {
panic("peer is not an EventEndpoint")
}
return deliverconn.New(context, chConfig, deliverconn.Deliver, peer.URL(), eventEndpoint.Opts()...)
}
// deliverFilteredProvider is the connection provider used for connecting to the DeliverFiltered service
var deliverFilteredProvider = func(context fabcontext.Client, chConfig fab.ChannelCfg, peer fab.Peer) (api.Connection, error) {
if peer == nil {
return nil, errors.New("Peer is nil")
}
eventEndpoint, ok := peer.(api.EventEndpoint)
if !ok {
panic("peer is not an EventEndpoint")
}
return deliverconn.New(context, chConfig, deliverconn.DeliverFiltered, peer.URL(), eventEndpoint.Opts()...)
}
// Client connects to a peer and receives channel events, such as bock, filtered block, chaincode, and transaction status events.
type Client struct {
*client.Client
params
}
// New returns a new deliver event client
func New(context fabcontext.Client, chConfig fab.ChannelCfg, discoveryService fab.DiscoveryService, opts ...options.Opt) (*Client, error) {
params := defaultParams()
options.Apply(params, opts)
// Use a custom Discovery Service which wraps the given discovery service
// and produces event endpoints containing additional GRPC options.
discoveryWrapper, err := endpoint.NewEndpointDiscoveryWrapper(context, chConfig.ID(), discoveryService)
if err != nil {
return nil, err
}
dispatcher := dispatcher.New(context, chConfig, discoveryWrapper, params.connProvider, opts...)
//default seek type is `Newest`
if params.seekType == "" {
params.seekType = seek.Newest
//discard (do not publish) next BlockEvent/FilteredBlockEvent in dispatcher, since default seek type 'newest' is
// only needed for block height calculations
dispatcher.UpdateLastBlockInfoOnly()
}
client := &Client{
Client: client.New(dispatcher, opts...),
params: *params,
}
client.SetAfterConnectHandler(client.seek)
client.SetBeforeReconnectHandler(client.setSeekFromLastBlockReceived)
if err := client.Start(); err != nil {
return nil, err
}
return client, nil
}
func (c *Client) seek() error {
logger.Debug("Sending seek request....")
seekInfo, err := c.seekInfo()
if err != nil {
return err
}
errch := make(chan error)
err1 := c.Submit(dispatcher.NewSeekEvent(seekInfo, errch))
if err1 != nil {
return err1
}
select {
case err = <-errch:
case <-time.After(c.respTimeout):
err = errors.New("timeout waiting for deliver status response")
}
if err != nil {
logger.Errorf("Unable to send seek request: %s", err)
return err
}
logger.Debug("Successfully sent seek")
return nil
}
func (c *Client) setSeekFromLastBlockReceived() error {
c.Lock()
defer c.Unlock()
// Make sure that, when we reconnect, we receive all of the events that we've missed
lastBlockNum := c.Dispatcher().LastBlockNum()
if lastBlockNum < math.MaxUint64 {
c.seekType = seek.FromBlock
c.fromBlock = c.Dispatcher().LastBlockNum() + 1
logger.Debugf("Setting seek info from last block received + 1: %d", c.fromBlock)
} else {
// We haven't received any blocks yet. Just ask for the newest
logger.Debugf("Setting seek info from newest")
c.seekType = seek.Newest
}
return nil
}
func (c *Client) seekInfo() (*ab.SeekInfo, error) {
c.RLock()
defer c.RUnlock()
switch c.seekType {
case seek.Newest:
logger.Debugf("Returning seek info: Newest")
return seek.InfoNewest(), nil
case seek.Oldest:
logger.Debugf("Returning seek info: Oldest")
return seek.InfoOldest(), nil
case seek.FromBlock:
logger.Debugf("Returning seek info: FromBlock(%d)", c.fromBlock)
return seek.InfoFrom(c.fromBlock), nil
default:
return nil, errors.Errorf("unsupported seek type:[%s]", c.seekType)
}
}