forked from apache/arrow
/
table.rb
597 lines (571 loc) · 19.8 KB
/
table.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
require "arrow/raw-table-converter"
module Arrow
class Table
include ColumnContainable
include GenericFilterable
include GenericTakeable
include RecordContainable
class << self
def load(path, options={})
TableLoader.load(path, options)
end
end
alias_method :initialize_raw, :initialize
private :initialize_raw
# Creates a new {Arrow::Table}.
#
# @overload initialize(columns)
#
# @param columns [::Array<Arrow::Column>] The columns of the table.
#
# @example Create a table from columns
# count_field = Arrow::Field.new("count", :uint32)
# count_array = Arrow::UInt32Array.new([0, 2, nil, 4])
# count_column = Arrow::Column.new(count_field, count_array)
# visible_field = Arrow::Field.new("visible", :boolean)
# visible_array = Arrow::BooleanArray.new([true, nil, nil, false])
# visible_column = Arrow::Column.new(visible_field, visible_array)
# Arrow::Table.new([count_column, visible_column])
#
# @overload initialize(raw_table)
#
# @param raw_table [Hash<String, Arrow::Array>]
# The pairs of column name and values of the table. Column values is
# `Arrow::Array`.
#
# @example Create a table from column name and values
# Arrow::Table.new("count" => Arrow::UInt32Array.new([0, 2, nil, 4]),
# "visible" => Arrow::BooleanArray.new([true, nil, nil, false]))
#
# @overload initialize(raw_table)
#
# @param raw_table [Hash<String, Arrow::ChunkedArray>]
# The pairs of column name and values of the table. Column values is
# `Arrow::ChunkedArray`.
#
# @example Create a table from column name and values
# count_chunks = [
# Arrow::UInt32Array.new([0, 2]),
# Arrow::UInt32Array.new([nil, 4]),
# ]
# visible_chunks = [
# Arrow::BooleanArray.new([true]),
# Arrow::BooleanArray.new([nil, nil, false]),
# ]
# Arrow::Table.new("count" => Arrow::ChunkedArray.new(count_chunks),
# "visible" => Arrow::ChunkedArray.new(visible_chunks))
#
# @overload initialize(raw_table)
#
# @param raw_table [Hash<String, ::Array>]
# The pairs of column name and values of the table. Column values is
# `Array`.
#
# @example Create a table from column name and values
# Arrow::Table.new("count" => [0, 2, nil, 4],
# "visible" => [true, nil, nil, false])
#
# @overload initialize(schema, columns)
#
# @param schema [Arrow::Schema] The schema of the table.
# You can also specify schema as primitive Ruby objects.
# See {Arrow::Schema#initialize} for details.
#
# @param columns [::Array<Arrow::Column>] The data of the table.
#
# @example Create a table from schema and columns
# count_field = Arrow::Field.new("count", :uint32)
# count_array = Arrow::UInt32Array.new([0, 2, nil, 4])
# count_column = Arrow::Column.new(count_field, count_array)
# visible_field = Arrow::Field.new("visible", :boolean)
# visible_array = Arrow::BooleanArray.new([true, nil, nil, false])
# visible_column = Arrow::Column.new(visible_field, visible_array)
# Arrow::Table.new(Arrow::Schema.new([count_field, visible_field]),
# [count_column, visible_column])
#
# @overload initialize(schema, arrays)
#
# @param schema [Arrow::Schema] The schema of the table.
# You can also specify schema as primitive Ruby objects.
# See {Arrow::Schema#initialize} for details.
#
# @param arrays [::Array<Arrow::Array>] The data of the table.
#
# @example Create a table from schema and arrays
# count_field = Arrow::Field.new("count", :uint32)
# count_array = Arrow::UInt32Array.new([0, 2, nil, 4])
# visible_field = Arrow::Field.new("visible", :boolean)
# visible_array = Arrow::BooleanArray.new([true, nil, nil, false])
# Arrow::Table.new(Arrow::Schema.new([count_field, visible_field]),
# [count_array, visible_array])
#
# @overload initialize(schema, record_batches)
#
# @param schema [Arrow::Schema] The schema of the table.
# You can also specify schema as primitive Ruby objects.
# See {Arrow::Schema#initialize} for details.
#
# @param arrays [::Array<Arrow::RecordBatch>] The data of the table.
#
# @example Create a table from schema and record batches
# count_field = Arrow::Field.new("count", :uint32)
# visible_field = Arrow::Field.new("visible", :boolean)
# schema = Arrow::Schema.new([count_field, visible_field])
# record_batches = [
# Arrow::RecordBatch.new(schema, [[0, true], [2, nil], [nil, nil]]),
# Arrow::RecordBatch.new(schema, [[4, false]]),
# ]
# Arrow::Table.new(schema, record_batches)
#
# @overload initialize(schema, raw_records)
#
# @param schema [Arrow::Schema] The schema of the table.
# You can also specify schema as primitive Ruby objects.
# See {Arrow::Schema#initialize} for details.
#
# @param arrays [::Array<::Array>] The data of the table as primitive
# Ruby objects.
#
# @example Create a table from schema and raw records
# schema = {
# count: :uint32,
# visible: :boolean,
# }
# raw_records = [
# [0, true],
# [2, nil],
# [nil, nil],
# [4, false],
# ]
# Arrow::Table.new(schema, raw_records)
def initialize(*args)
n_args = args.size
case n_args
when 1
raw_table_converter = RawTableConverter.new(args[0])
schema = raw_table_converter.schema
values = raw_table_converter.values
when 2
schema = args[0]
schema = Schema.new(schema) unless schema.is_a?(Schema)
values = args[1]
case values[0]
when ::Array
values = [RecordBatch.new(schema, values)]
when Column
values = values.collect(&:data)
end
else
message = "wrong number of arguments (given #{n_args}, expected 1..2)"
raise ArgumentError, message
end
initialize_raw(schema, values)
end
def each_record_batch
return to_enum(__method__) unless block_given?
reader = TableBatchReader.new(self)
while record_batch = reader.read_next
yield(record_batch)
end
end
alias_method :size, :n_rows
alias_method :length, :n_rows
alias_method :slice_raw, :slice
# @overload slice(offset, length)
#
# @param offset [Integer] The offset of sub Arrow::Table.
# @param length [Integer] The length of sub Arrow::Table.
# @return [Arrow::Table]
# The sub `Arrow::Table` that covers only from
# `offset` to `offset + length` range.
#
# @overload slice(index)
#
# @param index [Integer] The index in this table.
# @return [Arrow::Record]
# The `Arrow::Record` corresponding to index of
# the table.
#
# @overload slice(booleans)
#
# @param booleans [::Array<Boolean>]
# The values indicating the target rows.
# @return [Arrow::Table]
# The sub `Arrow::Table` that covers only rows of indices
# the values of `booleans` is true.
#
# @overload slice(boolean_array)
#
# @param boolean_array [::Array<Arrow::BooleanArray>]
# The values indicating the target rows.
# @return [Arrow::Table]
# The sub `Arrow::Table` that covers only rows of indices
# the values of `boolean_array` is true.
#
# @overload slice(range)
#
# @param range_included_end [Range] The range indicating the target rows.
# @return [Arrow::Table]
# The sub `Arrow::Table` that covers only rows of the range of indices.
#
# @overload slice(conditions)
#
# @param conditions [Hash] The conditions to select records.
# @return [Arrow::Table]
# The sub `Arrow::Table` that covers only rows matched by condition
#
# @overload slice
#
# @yield [slicer] Gives slicer that constructs condition to select records.
# @yieldparam slicer [Arrow::Slicer] The slicer that helps us to
# build condition.
# @yieldreturn [Arrow::Slicer::Condition, ::Array<Arrow::Slicer::Condition>]
# The condition to select records.
# @return [Arrow::Table]
# The sub `Arrow::Table` that covers only rows matched by condition
# specified by slicer.
def slice(*args)
slicers = []
if block_given?
unless args.empty?
raise ArgumentError, "must not specify both arguments and block"
end
block_slicer = yield(Slicer.new(self))
case block_slicer
when ::Array
slicers.concat(block_slicer)
else
slicers << block_slicer
end
else
expected_n_args = nil
case args.size
when 1
case args[0]
when Integer
index = args[0]
index += n_rows if index < 0
return nil if index < 0
return nil if index >= n_rows
return Record.new(self, index)
when Hash
condition_pairs = args[0]
slicer = Slicer.new(self)
conditions = []
condition_pairs.each do |key, value|
case value
when Range
# TODO: Optimize "begin <= key <= end" case by missing "between" kernel
# https://issues.apache.org/jira/browse/ARROW-9843
unless value.begin.nil?
conditions << (slicer[key] >= value.begin)
end
unless value.end.nil?
if value.exclude_end?
conditions << (slicer[key] < value.end)
else
conditions << (slicer[key] <= value.end)
end
end
else
conditions << (slicer[key] == value)
end
end
slicers << conditions.inject(:&)
else
slicers << args[0]
end
when 2
offset, length = args
slicers << (offset...(offset + length))
else
expected_n_args = "1..2"
end
if expected_n_args
message = "wrong number of arguments " +
"(given #{args.size}, expected #{expected_n_args})"
raise ArgumentError, message
end
end
filter_options = Arrow::FilterOptions.new
filter_options.null_selection_behavior = :emit_null
sliced_tables = []
slicers.each do |slicer|
slicer = slicer.evaluate if slicer.respond_to?(:evaluate)
case slicer
when Integer
slicer += n_rows if slicer < 0
sliced_tables << slice_by_range(slicer, n_rows - 1)
when Range
original_from = from = slicer.first
to = slicer.last
to -= 1 if slicer.exclude_end?
from += n_rows if from < 0
if from < 0 or from >= n_rows
message =
"offset is out of range (-#{n_rows + 1},#{n_rows}): " +
"#{original_from}"
raise ArgumentError, message
end
to += n_rows if to < 0
sliced_tables << slice_by_range(from, to)
when ::Array, BooleanArray, ChunkedArray
sliced_tables << filter(slicer, filter_options)
else
message = "slicer must be Integer, Range, (from, to), " +
"Arrow::ChunkedArray of Arrow::BooleanArray, " +
"Arrow::BooleanArray or Arrow::Slicer::Condition: #{slicer.inspect}"
raise ArgumentError, message
end
end
if sliced_tables.size > 1
sliced_tables[0].concatenate(sliced_tables[1..-1])
else
sliced_tables[0]
end
end
# TODO
#
# @return [Arrow::Table]
def merge(other)
added_columns = {}
removed_columns = {}
case other
when Hash
other.each do |name, value|
name = name.to_s
if value
added_columns[name] = ensure_raw_column(name, value)
else
removed_columns[name] = true
end
end
when Table
added_columns = {}
other.columns.each do |column|
name = column.name
added_columns[name] = ensure_raw_column(name, column)
end
else
message = "merge target must be Hash or Arrow::Table: " +
"<#{other.inspect}>: #{inspect}"
raise ArgumentError, message
end
new_columns = []
columns.each do |column|
column_name = column.name
new_column = added_columns.delete(column_name)
if new_column
new_columns << new_column
next
end
next if removed_columns.key?(column_name)
new_columns << ensure_raw_column(column_name, column)
end
added_columns.each do |name, new_column|
new_columns << new_column
end
new_fields = []
new_arrays = []
new_columns.each do |new_column|
new_fields << new_column[:field]
new_arrays << new_column[:data]
end
self.class.new(new_fields, new_arrays)
end
alias_method :remove_column_raw, :remove_column
def remove_column(name_or_index)
case name_or_index
when String, Symbol
name = name_or_index.to_s
index = columns.index {|column| column.name == name}
if index.nil?
message = "unknown column: #{name_or_index.inspect}: #{inspect}"
raise KeyError.new(message)
end
else
index = name_or_index
index += n_columns if index < 0
if index < 0 or index >= n_columns
message = "out of index (0..#{n_columns - 1}): " +
"#{name_or_index.inspect}: #{inspect}"
raise IndexError.new(message)
end
end
remove_column_raw(index)
end
# Experimental
def group(*keys)
Group.new(self, keys)
end
# Experimental
def window(size: nil)
RollingWindow.new(self, size)
end
def save(output, options={})
saver = TableSaver.new(self, output, options)
saver.save
end
def pack
packed_arrays = columns.collect do |column|
column.data.pack
end
self.class.new(schema, packed_arrays)
end
# @overload join(right, key, type: :inner, left_outputs: nil, right_outputs: nil)
# @!macro join_common_before
# @param right [Arrow::Table] The right table.
#
# Join columns with `right` on join key columns.
#
# @!macro join_common_after
# @param type [Arrow::JoinType] How to join.
# @param left_outputs [::Array<String, Symbol>] Output columns in
# `self`.
#
# If both of `left_outputs` and `right_outputs` aren't
# specified, all columns in `self` and `right` are
# outputted.
# @param right_outputs [::Array<String, Symbol>] Output columns in
# `right`.
#
# If both of `left_outputs` and `right_outputs` aren't
# specified, all columns in `self` and `right` are
# outputted.
# @return [Arrow::Table]
# The joined `Arrow::Table`.
#
# @macro join_common_before
# @param key [String, Symbol] A join key.
# @macro join_common_after
#
# @overload join(right, keys, type: :inner, left_outputs: nil, right_outputs: nil)
#
# @macro join_common_before
# @param keys [::Array<String, Symbol>] Join keys.
# @macro join_common_after
#
# @overload join(right, keys, type: :inner, left_outputs: nil, right_outputs: nil)
#
# @macro join_common_before
# @param keys [Hash] Specify join keys in `self` and `right` separately.
# @option keys [String, Symbol, ::Array<String, Symbol>] :left
# Join keys in `self`.
# @option keys [String, Symbol, ::Array<String, Symbol>] :right
# Join keys in `right`.
# @macro join_common_after
#
# @since 7.0.0
def join(right, keys, type: :inner, left_outputs: nil, right_outputs: nil)
plan = ExecutePlan.new
left_node = plan.build_source_node(self)
right_node = plan.build_source_node(right)
if keys.is_a?(Hash)
left_keys = keys[:left]
right_keys = keys[:right]
else
left_keys = keys
right_keys = keys
end
left_keys = Array(left_keys)
right_keys = Array(right_keys)
hash_join_node_options = HashJoinNodeOptions.new(type,
left_keys,
right_keys)
unless left_outputs.nil?
hash_join_node_options.left_outputs = left_outputs
end
unless right_outputs.nil?
hash_join_node_options.right_outputs = right_outputs
end
hash_join_node = plan.build_hash_join_node(left_node,
right_node,
hash_join_node_options)
sink_node_options = SinkNodeOptions.new
plan.build_sink_node(hash_join_node, sink_node_options)
plan.validate
plan.start
plan.wait
reader = sink_node_options.get_reader(hash_join_node.output_schema)
reader.read_all
end
alias_method :to_s_raw, :to_s
def to_s(options={})
format = options[:format]
case format
when :column
return to_s_raw
when :list
formatter_class = TableListFormatter
when :table, nil
formatter_class = TableTableFormatter
else
message = ":format must be :column, :list, :table or nil"
raise ArgumentError, "#{message}: <#{format.inspect}>"
end
formatter = formatter_class.new(self, options)
formatter.format
end
alias_method :inspect_raw, :inspect
def inspect
"#{super}\n#{to_s}"
end
def respond_to_missing?(name, include_private)
return true if find_column(name)
super
end
def method_missing(name, *args, &block)
if args.empty?
column = find_column(name)
return column if column
end
super
end
private
def slice_by_range(from, to)
slice_raw(from, to - from + 1)
end
def ensure_raw_column(name, data)
case data
when Array
{
field: Field.new(name, data.value_data_type),
data: ChunkedArray.new([data]),
}
when ChunkedArray
{
field: Field.new(name, data.value_data_type),
data: data,
}
when Column
column = data
data = column.data
data = ChunkedArray.new([data]) unless data.is_a?(ChunkedArray)
{
field: column.field,
data: data,
}
else
message = "column must be Arrow::Array or Arrow::Column: " +
"<#{name}>: <#{data.inspect}>: #{inspect}"
raise ArgumentError, message
end
end
end
end