-
Notifications
You must be signed in to change notification settings - Fork 152
/
copy.go
101 lines (88 loc) · 2.25 KB
/
copy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package table
import "github.com/influxdata/flux"
// Copy returns a buffered copy of the table and consumes the
// input table. If the input table is already buffered, it "consumes"
// the input and returns the same table.
//
// The buffered table can then be copied additional times using the
// BufferedTable.Copy method.
//
// This method should be used sparingly if at all. It will retain
// each of the buffers of data coming out of a table so the entire
// table is materialized in memory. For large datasets, this could
// potentially cause a problem. The allocator is meant to catch when
// this happens and prevent it.
func Copy(t flux.Table) (flux.BufferedTable, error) {
if tbl, ok := t.(flux.BufferedTable); ok {
return tbl, nil
}
tbl := tableBuffer{
key: t.Key(),
colMeta: t.Cols(),
}
if t.Empty() {
return &tbl, nil
}
if err := t.Do(func(cr flux.ColReader) error {
cr.Retain()
tbl.buffers = append(tbl.buffers, cr)
return nil
}); err != nil {
tbl.Done()
return nil, err
}
return &tbl, nil
}
// tableBuffer maintains a buffer of the data within a table.
// It is created by reading a table and using Retain to retain
// a reference to each ColReader that is returned.
//
// This implements the flux.BufferedTable interface.
type tableBuffer struct {
key flux.GroupKey
colMeta []flux.ColMeta
i int
buffers []flux.ColReader
}
func (tb *tableBuffer) Key() flux.GroupKey {
return tb.key
}
func (tb *tableBuffer) Cols() []flux.ColMeta {
return tb.colMeta
}
func (tb *tableBuffer) Do(f func(flux.ColReader) error) error {
defer tb.Done()
for ; tb.i < len(tb.buffers); tb.i++ {
b := tb.buffers[tb.i]
if err := f(b); err != nil {
return err
}
b.Release()
}
return nil
}
func (tb *tableBuffer) Done() {
for ; tb.i < len(tb.buffers); tb.i++ {
tb.buffers[tb.i].Release()
}
}
func (tb *tableBuffer) Empty() bool {
return len(tb.buffers) == 0
}
func (tb *tableBuffer) Buffer(i int) flux.ColReader {
return tb.buffers[i]
}
func (tb *tableBuffer) BufferN() int {
return len(tb.buffers)
}
func (tb *tableBuffer) Copy() flux.BufferedTable {
for i := tb.i; i < len(tb.buffers); i++ {
tb.buffers[i].Retain()
}
return &tableBuffer{
key: tb.key,
colMeta: tb.colMeta,
i: tb.i,
buffers: tb.buffers,
}
}