forked from NebulousLabs/Sia
-
Notifications
You must be signed in to change notification settings - Fork 0
/
scan.go
383 lines (343 loc) · 12.6 KB
/
scan.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
package hostdb
// scan.go contains the functions which periodically scan the list of all hosts
// to see which hosts are online or offline, and to get any updates to the
// settings of the hosts.
import (
"net"
"time"
"github.com/NebulousLabs/Sia/build"
"github.com/NebulousLabs/Sia/crypto"
"github.com/NebulousLabs/Sia/encoding"
"github.com/NebulousLabs/Sia/modules"
"github.com/NebulousLabs/fastrand"
)
// queueScan will add a host to the queue to be scanned.
func (hdb *HostDB) queueScan(entry modules.HostDBEntry) {
// If this entry is already in the scan pool, can return immediately.
_, exists := hdb.scanMap[entry.PublicKey.String()]
if exists {
return
}
// Add the entry to a waitlist, then check if any thread is currently
// emptying the waitlist. If not, spawn a thread to empty the waitlist.
hdb.scanMap[entry.PublicKey.String()] = struct{}{}
hdb.scanList = append(hdb.scanList, entry)
if hdb.scanWait {
// Another thread is emptying the scan list, nothing to worry about.
return
}
// Sanity check - the scan map and the scan list should have the same
// length.
if build.DEBUG && len(hdb.scanMap) > len(hdb.scanList)+maxScanningThreads {
hdb.log.Critical("The hostdb scan map has seemingly grown too large:", len(hdb.scanMap), len(hdb.scanList), maxScanningThreads)
}
hdb.scanWait = true
go func() {
scanPool := make(chan modules.HostDBEntry)
defer close(scanPool)
// Nobody is emptying the scan list, volunteer.
if hdb.tg.Add() != nil {
// Hostdb is shutting down, don't spin up another thread. It is
// okay to leave scanWait set to true as that will not affect
// shutdown.
return
}
defer hdb.tg.Done()
// Due to the patterns used to spin up scanning threads, it's possible
// that we get to this point while all scanning threads are currently
// used up, completing jobs that were sent out by the previous pool
// managing thread. This thread is at risk of deadlocking if there's
// not at least one scanning thread accepting work that it created
// itself, so we use a starterThread exception and spin up
// one-thread-too-many on the first iteration to ensure that we do not
// deadlock.
starterThread := false
for {
// If the scanList is empty, this thread can spin down.
hdb.mu.Lock()
if len(hdb.scanList) == 0 {
// Scan list is empty, can exit. Let the world know that nobody
// is emptying the scan list anymore.
hdb.scanWait = false
hdb.mu.Unlock()
return
}
// Get the next host, shrink the scan list.
entry := hdb.scanList[0]
hdb.scanList = hdb.scanList[1:]
delete(hdb.scanMap, entry.PublicKey.String())
scansRemaining := len(hdb.scanList)
// Grab the most recent entry for this host.
recentEntry, exists := hdb.hostTree.Select(entry.PublicKey)
if exists {
entry = recentEntry
}
// Try to send this entry to an existing idle worker (non-blocking).
select {
case scanPool <- entry:
hdb.log.Debugf("Sending host %v for scan, %v hosts remain", entry.PublicKey.String(), scansRemaining)
hdb.mu.Unlock()
continue
default:
}
// Create new worker thread.
if hdb.scanningThreads < maxScanningThreads || !starterThread {
starterThread = true
hdb.scanningThreads++
go func() {
hdb.threadedProbeHosts(scanPool)
hdb.mu.Lock()
hdb.scanningThreads--
hdb.mu.Unlock()
}()
}
hdb.mu.Unlock()
// Block while waiting for an opening in the scan pool.
hdb.log.Debugf("Sending host %v for scan, %v hosts remain", entry.PublicKey.String(), scansRemaining)
select {
case scanPool <- entry:
// iterate again
case <-hdb.tg.StopChan():
// quit
return
}
}
}()
}
// updateEntry updates an entry in the hostdb after a scan has taken place.
//
// CAUTION: This function will automatically add multiple entries to a new host
// to give that host some base uptime. This makes this function co-dependent
// with the host weight functions. Adjustment of the host weight functions need
// to keep this function in mind, and vice-versa.
func (hdb *HostDB) updateEntry(entry modules.HostDBEntry, netErr error) {
// If the scan failed because we don't have Internet access, toss out this update.
if netErr != nil && !hdb.gateway.Online() {
return
}
// Grab the host from the host tree, and update it with the neew settings.
newEntry, exists := hdb.hostTree.Select(entry.PublicKey)
if exists {
newEntry.HostExternalSettings = entry.HostExternalSettings
} else {
newEntry = entry
}
// Update the recent interactions with this host.
if netErr == nil {
newEntry.RecentSuccessfulInteractions++
} else {
newEntry.RecentFailedInteractions++
}
// Add the datapoints for the scan.
if len(newEntry.ScanHistory) < 2 {
// Add two scans to the scan history. Two are needed because the scans
// are forward looking, but we want this first scan to represent as
// much as one week of uptime or downtime.
earliestStartTime := time.Now().Add(time.Hour * 7 * 24 * -1) // Permit up to a week of starting uptime or downtime.
suggestedStartTime := time.Now().Add(time.Minute * 10 * time.Duration(hdb.blockHeight-entry.FirstSeen+1) * -1) // Add one to the FirstSeen in case FirstSeen is this block, guarantees incrementing order.
if suggestedStartTime.Before(earliestStartTime) {
suggestedStartTime = earliestStartTime
}
newEntry.ScanHistory = modules.HostDBScans{
{Timestamp: suggestedStartTime, Success: netErr == nil},
{Timestamp: time.Now(), Success: netErr == nil},
}
} else {
if newEntry.ScanHistory[len(newEntry.ScanHistory)-1].Success && netErr != nil {
hdb.log.Debugf("Host %v is being downgraded from an online host to an offline host: %v\n", newEntry.PublicKey.String(), netErr)
}
// Make sure that the current time is after the timestamp of the
// previous scan. It may not be if the system clock has changed. This
// will prevent the sort-check sanity checks from triggering.
newTimestamp := time.Now()
prevTimestamp := newEntry.ScanHistory[len(newEntry.ScanHistory)-1].Timestamp
if !newTimestamp.After(prevTimestamp) {
newTimestamp = prevTimestamp.Add(time.Second)
}
// Before appending, make sure that the scan we just performed is
// timestamped after the previous scan performed. It may not be if the
// system clock has changed.
newEntry.ScanHistory = append(newEntry.ScanHistory, modules.HostDBScan{Timestamp: newTimestamp, Success: netErr == nil})
}
// Check whether any of the recent scans demonstrate uptime. The pruning and
// compression of the history ensure that there are only relatively recent
// scans represented.
var recentUptime bool
for _, scan := range newEntry.ScanHistory {
if scan.Success {
recentUptime = true
}
}
// If the host has been offline for too long, delete the host from the
// hostdb. Only delete if there have been enough scans over a long enough
// period to be confident that the host really is offline for good.
if time.Now().Sub(newEntry.ScanHistory[0].Timestamp) > maxHostDowntime && !recentUptime && len(newEntry.ScanHistory) >= minScans {
err := hdb.hostTree.Remove(newEntry.PublicKey)
if err != nil {
hdb.log.Println("ERROR: unable to remove host newEntry which has had a ton of downtime:", err)
}
// The function should terminate here as no more interaction is needed
// with this host.
return
}
// Compress any old scans into the historic values.
for len(newEntry.ScanHistory) > minScans && time.Now().Sub(newEntry.ScanHistory[0].Timestamp) > maxHostDowntime {
timePassed := newEntry.ScanHistory[1].Timestamp.Sub(newEntry.ScanHistory[0].Timestamp)
if newEntry.ScanHistory[0].Success {
newEntry.HistoricUptime += timePassed
} else {
newEntry.HistoricDowntime += timePassed
}
newEntry.ScanHistory = newEntry.ScanHistory[1:]
}
// Add the updated entry
if !exists {
err := hdb.hostTree.Insert(newEntry)
if err != nil {
hdb.log.Println("ERROR: unable to insert entry which is was thought to be new:", err)
} else {
hdb.log.Debugf("Adding host %v to the hostdb. Net error: %v\n", newEntry.PublicKey.String(), netErr)
}
} else {
err := hdb.hostTree.Modify(newEntry)
if err != nil {
hdb.log.Println("ERROR: unable to modify entry which is thought to exist:", err)
} else {
hdb.log.Debugf("Adding host %v to the hostdb. Net error: %v\n", newEntry.PublicKey.String(), netErr)
}
}
}
// managedScanHost will connect to a host and grab the settings, verifying
// uptime and updating to the host's preferences.
func (hdb *HostDB) managedScanHost(entry modules.HostDBEntry) {
// Request settings from the queued host entry.
netAddr := entry.NetAddress
pubKey := entry.PublicKey
hdb.log.Debugf("Scanning host %v at %v", pubKey, netAddr)
// Update historic interactions of entry if necessary
hdb.mu.RLock()
updateHostHistoricInteractions(&entry, hdb.blockHeight)
hdb.mu.RUnlock()
var settings modules.HostExternalSettings
err := func() error {
dialer := &net.Dialer{
Cancel: hdb.tg.StopChan(),
Timeout: hostRequestTimeout,
}
conn, err := dialer.Dial("tcp", string(netAddr))
if err != nil {
return err
}
connCloseChan := make(chan struct{})
go func() {
select {
case <-hdb.tg.StopChan():
case <-connCloseChan:
}
conn.Close()
}()
defer close(connCloseChan)
conn.SetDeadline(time.Now().Add(hostScanDeadline))
err = encoding.WriteObject(conn, modules.RPCSettings)
if err != nil {
return err
}
var pubkey crypto.PublicKey
copy(pubkey[:], pubKey.Key)
return crypto.ReadSignedObject(conn, &settings, maxSettingsLen, pubkey)
}()
if err != nil {
hdb.log.Debugf("Scan of host at %v failed: %v", netAddr, err)
} else {
hdb.log.Debugf("Scan of host at %v succeeded.", netAddr)
entry.HostExternalSettings = settings
}
// Update the host tree to have a new entry, including the new error. Then
// delete the entry from the scan map as the scan has been successful.
hdb.mu.Lock()
hdb.updateEntry(entry, err)
hdb.mu.Unlock()
}
// threadedProbeHosts pulls hosts from the thread pool and runs a scan on them.
func (hdb *HostDB) threadedProbeHosts(scanPool <-chan modules.HostDBEntry) {
err := hdb.tg.Add()
if err != nil {
return
}
defer hdb.tg.Done()
for hostEntry := range scanPool {
// Block until hostdb has internet connectivity.
for {
hdb.mu.RLock()
online := hdb.gateway.Online()
hdb.mu.RUnlock()
if online {
break
}
select {
case <-time.After(time.Second * 30):
continue
case <-hdb.tg.StopChan():
return
}
}
// There appears to be internet connectivity, continue with the
// scan.
hdb.managedScanHost(hostEntry)
}
}
// threadedScan is an ongoing function which will query the full set of hosts
// every few hours to see who is online and available for uploading.
func (hdb *HostDB) threadedScan() {
err := hdb.tg.Add()
if err != nil {
return
}
defer hdb.tg.Done()
for {
// Set up a scan for the hostCheckupQuanity most valuable hosts in the
// hostdb. Hosts that fail their scans will be docked significantly,
// pushing them further back in the hierarchy, ensuring that for the
// most part only online hosts are getting scanned unless there are
// fewer than hostCheckupQuantity of them.
// Grab a set of hosts to scan, grab hosts that are active, inactive,
// and offline to get high diversity.
var onlineHosts, offlineHosts []modules.HostDBEntry
allHosts := hdb.hostTree.All()
for i := len(allHosts) - 1; i >= 0; i-- {
if len(onlineHosts) >= hostCheckupQuantity && len(offlineHosts) >= hostCheckupQuantity {
break
}
// Figure out if the host is online or offline.
host := allHosts[i]
online := len(host.ScanHistory) > 0 && host.ScanHistory[len(host.ScanHistory)-1].Success
if online && len(onlineHosts) < hostCheckupQuantity {
onlineHosts = append(onlineHosts, host)
} else if !online && len(offlineHosts) < hostCheckupQuantity {
offlineHosts = append(offlineHosts, host)
}
}
// Queue the scans for each host.
hdb.log.Println("Performing scan on", len(onlineHosts), "online hosts and", len(offlineHosts), "offline hosts.")
hdb.mu.Lock()
for _, host := range onlineHosts {
hdb.queueScan(host)
}
for _, host := range offlineHosts {
hdb.queueScan(host)
}
hdb.mu.Unlock()
// Sleep for a random amount of time before doing another round of
// scanning. The minimums and maximums keep the scan time reasonable,
// while the randomness prevents the scanning from always happening at
// the same time of day or week.
sleepRange := uint64(maxScanSleep - minScanSleep)
sleepTime := minScanSleep + time.Duration(fastrand.Uint64n(sleepRange))
// Sleep until it's time for the next scan cycle.
select {
case <-hdb.tg.StopChan():
return
case <-time.After(sleepTime):
}
}
}