forked from joltify-finance/tss
/
signature_notifier.go
237 lines (216 loc) · 7.08 KB
/
signature_notifier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
package keysign
import (
"context"
"fmt"
"sync"
"time"
"github.com/binance-chain/tss-lib/common"
tsslibcommon "github.com/binance-chain/tss-lib/common"
"github.com/golang/protobuf/proto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/oppyfinance/tss/messages"
"github.com/oppyfinance/tss/p2p"
)
var signatureNotifierProtocol protocol.ID = "/p2p/signatureNotifier"
type signatureItem struct {
messageID string
peerID peer.ID
signatureData []*common.ECSignature
}
// SignatureNotifier is design to notify the
type SignatureNotifier struct {
logger zerolog.Logger
host host.Host
notifierLock *sync.Mutex
notifiers map[string]*Notifier
messages chan *signatureItem
streamMgr *p2p.StreamMgr
}
// NewSignatureNotifier create a new instance of SignatureNotifier
func NewSignatureNotifier(host host.Host) *SignatureNotifier {
s := &SignatureNotifier{
logger: log.With().Str("module", "signature_notifier").Logger(),
host: host,
notifierLock: &sync.Mutex{},
notifiers: make(map[string]*Notifier),
messages: make(chan *signatureItem),
streamMgr: p2p.NewStreamMgr(),
}
host.SetStreamHandler(signatureNotifierProtocol, s.handleStream)
return s
}
// HandleStream handle signature notify stream
func (s *SignatureNotifier) handleStream(stream network.Stream) {
err := stream.Scope().ReserveMemory(1024, network.ReservationPriorityAlways)
if err != nil {
s.logger.Error().Err(err).Msgf("fail to reserve the memory in signature notifier")
return
}
defer stream.Scope().ReleaseMemory(1024)
remotePeer := stream.Conn().RemotePeer()
logger := s.logger.With().Str("remote peer", remotePeer.String()).Logger()
logger.Debug().Msg("reading signature notifier message")
payload, err := p2p.ReadStreamWithBuffer(stream)
if err != nil {
logger.Err(err).Msgf("fail to read payload from stream")
s.streamMgr.AddStream("UNKNOWN", stream)
return
}
// we tell the sender we have received the message
err = p2p.WriteStreamWithBuffer([]byte("done"), stream)
if err != nil {
logger.Error().Err(err).Msgf("fail to write the reply to peer: %s", remotePeer)
}
var msg messages.KeysignSignature
if err := proto.Unmarshal(payload, &msg); err != nil {
logger.Err(err).Msg("fail to unmarshal join party request")
s.streamMgr.AddStream("UNKNOWN", stream)
return
}
s.streamMgr.AddStream(msg.ID, stream)
var signatures []*common.ECSignature
if len(msg.Signatures) > 0 && msg.KeysignStatus == messages.KeysignSignature_Success {
for _, el := range msg.Signatures {
var signature common.ECSignature
if err := proto.Unmarshal(el, &signature); err != nil {
logger.Error().Err(err).Msg("fail to unmarshal signature data")
return
}
signatures = append(signatures, &signature)
}
}
s.notifierLock.Lock()
defer s.notifierLock.Unlock()
n, ok := s.notifiers[msg.ID]
if !ok {
logger.Debug().Msgf("notifier for message id(%s) not exist", msg.ID)
return
}
finished, err := n.ProcessSignature(signatures)
if err != nil {
logger.Error().Err(err).Msg("fail to verify local signature data")
return
}
if finished {
delete(s.notifiers, msg.ID)
}
}
func (s *SignatureNotifier) sendOneMsgToPeer(m *signatureItem) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
ctx = network.WithUseTransient(ctx, "signature notifier")
stream, err := s.host.NewStream(ctx, m.peerID, signatureNotifierProtocol)
if err != nil {
return fmt.Errorf("fail to create stream to peer(%s):%w", m.peerID, err)
}
err = stream.Scope().ReserveMemory(1024, network.ReservationPriorityAlways)
if err != nil {
s.logger.Error().Err(err).Msgf("fail to reserve the memory in send to peer signature notifier")
return err
}
defer stream.Scope().ReleaseMemory(1024)
s.logger.Debug().Msgf("open stream to (%s) successfully", m.peerID)
defer func() {
s.streamMgr.AddStream(m.messageID, stream)
}()
ks := &messages.KeysignSignature{
ID: m.messageID,
KeysignStatus: messages.KeysignSignature_Failed,
}
if m.signatureData != nil {
var signatures [][]byte
for _, el := range m.signatureData {
buf, err := proto.Marshal(el)
if err != nil {
return fmt.Errorf("fail to marshal signature data to bytes:%w", err)
}
signatures = append(signatures, buf)
}
ks.Signatures = signatures
ks.KeysignStatus = messages.KeysignSignature_Success
}
ksBuf, err := proto.Marshal(ks)
if err != nil {
return fmt.Errorf("fail to marshal Keysign Signature to bytes:%w", err)
}
err = p2p.WriteStreamWithBuffer(ksBuf, stream)
if err != nil {
return fmt.Errorf("fail to write message to stream:%w", err)
}
if p2p.ApplyDeadline {
// we wait for 1 second to allow the receive notify us
if err := stream.SetReadDeadline(time.Now().Add(time.Second * 1)); nil != err {
return err
}
}
ret := make([]byte, 8)
_, err = stream.Read(ret)
return err
}
// BroadcastSignature sending the keysign signature to all other peers
func (s *SignatureNotifier) BroadcastSignature(messageID string, sig []*tsslibcommon.ECSignature, peers []peer.ID) error {
return s.broadcastCommon(messageID, sig, peers)
}
func (s *SignatureNotifier) broadcastCommon(messageID string, sig []*tsslibcommon.ECSignature, peers []peer.ID) error {
wg := sync.WaitGroup{}
for _, p := range peers {
if p == s.host.ID() {
// don't send the signature to itself
continue
}
signature := &signatureItem{
messageID: messageID,
peerID: p,
signatureData: sig,
}
wg.Add(1)
go func() {
defer wg.Done()
err := s.sendOneMsgToPeer(signature)
if err != nil {
s.logger.Error().Err(err).Msgf("fail to send signature to peer %s", signature.peerID.String())
}
}()
}
wg.Wait()
return nil
}
// BroadcastFailed will send keysign failed message to the nodes that are not in the keysign party
func (s *SignatureNotifier) BroadcastFailed(messageID string, peers []peer.ID) error {
return s.broadcastCommon(messageID, nil, peers)
}
func (s *SignatureNotifier) addToNotifiers(n *Notifier) {
s.notifierLock.Lock()
defer s.notifierLock.Unlock()
s.notifiers[n.MessageID] = n
}
func (s *SignatureNotifier) removeNotifier(n *Notifier) {
s.notifierLock.Lock()
defer s.notifierLock.Unlock()
delete(s.notifiers, n.MessageID)
}
// WaitForSignature wait until keysign finished and signature is available
func (s *SignatureNotifier) WaitForSignature(messageID string, message [][]byte, poolPubKey string, timeout time.Duration, sigChan chan string) ([]*common.ECSignature, error) {
n, err := NewNotifier(messageID, message, poolPubKey)
if err != nil {
return nil, fmt.Errorf("fail to create notifier")
}
s.addToNotifiers(n)
defer s.removeNotifier(n)
select {
case d := <-n.GetResponseChannel():
return d, nil
case <-time.After(timeout):
return nil, fmt.Errorf("timeout: didn't receive signature after %s", timeout)
case <-sigChan:
return nil, p2p.ErrSigGenerated
}
}
func (s *SignatureNotifier) ReleaseStream(msgID string) {
s.streamMgr.ReleaseStream(msgID)
}