This repository has been archived by the owner on Oct 14, 2020. It is now read-only.
forked from cockroachdb/cockroach
/
base.go
285 lines (244 loc) · 7.9 KB
/
base.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Radu Berinde (radu@cockroachlabs.com)
package distsql
import (
"sync"
"sync/atomic"
"golang.org/x/net/context"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
basictracer "github.com/opentracing/basictracer-go"
opentracing "github.com/opentracing/opentracing-go"
)
type joinType int
const (
innerJoin joinType = iota
leftOuter
rightOuter
fullOuter
)
const rowChannelBufSize = 16
type columns []uint32
// RowReceiver is any component of a flow that receives rows from another
// component. It can be an input synchronizer, a router, or a mailbox.
type RowReceiver interface {
// PushRow sends a row to this receiver. May block.
// Returns true if the row was sent, or false if the receiver does not need
// any more rows. In all cases, Close() still needs to be called.
// The sender must not modify the row after calling this function.
PushRow(row sqlbase.EncDatumRow) bool
// Close is called when we have no more rows; it causes the RowReceiver to
// process all rows and clean up. If err is not null, the error is sent to
// the receiver (and the function may block).
Close(err error)
}
// RowSource is any component of a flow that produces rows that cam be consumed
// by another component.
type RowSource interface {
// NextRow retrieves the next row. Returns a nil row if there are no more
// rows. Depending on the implementation, it may block.
// The caller must not modify the received row.
NextRow() (sqlbase.EncDatumRow, error)
}
// processor is a common interface implemented by all processors, used by the
// higher-level flow orchestration code.
type processor interface {
// Run is the main loop of the processor.
// If wg is non-nil, wg.Done is called before exiting.
Run(wg *sync.WaitGroup)
}
// noopProcessor is a processor that simply passes rows through from the
// synchronizer to the router. It can be useful in the last stage of a
// computation, where we may only need the synchronizer to join streams.
type noopProcessor struct {
flowCtx *FlowCtx
input RowSource
output RowReceiver
}
var _ processor = &noopProcessor{}
func newNoopProcessor(flowCtx *FlowCtx, input RowSource, output RowReceiver) *noopProcessor {
return &noopProcessor{flowCtx: flowCtx, input: input, output: output}
}
// Run is part of the processor interface.
func (n *noopProcessor) Run(wg *sync.WaitGroup) {
if wg != nil {
defer wg.Done()
}
for {
row, err := n.input.NextRow()
if err != nil || row == nil {
n.output.Close(err)
return
}
if log.V(3) {
log.Infof(n.flowCtx.Context, "noop: pushing row %s\n", row)
}
if !n.output.PushRow(row) {
return
}
}
}
// StreamMsg is the message used in the channels that implement
// local physical streams.
type StreamMsg struct {
// Only one of these fields will be set.
Row sqlbase.EncDatumRow
Err error
}
// RowChannel is a thin layer over a StreamMsg channel, which can be used to
// transfer rows between goroutines.
type RowChannel struct {
// The channel on which rows are delivered.
C <-chan StreamMsg
// dataChan is the same channel as C.
dataChan chan StreamMsg
// noMoreRows is an atomic that signals we no longer accept rows via
// PushRow.
noMoreRows uint32
}
var _ RowReceiver = &RowChannel{}
var _ RowSource = &RowChannel{}
// InitWithBufSize initializes the RowChannel with a given buffer size.
func (rc *RowChannel) InitWithBufSize(chanBufSize int) {
rc.dataChan = make(chan StreamMsg, chanBufSize)
rc.C = rc.dataChan
atomic.StoreUint32(&rc.noMoreRows, 0)
}
// Init initializes the RowChannel with the default buffer size.
func (rc *RowChannel) Init() {
rc.InitWithBufSize(rowChannelBufSize)
}
// PushRow is part of the RowReceiver interface.
func (rc *RowChannel) PushRow(row sqlbase.EncDatumRow) bool {
if atomic.LoadUint32(&rc.noMoreRows) == 1 {
return false
}
rc.dataChan <- StreamMsg{Row: row, Err: nil}
return true
}
// Close is part of the RowReceiver interface.
func (rc *RowChannel) Close(err error) {
if err != nil {
rc.dataChan <- StreamMsg{Row: nil, Err: err}
}
close(rc.dataChan)
}
// NextRow is part of the RowSource interface.
func (rc *RowChannel) NextRow() (sqlbase.EncDatumRow, error) {
d, ok := <-rc.C
if !ok {
// No more rows.
return nil, nil
}
if d.Err != nil {
return nil, d.Err
}
return d.Row, nil
}
// NoMoreRows causes future PushRow calls to return false. The caller should
// still drain the channel to make sure the sender is not blocked.
func (rc *RowChannel) NoMoreRows() {
atomic.StoreUint32(&rc.noMoreRows, 1)
}
// MultiplexedRowChannel is a RowChannel wrapper which allows multiple row
// producers to push rows on the same channel.
type MultiplexedRowChannel struct {
rowChan RowChannel
// numSenders is an atomic counter that keeps track of how many senders have
// yet to call Close().
numSenders int32
firstErr error
}
var _ RowReceiver = &MultiplexedRowChannel{}
var _ RowSource = &MultiplexedRowChannel{}
// Init initializes the MultiplexedRowChannel with the default buffer size.
func (mrc *MultiplexedRowChannel) Init(numSenders int) {
mrc.rowChan.Init()
atomic.StoreInt32(&mrc.numSenders, int32(numSenders))
mrc.firstErr = nil
}
// PushRow is part of the RowReceiver interface.
func (mrc *MultiplexedRowChannel) PushRow(row sqlbase.EncDatumRow) bool {
return mrc.rowChan.PushRow(row)
}
// Close is part of the RowReceiver interface.
func (mrc *MultiplexedRowChannel) Close(err error) {
if err != nil {
mrc.firstErr = err
}
newVal := atomic.AddInt32(&mrc.numSenders, -1)
if newVal < 0 {
panic("too many Close() calls")
}
if newVal == 0 {
mrc.rowChan.Close(mrc.firstErr)
}
}
// NextRow is part of the RowSource interface.
func (mrc *MultiplexedRowChannel) NextRow() (sqlbase.EncDatumRow, error) {
return mrc.rowChan.NextRow()
}
// RowBuffer is an implementation of RowReceiver that buffers (accumulates)
// results in memory, as well as an implementation of RowSource that returns
// rows from a row buffer.
type RowBuffer struct {
rows sqlbase.EncDatumRows
err error
// closed is used when the RowBuffer is used as a RowReceiver; it is set to
// true when the sender calls Close.
closed bool
// done is used when the RowBuffer is used as a RowSource; it is set to true
// when the receiver read all the rows.
done bool
}
var _ RowReceiver = &RowBuffer{}
var _ RowSource = &RowBuffer{}
// PushRow is part of the RowReceiver interface.
func (rb *RowBuffer) PushRow(row sqlbase.EncDatumRow) bool {
rowCopy := append(sqlbase.EncDatumRow(nil), row...)
rb.rows = append(rb.rows, rowCopy)
return true
}
// Close is part of the RowReceiver interface.
func (rb *RowBuffer) Close(err error) {
rb.err = err
rb.closed = true
}
// NextRow is part of the RowSource interface.
func (rb *RowBuffer) NextRow() (sqlbase.EncDatumRow, error) {
if rb.err != nil {
return nil, rb.err
}
if len(rb.rows) == 0 {
rb.done = true
return nil, nil
}
row := rb.rows[0]
rb.rows = rb.rows[1:]
return row, nil
}
// SetFlowRequestTrace populates req.Trace with the context of the current Span
// in the context (if any).
func SetFlowRequestTrace(ctx context.Context, req *SetupFlowRequest) error {
sp := opentracing.SpanFromContext(ctx)
if sp == nil {
return nil
}
req.TraceContext = &tracing.SpanContextCarrier{}
tracer := sp.Tracer()
return tracer.Inject(sp.Context(), basictracer.Delegator, req.TraceContext)
}