-
Notifications
You must be signed in to change notification settings - Fork 0
/
plotgroup.go
161 lines (133 loc) · 3.93 KB
/
plotgroup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Copyright © 2024 Ken Robertson <ken@invalidlogic.com>
package main
import (
"cmp"
"log"
"os"
"path/filepath"
"slices"
"sync"
"sync/atomic"
"github.com/dustin/go-humanize"
)
type plotGroup struct {
name string
concurrency int64
transfers atomic.Int64
sortedPlots []*plotPath
sortMutex sync.RWMutex
}
func newPlotGroup(cfg *configGroup, allowExcessConcurrency bool) (*plotGroup, error) {
pg := &plotGroup{
name: cfg.name,
concurrency: cfg.Concurrency,
sortedPlots: make([]*plotPath, 0),
}
// validate the plots exist and add them in
for _, p := range cfg.Paths {
p, err := filepath.Abs(p)
if err != nil {
log.Printf("Path %s failed expansion, skipping: %v", p, err)
continue
}
matches, err := filepath.Glob(p)
if err != nil {
log.Printf("Path %s failed globbing, skipping: %v", p, err)
continue
}
for _, m := range matches {
fi, err := os.Stat(m)
if err != nil {
log.Printf("Path %s failed validation, skipping: %v", m, err)
continue
}
if !fi.IsDir() {
log.Printf("Path %s is not a directory, skipping", m)
continue
}
// FIXME: add checking skip file
pp := &plotPath{path: m}
pp.updateFreeSpace()
pg.sortedPlots = append(pg.sortedPlots, pp)
log.Printf("Registred plot path: %s [%s free / %s total]",
m, humanize.IBytes(pp.freeSpace), humanize.IBytes(pp.totalSpace))
}
}
// ensure concurrency doesn't exceed paths
if !allowExcessConcurrency && pg.concurrency > int64(len(pg.sortedPlots)) {
pg.concurrency = int64(len(pg.sortedPlots))
}
// sort the paths
pg.sortPaths()
log.Printf("Plot Group %q ready with concurrency %d.", pg.name, pg.concurrency)
return pg, nil
}
// sortPaths will update the order of the plotPaths inside the sink's
// sortedPaths slice. This should be done after every file transfer when the
// free space is updated.
func (pg *plotGroup) sortPaths() {
pg.sortMutex.Lock()
defer pg.sortMutex.Unlock()
slices.SortStableFunc(pg.sortedPlots, func(a, b *plotPath) int {
return cmp.Compare(b.freeSpace, a.freeSpace)
})
}
// sortCachePaths will update the order of the plotPaths inside the group's
// sortedPaths slice according to the number of transfers. This is used with
// cache plotPaths rather than final destination ones.
func (pg *plotGroup) sortCachePaths() {
pg.sortMutex.Lock()
defer pg.sortMutex.Unlock()
slices.SortStableFunc(pg.sortedPlots, func(a, b *plotPath) int {
return cmp.Compare(a.transfers.Load(), b.transfers.Load())
})
}
// pickPlot will return which plot path would be most ideal for the current
// request. It will order the one with the most free space that doesn't already
// have an active transfer.
func (pg *plotGroup) pickPlot(size uint64) *plotPath {
pg.sortMutex.RLock()
defer pg.sortMutex.RUnlock()
if pg.transfers.Load() >= pg.concurrency {
return nil
}
for _, v := range pg.sortedPlots {
if v.busy.Load() {
continue
}
if v.paused.Load() {
continue
}
// this is sorted by free space, if this one doesn't have enough space,
// no point to continue.
if size > v.freeSpace {
return nil
}
return v
}
return nil
}
// sortGroups will update the order of the plotGroups inside the sink's
// sortedGrups slice. This should be done after every file transfer when the
// number of transfers is updated.
func (s *sink) sortGroups() {
s.sortMutex.Lock()
defer s.sortMutex.Unlock()
slices.SortStableFunc(s.sortedGroups, func(a, b *plotGroup) int {
return cmp.Compare(a.transfers.Load(), b.transfers.Load())
})
}
// pickPlot will return which plot path would be most ideal for the current
// request. It will loop over the available groups, sorted by the number of
// transfers they already have, and return an available plotPath to use.
func (s *sink) pickPlot(size uint64) (*plotGroup, *plotPath) {
s.sortMutex.RLock()
defer s.sortMutex.RUnlock()
for _, pg := range s.sortedGroups {
pp := pg.pickPlot(size)
if pp != nil {
return pg, pp
}
}
return nil, nil
}