/
torrentLoop.go
197 lines (172 loc) · 4.6 KB
/
torrentLoop.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package torrent
import (
"encoding/hex"
"log"
"os"
"os/signal"
"github.com/nictuku/dht"
"golang.org/x/net/proxy"
)
type TorrentFlags struct {
Port int
FileDir string
SeedRatio float64
UseDeadlockDetector bool
UseLPD bool
UseDHT bool
UseUPnP bool
UseNATPMP bool
TrackerlessMode bool
ExecOnSeeding string
// The dial function to use. Nil means use net.Dial
Dial proxy.Dialer
// IP address of gateway used for NAT-PMP
Gateway string
//Provides the filesystems added torrents are saved to
FileSystemProvider FsProvider
//Whether to check file hashes when adding torrents
InitialCheck bool
//Provides cache to each torrent
Cacher CacheProvider
//Whether to write and use *.haveBitset resume data
QuickResume bool
//How many torrents should be active at a time
MaxActive int
//Maximum amount of memory (in MiB) to use for each torrent's Active Pieces.
//0 means a single Active Piece. Negative means Unlimited Active Pieces.
MemoryPerTorrent int
}
func RunTorrents(flags *TorrentFlags, torrentFiles []string) (err error) {
conChan, listenPort, err := ListenForPeerConnections(flags)
if err != nil {
log.Println("Couldn't listen for peers connection: ", err)
return
}
quitChan := listenSigInt()
createChan := make(chan string, flags.MaxActive)
startChan := make(chan *TorrentSession, 1)
doneChan := make(chan *TorrentSession, 1)
var dhtNode dht.DHT
if flags.UseDHT {
dhtNode = *startDHT(flags.Port)
}
torrentSessions := make(map[string]*TorrentSession)
go func() {
for torrentFile := range createChan {
ts, err := NewTorrentSession(flags, torrentFile, uint16(listenPort))
if err != nil {
log.Println("Couldn't create torrent session for "+torrentFile+" .", err)
doneChan <- &TorrentSession{}
} else {
log.Printf("Created torrent session for %s", ts.M.Info.Name)
startChan <- ts
}
}
}()
torrentQueue := []string{}
if len(torrentFiles) > flags.MaxActive {
torrentQueue = torrentFiles[flags.MaxActive:]
}
for i, torrentFile := range torrentFiles {
if i < flags.MaxActive {
createChan <- torrentFile
} else {
break
}
}
lpd := &Announcer{}
if flags.UseLPD {
lpd, err = NewAnnouncer(uint16(listenPort))
if err != nil {
log.Println("Couldn't listen for Local Peer Discoveries: ", err)
flags.UseLPD = false
}
}
theWorldisEnding := false
mainLoop:
for {
select {
case ts := <-startChan:
if !theWorldisEnding {
ts.dht = &dhtNode
if flags.UseLPD {
lpd.Announce(ts.M.InfoHash)
}
torrentSessions[ts.M.InfoHash] = ts
log.Printf("Starting torrent session for %s", ts.M.Info.Name)
go func(t *TorrentSession) {
t.DoTorrent()
doneChan <- t
}(ts)
}
case ts := <-doneChan:
if ts.M != nil {
delete(torrentSessions, ts.M.InfoHash)
if flags.UseLPD {
lpd.StopAnnouncing(ts.M.InfoHash)
}
}
if !theWorldisEnding && len(torrentQueue) > 0 {
createChan <- torrentQueue[0]
torrentQueue = torrentQueue[1:]
continue mainLoop
}
if len(torrentSessions) == 0 {
break mainLoop
}
case <-quitChan:
theWorldisEnding = true
for _, ts := range torrentSessions {
go ts.Quit()
}
case c := <-conChan:
// log.Printf("New bt connection for ih %x", c.Infohash)
if ts, ok := torrentSessions[c.Infohash]; ok {
ts.AcceptNewPeer(c)
}
case dhtPeers := <-dhtNode.PeersRequestResults:
for key, peers := range dhtPeers {
if ts, ok := torrentSessions[string(key)]; ok {
// log.Printf("Received %d DHT peers for torrent session %x\n", len(peers), []byte(key))
for _, peer := range peers {
peer = dht.DecodePeerAddress(peer)
ts.HintNewPeer(peer)
}
} else {
log.Printf("Received DHT peer for an unknown torrent session %x\n", []byte(key))
}
}
case announce := <-lpd.Announces:
hexhash, err := hex.DecodeString(announce.Infohash)
if err != nil {
log.Println("Err with hex-decoding:", err)
}
if ts, ok := torrentSessions[string(hexhash)]; ok {
// log.Printf("Received LPD announce for ih %s", announce.Infohash)
ts.HintNewPeer(announce.Peer)
}
}
}
if flags.UseDHT {
dhtNode.Stop()
}
return
}
func listenSigInt() chan os.Signal {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, os.Kill)
return c
}
func startDHT(listenPort int) *dht.DHT {
// TODO: UPnP UDP port mapping.
cfg := dht.NewConfig()
cfg.Port = listenPort
cfg.NumTargetPeers = TARGET_NUM_PEERS
dhtnode, err := dht.New(cfg)
if err != nil {
log.Println("DHT node creation error:", err)
return nil
}
go dhtnode.Run()
return dhtnode
}