/
status.go
133 lines (115 loc) · 3.03 KB
/
status.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package transform
import (
"context"
"sync"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/pps"
"github.com/pachyderm/pachyderm/v2/src/server/worker/common"
)
// Status is a struct representing the current status of the transform worker,
// its public interface only allows getting the status of a task and canceling
// the currently-processing datum.
type Status struct {
mutex sync.Mutex
jobID string
datumStatus *pps.DatumStatus
cancel func()
batchMutex sync.Mutex
nextChan chan error
setupChan chan []string
}
func convertInputs(inputs []*common.Input) []*pps.InputFile {
var result []*pps.InputFile
for _, input := range inputs {
result = append(result, &pps.InputFile{
Path: input.FileInfo.File.Path,
Hash: input.FileInfo.Hash,
})
}
return result
}
func (s *Status) withLock(cb func()) {
s.mutex.Lock()
defer s.mutex.Unlock()
cb()
}
func (s *Status) withJob(jobID string, cb func() error) error {
s.withLock(func() {
s.jobID = jobID
})
defer s.withLock(func() {
s.jobID = ""
})
return cb()
}
func (s *Status) withDatum(inputs []*common.Input, cancel func(), cb func() error) error {
s.withLock(func() {
status := &pps.DatumStatus{
Data: convertInputs(inputs),
Started: timestamppb.Now(),
}
s.datumStatus = status
s.cancel = cancel
})
defer s.withLock(func() {
s.datumStatus = nil
s.cancel = nil
})
return cb()
}
// GetStatus returns the current WorkerStatus for the transform worker
func (s *Status) GetStatus() (*pps.WorkerStatus, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
return &pps.WorkerStatus{
JobId: s.jobID,
DatumStatus: s.datumStatus,
}, nil
}
// Cancel cancels the currently running datum if it matches the specified job and inputs
func (s *Status) Cancel(jobID string, datumFilter []string) bool {
s.mutex.Lock()
defer s.mutex.Unlock()
if jobID == s.jobID && common.MatchDatum(datumFilter, s.datumStatus.Data) {
// Fields will be cleared as the worker stack unwinds
s.cancel()
return true
}
return false
}
func (s *Status) withDatumBatch(cb func(<-chan error, chan<- []string) error) error {
s.withBatchLock(func() {
if s.nextChan != nil {
panic("multiple goroutines attempting to set up a datum batch")
}
s.nextChan, s.setupChan = make(chan error), make(chan []string)
})
defer s.withBatchLock(func() {
s.nextChan, s.setupChan = nil, nil
})
return cb(s.nextChan, s.setupChan)
}
func (s *Status) withBatchLock(cb func()) {
s.batchMutex.Lock()
defer s.batchMutex.Unlock()
cb()
}
func (s *Status) NextDatum(ctx context.Context, err error) ([]string, error) {
s.batchMutex.Lock()
defer s.batchMutex.Unlock()
if s.nextChan == nil {
return nil, errors.New("datum batching not enabled")
}
select {
case s.nextChan <- err:
case <-ctx.Done():
return nil, errors.EnsureStack(context.Cause(ctx))
}
select {
case env := <-s.setupChan:
return env, nil
case <-ctx.Done():
return nil, errors.EnsureStack(context.Cause(ctx))
}
}