forked from hyperledger/fabric-sdk-go
/
dispatcher.go
executable file
·122 lines (101 loc) · 4.15 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package dispatcher
import (
ab "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric-sdk-go/pkg/common/logging"
"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/api"
clientdisp "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/dispatcher"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/connection"
esdispatcher "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/dispatcher"
cb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer"
"github.com/pkg/errors"
)
var logger = logging.NewLogger("fabsdk/fab")
type dsConnection interface {
api.Connection
Send(seekInfo *ab.SeekInfo) error
}
// Dispatcher is responsible for handling all events, including connection and registration events originating from the client,
// and events originating from the channel event service. All events are processed in a single Go routine
// in order to avoid any race conditions and to ensure that events are processed in the order that they are received.
// This also avoids the need for synchronization.
type Dispatcher struct {
*clientdisp.Dispatcher
}
// New returns a new deliver dispatcher
func New(context fabcontext.Client, chConfig fab.ChannelCfg, discoveryService fab.DiscoveryService, connectionProvider api.ConnectionProvider, opts ...options.Opt) *Dispatcher {
return &Dispatcher{
Dispatcher: clientdisp.New(context, chConfig, discoveryService, connectionProvider, opts...),
}
}
// Start starts the dispatcher
func (ed *Dispatcher) Start() error {
ed.registerHandlers()
if err := ed.Dispatcher.Start(); err != nil {
return errors.WithMessage(err, "error starting deliver event dispatcher")
}
return nil
}
func (ed *Dispatcher) connection() dsConnection {
return ed.Dispatcher.Connection().(dsConnection)
}
func (ed *Dispatcher) handleSeekEvent(e esdispatcher.Event) {
evt := e.(*SeekEvent)
if ed.Connection() == nil {
logger.Warn("Unable to register channel since no connection was established.")
return
}
if err := ed.connection().Send(evt.SeekInfo); err != nil {
evt.ErrCh <- errors.Wrapf(err, "error sending seek info for channel [%s]", ed.ChannelConfig().ID())
} else {
evt.ErrCh <- nil
}
}
func (ed *Dispatcher) handleEvent(e esdispatcher.Event) {
delevent := e.(*connection.Event)
evt := delevent.Event.(*pb.DeliverResponse)
switch response := evt.Type.(type) {
case *pb.DeliverResponse_Status:
ed.handleDeliverResponseStatus(response)
case *pb.DeliverResponse_Block:
ed.HandleBlock(response.Block, delevent.SourceURL)
case *pb.DeliverResponse_FilteredBlock:
ed.HandleFilteredBlock(response.FilteredBlock, delevent.SourceURL)
default:
logger.Errorf("handler not found for deliver response type %T", response)
}
}
func (ed *Dispatcher) handleDeliverResponseStatus(evt *pb.DeliverResponse_Status) {
logger.Debugf("Got deliver response status event: %#v", evt)
if evt.Status == cb.Status_SUCCESS {
return
}
logger.Warnf("Got deliver response status event: %#v. Disconnecting...", evt)
errch := make(chan error, 1)
ed.Dispatcher.HandleDisconnectEvent(&clientdisp.DisconnectEvent{
Errch: errch,
})
err := <-errch
if err != nil {
logger.Warnf("Error disconnecting: %s", err)
}
ed.Dispatcher.HandleDisconnectedEvent(disconnectedEventFromStatus(evt.Status))
}
func (ed *Dispatcher) registerHandlers() {
ed.RegisterHandler(&SeekEvent{}, ed.handleSeekEvent)
ed.RegisterHandler(&connection.Event{}, ed.handleEvent)
}
func disconnectedEventFromStatus(status cb.Status) *clientdisp.DisconnectedEvent {
err := errors.Errorf("got error status from deliver server: %s", status)
if status == cb.Status_FORBIDDEN {
return clientdisp.NewFatalDisconnectedEvent(err)
}
return clientdisp.NewDisconnectedEvent(err)
}