-
Notifications
You must be signed in to change notification settings - Fork 1
/
handlers.go
191 lines (187 loc) · 5.56 KB
/
handlers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package offerproto
/*
* Dual-licensed under Apache-2.0 and MIT.
*
* You can get a copy of the Apache License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* You can also get a copy of the MIT License at
*
* http://opensource.org/licenses/MIT
*
* @wcgcyx - https://github.com/wcgcyx
*/
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/wcgcyx/fcr/comms"
"github.com/wcgcyx/fcr/fcroffer"
)
// handleQueryOffer handles the query offer request.
//
// @input - network stream.
func (proto *OfferProtocol) handleQueryOffer(conn network.Stream) {
defer conn.Close()
// Read request.
req, err := comms.NewRequestIn(proto.routineCtx, proto.opTimeout, proto.ioTimeout, conn, proto.signer)
if err != nil {
log.Debugf("Fail to establish request from %v: %v", conn.ID(), err.Error())
return
}
// Prepare response.
var respStatus bool
var respData []byte
var respErr string
defer func() {
var data []byte
if respStatus {
data = respData
} else {
data = []byte(respErr)
}
err = req.Respond(proto.routineCtx, proto.opTimeout, proto.ioTimeout, respStatus, data)
if err != nil {
log.Debugf("Error sending response: %v", err.Error())
}
}()
// Get request.
reqData, err := req.Receive(proto.routineCtx, proto.ioTimeout)
if err != nil {
log.Debugf("Fail to receive request from stream %v: %v", conn.ID(), err.Error())
return
}
// Start processing request.
subCtx, cancel := context.WithTimeout(proto.routineCtx, proto.opTimeout)
defer cancel()
exists, err := proto.peerMgr.HasPeer(subCtx, req.CurrencyID, req.FromAddr)
if err != nil {
respErr = fmt.Sprintf("Internal error")
log.Warnf("Fail to check if contains peer %v-%v: %v", req.CurrencyID, req.FromAddr, err.Error())
return
}
if exists {
// Check if peer is blocked.
blocked, err := proto.peerMgr.IsBlocked(subCtx, req.CurrencyID, req.FromAddr)
if err != nil {
respErr = fmt.Sprintf("Internal error")
log.Warnf("Fail to check if peer %v-%v is blocked: %v", req.CurrencyID, req.FromAddr, err.Error())
return
}
if blocked {
respErr = fmt.Sprintf("Peer %v-%v is blocked", req.CurrencyID, req.FromAddr)
log.Debugf("Peer %v-%v has been blocked, stop processing request", req.CurrencyID, req.FromAddr)
return
}
} else {
// Insert peer.
pi := peer.AddrInfo{
ID: conn.Conn().RemotePeer(),
Addrs: []multiaddr.Multiaddr{conn.Conn().RemoteMultiaddr()},
}
err = proto.peerMgr.AddPeer(subCtx, req.CurrencyID, req.FromAddr, pi)
if err != nil {
respErr = fmt.Sprintf("Internal error")
log.Warnf("Fail to add peer %v-%v with %v: %v", req.CurrencyID, req.FromAddr, pi, err.Error())
return
}
}
// Decode request
type reqJson struct {
Root cid.Cid `json:"root"`
}
decoded := reqJson{}
err = json.Unmarshal(reqData, &decoded)
if err != nil {
respErr = fmt.Sprintf("Fail to decode request")
log.Debugf("Fail to decode request: %v", err.Error())
return
}
root := decoded.Root
// Check if this root is served at requested currency.
exists, ppb, err := proto.cservMgr.Inspect(subCtx, root, req.CurrencyID)
if err != nil {
respErr = fmt.Sprintf("Internal error")
log.Warnf("Fail to inspect piece manager for %v-%v: %v", root.String(), req.CurrencyID, err.Error())
return
}
if !exists {
respErr = fmt.Sprintf("Not currently served")
return
}
// Check if this piece is not removed.
exists, _, _, size, _, err := proto.pieceMgr.Inspect(subCtx, root)
if err != nil {
respErr = fmt.Sprintf("Internal error")
log.Warnf("Fail to inspect piece manager for %v-%v: %v", root.String(), req.CurrencyID, err.Error())
return
}
if !exists {
respErr = fmt.Sprintf("Not currently served")
go func() {
err := proto.cservMgr.Stop(proto.routineCtx, root, req.CurrencyID)
if err != nil {
log.Warnf("Fail to stop serving piece for removed %v-%v: %v", root.String(), req.CurrencyID, err.Error())
}
}()
return
}
// Get offer nonce for potential offer creation.
nonce, err := proto.offerMgr.GetNonce(subCtx)
if err != nil {
respErr = fmt.Sprintf("Internal error")
log.Warnf("Error getting nonce from offer manager: %v", err.Error())
return
}
expiration := time.Now().Add(proto.offerExpiry)
offer := &fcroffer.PieceOffer{
ID: root,
Size: size,
CurrencyID: req.CurrencyID,
PPB: ppb,
RecipientAddr: req.ToAddr,
Expiration: expiration,
Inactivity: proto.offerInactivity,
Nonce: nonce,
}
// Load miner proof
exists, keyType, minerAddr, proof, err := proto.mps.GetMinerProof(subCtx, req.CurrencyID)
if err != nil {
respErr = fmt.Sprintf("Internal error")
log.Warnf("Fail to get miner proof for %v: %v", req.CurrencyID, err.Error())
return
}
if exists {
offer.LinkedMinerKeyType = keyType
offer.LinkedMinerAddr = minerAddr
offer.LinkedMinerProof = proof
}
// Sign offer
data, err := offer.GetToBeSigned()
if err != nil {
respErr = fmt.Sprintf("Internal error")
log.Errorf("Fail to get signing data for offer: %v", err.Error())
return
}
sigType, sig, err := proto.signer.Sign(subCtx, req.CurrencyID, data)
if err != nil {
respErr = fmt.Sprintf("Internal error")
log.Warnf("Error signing piece offer for %v: %v", req.CurrencyID, err.Error())
return
}
offer.AddSignature(sigType, sig)
// Create response.
respData, err = offer.Encode()
if err != nil {
respErr = fmt.Sprintf("Internal error")
log.Errorf("Error encoding piece offer for %v-%v: %v", root.String(), req.CurrencyID, err.Error())
return
}
respStatus = true
}