-
Notifications
You must be signed in to change notification settings - Fork 152
/
chunk.go
137 lines (117 loc) · 3.59 KB
/
chunk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package table
import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/array"
"github.com/influxdata/flux/arrow"
)
// Chunk is a horizontal partition of a Table. It is a subset of rows
// and contains a set of columns known as the group key.
// It may not contain all columns that have been seen associated with
// that group key so transformations should verify the existence of columns
// for each chunk independently.
type Chunk struct {
buf arrow.TableBuffer
}
// ChunkFromBuffer will create a Chunk from the TableBuffer.
//
// This function takes ownership of the arrow.TableBuffer
// and the Chunk goes out of scope at the same time
// as the arrow.TableBuffer unless Retain is called.
func ChunkFromBuffer(buf arrow.TableBuffer) Chunk {
return Chunk{buf: buf}
}
// ChunkFromReader will create a Chunk from the ColReader.
//
// This function borrows a reference to the data in the ColReader
// and will go out of scope at the same time as the ColReader
// unless Retain is called.
func ChunkFromReader(cr flux.ColReader) Chunk {
buf := arrow.TableBuffer{
GroupKey: cr.Key(),
Columns: cr.Cols(),
Values: make([]array.Array, len(cr.Cols())),
}
for j := range buf.Values {
buf.Values[j] = Values(cr, j)
}
return ChunkFromBuffer(buf)
}
// Key returns the columns which are common for each row this view.
func (v Chunk) Key() flux.GroupKey {
return v.buf.Key()
}
// Buffer returns the underlying TableBuffer used for this Chunk.
// This is exposed for use by another package, but this method
// should never be invoked in normal code.
func (v Chunk) Buffer() arrow.TableBuffer {
return v.buf
}
// NCols returns the number of columns in this Chunk.
func (v Chunk) NCols() int {
return len(v.buf.Columns)
}
// Len returns the number of rows.
func (v Chunk) Len() int {
return v.buf.Len()
}
// Cols returns the columns as a slice.
func (v Chunk) Cols() []flux.ColMeta {
return v.buf.Columns
}
// Col returns the metadata associated with the column.
func (v Chunk) Col(j int) flux.ColMeta {
return v.buf.Columns[j]
}
// Index returns the index of the column with the given name.
func (v Chunk) Index(label string) int {
for j, c := range v.buf.Columns {
if c.Label == label {
return j
}
}
return -1
}
// HasCol returns whether a column with the given name exists.
func (v Chunk) HasCol(label string) bool {
return v.Index(label) >= 0
}
// Values returns a reference to the array of values in this Chunk.
// The returned array is a borrowed reference and the caller can
// call Retain on the returned array to retain its own reference
// to the array.
func (v Chunk) Values(j int) array.Array {
return v.buf.Values[j]
}
// Bools is a convenience function for retrieving an array
// as a boolean array.
func (v Chunk) Bools(j int) *array.Boolean {
return v.Values(j).(*array.Boolean)
}
// Ints is a convenience function for retrieving an array
// as an int array.
func (v Chunk) Ints(j int) *array.Int {
return v.Values(j).(*array.Int)
}
// Uints is a convenience function for retrieving an array
// as a uint array.
func (v Chunk) Uints(j int) *array.Uint {
return v.Values(j).(*array.Uint)
}
// Floats is a convenience function for retrieving an array
// as a float array.
func (v Chunk) Floats(j int) *array.Float {
return v.Values(j).(*array.Float)
}
// Strings is a convenience function for retrieving an array
// as a string array.
func (v Chunk) Strings(j int) *array.String {
return v.Values(j).(*array.String)
}
// Retain will retain a reference to this Chunk.
func (v Chunk) Retain() {
v.buf.Retain()
}
// Release will release a reference to this buffer.
func (v Chunk) Release() {
v.buf.Release()
}