forked from NebulousLabs/Sia
-
Notifications
You must be signed in to change notification settings - Fork 7
/
worker.go
145 lines (129 loc) · 4.86 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package renter
import (
"sync"
"time"
"github.com/NebulousLabs/Sia/modules"
"github.com/NebulousLabs/Sia/types"
)
// A worker listens for work on a certain host.
//
// The mutex of the worker only protects the 'unprocessedChunks' and the
// 'standbyChunks' fields of the worker. The rest of the fields are only
// interacted with exclusively by the primary worker thread, and only one of
// those ever exists at a time.
//
// The workers have a concept of 'cooldown' for uploads and downloads. If a
// download or upload operation fails, the assumption is that future attempts
// are also likely to fail, because whatever condition resulted in the failure
// will still be present until some time has passed. Without any cooldowns,
// uploading and downloading with flaky hosts in the worker sets has
// substantially reduced overall performance and throughput.
type worker struct {
// The contract and host used by this worker.
contract modules.RenterContract
hostPubKey types.SiaPublicKey
renter *Renter
// Download variables that are not protected by a mutex, but also do not
// need to be protected by a mutex, as they are only accessed by the master
// thread for the worker.
ownedDownloadConsecutiveFailures int // How many failures in a row?
ownedDownloadRecentFailure time.Time // How recent was the last failure?
// Download variables related to queuing work. They have a separate mutex to
// minimize lock contention.
downloadChan chan struct{} // Notifications of new work. Takes priority over uploads.
downloadChunks []*unfinishedDownloadChunk // Yet unprocessed work items.
downloadMu sync.Mutex
downloadTerminated bool // Has downloading been terminated for this worker?
// Upload variables.
unprocessedChunks []*unfinishedUploadChunk // Yet unprocessed work items.
uploadChan chan struct{} // Notifications of new work.
uploadConsecutiveFailures int // How many times in a row uploading has failed.
uploadRecentFailure time.Time // How recent was the last failure?
uploadTerminated bool // Have we stopped uploading?
// Utilities.
//
// The mutex is only needed when interacting with 'downloadChunks' and
// 'unprocessedChunks', as everything else is only accessed from the single
// master thread.
killChan chan struct{} // Worker will shut down if a signal is sent down this channel.
mu sync.Mutex
}
// updateWorkerPool will grab the set of contracts from the contractor and
// update the worker pool to match.
func (r *Renter) managedUpdateWorkerPool() {
contractSlice := r.hostContractor.Contracts()
contractMap := make(map[types.FileContractID]modules.RenterContract)
for i := 0; i < len(contractSlice); i++ {
contractMap[contractSlice[i].ID] = contractSlice[i]
}
// Add a worker for any contract that does not already have a worker.
for id, contract := range contractMap {
lockID := r.mu.Lock()
_, exists := r.workerPool[id]
if !exists {
worker := &worker{
contract: contract,
hostPubKey: contract.HostPublicKey,
downloadChan: make(chan struct{}, 1),
killChan: make(chan struct{}),
uploadChan: make(chan struct{}, 1),
renter: r,
}
r.workerPool[id] = worker
go worker.threadedWorkLoop()
}
r.mu.Unlock(lockID)
}
// Remove a worker for any worker that is not in the set of new contracts.
lockID := r.mu.Lock()
for id, worker := range r.workerPool {
_, exists := contractMap[id]
if !exists {
delete(r.workerPool, id)
close(worker.killChan)
}
}
r.mu.Unlock(lockID)
}
// threadedWorkLoop repeatedly issues work to a worker, stopping when the worker
// is killed or when the thread group is closed.
func (w *worker) threadedWorkLoop() {
err := w.renter.tg.Add()
if err != nil {
return
}
defer w.renter.tg.Done()
defer w.managedKillUploading()
defer w.managedKillDownloading()
for {
// Perform one stpe of processing download work.
downloadChunk := w.managedNextDownloadChunk()
if downloadChunk != nil {
// managedDownload will handle removing the worker internally. If
// the chunk is dropped from the worker, the worker will be removed
// from the chunk. If the worker executes a download (success or
// failure), the worker will be removed from the chunk. If the
// worker is put on standby, it will not be removed from the chunk.
w.managedDownload(downloadChunk)
continue
}
// Perform one step of processing upload work.
chunk, pieceIndex := w.managedNextUploadChunk()
if chunk != nil {
w.managedUpload(chunk, pieceIndex)
continue
}
// Block until new work is received via the upload or download channels,
// or until a kill or stop signal is received.
select {
case <-w.downloadChan:
continue
case <-w.uploadChan:
continue
case <-w.killChan:
return
case <-w.renter.tg.StopChan():
return
}
}
}