-
Notifications
You must be signed in to change notification settings - Fork 33
/
adapter.go
181 lines (152 loc) · 4.54 KB
/
adapter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package output
import (
"context"
"sync"
"time"
"go.uber.org/atomic"
"github.com/neilotoole/sq/libsq/core/errz"
"github.com/neilotoole/sq/libsq/core/sqlz"
)
// RecordWriterAdapter implements libsq.RecordWriter and
// wraps an output.RecordWriter instance, providing a
// bridge between the asynchronous libsq.RecordWriter and
// synchronous output.RecordWriter interfaces.
//
// Note that a writer implementation such as the JSON or
// CSV writer could directly implement libsq.RecordWriter.
// But that interface is non-trivial to implement, hence
// this bridge type.
//
// The FlushAfterN and FlushAfterDuration fields control
// flushing of the writer.
type RecordWriterAdapter struct {
rw RecordWriter
wg *sync.WaitGroup
recCh chan sqlz.Record
errCh chan error
errs []error
written *atomic.Int64
cancelFn context.CancelFunc
// FlushAfterN indicates that the writer's Flush method
// should be invoked after N invocations of WriteRecords.
// A value of 0 will flush every time a record is written.
// Set to -1 to disable.
FlushAfterN int64
// FlushAfterDuration controls whether the writer's Flush method
// is invoked periodically. A duration <= 0 disables periodic flushing.
FlushAfterDuration time.Duration
}
// adapterRecChSize is the size of the record chan (effectively
// the buffer) used by RecordWriterAdapter.
// Possibly this value should be user-configurable.
const adapterRecChSize = 1000
// NewRecordWriterAdapter returns a new RecordWriterAdapter.
func NewRecordWriterAdapter(rw RecordWriter) *RecordWriterAdapter {
recCh := make(chan sqlz.Record, adapterRecChSize)
return &RecordWriterAdapter{rw: rw, recCh: recCh, wg: &sync.WaitGroup{}, written: atomic.NewInt64(0)}
}
// Open implements libsq.RecordWriter.
func (w *RecordWriterAdapter) Open(ctx context.Context, cancelFn context.CancelFunc,
recMeta sqlz.RecordMeta,
) (chan<- sqlz.Record, <-chan error, error) {
w.cancelFn = cancelFn
err := w.rw.Open(recMeta)
if err != nil {
return nil, nil, err
}
// errCh has size 2 because that's the maximum number of
// errs that could be sent. Typically only one err is sent,
// but in the case of ctx.Done, we send ctx.Err, followed
// by any error returned by r.rw.Close.
w.errCh = make(chan error, 2)
w.wg.Add(1)
go func() {
defer func() {
w.wg.Done()
close(w.errCh)
}()
var lastFlushN, recN int64
var flushTimer *time.Timer
var flushCh <-chan time.Time
if w.FlushAfterDuration > 0 {
flushTimer = time.NewTimer(w.FlushAfterDuration)
flushCh = flushTimer.C
defer flushTimer.Stop()
}
for {
select {
case <-ctx.Done():
w.addErrs(ctx.Err(), w.rw.Close())
return
case <-flushCh:
// The flushTimer has expired, time to flush.
err = w.rw.Flush()
if err != nil {
w.addErrs(err)
return
}
lastFlushN = recN
flushTimer.Reset(w.FlushAfterDuration)
continue
case rec := <-w.recCh:
if rec == nil { // no more results on recCh, it has been closed
err = w.rw.Close()
if err != nil {
w.addErrs()
}
return
}
// rec is not nil, therefore we write it out.
// We could accumulate a bunch of recs into a slice here,
// but we'll worry about that if benchmarking shows it'll matter.
writeErr := w.rw.WriteRecords([]sqlz.Record{rec})
if writeErr != nil {
w.addErrs(writeErr)
return
}
recN = w.written.Inc()
// Check if we should flush
if w.FlushAfterN >= 0 && (recN-lastFlushN >= w.FlushAfterN) {
err = w.rw.Flush()
if err != nil {
w.addErrs(err)
return
}
lastFlushN = recN
if flushTimer != nil {
// Reset the timer, but we need to stop and drain it first.
// See the timer.Reset docs.
if !flushTimer.Stop() {
<-flushTimer.C
}
flushTimer.Reset(w.FlushAfterDuration)
}
}
// If we got this far, we successfully wrote rec to rw.
// Therefore continue to wait/select for the next
// element on recCh (or for recCh to close)
// or for ctx.Done indicating timeout or cancel etc.
continue
}
}
}()
return w.recCh, w.errCh, nil
}
// Wait implements libsq.RecordWriter.
func (w *RecordWriterAdapter) Wait() (written int64, err error) {
w.wg.Wait()
if w.cancelFn != nil {
w.cancelFn()
}
return w.written.Load(), errz.Combine(w.errs...)
}
// addErrs handles any non-nil err in errs by appending it to w.errs
// and sending it on w.errCh.
func (w *RecordWriterAdapter) addErrs(errs ...error) {
for _, err := range errs {
if err != nil {
w.errs = append(w.errs, err)
w.errCh <- err
}
}
}