forked from cgorenflo/fabric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
producer.go
126 lines (108 loc) · 3.37 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package comm
import (
"fmt"
"math/rand"
"sync"
"time"
"github.com/hyperledger/fabric/common/flogging"
"google.golang.org/grpc"
)
var logger = flogging.MustGetLogger("ConnProducer")
var EndpointDisableInterval = time.Second * 10
// ConnectionFactory creates a connection to a certain endpoint
type ConnectionFactory func(endpoint string) (*grpc.ClientConn, error)
// ConnectionProducer produces connections out of a set of predefined
// endpoints
type ConnectionProducer interface {
// NewConnection creates a new connection.
// Returns the connection, the endpoint selected, nil on success.
// Returns nil, "", error on failure
NewConnection() (*grpc.ClientConn, string, error)
// UpdateEndpoints updates the endpoints of the ConnectionProducer
// to be the given endpoints
UpdateEndpoints(endpoints []string)
// DisableEndpoint remove endpoint from endpoint for some time
DisableEndpoint(endpoint string)
}
type connProducer struct {
sync.Mutex
endpoints []string
disabledEndpoints map[string]time.Time
connect ConnectionFactory
}
// NewConnectionProducer creates a new ConnectionProducer with given endpoints and connection factory.
// It returns nil, if the given endpoints slice is empty.
func NewConnectionProducer(factory ConnectionFactory, endpoints []string) ConnectionProducer {
if len(endpoints) == 0 {
return nil
}
return &connProducer{endpoints: endpoints, connect: factory, disabledEndpoints: make(map[string]time.Time)}
}
// NewConnection creates a new connection.
// Returns the connection, the endpoint selected, nil on success.
// Returns nil, "", error on failure
func (cp *connProducer) NewConnection() (*grpc.ClientConn, string, error) {
cp.Lock()
defer cp.Unlock()
for endpoint, timeout := range cp.disabledEndpoints {
if time.Since(timeout) >= EndpointDisableInterval {
delete(cp.disabledEndpoints, endpoint)
}
}
endpoints := shuffle(cp.endpoints)
checkedEndpoints := make([]string, 0)
for _, endpoint := range endpoints {
if _, ok := cp.disabledEndpoints[endpoint]; !ok {
checkedEndpoints = append(checkedEndpoints, endpoint)
conn, err := cp.connect(endpoint)
if err != nil {
logger.Error("Failed connecting to", endpoint, ", error:", err)
continue
}
return conn, endpoint, nil
}
}
return nil, "", fmt.Errorf("Could not connect to any of the endpoints: %v", checkedEndpoints)
}
// UpdateEndpoints updates the endpoints of the ConnectionProducer
// to be the given endpoints
func (cp *connProducer) UpdateEndpoints(endpoints []string) {
if len(endpoints) == 0 {
// Ignore updates with empty endpoints
return
}
cp.Lock()
defer cp.Unlock()
newDisabled := make(map[string]time.Time)
for i := range endpoints {
if startTime, ok := cp.disabledEndpoints[endpoints[i]]; ok {
newDisabled[endpoints[i]] = startTime
}
}
cp.endpoints = endpoints
cp.disabledEndpoints = newDisabled
}
func (cp *connProducer) DisableEndpoint(endpoint string) {
cp.Lock()
defer cp.Unlock()
for i := range cp.endpoints {
if cp.endpoints[i] == endpoint {
cp.disabledEndpoints[endpoint] = time.Now()
break
}
}
}
func shuffle(a []string) []string {
n := len(a)
returnedSlice := make([]string, n)
rand.Seed(time.Now().UnixNano())
indices := rand.Perm(n)
for i, idx := range indices {
returnedSlice[i] = a[idx]
}
return returnedSlice
}