-
Notifications
You must be signed in to change notification settings - Fork 623
/
sync.go
169 lines (149 loc) 路 3.68 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package server
import (
"context"
"fmt"
"io"
"sync"
"github.com/wandb/wandb/core/pkg/observability"
"github.com/wandb/wandb/core/pkg/service"
)
type SyncService struct {
ctx context.Context
wg sync.WaitGroup
logger *observability.CoreLogger
senderFunc func(*service.Record)
inChan chan *service.Record
// Result of offline sync to pass to the client when syncing is done
flushCallback func(error)
exitSeen bool
syncErr error
overwrite *service.SyncOverwrite
skip *service.SyncSkip
}
type SyncServiceOption func(*SyncService)
func NewSyncService(ctx context.Context, opts ...SyncServiceOption) *SyncService {
syncService := &SyncService{
ctx: ctx,
wg: sync.WaitGroup{},
inChan: make(chan *service.Record),
}
for _, opt := range opts {
opt(syncService)
}
return syncService
}
func WithSyncServiceOverwrite(overwrite *service.SyncOverwrite) SyncServiceOption {
return func(s *SyncService) {
s.overwrite = overwrite
}
}
func WithSyncServiceSkip(skip *service.SyncSkip) SyncServiceOption {
return func(s *SyncService) {
s.skip = skip
}
}
func WithSyncServiceLogger(logger *observability.CoreLogger) SyncServiceOption {
return func(s *SyncService) {
s.logger = logger
}
}
func WithSyncServiceSenderFunc(senderFunc func(*service.Record)) SyncServiceOption {
return func(s *SyncService) {
s.senderFunc = senderFunc
}
}
func WithSyncServiceFlushCallback(syncResultCallback func(error)) SyncServiceOption {
return func(s *SyncService) {
s.flushCallback = syncResultCallback
}
}
func (s *SyncService) SyncRecord(record *service.Record, err error) {
if err != nil && err != io.EOF {
s.syncErr = err
}
if err != nil && !s.exitSeen {
record = &service.Record{
RecordType: &service.Record_Exit{
Exit: &service.RunExitRecord{
ExitCode: 1,
},
},
}
s.inChan <- record
} else if record != nil {
s.inChan <- record
}
}
func (s *SyncService) Start() {
s.wg.Add(1)
go s.sync()
}
func (s *SyncService) Close() {
close(s.inChan)
s.wg.Wait()
}
func (s *SyncService) sync() {
for record := range s.inChan {
// TODO: we remove the control from the record because we don't want to try to
// respond to a non-existing connection when syncing an offline run. if this is
// is used for something else, we should re-evaluate this.
// remove the control from the record:
record.Control = nil
switch record.RecordType.(type) {
case *service.Record_Run:
s.syncRun(record)
case *service.Record_OutputRaw:
s.syncOutputRaw(record)
case *service.Record_Exit:
s.syncExit(record)
default:
s.senderFunc(record)
}
}
s.wg.Done()
}
func (s *SyncService) syncRun(record *service.Record) {
if s.overwrite != nil {
if s.overwrite.GetEntity() != "" {
record.GetRun().Entity = s.overwrite.GetEntity()
}
if s.overwrite.GetProject() != "" {
record.GetRun().Project = s.overwrite.GetProject()
}
if s.overwrite.GetRunId() != "" {
record.GetRun().RunId = s.overwrite.GetRunId()
}
}
s.senderFunc(record)
record = &service.Record{
RecordType: &service.Record_Request{
Request: &service.Request{
RequestType: &service.Request_RunStart{
RunStart: &service.RunStartRequest{},
},
},
},
}
s.senderFunc(record)
}
func (s *SyncService) syncExit(record *service.Record) {
s.exitSeen = true
s.senderFunc(record)
}
func (s *SyncService) syncOutputRaw(record *service.Record) {
if s.skip != nil && s.skip.GetOutputRaw() {
return
}
s.senderFunc(record)
}
func (s *SyncService) Flush() {
if s == nil {
return
}
s.Close()
if s.flushCallback == nil {
s.logger.CaptureError("Flush without callback", fmt.Errorf("flushing sync service"))
return
}
s.flushCallback(s.syncErr)
}