-
Notifications
You must be signed in to change notification settings - Fork 623
/
file_transfer_stats.go
130 lines (101 loc) 路 3.06 KB
/
file_transfer_stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package filetransfer
import (
"sync"
"sync/atomic"
"github.com/wandb/wandb/core/pkg/service"
)
// FileTransferStats reports file upload/download progress and totals.
type FileTransferStats interface {
// GetFilesStats returns byte counts for uploads.
GetFilesStats() *service.FilePusherStats
// GetFileCounts returns a breakdown of the kinds of files uploaded.
GetFileCounts() *service.FileCounts
// IsDone returns whether all uploads finished.
IsDone() bool
// SetDone marks all uploads as finished.
SetDone()
// UpdateUploadStats updates the upload stats for a file.
UpdateUploadStats(newInfo FileUploadInfo)
}
type fileTransferStats struct {
sync.Mutex
done *atomic.Bool
statsByPath map[string]FileUploadInfo
uploadedBytes *atomic.Int64
totalBytes *atomic.Int64
dedupedBytes *atomic.Int64
wandbCount *atomic.Int32
mediaCount *atomic.Int32
artifactCount *atomic.Int32
otherCount *atomic.Int32
}
func NewFileTransferStats() FileTransferStats {
return &fileTransferStats{
done: &atomic.Bool{},
statsByPath: make(map[string]FileUploadInfo),
uploadedBytes: &atomic.Int64{},
totalBytes: &atomic.Int64{},
dedupedBytes: &atomic.Int64{},
wandbCount: &atomic.Int32{},
mediaCount: &atomic.Int32{},
artifactCount: &atomic.Int32{},
otherCount: &atomic.Int32{},
}
}
func (fts *fileTransferStats) GetFilesStats() *service.FilePusherStats {
// NOTE: We don't lock, so these could be out of sync. For instance,
// TotalBytes could be less than UploadedBytes!
return &service.FilePusherStats{
UploadedBytes: fts.uploadedBytes.Load(),
TotalBytes: fts.totalBytes.Load(),
DedupedBytes: fts.dedupedBytes.Load(),
}
}
func (fts *fileTransferStats) GetFileCounts() *service.FileCounts {
return &service.FileCounts{
WandbCount: fts.wandbCount.Load(),
MediaCount: fts.mediaCount.Load(),
ArtifactCount: fts.artifactCount.Load(),
OtherCount: fts.otherCount.Load(),
}
}
func (fts *fileTransferStats) IsDone() bool {
return fts.done.Load()
}
func (fts *fileTransferStats) SetDone() {
fts.done.Store(true)
}
// FileUploadInfo is information about an in-progress file upload.
type FileUploadInfo struct {
// The local path to the file being uploaded.
Path string
// The kind of file this is.
FileKind RunFileKind
// The number of bytes uploaded so far.
UploadedBytes int64
// The total number of bytes being uploaded.
TotalBytes int64
}
func (fts *fileTransferStats) UpdateUploadStats(newInfo FileUploadInfo) {
fts.Lock()
defer fts.Unlock()
if oldInfo, ok := fts.statsByPath[newInfo.Path]; ok {
fts.addStats(oldInfo, -1)
}
fts.statsByPath[newInfo.Path] = newInfo
fts.addStats(newInfo, 1)
}
func (fts *fileTransferStats) addStats(info FileUploadInfo, mult int64) {
fts.uploadedBytes.Add(info.UploadedBytes * mult)
fts.totalBytes.Add(info.TotalBytes * mult)
switch info.FileKind {
default:
fts.otherCount.Add(int32(mult))
case RunFileKindWandb:
fts.wandbCount.Add(int32(mult))
case RunFileKindArtifact:
fts.artifactCount.Add(int32(mult))
case RunFileKindMedia:
fts.mediaCount.Add(int32(mult))
}
}