/
process_out.go
195 lines (172 loc) · 5.48 KB
/
process_out.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package tss
import (
"encoding/json"
"fmt"
"strconv"
"strings"
btss "github.com/binance-chain/tss-lib/tss"
"github.com/meshplus/bitxhub-core/tss/conversion"
"github.com/meshplus/bitxhub-core/tss/message"
"github.com/sirupsen/logrus"
)
// The TaskMessage body content
type BulkWireMsg struct {
WiredBulkMsgs []byte
MsgIdentifier string
Routing *btss.MessageRouting
}
func NewBulkWireMsg(msg []byte, id string, r *btss.MessageRouting) BulkWireMsg {
return BulkWireMsg{
WiredBulkMsgs: msg,
MsgIdentifier: id,
Routing: r,
}
}
// ProcessOutCh sends msg out ==========================================================================================
// add msgType to the library message, send it as requested or broadcast
func (t *TssInstance) ProcessOutCh(tssMsg btss.Message, msgType message.TssMsgType) error {
// 1. get wire byte, include msg content and rout info
msgData, r, err := tssMsg.WireBytes()
// if we cannot get the wire share, the tss will fail, we just quit.
if err != nil {
return fmt.Errorf("fail to get wire bytes: %w", err)
}
// 2. store this message in cache
cachedWiredMsg := NewBulkWireMsg(msgData, tssMsg.GetFrom().Moniker, r)
if r.IsBroadcast {
dat, ok := t.cachedWireBroadcastMsgLists.Load(tssMsg.Type())
if !ok {
l := []BulkWireMsg{cachedWiredMsg}
t.cachedWireBroadcastMsgLists.Store(tssMsg.Type(), l)
} else {
cachedList := dat.([]BulkWireMsg)
cachedList = append(cachedList, cachedWiredMsg)
t.cachedWireBroadcastMsgLists.Store(tssMsg.Type(), cachedList)
}
} else {
dat, ok := t.cachedWireUnicastMsgLists.Load(tssMsg.Type() + ":" + r.To[0].String())
if !ok {
l := []BulkWireMsg{cachedWiredMsg}
t.cachedWireUnicastMsgLists.Store(tssMsg.Type()+":"+r.To[0].String(), l)
} else {
cachedList := dat.([]BulkWireMsg)
cachedList = append(cachedList, cachedWiredMsg)
t.cachedWireUnicastMsgLists.Store(tssMsg.Type()+":"+r.To[0].String(), cachedList)
}
}
// 3. send broadcast msg
t.cachedWireBroadcastMsgLists.Range(func(key, value interface{}) bool {
wiredMsgList := value.([]BulkWireMsg)
wiredMsgType := key.(string)
if len(wiredMsgList) == t.msgNum {
err := t.sendBulkMsg(wiredMsgType, msgType, wiredMsgList)
if err != nil {
t.logger.Errorf("error in send bulk message")
return true
}
t.cachedWireBroadcastMsgLists.Delete(key)
}
return true
})
// 4. send unicast msg
t.cachedWireUnicastMsgLists.Range(func(key, value interface{}) bool {
wiredMsgList := value.([]BulkWireMsg)
ret := strings.Split(key.(string), ":")
wiredMsgType := ret[0]
if len(wiredMsgList) == t.msgNum {
err := t.sendBulkMsg(wiredMsgType, msgType, wiredMsgList)
if err != nil {
t.logger.Errorf("error in send bulk message")
return true
}
t.cachedWireUnicastMsgLists.Delete(key)
}
return true
})
return nil
}
// sendBulkMsg packages the message with type and signature, put it into the network module to send it out
func (t *TssInstance) sendBulkMsg(wiredMsgType string, tssMsgType message.TssMsgType, wiredMsgList []BulkWireMsg) error {
// 1. get msg rout info
// since all the messages in the list is the same round, so it must have the same dest
// we just need to get the routing info of the first message
r := wiredMsgList[0].Routing
// 2. msg marshal
buf, err := json.Marshal(wiredMsgList)
if err != nil {
return fmt.Errorf("error in marshal the cachedWireMsg: %w", err)
}
// 3. sign with p2p privkey (receive a message requires the sign to certify that the message came from the source)
sig, err := conversion.GenerateSignature(buf, t.msgID, t.localPrivK)
if err != nil {
t.logger.Errorf("fail to generate the share's signature")
return err
}
// 4. package msg with routing and signature
taskMsg := &message.TaskMessage{
Routing: r,
RoundInfo: wiredMsgType,
Message: buf,
Sig: sig,
}
taskMsgBytes, err := json.Marshal(taskMsg)
if err != nil {
return fmt.Errorf("fail to convert tss msg to wire bytes: %w", err)
}
// 5. constructor a p2p msg with type
wireMsg := &message.WireMessage{
MsgID: t.msgID,
MsgType: tssMsgType,
MsgData: taskMsgBytes,
}
// 6. get msg to info
partiesID := []uint64{}
if len(r.To) != 0 {
idUint, err := strconv.ParseUint(r.To[0].Id, 10, 32)
if err != nil {
return fmt.Errorf("parse uint error: %v", err)
}
partiesID = append(partiesID, idUint)
}
// 7. set to network module
t.renderToP2P(&message.SendMsgChan{
WireMsg: wireMsg,
PartiesID: partiesID,
})
return nil
}
// NotifyTaskDone broadcasts a message, the current task is over ======================================================
func (t *TssInstance) NotifyTaskDone() error {
taskDoneMsg := &message.TssTaskNotifier{
FromID: t.localPartyID,
TaskDone: true,
}
msgData, err := json.Marshal(taskDoneMsg)
if err != nil {
return fmt.Errorf("marshal tss task notifier error: %w", err)
}
wireMsg := &message.WireMessage{
MsgID: t.msgID,
MsgType: message.TSSTaskDone,
MsgData: msgData,
}
var parties []uint64
for id := range t.getPartyInfo().PartyIDMap {
if id == t.localPartyID {
continue
}
tmpId, err := strconv.ParseUint(id, 10, 32)
if err != nil {
t.logger.Errorf("get parties:%s", id)
return fmt.Errorf("receiverBroadcastHashToPeers parse int error: %v", err)
}
parties = append(parties, tmpId)
}
t.logger.WithFields(logrus.Fields{"parties": t.getPartyInfo().PartyIDMap, "msgType": string(wireMsg.MsgType)}).
Debug("send task Done")
t.renderToP2P(&message.SendMsgChan{
WireMsg: wireMsg,
PartiesID: parties,
})
return nil
}