-
Notifications
You must be signed in to change notification settings - Fork 65
/
synchronize.go
76 lines (65 loc) 路 1.63 KB
/
synchronize.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package physicalplan
import (
"context"
"errors"
"sync"
"sync/atomic"
"github.com/apache/arrow/go/v16/arrow"
)
// Synchronizer is used to combine the results of multiple parallel streams
// into a single stream concurrent stream. It also forms a barrier on the
// finishers, by waiting to call next plan's finish until all previous parallel
// stages have finished.
type Synchronizer struct {
next PhysicalPlan
nextMtx sync.Mutex
running *atomic.Int64
open *atomic.Int64
}
func Synchronize(concurrency int) *Synchronizer {
running := &atomic.Int64{}
running.Add(int64(concurrency))
open := &atomic.Int64{}
open.Add(int64(concurrency))
return &Synchronizer{running: running, open: open}
}
func (m *Synchronizer) Callback(ctx context.Context, r arrow.Record) error {
// multiple threads can emit the results to the next step, but they will do
// it synchronously
m.nextMtx.Lock()
defer m.nextMtx.Unlock()
err := m.next.Callback(ctx, r)
if err != nil {
return err
}
return nil
}
func (m *Synchronizer) Finish(ctx context.Context) error {
running := m.running.Add(-1)
if running < 0 {
return errors.New("too many Synchronizer Finish calls")
}
if running > 0 {
return nil
}
return m.next.Finish(ctx)
}
func (m *Synchronizer) SetNext(next PhysicalPlan) {
m.next = next
}
func (m *Synchronizer) SetNextPlan(nextPlan PhysicalPlan) {
m.next = nextPlan
}
func (m *Synchronizer) Draw() *Diagram {
return &Diagram{Details: "Synchronizer", Child: m.next.Draw()}
}
func (m *Synchronizer) Close() {
open := m.open.Add(-1)
if open < 0 {
panic("too many Synchronizer Close calls")
}
if open > 0 {
return
}
m.next.Close()
}