-
Notifications
You must be signed in to change notification settings - Fork 153
/
result.go
116 lines (100 loc) · 1.99 KB
/
result.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package execute
import (
"sync"
"github.com/influxdata/flux"
)
// result implements both the Transformation and Result interfaces,
// mapping the pushed based Transformation API to the pull based Result interface.
type result struct {
name string
mu sync.Mutex
tables chan resultMessage
abortErr chan error
aborted chan struct{}
}
type resultMessage struct {
table flux.Table
err error
}
func newResult(name string) *result {
return &result{
name: name,
// TODO(nathanielc): Currently this buffer needs to be big enough hold all result tables :(
tables: make(chan resultMessage, 1000),
abortErr: make(chan error, 1),
aborted: make(chan struct{}),
}
}
func (s *result) Name() string {
return s.name
}
func (s *result) RetractTable(DatasetID, flux.GroupKey) error {
//TODO implement
return nil
}
func (s *result) Process(id DatasetID, tbl flux.Table) error {
select {
case s.tables <- resultMessage{
table: tbl,
}:
case <-s.aborted:
}
return nil
}
func (s *result) Tables() flux.TableIterator {
return s
}
func (s *result) Do(f func(flux.Table) error) error {
for {
select {
case err := <-s.abortErr:
return err
case msg, more := <-s.tables:
if !more {
return nil
}
if msg.err != nil {
return msg.err
}
if err := f(msg.table); err != nil {
return err
}
}
}
}
func (s *result) UpdateWatermark(id DatasetID, mark Time) error {
//Nothing to do
return nil
}
func (s *result) UpdateProcessingTime(id DatasetID, t Time) error {
//Nothing to do
return nil
}
func (s *result) Finish(id DatasetID, err error) {
if err != nil {
select {
case s.tables <- resultMessage{
err: err,
}:
case <-s.aborted:
}
}
close(s.tables)
}
// Abort the result with the given error
func (s *result) abort(err error) {
s.mu.Lock()
defer s.mu.Unlock()
// Check if we have already aborted
aborted := false
select {
case <-s.aborted:
aborted = true
default:
}
if aborted {
return // already aborted
}
s.abortErr <- err
close(s.aborted)
}