forked from veggiedefender/torrent-client
/
dht_node.go
156 lines (141 loc) · 4.11 KB
/
dht_node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package dht_node
import (
dhtlog "github.com/anacrolix/log" // logger for dht Node
"github.com/anacrolix/torrent/metainfo"
"github.com/martenwallewein/torrent-client/peers"
"github.com/netsec-ethz/scion-apps/pkg/appnet"
"github.com/netsys-lab/dht"
"github.com/netsys-lab/dht/krpc"
peer_store "github.com/netsys-lab/dht/peer-store"
"github.com/scionproto/scion/go/lib/snet"
log "github.com/sirupsen/logrus"
"net"
"sync/atomic"
)
type DhtNode struct {
Node *dht.Server
peersStream *dht.Announce
stats *dhtStats
infoHash [20]byte
nodeAddr *net.UDPAddr
peerPort uint16
onNewPeerReceived func(peer peers.Peer)
}
type dhtStats struct {
receivedPeers uint32
blockedPeers uint32
numberOfAnnounces uint32
zeroPortsReceived uint32
}
// New creates a new DHT Node.
// peerPort, the port the controlling peer is listening to
// onNewPeerReceived, a function to be executed when a new Peer was found, used for adding the new peer to the
// controlling peers storage
func New(
nodeAddr *net.UDPAddr,
torrentInfoHash [20]byte,
startingNodes []dht.Addr,
peerPort uint16,
onNewPeerReceived func(peer peers.Peer)) (*DhtNode, error) {
log.Infof("creating new dht node, initial nodes: %+v, listening on: %+v, peer port: %d", startingNodes, nodeAddr, peerPort)
stats := &dhtStats{}
con, err := appnet.Listen(nodeAddr)
if err != nil {
log.Error("error creating connection for dht Node")
return nil, err
}
dhtConf := dht.NewDefaultServerConfig()
dhtConf.Conn = con
dhtConf.PeerStore = &peer_store.InMemory{}
dhtConf.Logger = dhtlog.Default.FilterLevel(dhtlog.Debug)
dhtConf.OnAnnouncePeer = func(infoHash metainfo.Hash, scionAddr snet.UDPAddr, port int, portOk bool) {
log.Debugf("handling announce for %s - %s - %d - %t", infoHash, scionAddr.String(), port, portOk)
var infoH [20]byte
copy(infoH[:], infoHash.Bytes())
if torrentInfoHash != infoH || !portOk || port == 0 {
atomic.AddUint32(&stats.blockedPeers, 1)
if port == 0 {
atomic.AddUint32(&stats.zeroPortsReceived, 1)
}
log.Infof("rejected peer %s - %s - %d - %t", infoHash, scionAddr, port, portOk)
return
}
dhtConf.PeerStore.AddPeer(infoHash, krpc.NodeAddr{
IP: scionAddr.Host.IP,
Port: port,
IA: scionAddr.IA,
})
atomic.AddUint32(&stats.receivedPeers, 1)
}
dhtConf.StartingNodes = func() ([]dht.Addr, error) {
return startingNodes, nil
}
node, err := dht.NewServer(dhtConf)
if err != nil {
log.Errorf("error creating dht Node: %v", err)
return nil, err
}
log.Infof("created dht server with id %+v", node.ID())
dhtNode := DhtNode{
Node: node,
infoHash: torrentInfoHash,
onNewPeerReceived: onNewPeerReceived,
stats: stats,
peerPort: peerPort,
nodeAddr: nodeAddr,
}
dhtNode.announceAndGetPeers()
return &dhtNode, nil
}
func (d *DhtNode) Port() *uint16 {
if d != nil {
return nil
}
port := uint16(d.nodeAddr.Port)
return &port
}
// announceAndGetPeers get peers via DHT and announce presence
func (d *DhtNode) announceAndGetPeers() {
log.Info("announcing via dht")
atomic.AddUint32(&d.stats.numberOfAnnounces, 1)
if d.peersStream != nil {
d.peersStream.Close()
}
ps, err := d.Node.Announce(d.infoHash, int(d.peerPort), false)
if err != nil {
log.Error(err)
return
}
d.peersStream = ps
go d.consumePeers()
}
func convertPeer(peer dht.Peer) peers.Peer {
return peers.Peer{
Addr: peer.String(),
Index: 0,
}
}
func (d *DhtNode) consumePeers() {
for v := range d.peersStream.Peers {
for _, cp := range v.Peers {
atomic.AddUint32(&d.stats.receivedPeers, 1)
if cp.Port == 0 {
atomic.AddUint32(&d.stats.blockedPeers, 1)
atomic.AddUint32(&d.stats.zeroPortsReceived, 1)
continue
}
d.onNewPeerReceived(convertPeer(cp))
}
}
}
func (d *DhtNode) Close() {
if d.peersStream != nil {
d.peersStream.Close()
}
d.PrintStats()
d.Node.Close()
}
func (d *DhtNode) PrintStats() {
log.Printf("Announced %d times, recieved %d peers, blocked %d peers, blocked 0-port %d peers",
d.stats.numberOfAnnounces, d.stats.receivedPeers, d.stats.blockedPeers, d.stats.zeroPortsReceived)
}