forked from NebulousLabs/Sia
-
Notifications
You must be signed in to change notification settings - Fork 0
/
renter.go
469 lines (392 loc) · 16 KB
/
renter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
// Package renter is responsible for uploading and downloading files on the sia
// network.
package renter
// NOTE: Some of the concurrency patterns in the renter are fairly complex. A
// lot of this has been cleaned up, though some shadows of previous demons still
// remain. Be careful when working with anything that involves concurrency.
// TODO: Allow the 'baseMemory' to be set by the user.
// TODO: The repair loop currently receives new upload jobs through a channel.
// The download loop has a better model, a heap that can be pushed to and popped
// from concurrently without needing complex channel communication. Migrating
// the renter to this model should clean up some of the places where uploading
// bottlenecks, and reduce the amount of channel-ninjitsu required to make the
// uploading function.
import (
"errors"
"reflect"
"strings"
"sync"
"github.com/NebulousLabs/Sia/build"
"github.com/NebulousLabs/Sia/modules"
"github.com/NebulousLabs/Sia/modules/renter/contractor"
"github.com/NebulousLabs/Sia/modules/renter/hostdb"
"github.com/NebulousLabs/Sia/persist"
siasync "github.com/NebulousLabs/Sia/sync"
"github.com/NebulousLabs/Sia/types"
"github.com/NebulousLabs/ratelimit"
"github.com/NebulousLabs/threadgroup"
)
var (
errNilContractor = errors.New("cannot create renter with nil contractor")
errNilCS = errors.New("cannot create renter with nil consensus set")
errNilGateway = errors.New("cannot create hostdb with nil gateway")
errNilHdb = errors.New("cannot create renter with nil hostdb")
errNilTpool = errors.New("cannot create renter with nil transaction pool")
)
var (
// priceEstimationScope is the number of hosts that get queried by the
// renter when providing price estimates. Especially for the 'Standard'
// variable, there should be congruence with the number of contracts being
// used in the renter allowance.
priceEstimationScope = build.Select(build.Var{
Standard: int(50),
Dev: int(12),
Testing: int(4),
}).(int)
)
// A hostDB is a database of hosts that the renter can use for figuring out who
// to upload to, and download from.
type hostDB interface {
// ActiveHosts returns the list of hosts that are actively being selected
// from.
ActiveHosts() []modules.HostDBEntry
// AllHosts returns the full list of hosts known to the hostdb, sorted in
// order of preference.
AllHosts() []modules.HostDBEntry
// AverageContractPrice returns the average contract price of a host.
AverageContractPrice() types.Currency
// Close closes the hostdb.
Close() error
// Host returns the HostDBEntry for a given host.
Host(types.SiaPublicKey) (modules.HostDBEntry, bool)
// RandomHosts returns a set of random hosts, weighted by their estimated
// usefulness / attractiveness to the renter. RandomHosts will not return
// any offline or inactive hosts.
RandomHosts(int, []types.SiaPublicKey) []modules.HostDBEntry
// ScoreBreakdown returns a detailed explanation of the various properties
// of the host.
ScoreBreakdown(modules.HostDBEntry) modules.HostScoreBreakdown
// EstimateHostScore returns the estimated score breakdown of a host with the
// provided settings.
EstimateHostScore(modules.HostDBEntry) modules.HostScoreBreakdown
}
// A hostContractor negotiates, revises, renews, and provides access to file
// contracts.
type hostContractor interface {
// SetAllowance sets the amount of money the contractor is allowed to
// spend on contracts over a given time period, divided among the number
// of hosts specified. Note that contractor can start forming contracts as
// soon as SetAllowance is called; that is, it may block.
SetAllowance(modules.Allowance) error
// Allowance returns the current allowance
Allowance() modules.Allowance
// Close closes the hostContractor.
Close() error
// Contracts returns the contracts formed by the contractor.
Contracts() []modules.RenterContract
// ContractByID returns the contract associated with the file contract id.
ContractByID(types.FileContractID) (modules.RenterContract, bool)
// ContractUtility returns the utility field for a given contract, along
// with a bool indicating if it exists.
ContractUtility(types.FileContractID) (modules.ContractUtility, bool)
// CurrentPeriod returns the height at which the current allowance period
// began.
CurrentPeriod() types.BlockHeight
// PeriodSpending returns the amount spent on contracts during the current
// billing period.
PeriodSpending() modules.ContractorSpending
// Editor creates an Editor from the specified contract ID, allowing the
// insertion, deletion, and modification of sectors.
Editor(types.FileContractID, <-chan struct{}) (contractor.Editor, error)
// IsOffline reports whether the specified host is considered offline.
IsOffline(types.FileContractID) bool
// Downloader creates a Downloader from the specified contract ID,
// allowing the retrieval of sectors.
Downloader(types.FileContractID, <-chan struct{}) (contractor.Downloader, error)
// ResolveID returns the most recent renewal of the specified ID.
ResolveID(types.FileContractID) types.FileContractID
}
// A trackedFile contains metadata about files being tracked by the Renter.
// Tracked files are actively repaired by the Renter. By default, files
// uploaded by the user are tracked, and files that are added (via loading a
// .sia file) are not.
type trackedFile struct {
// location of original file on disk
RepairPath string
}
// A Renter is responsible for tracking all of the files that a user has
// uploaded to Sia, as well as the locations and health of these files.
//
// TODO: Separate the workerPool to have its own mutex. The workerPool doesn't
// interfere with any of the other fields in the renter, should be fine for it
// to have a separate mutex, that way operations on the worker pool don't block
// operations on other parts of the struct. If we're going to do it that way,
// might make sense to split the worker pool off into it's own struct entirely
// the same way that we split of the memoryManager entirely.
type Renter struct {
// File management.
//
// tracking contains a list of files that the user intends to maintain. By
// default, files loaded through sharing are not maintained by the user.
files map[string]*file
tracking map[string]trackedFile // Map from nickname to metadata.
// Download management. The heap has a separate mutex because it is always
// accessed in isolation.
downloadHeapMu sync.Mutex // Used to protect the downloadHeap.
downloadHeap *downloadChunkHeap // A heap of priority-sorted chunks to download.
newDownloads chan struct{} // Used to notify download loop that new downloads are available.
// Download history. The history list has its own mutex because it is always
// accessed in isolation.
//
// TODO: Currently the download history doesn't include repair-initiated
// downloads, and instead only contains user-initiated downlods.
downloadHistory []*download
downloadHistoryMu sync.Mutex
// Upload management.
uploadHeap uploadHeap
// List of workers that can be used for uploading and/or downloading.
memoryManager *memoryManager
workerPool map[types.FileContractID]*worker
// Cache the last price estimation result.
lastEstimation modules.RenterPriceEstimation
// Utilities.
cs modules.ConsensusSet
g modules.Gateway
hostContractor hostContractor
hostDB hostDB
log *persist.Logger
persistDir string
mu *siasync.RWMutex
tg threadgroup.ThreadGroup
tpool modules.TransactionPool
}
// Close closes the Renter and its dependencies
func (r *Renter) Close() error {
r.tg.Stop()
r.hostDB.Close()
return r.hostContractor.Close()
}
// PriceEstimation estimates the cost in siacoins of performing various storage
// and data operations.
//
// TODO: Make this function line up with the actual settings in the renter.
// Perhaps even make it so it uses the renter's actual contracts if it has any.
func (r *Renter) PriceEstimation() modules.RenterPriceEstimation {
id := r.mu.RLock()
lastEstimation := r.lastEstimation
r.mu.RUnlock(id)
if !reflect.DeepEqual(lastEstimation, modules.RenterPriceEstimation{}) {
return lastEstimation
}
// Grab hosts to perform the estimation.
hosts := r.hostDB.RandomHosts(priceEstimationScope, nil)
// Check if there are zero hosts, which means no estimation can be made.
if len(hosts) == 0 {
return modules.RenterPriceEstimation{}
}
// Add up the costs for each host.
var totalContractCost types.Currency
var totalDownloadCost types.Currency
var totalStorageCost types.Currency
var totalUploadCost types.Currency
for _, host := range hosts {
totalContractCost = totalContractCost.Add(host.ContractPrice)
totalDownloadCost = totalDownloadCost.Add(host.DownloadBandwidthPrice)
totalStorageCost = totalStorageCost.Add(host.StoragePrice)
totalUploadCost = totalUploadCost.Add(host.UploadBandwidthPrice)
}
// Convert values to being human-scale.
totalDownloadCost = totalDownloadCost.Mul(modules.BytesPerTerabyte)
totalStorageCost = totalStorageCost.Mul(modules.BlockBytesPerMonthTerabyte)
totalUploadCost = totalUploadCost.Mul(modules.BytesPerTerabyte)
// Factor in redundancy.
totalStorageCost = totalStorageCost.Mul64(3) // TODO: follow file settings?
totalUploadCost = totalUploadCost.Mul64(3) // TODO: follow file settings?
// Perform averages.
totalContractCost = totalContractCost.Div64(uint64(len(hosts)))
totalDownloadCost = totalDownloadCost.Div64(uint64(len(hosts)))
totalStorageCost = totalStorageCost.Div64(uint64(len(hosts)))
totalUploadCost = totalUploadCost.Div64(uint64(len(hosts)))
// Take the average of the host set to estimate the overall cost of the
// contract forming.
totalContractCost = totalContractCost.Mul64(uint64(priceEstimationScope))
// Add the cost of paying the transaction fees for the first contract.
_, feePerByte := r.tpool.FeeEstimation()
totalContractCost = totalContractCost.Add(feePerByte.Mul64(1000).Mul64(uint64(priceEstimationScope)))
est := modules.RenterPriceEstimation{
FormContracts: totalContractCost,
DownloadTerabyte: totalDownloadCost,
StorageTerabyteMonth: totalStorageCost,
UploadTerabyte: totalUploadCost,
}
id = r.mu.Lock()
r.lastEstimation = est
r.mu.Unlock(id)
return est
}
// SetSettings will update the settings for the renter.
func (r *Renter) SetSettings(s modules.RenterSettings) error {
// Set allowance.
err := r.hostContractor.SetAllowance(s.Allowance)
if err != nil {
return err
}
// Set bandwidth limits.
if s.MaxDownloadSpeed < 0 || s.MaxUploadSpeed < 0 {
return errors.New("download/upload rate limit can't be below 0")
}
// Set ratelimit
if s.MaxDownloadSpeed == 0 && s.MaxUploadSpeed == 0 {
ratelimit.SetLimits(0, 0, 0)
} else {
ratelimit.SetLimits(s.MaxDownloadSpeed, s.MaxUploadSpeed, 4*4096)
}
r.managedUpdateWorkerPool()
return nil
}
// ActiveHosts returns an array of hostDB's active hosts
func (r *Renter) ActiveHosts() []modules.HostDBEntry { return r.hostDB.ActiveHosts() }
// AllHosts returns an array of all hosts
func (r *Renter) AllHosts() []modules.HostDBEntry { return r.hostDB.AllHosts() }
// Host returns the host associated with the given public key
func (r *Renter) Host(spk types.SiaPublicKey) (modules.HostDBEntry, bool) { return r.hostDB.Host(spk) }
// ScoreBreakdown returns the score breakdown
func (r *Renter) ScoreBreakdown(e modules.HostDBEntry) modules.HostScoreBreakdown {
return r.hostDB.ScoreBreakdown(e)
}
// EstimateHostScore returns the estimated host score
func (r *Renter) EstimateHostScore(e modules.HostDBEntry) modules.HostScoreBreakdown {
return r.hostDB.EstimateHostScore(e)
}
// Contracts returns an array of host contractor's contracts
func (r *Renter) Contracts() []modules.RenterContract { return r.hostContractor.Contracts() }
// CurrentPeriod returns the host contractor's current period
func (r *Renter) CurrentPeriod() types.BlockHeight { return r.hostContractor.CurrentPeriod() }
// ContractUtility returns the utility field for a given contract, along
// with a bool indicating if it exists.
func (r *Renter) ContractUtility(id types.FileContractID) (modules.ContractUtility, bool) {
return r.hostContractor.ContractUtility(id)
}
// PeriodSpending returns the host contractor's period spending
func (r *Renter) PeriodSpending() modules.ContractorSpending { return r.hostContractor.PeriodSpending() }
// Settings returns the host contractor's allowance
func (r *Renter) Settings() modules.RenterSettings {
return modules.RenterSettings{
Allowance: r.hostContractor.Allowance(),
}
}
// ProcessConsensusChange returns the process consensus change
func (r *Renter) ProcessConsensusChange(cc modules.ConsensusChange) {
id := r.mu.Lock()
r.lastEstimation = modules.RenterPriceEstimation{}
r.mu.Unlock(id)
}
// validateSiapath checks that a Siapath is a legal filename.
// ../ is disallowed to prevent directory traversal, and paths must not begin
// with / or be empty.
func validateSiapath(siapath string) error {
if siapath == "" {
return ErrEmptyFilename
}
if siapath == ".." {
return errors.New("siapath cannot be ..")
}
if siapath == "." {
return errors.New("siapath cannot be .")
}
// check prefix
if strings.HasPrefix(siapath, "/") {
return errors.New("siapath cannot begin with /")
}
if strings.HasPrefix(siapath, "../") {
return errors.New("siapath cannot begin with ../")
}
if strings.HasPrefix(siapath, "./") {
return errors.New("siapath connot begin with ./")
}
for _, pathElem := range strings.Split(siapath, "/") {
if pathElem == "." || pathElem == ".." {
return errors.New("siapath cannot contain . or .. elements")
}
}
return nil
}
// Enforce that Renter satisfies the modules.Renter interface.
var _ modules.Renter = (*Renter)(nil)
// newRenter initializes a renter and returns it.
func newRenter(g modules.Gateway, cs modules.ConsensusSet, tpool modules.TransactionPool, hdb hostDB, hc hostContractor, persistDir string) (*Renter, error) {
if g == nil {
return nil, errNilGateway
}
if cs == nil {
return nil, errNilCS
}
if tpool == nil {
return nil, errNilTpool
}
if hc == nil {
return nil, errNilContractor
}
if hdb == nil && build.Release != "testing" {
return nil, errNilHdb
}
r := &Renter{
files: make(map[string]*file),
tracking: make(map[string]trackedFile),
// Making newDownloads a buffered channel means that most of the time, a
// new download will trigger an unnecessary extra iteration of the
// download heap loop, searching for a chunk that's not there. This is
// preferable to the alternative, where in rare cases the download heap
// will miss work altogether.
newDownloads: make(chan struct{}, 1),
downloadHeap: new(downloadChunkHeap),
uploadHeap: uploadHeap{
activeChunks: make(map[uploadChunkID]struct{}),
newUploads: make(chan struct{}, 1),
},
workerPool: make(map[types.FileContractID]*worker),
cs: cs,
g: g,
hostDB: hdb,
hostContractor: hc,
persistDir: persistDir,
mu: siasync.New(modules.SafeMutexDelay, 1),
tpool: tpool,
}
r.memoryManager = newMemoryManager(defaultMemory, r.tg.StopChan())
// Load all saved data.
if err := r.initPersist(); err != nil {
return nil, err
}
// Subscribe to the consensus set.
err := cs.ConsensusSetSubscribe(r, modules.ConsensusChangeRecent, r.tg.StopChan())
if err != nil {
return nil, err
}
// Spin up the workers for the work pool.
r.managedUpdateWorkerPool()
go r.threadedDownloadLoop()
go r.threadedUploadLoop()
// Kill workers on shutdown.
r.tg.OnStop(func() error {
id := r.mu.RLock()
for _, worker := range r.workerPool {
close(worker.killChan)
}
r.mu.RUnlock(id)
return nil
})
return r, nil
}
// New returns an initialized renter.
func New(g modules.Gateway, cs modules.ConsensusSet, wallet modules.Wallet, tpool modules.TransactionPool, persistDir string) (*Renter, error) {
hdb, err := hostdb.New(g, cs, persistDir)
if err != nil {
return nil, err
}
hc, err := contractor.New(cs, wallet, tpool, hdb, persistDir)
if err != nil {
return nil, err
}
return newRenter(g, cs, tpool, hdb, hc, persistDir)
}