diff --git a/README.md b/README.md index 2e44c2c..477ea38 100644 --- a/README.md +++ b/README.md @@ -1,43 +1,43 @@ -# arrow 12.0.1-1 +# arrow 13.0.0-1 - - mingw-w64-i686-arrow-12.0.1-1-any.pkg.tar.xz + - mingw-w64-i686-arrow-13.0.0-1-any.pkg.tar.xz - mingw-w64-i686-aws-sdk-cpp-1.7.365-1-any.pkg.tar.xz - mingw-w64-i686-brotli-1.0.9-4-any.pkg.tar.xz - - mingw-w64-i686-openssl-1.1.1.m-9800-any.pkg.tar.xz + - mingw-w64-i686-openssl-3.1.1-9800-any.pkg.tar.xz - mingw-w64-i686-lz4-1.8.2-1-any.pkg.tar.xz - mingw-w64-i686-re2-20200801-1-any.pkg.tar.xz - mingw-w64-i686-snappy-1.1.7-2-any.pkg.tar.xz - mingw-w64-i686-bzip2-1.0.8-1-any.pkg.tar.xz - - mingw-w64-i686-curl-7.84.0-9000-any.pkg.tar.xz - - mingw-w64-i686-libssh2-1.11.0-9800-any.pkg.tar.xz + - mingw-w64-i686-curl-8.1.2-9000-any.pkg.tar.xz + - mingw-w64-i686-libssh2-1.11.0-9801-any.pkg.tar.xz - mingw-w64-i686-thrift-0.13.0-1-any.pkg.tar.xz - mingw-w64-i686-zstd-1.4.4-1-any.pkg.tar.xz - mingw-w64-i686-libutf8proc-2.4.0-2-any.pkg.tar.xz - mingw-w64-i686-nghttp2-1.51.0-1-any.pkg.tar.xz - - mingw-w64-x86_64-arrow-12.0.1-1-any.pkg.tar.xz + - mingw-w64-x86_64-arrow-13.0.0-1-any.pkg.tar.xz - mingw-w64-x86_64-aws-sdk-cpp-1.7.365-1-any.pkg.tar.xz - mingw-w64-x86_64-brotli-1.0.9-4-any.pkg.tar.xz - - mingw-w64-x86_64-openssl-1.1.1.m-9800-any.pkg.tar.xz + - mingw-w64-x86_64-openssl-3.1.1-9800-any.pkg.tar.xz - mingw-w64-x86_64-lz4-1.8.2-1-any.pkg.tar.xz - mingw-w64-x86_64-re2-20200801-1-any.pkg.tar.xz - mingw-w64-x86_64-snappy-1.1.7-2-any.pkg.tar.xz - mingw-w64-x86_64-bzip2-1.0.8-1-any.pkg.tar.xz - - mingw-w64-x86_64-curl-7.84.0-9000-any.pkg.tar.xz - - mingw-w64-x86_64-libssh2-1.11.0-9800-any.pkg.tar.xz + - mingw-w64-x86_64-curl-8.1.2-9000-any.pkg.tar.xz + - mingw-w64-x86_64-libssh2-1.11.0-9801-any.pkg.tar.xz - mingw-w64-x86_64-thrift-0.13.0-1-any.pkg.tar.xz - mingw-w64-x86_64-zstd-1.4.4-1-any.pkg.tar.xz - mingw-w64-x86_64-libutf8proc-2.4.0-2-any.pkg.tar.xz - mingw-w64-x86_64-nghttp2-1.51.0-1-any.pkg.tar.xz - - mingw-w64-ucrt-x86_64-arrow-12.0.1-1-any.pkg.tar.xz + - mingw-w64-ucrt-x86_64-arrow-13.0.0-1-any.pkg.tar.xz - mingw-w64-ucrt-x86_64-aws-sdk-cpp-1.7.365-1-any.pkg.tar.xz - mingw-w64-ucrt-x86_64-brotli-1.0.9-4-any.pkg.tar.xz - - mingw-w64-ucrt-x86_64-openssl-1.1.1.m-9800-any.pkg.tar.xz + - mingw-w64-ucrt-x86_64-openssl-3.1.1-9800-any.pkg.tar.xz - mingw-w64-ucrt-x86_64-lz4-1.8.2-1-any.pkg.tar.xz - mingw-w64-ucrt-x86_64-re2-20200801-1-any.pkg.tar.xz - mingw-w64-ucrt-x86_64-snappy-1.1.7-2-any.pkg.tar.xz - mingw-w64-ucrt-x86_64-bzip2-1.0.8-1-any.pkg.tar.xz - - mingw-w64-ucrt-x86_64-curl-7.84.0-9000-any.pkg.tar.xz - - mingw-w64-ucrt-x86_64-libssh2-1.11.0-9800-any.pkg.tar.xz + - mingw-w64-ucrt-x86_64-curl-8.1.2-9000-any.pkg.tar.xz + - mingw-w64-ucrt-x86_64-libssh2-1.11.0-9801-any.pkg.tar.xz - mingw-w64-ucrt-x86_64-thrift-0.13.0-1-any.pkg.tar.xz - mingw-w64-ucrt-x86_64-zstd-1.4.4-1-any.pkg.tar.xz - mingw-w64-ucrt-x86_64-libutf8proc-2.4.0-2-any.pkg.tar.xz diff --git a/include/arrow/acero/api.h b/include/arrow/acero/api.h new file mode 100644 index 0000000..c9724fd --- /dev/null +++ b/include/arrow/acero/api.h @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// NOTE: API is EXPERIMENTAL and will change without going through a +// deprecation cycle + +#pragma once + +/// \defgroup acero-api Utilities for creating and executing execution plans +/// @{ +/// @} + +/// \defgroup acero-nodes Options classes for the various exec nodes +/// @{ +/// @} + +#include "arrow/acero/exec_plan.h" +#include "arrow/acero/options.h" diff --git a/include/arrow/acero/asof_join_node.h b/include/arrow/acero/asof_join_node.h index b2ad2ed..6a0ce8f 100644 --- a/include/arrow/acero/asof_join_node.h +++ b/include/arrow/acero/asof_join_node.h @@ -30,16 +30,11 @@ using AsofJoinKeys = AsofJoinNodeOptions::Keys; /// \brief Make the output schema of an as-of-join node /// -/// Optionally, also provides the field output indices for this node. -/// \see arrow::engine::RelationInfo -/// /// \param[in] input_schema the schema of each input to the node /// \param[in] input_keys the key of each input to the node -/// \param[out] field_output_indices the output index of each field ARROW_ACERO_EXPORT Result> MakeOutputSchema( const std::vector>& input_schema, - const std::vector& input_keys, - std::vector* field_output_indices = NULLPTR); + const std::vector& input_keys); } // namespace asofjoin } // namespace acero diff --git a/include/arrow/acero/exec_plan.h b/include/arrow/acero/exec_plan.h index cdaab96..04303aa 100644 --- a/include/arrow/acero/exec_plan.h +++ b/include/arrow/acero/exec_plan.h @@ -48,7 +48,7 @@ using compute::threaded_exec_context; namespace acero { -/// \addtogroup execnode-components +/// \addtogroup acero-internals /// @{ class ARROW_ACERO_EXPORT ExecPlan : public std::enable_shared_from_this { @@ -118,6 +118,10 @@ class ARROW_ACERO_EXPORT ExecPlan : public std::enable_shared_from_this; @@ -173,9 +177,9 @@ class ARROW_ACERO_EXPORT ExecNode { /// non-deterministic. For example, a hash-join has no predictable output order. /// /// If the ordering is Ordering::Implicit then there is a meaningful order but that - /// odering is not represented by any column in the data. The most common case for this - /// is when reading data from an in-memory table. The data has an implicit "row order" - /// which is not neccesarily represented in the data set. + /// ordering is not represented by any column in the data. The most common case for + /// this is when reading data from an in-memory table. The data has an implicit "row + /// order" which is not necessarily represented in the data set. /// /// A filter or project node will not modify the ordering. Nothing needs to be done /// other than ensure the index assigned to output batches is the same as the @@ -321,7 +325,7 @@ class ARROW_ACERO_EXPORT ExecNode { /// /// This is not a pause. There will be no way to start the source again after this has /// been called. - Status StopProducing(); + virtual Status StopProducing(); std::string ToString(int indent = 0) const; @@ -377,16 +381,36 @@ inline Result MakeExecNode( return factory(plan, std::move(inputs), options); } -/// \brief Helper class for declaring sets of ExecNodes efficiently +/// @} + +/// \addtogroup acero-api +/// @{ + +/// \brief Helper class for declaring execution nodes /// -/// A Declaration represents an unconstructed ExecNode (and potentially more since its -/// inputs may also be Declarations). The node can be constructed and added to a plan -/// with Declaration::AddToPlan, which will recursively construct any inputs as necessary. +/// A Declaration represents an unconstructed ExecNode (and potentially an entire graph +/// since its inputs may also be Declarations) +/// +/// A Declaration can be converted to a plan and executed using one of the +/// DeclarationToXyz methods. +/// +/// For more direct control, a Declaration can be added to an existing execution +/// plan with Declaration::AddToPlan, which will recursively construct any inputs as +/// necessary. struct ARROW_ACERO_EXPORT Declaration { using Input = std::variant; Declaration() {} + /// \brief construct a declaration + /// \param factory_name the name of the exec node to construct. The node must have + /// been added to the exec node registry with this name. + /// \param inputs the inputs to the node, these should be other declarations + /// \param options options that control the behavior of the node. You must use + /// the appropriate subclass. For example, if `factory_name` is + /// "project" then `options` should be ProjectNodeOptions. + /// \param label a label to give the node. Can be used to distinguish it from other + /// nodes of the same type in the plan. Declaration(std::string factory_name, std::vector inputs, std::shared_ptr options, std::string label) : factory_name{std::move(factory_name)}, @@ -447,15 +471,28 @@ struct ARROW_ACERO_EXPORT Declaration { /// }); static Declaration Sequence(std::vector decls); + /// \brief add the declaration to an already created execution plan + /// \param plan the plan to add the node to + /// \param registry the registry to use to lookup the node factory + /// + /// This method will recursively call AddToPlan on all of the declaration's inputs. + /// This method is only for advanced use when the DeclarationToXyz methods are not + /// sufficient. + /// + /// \return the instantiated execution node Result AddToPlan(ExecPlan* plan, ExecFactoryRegistry* registry = default_exec_factory_registry()) const; // Validate a declaration bool IsValid(ExecFactoryRegistry* registry = default_exec_factory_registry()) const; + /// \brief the name of the factory to use when creating a node std::string factory_name; + /// \brief the declarations's inputs std::vector inputs; + /// \brief options to control the behavior of the node std::shared_ptr options; + /// \brief a label to give the node in the plan std::string label; }; @@ -489,7 +526,7 @@ struct ARROW_ACERO_EXPORT QueryOptions { /// otherwise. /// /// If explicitly set to true then plan execution will fail if there is no - /// meaningful ordering. This can be useful to valdiate a query that should + /// meaningful ordering. This can be useful to validate a query that should /// be emitting ordered results. /// /// If explicitly set to false then batches will be emit immediately even if there @@ -513,6 +550,13 @@ struct ARROW_ACERO_EXPORT QueryOptions { /// the `use_threads` option. ::arrow::internal::Executor* custom_cpu_executor = NULLPTR; + /// \brief custom executor to use for IO work + /// + /// Must be null or remain valid for the duration of the plan. If this is null then + /// the global io thread pool will be chosen whose behavior will be controlled by + /// the "ARROW_IO_THREADS" environment. + ::arrow::internal::Executor* custom_io_executor = NULLPTR; + /// \brief a memory pool to use for allocations /// /// Must remain valid for the duration of the plan. @@ -707,6 +751,10 @@ DeclarationToBatchesAsync(Declaration declaration, ExecContext exec_context); /// fills up. /// /// If a custom exec context is provided then the value of `use_threads` will be ignored. +/// +/// The returned RecordBatchReader can be closed early to cancel the computation of record +/// batches. In this case, only errors encountered by the computation may be reported. In +/// particular, no cancellation error may be reported. ARROW_ACERO_EXPORT Result> DeclarationToReader( Declaration declaration, bool use_threads = true, MemoryPool* memory_pool = default_memory_pool(), @@ -746,6 +794,8 @@ ARROW_ACERO_EXPORT Future<> DeclarationToStatusAsync( ARROW_ACERO_EXPORT Future<> DeclarationToStatusAsync(Declaration declaration, ExecContext exec_context); +/// @} + /// \brief Wrap an ExecBatch generator in a RecordBatchReader. /// /// The RecordBatchReader does not impose any ordering on emitted batches. @@ -765,7 +815,5 @@ Result>()>> MakeReaderGenerator( std::shared_ptr reader, arrow::internal::Executor* io_executor, int max_q = kDefaultBackgroundMaxQ, int q_restart = kDefaultBackgroundQRestart); -/// @} - } // namespace acero } // namespace arrow diff --git a/include/arrow/acero/groupby.h b/include/arrow/acero/groupby.h deleted file mode 100644 index c24990a..0000000 --- a/include/arrow/acero/groupby.h +++ /dev/null @@ -1,65 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include - -#include "arrow/acero/exec_plan.h" -#include "arrow/acero/options.h" -#include "arrow/acero/visibility.h" -#include "arrow/compute/api_aggregate.h" -#include "arrow/compute/exec.h" -#include "arrow/compute/kernel.h" -#include "arrow/datum.h" -#include "arrow/result.h" - -namespace arrow { -namespace acero { - -/// Convenience function to perform a group-by on a table -/// -/// The result will be calculated using an exec plan with an aggregate node. -/// -/// If there are no arguments/aggregates then the returned table will have one row -/// for each unique combination of keys -/// -/// Note: If there are many groups the output table may have multiple chunks. -/// -/// If there are no keys then the aggregates will be applied to the full table. -/// The output table in this scenario is guaranteed to have exactly 1 row. -/// -/// \return a table that will have one column for each aggregate, named after they -/// aggregate function, and one column for each key -ARROW_ACERO_EXPORT -Result> TableGroupBy( - std::shared_ptr table, std::vector aggregates, - std::vector keys, bool use_threads = false, - MemoryPool* memory_pool = default_memory_pool()); - -/// Convenience function to perform a group-by on a record batch -/// -/// \see GroupByTable -ARROW_ACERO_EXPORT -Result> BatchGroupBy( - std::shared_ptr record_batch, std::vector aggregates, - std::vector keys, bool use_threads = false, - MemoryPool* memory_pool = default_memory_pool()); - -} // namespace acero -} // namespace arrow diff --git a/include/arrow/acero/options.h b/include/arrow/acero/options.h index 635f8a1..bb94bda 100644 --- a/include/arrow/acero/options.h +++ b/include/arrow/acero/options.h @@ -52,53 +52,80 @@ class Executor; namespace acero { +/// \brief This must not be used in release-mode +struct DebugOptions; + using AsyncExecBatchGenerator = AsyncGenerator>; -/// \addtogroup execnode-options +/// \addtogroup acero-nodes /// @{ + +/// \brief A base class for all options objects +/// +/// The only time this is used directly is when a node has no configuration class ARROW_ACERO_EXPORT ExecNodeOptions { public: virtual ~ExecNodeOptions() = default; + + /// \brief This must not be used in release-mode + std::shared_ptr debug_opts; }; -/// \brief Adapt an AsyncGenerator as a source node +/// \brief A node representing a generic source of data for Acero +/// +/// The source node will start calling `generator` during StartProducing. An initial +/// task will be created that will call `generator`. It will not call `generator` +/// reentrantly. If the source can be read in parallel then those details should be +/// encapsulated within `generator`. /// -/// plan->exec_context()->executor() will be used to parallelize pushing to -/// outputs, if provided. +/// For each batch received a new task will be created to push that batch downstream. +/// This task will slice smaller units of size `ExecPlan::kMaxBatchSize` from the +/// parent batch and call InputRecieved. Thus, if the `generator` yields a large +/// batch it may result in several calls to InputReceived. +/// +/// The SourceNode will, by default, assign an implicit ordering to outgoing batches. +/// This is valid as long as the generator generates batches in a deterministic fashion. +/// Currently, the only way to override this is to subclass the SourceNode. +/// +/// This node is not generally used directly but can serve as the basis for various +/// specialized nodes. class ARROW_ACERO_EXPORT SourceNodeOptions : public ExecNodeOptions { public: + /// Create an instance from values SourceNodeOptions(std::shared_ptr output_schema, std::function>()> generator) : output_schema(std::move(output_schema)), generator(std::move(generator)) {} - static Result> FromTable(const Table& table, - arrow::internal::Executor*); - - static Result> FromRecordBatchReader( - std::shared_ptr reader, std::shared_ptr schema, - arrow::internal::Executor*); - + /// \brief the schema for batches that will be generated by this source std::shared_ptr output_schema; + /// \brief an asynchronous stream of batches ending with std::nullopt std::function>()> generator; }; -/// \brief An extended Source node which accepts a table +/// \brief a node that generates data from a table already loaded in memory +/// +/// The table source node will slice off chunks, defined by `max_batch_size` +/// for parallel processing. The source node extends source node and so these +/// chunks will be iteratively processed in small batches. \see SourceNode +/// for details. class ARROW_ACERO_EXPORT TableSourceNodeOptions : public ExecNodeOptions { public: static constexpr int64_t kDefaultMaxBatchSize = 1 << 20; + + /// Create an instance from values TableSourceNodeOptions(std::shared_ptr
table, int64_t max_batch_size = kDefaultMaxBatchSize) : table(table), max_batch_size(max_batch_size) {} - // arrow table which acts as the data source + /// \brief a table which acts as the data source std::shared_ptr
table; - // Size of batches to emit from this node - // If the table is larger the node will emit multiple batches from the - // the table to be processed in parallel. + /// \brief size of batches to emit from this node + /// If the table is larger the node will emit multiple batches from the + /// the table to be processed in parallel. int64_t max_batch_size; }; -/// \brief Define a lazy resolved Arrow table. +/// \brief define a lazily resolved Arrow table. /// /// The table uniquely identified by the names can typically be resolved at the time when /// the plan is to be consumed. @@ -106,19 +133,27 @@ class ARROW_ACERO_EXPORT TableSourceNodeOptions : public ExecNodeOptions { /// This node is for serialization purposes only and can never be executed. class ARROW_ACERO_EXPORT NamedTableNodeOptions : public ExecNodeOptions { public: + /// Create an instance from values NamedTableNodeOptions(std::vector names, std::shared_ptr schema) : names(std::move(names)), schema(schema) {} + /// \brief the names to put in the serialized plan std::vector names; + /// \brief the output schema of the table std::shared_ptr schema; }; -/// \brief An extended Source node which accepts a schema +/// \brief a source node which feeds data from a synchronous iterator of batches /// /// ItMaker is a maker of an iterator of tabular data. +/// +/// The node can be configured to use an I/O executor. If set then each time the +/// iterator is polled a new I/O thread task will be created to do the polling. This +/// allows a blocking iterator to stay off the CPU thread pool. template class ARROW_ACERO_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions { public: + /// Create an instance that will create a new task on io_executor for each iteration SchemaSourceNodeOptions(std::shared_ptr schema, ItMaker it_maker, arrow::internal::Executor* io_executor) : schema(schema), @@ -126,6 +161,8 @@ class ARROW_ACERO_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions { io_executor(io_executor), requires_io(true) {} + /// Create an instance that will either iterate synchronously or use the default I/O + /// executor SchemaSourceNodeOptions(std::shared_ptr schema, ItMaker it_maker, bool requires_io = false) : schema(schema), @@ -150,8 +187,13 @@ class ARROW_ACERO_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions { bool requires_io; }; +/// a source node that reads from a RecordBatchReader +/// +/// Each iteration of the RecordBatchReader will be run on a new thread task created +/// on the I/O thread pool. class ARROW_ACERO_EXPORT RecordBatchReaderSourceNodeOptions : public ExecNodeOptions { public: + /// Create an instance from values RecordBatchReaderSourceNodeOptions(std::shared_ptr reader, arrow::internal::Executor* io_executor = NULLPTR) : reader(std::move(reader)), io_executor(io_executor) {} @@ -165,6 +207,7 @@ class ARROW_ACERO_EXPORT RecordBatchReaderSourceNodeOptions : public ExecNodeOpt arrow::internal::Executor* io_executor; }; +/// a source node that reads from an iterator of array vectors using ArrayVectorIteratorMaker = std::function>()>; /// \brief An extended Source node which accepts a schema and array-vectors class ARROW_ACERO_EXPORT ArrayVectorSourceNodeOptions @@ -172,6 +215,7 @@ class ARROW_ACERO_EXPORT ArrayVectorSourceNodeOptions using SchemaSourceNodeOptions::SchemaSourceNodeOptions; }; +/// a source node that reads from an iterator of ExecBatch using ExecBatchIteratorMaker = std::function>()>; /// \brief An extended Source node which accepts a schema and exec-batches class ARROW_ACERO_EXPORT ExecBatchSourceNodeOptions @@ -186,34 +230,45 @@ class ARROW_ACERO_EXPORT ExecBatchSourceNodeOptions }; using RecordBatchIteratorMaker = std::function>()>; -/// \brief An extended Source node which accepts a schema and record-batches +/// a source node that reads from an iterator of RecordBatch class ARROW_ACERO_EXPORT RecordBatchSourceNodeOptions : public SchemaSourceNodeOptions { using SchemaSourceNodeOptions::SchemaSourceNodeOptions; }; -/// \brief Make a node which excludes some rows from batches passed through it +/// \brief a node which excludes some rows from batches passed through it /// /// filter_expression will be evaluated against each batch which is pushed to /// this node. Any rows for which filter_expression does not evaluate to `true` will be /// excluded in the batch emitted by this node. +/// +/// This node will emit empty batches if all rows are excluded. This is done +/// to avoid gaps in the ordering. class ARROW_ACERO_EXPORT FilterNodeOptions : public ExecNodeOptions { public: + /// \brief create an instance from values explicit FilterNodeOptions(Expression filter_expression) : filter_expression(std::move(filter_expression)) {} + /// \brief the expression to filter batches + /// + /// The return type of this expression must be boolean Expression filter_expression; }; +/// \brief a node which selects a specified subset from the input class ARROW_ACERO_EXPORT FetchNodeOptions : public ExecNodeOptions { public: static constexpr std::string_view kName = "fetch"; + /// \brief create an instance from values FetchNodeOptions(int64_t offset, int64_t count) : offset(offset), count(count) {} + /// \brief the number of rows to skip int64_t offset; + /// \brief the number of rows to keep (not counting skipped rows) int64_t count; }; -/// \brief Make a node which executes expressions on input batches, producing batches +/// \brief a node which executes expressions on input batches, producing batches /// of the same length with new columns. /// /// Each expression will be evaluated against each batch which is pushed to @@ -222,21 +277,44 @@ class ARROW_ACERO_EXPORT FetchNodeOptions : public ExecNodeOptions { /// If names are not provided, the string representations of exprs will be used. class ARROW_ACERO_EXPORT ProjectNodeOptions : public ExecNodeOptions { public: + /// \brief create an instance from values explicit ProjectNodeOptions(std::vector expressions, std::vector names = {}) : expressions(std::move(expressions)), names(std::move(names)) {} + /// \brief the expressions to run on the batches + /// + /// The output will have one column for each expression. If you wish to keep any of + /// the columns from the input then you should create a simple field_ref expression + /// for that column. std::vector expressions; + /// \brief the names of the output columns + /// + /// If this is not specified then the result of calling ToString on the expression will + /// be used instead + /// + /// This list should either be empty or have the same length as `expressions` std::vector names; }; -/// \brief Make a node which aggregates input batches, optionally grouped by keys and -/// optionally segmented by segment-keys. Both keys and segment-keys determine the group. -/// However segment-keys are also used for determining grouping segments, which should be -/// large, and allow streaming a partial aggregation result after processing each segment. -/// One common use-case for segment-keys is ordered aggregation, in which the segment-key -/// attribute specifies a column with non-decreasing values or a lexicographically-ordered -/// set of such columns. +/// \brief a node which aggregates input batches and calculates summary statistics +/// +/// The node can summarize the entire input or it can group the input with grouping keys +/// and segment keys. +/// +/// By default, the aggregate node is a pipeline breaker. It must accumulate all input +/// before any output is produced. Segment keys are a performance optimization. If +/// you know your input is already partitioned by one or more columns then you can +/// specify these as segment keys. At each change in the segment keys the node will +/// emit values for all data seen so far. +/// +/// Segment keys are currently limited to single-threaded mode. +/// +/// Both keys and segment-keys determine the group. However segment-keys are also used +/// for determining grouping segments, which should be large, and allow streaming a +/// partial aggregation result after processing each segment. One common use-case for +/// segment-keys is ordered aggregation, in which the segment-key attribute specifies a +/// column with non-decreasing values or a lexicographically-ordered set of such columns. /// /// If the keys attribute is a non-empty vector, then each aggregate in `aggregates` is /// expected to be a HashAggregate function. If the keys attribute is an empty vector, @@ -246,8 +324,14 @@ class ARROW_ACERO_EXPORT ProjectNodeOptions : public ExecNodeOptions { /// described above, applies. /// /// The keys and segment_keys vectors must be disjoint. +/// +/// If no measures are provided then you will simply get the list of unique keys. +/// +/// This node outputs segment keys first, followed by regular keys, followed by one +/// column for each aggregate. class ARROW_ACERO_EXPORT AggregateNodeOptions : public ExecNodeOptions { public: + /// \brief create an instance from values explicit AggregateNodeOptions(std::vector aggregates, std::vector keys = {}, std::vector segment_keys = {}) @@ -255,7 +339,7 @@ class ARROW_ACERO_EXPORT AggregateNodeOptions : public ExecNodeOptions { keys(std::move(keys)), segment_keys(std::move(segment_keys)) {} - // aggregations which will be applied to the targetted fields + // aggregations which will be applied to the targeted fields std::vector aggregates; // keys by which aggregations will be grouped (optional) std::vector keys; @@ -263,13 +347,18 @@ class ARROW_ACERO_EXPORT AggregateNodeOptions : public ExecNodeOptions { std::vector segment_keys; }; +/// \brief a default value at which backpressure will be applied constexpr int32_t kDefaultBackpressureHighBytes = 1 << 30; // 1GiB -constexpr int32_t kDefaultBackpressureLowBytes = 1 << 28; // 256MiB +/// \brief a default value at which backpressure will be removed +constexpr int32_t kDefaultBackpressureLowBytes = 1 << 28; // 256MiB +/// \brief an interface that can be queried for backpressure statistics class ARROW_ACERO_EXPORT BackpressureMonitor { public: virtual ~BackpressureMonitor() = default; + /// \brief fetches the number of bytes currently queued up virtual uint64_t bytes_in_use() = 0; + /// \brief checks to see if backpressure is currently applied virtual bool is_paused() = 0; }; @@ -286,18 +375,25 @@ struct ARROW_ACERO_EXPORT BackpressureOptions { BackpressureOptions(uint64_t resume_if_below, uint64_t pause_if_above) : resume_if_below(resume_if_below), pause_if_above(pause_if_above) {} + /// \brief create an instance using default values for backpressure limits static BackpressureOptions DefaultBackpressure() { return BackpressureOptions(kDefaultBackpressureLowBytes, kDefaultBackpressureHighBytes); } + /// \brief helper method to determine if backpressure is disabled + /// \return true if pause_if_above is greater than zero, false otherwise bool should_apply_backpressure() const { return pause_if_above > 0; } + /// \brief the number of bytes at which the producer should resume producing uint64_t resume_if_below; + /// \brief the number of bytes at which the producer should pause producing + /// + /// If this is <= 0 then backpressure will be disabled uint64_t pause_if_above; }; -/// \brief Add a sink node which forwards to an AsyncGenerator +/// \brief a sink node which collects results in a queue /// /// Emitted batches will only be ordered if there is a meaningful ordering /// and sequence_output is not set to false. @@ -369,6 +465,7 @@ class ARROW_ACERO_EXPORT BackpressureControl { virtual void Resume() = 0; }; +/// \brief a sink node that consumes the data as part of the plan using callbacks class ARROW_ACERO_EXPORT SinkNodeConsumer { public: virtual ~SinkNodeConsumer() = default; @@ -420,11 +517,13 @@ class ARROW_ACERO_EXPORT ConsumingSinkNodeOptions : public ExecNodeOptions { /// fields. Then sorted batches will be forwarded to the generator in sorted order. class ARROW_ACERO_EXPORT OrderBySinkNodeOptions : public SinkNodeOptions { public: + /// \brief create an instance from values explicit OrderBySinkNodeOptions( SortOptions sort_options, std::function>()>* generator) : SinkNodeOptions(generator), sort_options(std::move(sort_options)) {} + /// \brief options describing which columns and direction to sort SortOptions sort_options; }; @@ -443,8 +542,6 @@ class ARROW_ACERO_EXPORT OrderByNodeOptions : public ExecNodeOptions { Ordering ordering; }; -/// @} - enum class JoinType { LEFT_SEMI, RIGHT_SEMI, @@ -460,14 +557,12 @@ std::string ToString(JoinType t); enum class JoinKeyCmp { EQ, IS }; -/// \addtogroup execnode-options -/// @{ - -/// \brief Make a node which implements join operation using hash join strategy. +/// \brief a node which implements a join operation using a hash table class ARROW_ACERO_EXPORT HashJoinNodeOptions : public ExecNodeOptions { public: static constexpr const char* default_output_suffix_for_left = ""; static constexpr const char* default_output_suffix_for_right = ""; + /// \brief create an instance from values that outputs all columns HashJoinNodeOptions( JoinType in_join_type, std::vector in_left_keys, std::vector in_right_keys, Expression filter = literal(true), @@ -487,6 +582,13 @@ class ARROW_ACERO_EXPORT HashJoinNodeOptions : public ExecNodeOptions { this->key_cmp[i] = JoinKeyCmp::EQ; } } + /// \brief create an instance from keys + /// + /// This will create an inner join that outputs all columns and has no post join filter + /// + /// `in_left_keys` should have the same length and types as `in_right_keys` + /// @param in_left_keys the keys in the left input + /// @param in_right_keys the keys in the right input HashJoinNodeOptions(std::vector in_left_keys, std::vector in_right_keys) : left_keys(std::move(in_left_keys)), right_keys(std::move(in_right_keys)) { @@ -500,6 +602,7 @@ class ARROW_ACERO_EXPORT HashJoinNodeOptions : public ExecNodeOptions { } this->filter = literal(true); } + /// \brief create an instance from values using JoinKeyCmp::EQ for all comparisons HashJoinNodeOptions( JoinType join_type, std::vector left_keys, std::vector right_keys, std::vector left_output, @@ -522,6 +625,7 @@ class ARROW_ACERO_EXPORT HashJoinNodeOptions : public ExecNodeOptions { this->key_cmp[i] = JoinKeyCmp::EQ; } } + /// \brief create an instance from values HashJoinNodeOptions( JoinType join_type, std::vector left_keys, std::vector right_keys, std::vector left_output, @@ -575,7 +679,7 @@ class ARROW_ACERO_EXPORT HashJoinNodeOptions : public ExecNodeOptions { bool disable_bloom_filter = false; }; -/// \brief Make a node which implements asof join operation +/// \brief a node which implements the asof join operation /// /// Note, this API is experimental and will change in the future /// @@ -625,7 +729,7 @@ class ARROW_ACERO_EXPORT AsofJoinNodeOptions : public ExecNodeOptions { int64_t tolerance; }; -/// \brief Make a node which select top_k/bottom_k rows passed through it +/// \brief a node which select top_k/bottom_k rows passed through it /// /// All batches pushed to this node will be accumulated, then selected, by the given /// fields. Then sorted batches will be forwarded to the generator in sorted order. @@ -640,16 +744,18 @@ class ARROW_ACERO_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions { SelectKOptions select_k_options; }; -/// \brief Adapt a Table as a sink node -/// -/// obtains the output of an execution plan to -/// a table pointer. +/// \brief a sink node which accumulates all output into a table class ARROW_ACERO_EXPORT TableSinkNodeOptions : public ExecNodeOptions { public: + /// \brief create an instance from values explicit TableSinkNodeOptions(std::shared_ptr
* output_table, std::optional sequence_output = std::nullopt) : output_table(output_table), sequence_output(sequence_output) {} + /// \brief an "out parameter" specifying the table that will be created + /// + /// Must not be null and remain valid for the entirety of the plan execution. After the + /// plan has completed this will be set to point to the result table std::shared_ptr
* output_table; /// \brief Controls whether batches should be emitted immediately or sequenced in order /// @@ -664,6 +770,7 @@ class ARROW_ACERO_EXPORT TableSinkNodeOptions : public ExecNodeOptions { std::vector names; }; +/// \brief a row template that describes one row that will be generated for each input row struct ARROW_ACERO_EXPORT PivotLongerRowTemplate { PivotLongerRowTemplate(std::vector feature_values, std::vector> measurement_values) @@ -699,6 +806,7 @@ struct ARROW_ACERO_EXPORT PivotLongerRowTemplate { /// "location" (left vs right) and a measurement "temp". What we really want is: /// /// | time | location | temp | +/// | --- | --- | --- | /// | 1 | left | 10 | /// | 1 | right | 20 | /// | 2 | left | 15 | diff --git a/include/arrow/acero/test_nodes.h b/include/arrow/acero/test_nodes.h index 2d1d630..7e31aa3 100644 --- a/include/arrow/acero/test_nodes.h +++ b/include/arrow/acero/test_nodes.h @@ -53,6 +53,33 @@ struct JitterNodeOptions : public ExecNodeOptions { static constexpr std::string_view kName = "jitter"; }; +class GateImpl; + +class Gate { + public: + static std::shared_ptr Make(); + + Gate(); + virtual ~Gate(); + + void ReleaseAllBatches(); + void ReleaseOneBatch(); + Future<> WaitForNextReleasedBatch(); + + private: + ARROW_DISALLOW_COPY_AND_ASSIGN(Gate); + + GateImpl* impl_; +}; + +// A node that holds all input batches until a given gate is released +struct GatedNodeOptions : public ExecNodeOptions { + explicit GatedNodeOptions(Gate* gate) : gate(gate) {} + Gate* gate; + + static constexpr std::string_view kName = "gated"; +}; + void RegisterTestNodes(); } // namespace acero diff --git a/include/arrow/api.h b/include/arrow/api.h index 5466af8..ac568a0 100644 --- a/include/arrow/api.h +++ b/include/arrow/api.h @@ -19,29 +19,29 @@ #pragma once -#include "arrow/array.h" // IYWU pragma: export -#include "arrow/array/array_run_end.h" // IYWU pragma: export -#include "arrow/array/concatenate.h" // IYWU pragma: export -#include "arrow/buffer.h" // IYWU pragma: export -#include "arrow/builder.h" // IYWU pragma: export -#include "arrow/chunked_array.h" // IYWU pragma: export -#include "arrow/compare.h" // IYWU pragma: export -#include "arrow/config.h" // IYWU pragma: export -#include "arrow/datum.h" // IYWU pragma: export -#include "arrow/extension_type.h" // IYWU pragma: export -#include "arrow/memory_pool.h" // IYWU pragma: export -#include "arrow/pretty_print.h" // IYWU pragma: export -#include "arrow/record_batch.h" // IYWU pragma: export -#include "arrow/result.h" // IYWU pragma: export -#include "arrow/status.h" // IYWU pragma: export -#include "arrow/table.h" // IYWU pragma: export -#include "arrow/table_builder.h" // IYWU pragma: export -#include "arrow/tensor.h" // IYWU pragma: export -#include "arrow/type.h" // IYWU pragma: export +#include "arrow/array.h" // IWYU pragma: export +#include "arrow/array/array_run_end.h" // IWYU pragma: export +#include "arrow/array/concatenate.h" // IWYU pragma: export +#include "arrow/buffer.h" // IWYU pragma: export +#include "arrow/builder.h" // IWYU pragma: export +#include "arrow/chunked_array.h" // IWYU pragma: export +#include "arrow/compare.h" // IWYU pragma: export +#include "arrow/config.h" // IWYU pragma: export +#include "arrow/datum.h" // IWYU pragma: export +#include "arrow/extension_type.h" // IWYU pragma: export +#include "arrow/memory_pool.h" // IWYU pragma: export +#include "arrow/pretty_print.h" // IWYU pragma: export +#include "arrow/record_batch.h" // IWYU pragma: export +#include "arrow/result.h" // IWYU pragma: export +#include "arrow/status.h" // IWYU pragma: export +#include "arrow/table.h" // IWYU pragma: export +#include "arrow/table_builder.h" // IWYU pragma: export +#include "arrow/tensor.h" // IWYU pragma: export +#include "arrow/type.h" // IWYU pragma: export #include "arrow/util/key_value_metadata.h" // IWYU pragma: export -#include "arrow/visit_array_inline.h" // IYWU pragma: export -#include "arrow/visit_scalar_inline.h" // IYWU pragma: export -#include "arrow/visitor.h" // IYWU pragma: export +#include "arrow/visit_array_inline.h" // IWYU pragma: export +#include "arrow/visit_scalar_inline.h" // IWYU pragma: export +#include "arrow/visitor.h" // IWYU pragma: export /// \brief Top-level namespace for Apache Arrow C++ API namespace arrow {} diff --git a/include/arrow/array/array_nested.h b/include/arrow/array/array_nested.h index 6fb3fd3..4f5f3f6 100644 --- a/include/arrow/array/array_nested.h +++ b/include/arrow/array/array_nested.h @@ -234,6 +234,10 @@ class ARROW_EXPORT MapArray : public ListArray { const std::shared_ptr& null_bitmap = NULLPTR, int64_t null_count = kUnknownNullCount, int64_t offset = 0); + MapArray(const std::shared_ptr& type, int64_t length, BufferVector buffers, + const std::shared_ptr& keys, const std::shared_ptr& items, + int64_t null_count = kUnknownNullCount, int64_t offset = 0); + MapArray(const std::shared_ptr& type, int64_t length, const std::shared_ptr& value_offsets, const std::shared_ptr& values, diff --git a/include/arrow/array/builder_binary.h b/include/arrow/array/builder_binary.h index c9020f3..b0c4fe2 100644 --- a/include/arrow/array/builder_binary.h +++ b/include/arrow/array/builder_binary.h @@ -67,7 +67,7 @@ class BaseBinaryBuilder Status Append(const uint8_t* value, offset_type length) { ARROW_RETURN_NOT_OK(Reserve(1)); - ARROW_RETURN_NOT_OK(AppendNextOffset()); + UnsafeAppendNextOffset(); // Safety check for UBSAN. if (ARROW_PREDICT_TRUE(length > 0)) { ARROW_RETURN_NOT_OK(ValidateOverflow(length)); @@ -114,15 +114,15 @@ class BaseBinaryBuilder } Status AppendNull() final { - ARROW_RETURN_NOT_OK(AppendNextOffset()); ARROW_RETURN_NOT_OK(Reserve(1)); + UnsafeAppendNextOffset(); UnsafeAppendToBitmap(false); return Status::OK(); } Status AppendEmptyValue() final { - ARROW_RETURN_NOT_OK(AppendNextOffset()); ARROW_RETURN_NOT_OK(Reserve(1)); + UnsafeAppendNextOffset(); UnsafeAppendToBitmap(true); return Status::OK(); } @@ -193,8 +193,7 @@ class BaseBinaryBuilder values.begin(), values.end(), 0ULL, [](uint64_t sum, const std::string& str) { return sum + str.size(); }); ARROW_RETURN_NOT_OK(Reserve(values.size())); - ARROW_RETURN_NOT_OK(value_data_builder_.Reserve(total_length)); - ARROW_RETURN_NOT_OK(offsets_builder_.Reserve(values.size())); + ARROW_RETURN_NOT_OK(ReserveData(total_length)); if (valid_bytes != NULLPTR) { for (std::size_t i = 0; i < values.size(); ++i) { @@ -288,13 +287,16 @@ class BaseBinaryBuilder auto bitmap = array.GetValues(0, 0); auto offsets = array.GetValues(1); auto data = array.GetValues(2, 0); + auto total_length = offsets[offset + length] - offsets[offset]; + ARROW_RETURN_NOT_OK(Reserve(length)); + ARROW_RETURN_NOT_OK(ReserveData(total_length)); for (int64_t i = 0; i < length; i++) { if (!bitmap || bit_util::GetBit(bitmap, array.offset + offset + i)) { const offset_type start = offsets[offset + i]; const offset_type end = offsets[offset + i + 1]; - ARROW_RETURN_NOT_OK(Append(data + start, end - start)); + UnsafeAppend(data + start, end - start); } else { - ARROW_RETURN_NOT_OK(AppendNull()); + UnsafeAppendNull(); } } return Status::OK(); diff --git a/include/arrow/array/builder_nested.h b/include/arrow/array/builder_nested.h index 74f7c04..d0b17c2 100644 --- a/include/arrow/array/builder_nested.h +++ b/include/arrow/array/builder_nested.h @@ -63,7 +63,7 @@ class BaseListBuilder : public ArrayBuilder { : BaseListBuilder(pool, value_builder, list(value_builder->type()), alignment) {} Status Resize(int64_t capacity) override { - if (capacity > maximum_elements()) { + if (ARROW_PREDICT_FALSE(capacity > maximum_elements())) { return Status::CapacityError("List array cannot reserve space for more than ", maximum_elements(), " got ", capacity); } @@ -99,14 +99,14 @@ class BaseListBuilder : public ArrayBuilder { Status Append(bool is_valid = true) { ARROW_RETURN_NOT_OK(Reserve(1)); UnsafeAppendToBitmap(is_valid); - return AppendNextOffset(); + UnsafeAppendNextOffset(); + return Status::OK(); } Status AppendNull() final { return Append(false); } Status AppendNulls(int64_t length) final { ARROW_RETURN_NOT_OK(Reserve(length)); - ARROW_RETURN_NOT_OK(ValidateOverflow(0)); UnsafeAppendToBitmap(length, false); const int64_t num_values = value_builder_->length(); for (int64_t i = 0; i < length; ++i) { @@ -119,7 +119,6 @@ class BaseListBuilder : public ArrayBuilder { Status AppendEmptyValues(int64_t length) final { ARROW_RETURN_NOT_OK(Reserve(length)); - ARROW_RETURN_NOT_OK(ValidateOverflow(0)); UnsafeAppendToBitmap(length, true); const int64_t num_values = value_builder_->length(); for (int64_t i = 0; i < length; ++i) { @@ -133,17 +132,17 @@ class BaseListBuilder : public ArrayBuilder { const offset_type* offsets = array.GetValues(1); const bool all_valid = !array.MayHaveLogicalNulls(); const uint8_t* validity = array.HasValidityBitmap() ? array.buffers[0].data : NULLPTR; + ARROW_RETURN_NOT_OK(Reserve(length)); for (int64_t row = offset; row < offset + length; row++) { const bool is_valid = all_valid || (validity && bit_util::GetBit(validity, array.offset + row)) || array.IsValid(row); + UnsafeAppendToBitmap(is_valid); + UnsafeAppendNextOffset(); if (is_valid) { - ARROW_RETURN_NOT_OK(Append()); int64_t slot_length = offsets[row + 1] - offsets[row]; ARROW_RETURN_NOT_OK(value_builder_->AppendArraySlice(array.child_data[0], offsets[row], slot_length)); - } else { - ARROW_RETURN_NOT_OK(AppendNull()); } } return Status::OK(); @@ -202,6 +201,11 @@ class BaseListBuilder : public ArrayBuilder { const int64_t num_values = value_builder_->length(); return offsets_builder_.Append(static_cast(num_values)); } + + void UnsafeAppendNextOffset() { + const int64_t num_values = value_builder_->length(); + offsets_builder_.UnsafeAppend(static_cast(num_values)); + } }; /// \class ListBuilder diff --git a/include/arrow/array/builder_run_end.h b/include/arrow/array/builder_run_end.h index 9764c57..ac92efb 100644 --- a/include/arrow/array/builder_run_end.h +++ b/include/arrow/array/builder_run_end.h @@ -273,7 +273,7 @@ class ARROW_EXPORT RunEndEncodedBuilder : public ArrayBuilder { // Pre-condition: !value_run_builder_.has_open_run() template - Status DoAppendArray(const ArraySpan& to_append); + Status DoAppendArraySlice(const ArraySpan& array, int64_t offset, int64_t length); template Status DoAppendRunEnd(int64_t run_end); diff --git a/include/arrow/array/data.h b/include/arrow/array/data.h index 27a30ad..82a6e73 100644 --- a/include/arrow/array/data.h +++ b/include/arrow/array/data.h @@ -527,7 +527,8 @@ struct ARROW_EXPORT ArraySpan { int64_t ComputeLogicalNullCount() const; private: - friend bool internal::IsNullRunEndEncoded(const ArrayData& span, int64_t i); + ARROW_FRIEND_EXPORT friend bool internal::IsNullRunEndEncoded(const ArrayData& span, + int64_t i); bool IsNullSparseUnion(int64_t i) const; bool IsNullDenseUnion(int64_t i) const; diff --git a/include/arrow/array/util.h b/include/arrow/array/util.h index 6e6c61b..9f34af0 100644 --- a/include/arrow/array/util.h +++ b/include/arrow/array/util.h @@ -73,10 +73,11 @@ namespace internal { /// are not swapped by this function and should be handled separately. /// /// \param[in] data the array contents +/// \param[in] pool the memory pool to allocate memory from /// \return the resulting ArrayData whose elements were swapped ARROW_EXPORT Result> SwapEndianArrayData( - const std::shared_ptr& data); + const std::shared_ptr& data, MemoryPool* pool = default_memory_pool()); /// Given a number of ArrayVectors, treat each ArrayVector as the /// chunks of a chunked array. Then rechunk each ArrayVector such that diff --git a/include/arrow/buffer.h b/include/arrow/buffer.h index 9270c4d..65f1abd 100644 --- a/include/arrow/buffer.h +++ b/include/arrow/buffer.h @@ -63,7 +63,11 @@ class ARROW_EXPORT Buffer { Buffer(const uint8_t* data, int64_t size, std::shared_ptr mm, std::shared_ptr parent = NULLPTR) - : is_mutable_(false), data_(data), size_(size), capacity_(size), parent_(parent) { + : is_mutable_(false), + data_(data), + size_(size), + capacity_(size), + parent_(std::move(parent)) { SetMemoryManager(std::move(mm)); } diff --git a/include/arrow/c/abi.h b/include/arrow/c/abi.h index d58417e..6abe866 100644 --- a/include/arrow/c/abi.h +++ b/include/arrow/c/abi.h @@ -15,10 +15,27 @@ // specific language governing permissions and limitations // under the License. +/// \file abi.h Arrow C Data Interface +/// +/// The Arrow C Data interface defines a very small, stable set +/// of C definitions which can be easily copied into any project's +/// source code and vendored to be used for columnar data interchange +/// in the Arrow format. For non-C/C++ languages and runtimes, +/// it should be almost as easy to translate the C definitions into +/// the corresponding C FFI declarations. +/// +/// Applications and libraries can therefore work with Arrow memory +/// without necessarily using the Arrow libraries or reinventing +/// the wheel. Developers can choose between tight integration +/// with the Arrow software project or minimal integration with +/// the Arrow format only. + #pragma once #include +// Spec and documentation: https://arrow.apache.org/docs/format/CDataInterface.html + #ifdef __cplusplus extern "C" { #endif @@ -65,6 +82,61 @@ struct ArrowArray { #endif // ARROW_C_DATA_INTERFACE +#ifndef ARROW_C_DEVICE_DATA_INTERFACE +#define ARROW_C_DEVICE_DATA_INTERFACE + +// Spec and Documentation: https://arrow.apache.org/docs/format/CDeviceDataInterface.html + +// DeviceType for the allocated memory +typedef int32_t ArrowDeviceType; + +// CPU device, same as using ArrowArray directly +#define ARROW_DEVICE_CPU 1 +// CUDA GPU Device +#define ARROW_DEVICE_CUDA 2 +// Pinned CUDA CPU memory by cudaMallocHost +#define ARROW_DEVICE_CUDA_HOST 3 +// OpenCL Device +#define ARROW_DEVICE_OPENCL 4 +// Vulkan buffer for next-gen graphics +#define ARROW_DEVICE_VULKAN 7 +// Metal for Apple GPU +#define ARROW_DEVICE_METAL 8 +// Verilog simulator buffer +#define ARROW_DEVICE_VPI 9 +// ROCm GPUs for AMD GPUs +#define ARROW_DEVICE_ROCM 10 +// Pinned ROCm CPU memory allocated by hipMallocHost +#define ARROW_DEVICE_ROCM_HOST 11 +// Reserved for extension +#define ARROW_DEVICE_EXT_DEV 12 +// CUDA managed/unified memory allocated by cudaMallocManaged +#define ARROW_DEVICE_CUDA_MANAGED 13 +// unified shared memory allocated on a oneAPI non-partitioned device. +#define ARROW_DEVICE_ONEAPI 14 +// GPU support for next-gen WebGPU standard +#define ARROW_DEVICE_WEBGPU 15 +// Qualcomm Hexagon DSP +#define ARROW_DEVICE_HEXAGON 16 + +struct ArrowDeviceArray { + // the Allocated Array + // + // the buffers in the array (along with the buffers of any + // children) are what is allocated on the device. + struct ArrowArray array; + // The device id to identify a specific device + int64_t device_id; + // The type of device which can access this memory. + ArrowDeviceType device_type; + // An event-like object to synchronize on if needed. + void* sync_event; + // Reserved bytes for future expansion. + int64_t reserved[3]; +}; + +#endif // ARROW_C_DEVICE_DATA_INTERFACE + #ifndef ARROW_C_STREAM_INTERFACE #define ARROW_C_STREAM_INTERFACE @@ -106,6 +178,56 @@ struct ArrowArrayStream { #endif // ARROW_C_STREAM_INTERFACE +#ifndef ARROW_C_DEVICE_STREAM_INTERFACE +#define ARROW_C_DEVICE_STREAM_INTERFACE + +// Equivalent to ArrowArrayStream, but for ArrowDeviceArrays. +// +// This stream is intended to provide a stream of data on a single +// device, if a producer wants data to be produced on multiple devices +// then multiple streams should be provided. One per device. +struct ArrowDeviceArrayStream { + // The device that this stream produces data on. + ArrowDeviceType device_type; + + // Callback to get the stream schema + // (will be the same for all arrays in the stream). + // + // Return value 0 if successful, an `errno`-compatible error code otherwise. + // + // If successful, the ArrowSchema must be released independently from the stream. + // The schema should be accessible via CPU memory. + int (*get_schema)(struct ArrowDeviceArrayStream* self, struct ArrowSchema* out); + + // Callback to get the next array + // (if no error and the array is released, the stream has ended) + // + // Return value: 0 if successful, an `errno`-compatible error code otherwise. + // + // If successful, the ArrowDeviceArray must be released independently from the stream. + int (*get_next)(struct ArrowDeviceArrayStream* self, struct ArrowDeviceArray* out); + + // Callback to get optional detailed error information. + // This must only be called if the last stream operation failed + // with a non-0 return code. + // + // Return value: pointer to a null-terminated character array describing + // the last error, or NULL if no description is available. + // + // The returned pointer is only valid until the next operation on this stream + // (including release). + const char* (*get_last_error)(struct ArrowDeviceArrayStream* self); + + // Release callback: release the stream's own resources. + // Note that arrays returned by `get_next` must be individually released. + void (*release)(struct ArrowDeviceArrayStream* self); + + // Opaque producer-specific data + void* private_data; +}; + +#endif // ARROW_C_DEVICE_STREAM_INTERFACE + #ifdef __cplusplus } #endif diff --git a/include/arrow/c/helpers.h b/include/arrow/c/helpers.h index a5c1f6f..a24f272 100644 --- a/include/arrow/c/helpers.h +++ b/include/arrow/c/helpers.h @@ -17,11 +17,20 @@ #pragma once -#include +#include +#include #include #include "arrow/c/abi.h" +#define ARROW_C_ASSERT(condition, msg) \ + do { \ + if (!(condition)) { \ + fprintf(stderr, "%s:%d:: %s", __FILE__, __LINE__, (msg)); \ + abort(); \ + } \ + } while (0) + #ifdef __cplusplus extern "C" { #endif @@ -51,7 +60,8 @@ inline void ArrowSchemaMove(struct ArrowSchema* src, struct ArrowSchema* dest) { inline void ArrowSchemaRelease(struct ArrowSchema* schema) { if (!ArrowSchemaIsReleased(schema)) { schema->release(schema); - assert(ArrowSchemaIsReleased(schema)); + ARROW_C_ASSERT(ArrowSchemaIsReleased(schema), + "ArrowSchemaRelease did not cleanup release callback"); } } @@ -78,7 +88,8 @@ inline void ArrowArrayMove(struct ArrowArray* src, struct ArrowArray* dest) { inline void ArrowArrayRelease(struct ArrowArray* array) { if (!ArrowArrayIsReleased(array)) { array->release(array); - assert(ArrowArrayIsReleased(array)); + ARROW_C_ASSERT(ArrowArrayIsReleased(array), + "ArrowArrayRelease did not cleanup release callback"); } } @@ -108,7 +119,8 @@ inline void ArrowArrayStreamMove(struct ArrowArrayStream* src, inline void ArrowArrayStreamRelease(struct ArrowArrayStream* stream) { if (!ArrowArrayStreamIsReleased(stream)) { stream->release(stream); - assert(ArrowArrayStreamIsReleased(stream)); + ARROW_C_ASSERT(ArrowArrayStreamIsReleased(stream), + "ArrowArrayStreamRelease did not cleanup release callback"); } } diff --git a/include/arrow/compute/api.h b/include/arrow/compute/api.h index 7e7ec36..5b5dfdf 100644 --- a/include/arrow/compute/api.h +++ b/include/arrow/compute/api.h @@ -33,11 +33,6 @@ #include "arrow/compute/registry.h" // IWYU pragma: export #include "arrow/datum.h" // IWYU pragma: export -/// \defgroup execnode-expressions Utilities for creating expressions to -/// use in execution plans -/// @{ -/// @} - #include "arrow/compute/expression.h" // IWYU pragma: export /// \defgroup execnode-row Utilities for working with data in a row-major format @@ -46,7 +41,7 @@ #include "arrow/compute/row/grouper.h" // IWYU pragma: export -/// \defgroup execnode-components Components associated with ExecBatch +/// \defgroup acero-internals Acero internals, useful for those extending Acero /// @{ /// @} diff --git a/include/arrow/compute/api_aggregate.h b/include/arrow/compute/api_aggregate.h index 97c6542..8f45f61 100644 --- a/include/arrow/compute/api_aggregate.h +++ b/include/arrow/compute/api_aggregate.h @@ -284,6 +284,36 @@ Result Sum( const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults(), ExecContext* ctx = NULLPTR); +/// \brief Calculate the first value of an array +/// +/// \param[in] value input datum, expecting Array or ChunkedArray +/// \param[in] options see ScalarAggregateOptions for more information +/// \param[in] ctx the function execution context, optional +/// \return datum of the computed first as Scalar +/// +/// \since 13.0.0 +/// \note API not yet finalized +ARROW_EXPORT +Result First( + const Datum& value, + const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults(), + ExecContext* ctx = NULLPTR); + +/// \brief Calculate the last value of an array +/// +/// \param[in] value input datum, expecting Array or ChunkedArray +/// \param[in] options see ScalarAggregateOptions for more information +/// \param[in] ctx the function execution context, optional +/// \return datum of the computed last as a Scalar +/// +/// \since 13.0.0 +/// \note API not yet finalized +ARROW_EXPORT +Result Last( + const Datum& value, + const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults(), + ExecContext* ctx = NULLPTR); + /// \brief Calculate the min / max of a numeric array /// /// This function returns both the min and max as a struct scalar, with type diff --git a/include/arrow/compute/api_vector.h b/include/arrow/compute/api_vector.h index 2ec1cf9..c85db1a 100644 --- a/include/arrow/compute/api_vector.h +++ b/include/arrow/compute/api_vector.h @@ -210,25 +210,39 @@ class ARROW_EXPORT PartitionNthOptions : public FunctionOptions { NullPlacement null_placement; }; -/// \brief Options for cumulative sum function -class ARROW_EXPORT CumulativeSumOptions : public FunctionOptions { +/// \brief Options for cumulative functions +/// \note Also aliased as CumulativeSumOptions for backward compatibility +class ARROW_EXPORT CumulativeOptions : public FunctionOptions { public: - explicit CumulativeSumOptions(double start = 0, bool skip_nulls = false, - bool check_overflow = false); - explicit CumulativeSumOptions(std::shared_ptr start, bool skip_nulls = false, - bool check_overflow = false); - static constexpr char const kTypeName[] = "CumulativeSumOptions"; - static CumulativeSumOptions Defaults() { return CumulativeSumOptions(); } - - /// Optional starting value for cumulative operation computation - std::shared_ptr start; + explicit CumulativeOptions(bool skip_nulls = false); + explicit CumulativeOptions(double start, bool skip_nulls = false); + explicit CumulativeOptions(std::shared_ptr start, bool skip_nulls = false); + static constexpr char const kTypeName[] = "CumulativeOptions"; + static CumulativeOptions Defaults() { return CumulativeOptions(); } + + /// Optional starting value for cumulative operation computation, default depends on the + /// operation and input type. + /// - sum: 0 + /// - prod: 1 + /// - min: maximum of the input type + /// - max: minimum of the input type + std::optional> start; /// If true, nulls in the input are ignored and produce a corresponding null output. /// When false, the first null encountered is propagated through the remaining output. bool skip_nulls = false; +}; +using CumulativeSumOptions = CumulativeOptions; // For backward compatibility + +/// \brief Options for pairwise functions +class ARROW_EXPORT PairwiseOptions : public FunctionOptions { + public: + explicit PairwiseOptions(int64_t periods = 1); + static constexpr char const kTypeName[] = "PairwiseOptions"; + static PairwiseOptions Defaults() { return PairwiseOptions(); } - /// When true, returns an Invalid Status when overflow is detected - bool check_overflow = false; + /// Periods to shift for applying the binary operation, accepts negative values. + int64_t periods = 1; }; /// @} @@ -259,12 +273,18 @@ namespace internal { // These internal functions are implemented in kernels/vector_selection.cc /// \brief Return the number of selected indices in the boolean filter +/// +/// \param filter a plain or run-end encoded boolean array with or without nulls +/// \param null_selection how to handle nulls in the filter ARROW_EXPORT int64_t GetFilterOutputSize(const ArraySpan& filter, FilterOptions::NullSelectionBehavior null_selection); /// \brief Compute uint64 selection indices for use with Take given a boolean /// filter +/// +/// \param filter a plain or run-end encoded boolean array with or without nulls +/// \param null_selection how to handle nulls in the filter ARROW_EXPORT Result> GetTakeIndices( const ArraySpan& filter, FilterOptions::NullSelectionBehavior null_selection, @@ -597,12 +617,72 @@ Result RunEndEncode( ARROW_EXPORT Result RunEndDecode(const Datum& value, ExecContext* ctx = NULLPTR); +/// \brief Compute the cumulative sum of an array-like object +/// +/// \param[in] values array-like input +/// \param[in] options configures cumulative sum behavior +/// \param[in] check_overflow whether to check for overflow, if true, return Invalid +/// status on overflow, otherwise wrap around on overflow +/// \param[in] ctx the function execution context, optional ARROW_EXPORT Result CumulativeSum( - const Datum& values, - const CumulativeSumOptions& options = CumulativeSumOptions::Defaults(), + const Datum& values, const CumulativeOptions& options = CumulativeOptions::Defaults(), + bool check_overflow = false, ExecContext* ctx = NULLPTR); + +/// \brief Compute the cumulative product of an array-like object +/// +/// \param[in] values array-like input +/// \param[in] options configures cumulative prod behavior +/// \param[in] check_overflow whether to check for overflow, if true, return Invalid +/// status on overflow, otherwise wrap around on overflow +/// \param[in] ctx the function execution context, optional +ARROW_EXPORT +Result CumulativeProd( + const Datum& values, const CumulativeOptions& options = CumulativeOptions::Defaults(), + bool check_overflow = false, ExecContext* ctx = NULLPTR); + +/// \brief Compute the cumulative max of an array-like object +/// +/// \param[in] values array-like input +/// \param[in] options configures cumulative max behavior +/// \param[in] ctx the function execution context, optional +ARROW_EXPORT +Result CumulativeMax( + const Datum& values, const CumulativeOptions& options = CumulativeOptions::Defaults(), + ExecContext* ctx = NULLPTR); + +/// \brief Compute the cumulative min of an array-like object +/// +/// \param[in] values array-like input +/// \param[in] options configures cumulative min behavior +/// \param[in] ctx the function execution context, optional +ARROW_EXPORT +Result CumulativeMin( + const Datum& values, const CumulativeOptions& options = CumulativeOptions::Defaults(), ExecContext* ctx = NULLPTR); +/// \brief Return the first order difference of an array. +/// +/// Computes the first order difference of an array, i.e. +/// output[i] = input[i] - input[i - p] if i >= p +/// output[i] = null otherwise +/// where p is the period. For example, with p = 1, +/// Diff([1, 4, 9, 10, 15]) = [null, 3, 5, 1, 5]. +/// With p = 2, +/// Diff([1, 4, 9, 10, 15]) = [null, null, 8, 6, 6] +/// p can also be negative, in which case the diff is computed in +/// the opposite direction. +/// \param[in] array array input +/// \param[in] options options, specifying overflow behavior and period +/// \param[in] check_overflow whether to return error on overflow +/// \param[in] ctx the function execution context, optional +/// \return result as array +ARROW_EXPORT +Result> PairwiseDiff(const Array& array, + const PairwiseOptions& options, + bool check_overflow = false, + ExecContext* ctx = NULLPTR); + // ---------------------------------------------------------------------- // Deprecated functions diff --git a/include/arrow/compute/exec.h b/include/arrow/compute/exec.h index d583f0e..3fbefe4 100644 --- a/include/arrow/compute/exec.h +++ b/include/arrow/compute/exec.h @@ -168,7 +168,7 @@ constexpr int64_t kUnsequencedIndex = -1; /// than is desirable for this class. Microbenchmarks would help determine for /// sure. See ARROW-8928. -/// \addtogroup execnode-components +/// \addtogroup acero-internals /// @{ struct ARROW_EXPORT ExecBatch { @@ -265,6 +265,13 @@ inline bool operator!=(const ExecBatch& l, const ExecBatch& r) { return !l.Equal ARROW_EXPORT void PrintTo(const ExecBatch&, std::ostream*); +/// @} + +/// \defgroup compute-internals Utilities for calling functions, useful for those +/// extending the function registry +/// +/// @{ + struct ExecValue { ArraySpan array = {}; const Scalar* scalar = NULLPTR; @@ -349,6 +356,9 @@ struct ARROW_EXPORT ExecResult { const std::shared_ptr& array_data() const { return std::get>(this->value); } + ArrayData* array_data_mutable() { + return std::get>(this->value).get(); + } bool is_array_data() const { return this->value.index() == 1; } }; @@ -414,8 +424,6 @@ struct ARROW_EXPORT ExecSpan { std::vector values; }; -/// @} - /// \defgroup compute-call-function One-shot calls to compute functions /// /// @{ diff --git a/include/arrow/compute/expression.h b/include/arrow/compute/expression.h index c9c7b0e..9a36a6d 100644 --- a/include/arrow/compute/expression.h +++ b/include/arrow/compute/expression.h @@ -33,7 +33,7 @@ namespace arrow { namespace compute { -/// \defgroup expression-core Expressions to describe transformations in execution plans +/// \defgroup expression-core Expressions to describe data transformations /// /// @{ @@ -94,7 +94,11 @@ class ARROW_EXPORT Expression { bool IsNullLiteral() const; /// Return true if this expression could evaluate to true. Will return true for any - /// unbound, non-boolean, or unsimplified Expressions + /// unbound or non-boolean Expressions. IsSatisfiable does not (currently) do any + /// canonicalization or simplification of the expression, so even Expressions + /// which are unsatisfiable may spuriously return `true` here. This function is + /// intended for use in predicate pushdown where a filter expression is simplified + /// by a guarantee, so it assumes that trying to simplify again would be redundant. bool IsSatisfiable() const; // XXX someday @@ -256,7 +260,7 @@ Result> Serialize(const Expression&); ARROW_EXPORT Result Deserialize(std::shared_ptr); -/// \defgroup expression-convenience Functions convenient expression creation +/// \defgroup expression-convenience Helpers for convenient expression creation /// /// @{ diff --git a/include/arrow/compute/kernel.h b/include/arrow/compute/kernel.h index a642130..5b5b571 100644 --- a/include/arrow/compute/kernel.h +++ b/include/arrow/compute/kernel.h @@ -152,6 +152,12 @@ ARROW_EXPORT std::shared_ptr RunEndInteger(); ARROW_EXPORT std::shared_ptr RunEndEncoded( std::shared_ptr value_type_matcher); +/// \brief Match run-end encoded types that use any valid run-end type and +/// encode specific value types +/// +/// @param[in] value_type_id a type id that the type of the values field should match +ARROW_EXPORT std::shared_ptr RunEndEncoded(Type::type value_type_id); + /// \brief Match run-end encoded types that encode specific run-end and value types /// /// @param[in] run_end_type_matcher a matcher that is applied to the run_ends field @@ -277,14 +283,16 @@ class ARROW_EXPORT OutputType { /// /// This function SHOULD _not_ be used to check for arity, that is to be /// performed one or more layers above. - using Resolver = Result (*)(KernelContext*, const std::vector&); + using Resolver = + std::function(KernelContext*, const std::vector&)>; /// \brief Output an exact type OutputType(std::shared_ptr type) // NOLINT implicit construction : kind_(FIXED), type_(std::move(type)) {} /// \brief Output a computed type depending on actual input types - OutputType(Resolver resolver) // NOLINT implicit construction + template + OutputType(Fn resolver) // NOLINT implicit construction : kind_(COMPUTED), resolver_(std::move(resolver)) {} OutputType(const OutputType& other) { @@ -644,22 +652,22 @@ using ScalarAggregateFinalize = Status (*)(KernelContext*, Datum*); /// * finalize: produces the end result of the aggregation using the /// KernelState in the KernelContext. struct ARROW_EXPORT ScalarAggregateKernel : public Kernel { - ScalarAggregateKernel() = default; - ScalarAggregateKernel(std::shared_ptr sig, KernelInit init, ScalarAggregateConsume consume, ScalarAggregateMerge merge, - ScalarAggregateFinalize finalize) + ScalarAggregateFinalize finalize, const bool ordered) : Kernel(std::move(sig), std::move(init)), consume(consume), merge(merge), - finalize(finalize) {} + finalize(finalize), + ordered(ordered) {} ScalarAggregateKernel(std::vector in_types, OutputType out_type, KernelInit init, ScalarAggregateConsume consume, - ScalarAggregateMerge merge, ScalarAggregateFinalize finalize) + ScalarAggregateMerge merge, ScalarAggregateFinalize finalize, + const bool ordered) : ScalarAggregateKernel( KernelSignature::Make(std::move(in_types), std::move(out_type)), - std::move(init), consume, merge, finalize) {} + std::move(init), consume, merge, finalize, ordered) {} /// \brief Merge a vector of KernelStates into a single KernelState. /// The merged state will be returned and will be set on the KernelContext. @@ -670,6 +678,14 @@ struct ARROW_EXPORT ScalarAggregateKernel : public Kernel { ScalarAggregateConsume consume; ScalarAggregateMerge merge; ScalarAggregateFinalize finalize; + /// \brief Whether this kernel requires ordering + /// Some aggregations, such as, "first", requires some kind of input order. The + /// order can be implicit, e.g., the order of the input data, or explicit, e.g. + /// the ordering specified with a window aggregation. + /// The caller of the aggregate kernel is responsible for passing data in some + /// defined order to the kernel. The flag here is a way for the kernel to tell + /// the caller that data passed to the kernel must be defined in some order. + bool ordered = false; }; // ---------------------------------------------------------------------- @@ -699,25 +715,31 @@ struct ARROW_EXPORT HashAggregateKernel : public Kernel { HashAggregateKernel(std::shared_ptr sig, KernelInit init, HashAggregateResize resize, HashAggregateConsume consume, - HashAggregateMerge merge, HashAggregateFinalize finalize) + HashAggregateMerge merge, HashAggregateFinalize finalize, + const bool ordered) : Kernel(std::move(sig), std::move(init)), resize(resize), consume(consume), merge(merge), - finalize(finalize) {} + finalize(finalize), + ordered(ordered) {} HashAggregateKernel(std::vector in_types, OutputType out_type, KernelInit init, HashAggregateConsume consume, HashAggregateResize resize, HashAggregateMerge merge, - HashAggregateFinalize finalize) + HashAggregateFinalize finalize, const bool ordered) : HashAggregateKernel( KernelSignature::Make(std::move(in_types), std::move(out_type)), - std::move(init), resize, consume, merge, finalize) {} + std::move(init), resize, consume, merge, finalize, ordered) {} HashAggregateResize resize; HashAggregateConsume consume; HashAggregateMerge merge; HashAggregateFinalize finalize; + /// @brief whether the summarizer requires ordering + /// This is similar to ScalarAggregateKernel. See ScalarAggregateKernel + /// for detailed doc of this variable. + bool ordered = false; }; } // namespace compute diff --git a/include/arrow/compute/key_map.h b/include/arrow/compute/key_map.h index 5e40b3d..7ab4847 100644 --- a/include/arrow/compute/key_map.h +++ b/include/arrow/compute/key_map.h @@ -17,13 +17,13 @@ #pragma once +#include #include #include "arrow/compute/util.h" -#include "arrow/compute/util_internal.h" -#include "arrow/memory_pool.h" #include "arrow/result.h" #include "arrow/status.h" +#include "arrow/type_fwd.h" namespace arrow { namespace compute { @@ -80,9 +80,11 @@ class ARROW_EXPORT SwissTable { void num_inserted(uint32_t i) { num_inserted_ = i; } - uint8_t* blocks() const { return blocks_; } + uint8_t* blocks() const { return blocks_->mutable_data(); } - uint32_t* hashes() const { return hashes_; } + uint32_t* hashes() const { + return reinterpret_cast(hashes_->mutable_data()); + } /// \brief Extract group id for a given slot in a given block. /// @@ -226,12 +228,12 @@ class ARROW_EXPORT SwissTable { // --------------------------------------------------- // * Empty bucket has value 0x80. Non-empty bucket has highest bit set to 0. // - uint8_t* blocks_; + std::shared_ptr blocks_; // Array of hashes of values inserted into slots. // Undefined if the corresponding slot is empty. // There is 64B padding at the end. - uint32_t* hashes_; + std::shared_ptr hashes_; int64_t hardware_flags_; MemoryPool* pool_; @@ -243,8 +245,8 @@ uint64_t SwissTable::extract_group_id(const uint8_t* block_ptr, int slot, // bytes. We assume here that the number of bits is rounded up to 8, 16, 32 or 64. In // that case we can extract group id using aligned 64-bit word access. int num_group_id_bits = static_cast(ARROW_POPCOUNT64(group_id_mask)); - ARROW_DCHECK(num_group_id_bits == 8 || num_group_id_bits == 16 || - num_group_id_bits == 32 || num_group_id_bits == 64); + assert(num_group_id_bits == 8 || num_group_id_bits == 16 || num_group_id_bits == 32 || + num_group_id_bits == 64); int bit_offset = slot * num_group_id_bits; const uint64_t* group_id_bytes = @@ -260,8 +262,8 @@ void SwissTable::insert_into_empty_slot(uint32_t slot_id, uint32_t hash, // We assume here that the number of bits is rounded up to 8, 16, 32 or 64. // In that case we can insert group id value using aligned 64-bit word access. - ARROW_DCHECK(num_groupid_bits == 8 || num_groupid_bits == 16 || - num_groupid_bits == 32 || num_groupid_bits == 64); + assert(num_groupid_bits == 8 || num_groupid_bits == 16 || num_groupid_bits == 32 || + num_groupid_bits == 64); const uint64_t num_block_bytes = (8 + num_groupid_bits); constexpr uint64_t stamp_mask = 0x7f; @@ -270,13 +272,13 @@ void SwissTable::insert_into_empty_slot(uint32_t slot_id, uint32_t hash, int stamp = static_cast((hash >> (bits_hash_ - log_blocks_ - bits_stamp_)) & stamp_mask); uint64_t block_id = slot_id >> 3; - uint8_t* blockbase = blocks_ + num_block_bytes * block_id; + uint8_t* blockbase = blocks_->mutable_data() + num_block_bytes * block_id; blockbase[7 - start_slot] = static_cast(stamp); int groupid_bit_offset = static_cast(start_slot * num_groupid_bits); // Block status bytes should start at an address aligned to 8 bytes - ARROW_DCHECK((reinterpret_cast(blockbase) & 7) == 0); + assert((reinterpret_cast(blockbase) & 7) == 0); uint64_t* ptr = reinterpret_cast(blockbase) + 1 + (groupid_bit_offset >> 6); *ptr |= (static_cast(group_id) << (groupid_bit_offset & 63)); } diff --git a/include/arrow/compute/registry.h b/include/arrow/compute/registry.h index a7eb4bc..afd9f20 100644 --- a/include/arrow/compute/registry.h +++ b/include/arrow/compute/registry.h @@ -55,7 +55,7 @@ class ARROW_EXPORT FunctionRegistry { /// \brief Construct a new nested registry with the given parent. /// /// Most users only need to use the global registry. The returned registry never changes - /// its parent, even when an operation allows overwritting. + /// its parent, even when an operation allows overwriting. static std::unique_ptr Make(FunctionRegistry* parent); /// \brief Check whether a new function can be added to the registry. diff --git a/include/arrow/compute/util.h b/include/arrow/compute/util.h index 6e1bb79..489139e 100644 --- a/include/arrow/compute/util.h +++ b/include/arrow/compute/util.h @@ -139,69 +139,55 @@ class TempVectorHolder { uint32_t num_elements_; }; -class ARROW_EXPORT bit_util { - public: - static void bits_to_indexes(int bit_to_search, int64_t hardware_flags, - const int num_bits, const uint8_t* bits, int* num_indexes, - uint16_t* indexes, int bit_offset = 0); +namespace bit_util { - static void bits_filter_indexes(int bit_to_search, int64_t hardware_flags, +ARROW_EXPORT void bits_to_indexes(int bit_to_search, int64_t hardware_flags, const int num_bits, const uint8_t* bits, - const uint16_t* input_indexes, int* num_indexes, - uint16_t* indexes, int bit_offset = 0); + int* num_indexes, uint16_t* indexes, + int bit_offset = 0); - // Input and output indexes may be pointing to the same data (in-place filtering). - static void bits_split_indexes(int64_t hardware_flags, const int num_bits, - const uint8_t* bits, int* num_indexes_bit0, - uint16_t* indexes_bit0, uint16_t* indexes_bit1, - int bit_offset = 0); +ARROW_EXPORT void bits_filter_indexes(int bit_to_search, int64_t hardware_flags, + const int num_bits, const uint8_t* bits, + const uint16_t* input_indexes, int* num_indexes, + uint16_t* indexes, int bit_offset = 0); - // Bit 1 is replaced with byte 0xFF. - static void bits_to_bytes(int64_t hardware_flags, const int num_bits, - const uint8_t* bits, uint8_t* bytes, int bit_offset = 0); +// Input and output indexes may be pointing to the same data (in-place filtering). +ARROW_EXPORT void bits_split_indexes(int64_t hardware_flags, const int num_bits, + const uint8_t* bits, int* num_indexes_bit0, + uint16_t* indexes_bit0, uint16_t* indexes_bit1, + int bit_offset = 0); - // Return highest bit of each byte. - static void bytes_to_bits(int64_t hardware_flags, const int num_bits, - const uint8_t* bytes, uint8_t* bits, int bit_offset = 0); +// Bit 1 is replaced with byte 0xFF. +ARROW_EXPORT void bits_to_bytes(int64_t hardware_flags, const int num_bits, + const uint8_t* bits, uint8_t* bytes, int bit_offset = 0); - static bool are_all_bytes_zero(int64_t hardware_flags, const uint8_t* bytes, - uint32_t num_bytes); +// Return highest bit of each byte. +ARROW_EXPORT void bytes_to_bits(int64_t hardware_flags, const int num_bits, + const uint8_t* bytes, uint8_t* bits, int bit_offset = 0); - private: - inline static uint64_t SafeLoadUpTo8Bytes(const uint8_t* bytes, int num_bytes); - inline static void SafeStoreUpTo8Bytes(uint8_t* bytes, int num_bytes, uint64_t value); - inline static void bits_to_indexes_helper(uint64_t word, uint16_t base_index, - int* num_indexes, uint16_t* indexes); - inline static void bits_filter_indexes_helper(uint64_t word, - const uint16_t* input_indexes, - int* num_indexes, uint16_t* indexes); - template - static void bits_to_indexes_internal(int64_t hardware_flags, const int num_bits, - const uint8_t* bits, const uint16_t* input_indexes, - int* num_indexes, uint16_t* indexes, - uint16_t base_index = 0); +ARROW_EXPORT bool are_all_bytes_zero(int64_t hardware_flags, const uint8_t* bytes, + uint32_t num_bytes); #if defined(ARROW_HAVE_AVX2) - static void bits_to_indexes_avx2(int bit_to_search, const int num_bits, - const uint8_t* bits, int* num_indexes, - uint16_t* indexes, uint16_t base_index = 0); - static void bits_filter_indexes_avx2(int bit_to_search, const int num_bits, - const uint8_t* bits, const uint16_t* input_indexes, - int* num_indexes, uint16_t* indexes); - template - static void bits_to_indexes_imp_avx2(const int num_bits, const uint8_t* bits, - int* num_indexes, uint16_t* indexes, - uint16_t base_index = 0); - template - static void bits_filter_indexes_imp_avx2(const int num_bits, const uint8_t* bits, + +namespace avx2 { +ARROW_EXPORT void bits_filter_indexes_avx2(int bit_to_search, const int num_bits, + const uint8_t* bits, const uint16_t* input_indexes, int* num_indexes, uint16_t* indexes); - static void bits_to_bytes_avx2(const int num_bits, const uint8_t* bits, uint8_t* bytes); - static void bytes_to_bits_avx2(const int num_bits, const uint8_t* bytes, uint8_t* bits); - static bool are_all_bytes_zero_avx2(const uint8_t* bytes, uint32_t num_bytes); +ARROW_EXPORT void bits_to_indexes_avx2(int bit_to_search, const int num_bits, + const uint8_t* bits, int* num_indexes, + uint16_t* indexes, uint16_t base_index = 0); +ARROW_EXPORT void bits_to_bytes_avx2(const int num_bits, const uint8_t* bits, + uint8_t* bytes); +ARROW_EXPORT void bytes_to_bits_avx2(const int num_bits, const uint8_t* bytes, + uint8_t* bits); +ARROW_EXPORT bool are_all_bytes_zero_avx2(const uint8_t* bytes, uint32_t num_bytes); +} // namespace avx2 + #endif -}; +} // namespace bit_util } // namespace util namespace compute { diff --git a/include/arrow/dataset/api.h b/include/arrow/dataset/api.h index 6554dfc..c2ebd9d 100644 --- a/include/arrow/dataset/api.h +++ b/include/arrow/dataset/api.h @@ -26,6 +26,9 @@ #ifdef ARROW_CSV #include "arrow/dataset/file_csv.h" #endif +#ifdef ARROW_JSON +#include "arrow/dataset/file_json.h" +#endif #include "arrow/dataset/file_ipc.h" #ifdef ARROW_ORC #include "arrow/dataset/file_orc.h" diff --git a/include/arrow/dataset/partition.h b/include/arrow/dataset/partition.h index b122047..315a3d3 100644 --- a/include/arrow/dataset/partition.h +++ b/include/arrow/dataset/partition.h @@ -187,6 +187,8 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning { const ArrayVector& dictionaries() const { return dictionaries_; } + SegmentEncoding segment_encoding() const { return options_.segment_encoding; } + bool Equals(const Partitioning& other) const override; protected: diff --git a/include/arrow/dataset/type_fwd.h b/include/arrow/dataset/type_fwd.h index a7ea8d6..d58781e 100644 --- a/include/arrow/dataset/type_fwd.h +++ b/include/arrow/dataset/type_fwd.h @@ -72,6 +72,11 @@ class CsvFileWriter; class CsvFileWriteOptions; struct CsvFragmentScanOptions; +class JsonFileFormat; +class JsonFileWriter; +class JsonFileWriteOptions; +struct JsonFragmentScanOptions; + class IpcFileFormat; class IpcFileWriter; class IpcFileWriteOptions; diff --git a/include/arrow/datum.h b/include/arrow/datum.h index 1d6d87a..57ae373 100644 --- a/include/arrow/datum.h +++ b/include/arrow/datum.h @@ -43,14 +43,19 @@ class Table; /// \class Datum /// \brief Variant type for various Arrow C++ data structures struct ARROW_EXPORT Datum { + /// \brief The kind of datum stored enum Kind { NONE, SCALAR, ARRAY, CHUNKED_ARRAY, RECORD_BATCH, TABLE }; + /// \brief A placeholder type to represent empty datum struct Empty {}; - // Datums variants may have a length. This special value indicate that the - // current variant does not have a length. + /// \brief Datums variants may have a length. This special value indicate that the + /// current variant does not have a length. static constexpr int64_t kUnknownLength = -1; + /// \brief Storage of the actual datum. + /// + /// Note: For arrays, ArrayData is stored instead of Array for easier processing std::variant, std::shared_ptr, std::shared_ptr, std::shared_ptr, std::shared_ptr
> @@ -64,28 +69,49 @@ struct ARROW_EXPORT Datum { Datum(Datum&& other) = default; Datum& operator=(Datum&& other) = default; + /// \brief Construct from a Scalar Datum(std::shared_ptr value) // NOLINT implicit conversion : value(std::move(value)) {} + /// \brief Construct from an ArrayData Datum(std::shared_ptr value) // NOLINT implicit conversion : value(std::move(value)) {} + /// \brief Construct from an ArrayData Datum(ArrayData arg) // NOLINT implicit conversion : value(std::make_shared(std::move(arg))) {} - Datum(const Array& value); // NOLINT implicit conversion + /// \brief Construct from an Array + Datum(const Array& value); // NOLINT implicit conversion + + /// \brief Construct from an Array Datum(const std::shared_ptr& value); // NOLINT implicit conversion + + /// \brief Construct from a ChunkedArray Datum(std::shared_ptr value); // NOLINT implicit conversion - Datum(std::shared_ptr value); // NOLINT implicit conversion - Datum(std::shared_ptr
value); // NOLINT implicit conversion - // Explicit constructors from const-refs. Can be expensive, prefer the - // shared_ptr constructors + /// \brief Construct from a RecordBatch + Datum(std::shared_ptr value); // NOLINT implicit conversion + + /// \brief Construct from a Table + Datum(std::shared_ptr
value); // NOLINT implicit conversion + + /// \brief Construct from a ChunkedArray. + /// + /// This can be expensive, prefer the shared_ptr constructor explicit Datum(const ChunkedArray& value); + + /// \brief Construct from a RecordBatch. + /// + /// This can be expensive, prefer the shared_ptr constructor explicit Datum(const RecordBatch& value); + + /// \brief Construct from a Table. + /// + /// This can be expensive, prefer the shared_ptr
constructor explicit Datum(const Table& value); - // Cast from subtypes of Array or Scalar to Datum + /// \brief Cast from concrete subtypes of Array or Scalar to Datum template , bool IsScalar = std::is_base_of_v, typename = enable_if_t> @@ -93,7 +119,7 @@ struct ARROW_EXPORT Datum { : Datum(std::shared_ptr::type>( std::move(value))) {} - // Cast from subtypes of Array or Scalar to Datum + /// \brief Cast from concrete subtypes of Array or Scalar to Datum template , bool IsArray = std::is_base_of_v, bool IsScalar = std::is_base_of_v, @@ -101,32 +127,48 @@ struct ARROW_EXPORT Datum { Datum(T&& value) // NOLINT implicit conversion : Datum(std::make_shared(std::forward(value))) {} - // Many Scalars are copyable, let that happen + /// \brief Copy from concrete subtypes of Scalar. + /// + /// The concrete scalar type must be copyable (not all of them are). template >> Datum(const T& value) // NOLINT implicit conversion : Datum(std::make_shared(value)) {} // Convenience constructors + /// \brief Convenience constructor storing a bool scalar. explicit Datum(bool value); + /// \brief Convenience constructor storing an int8 scalar. explicit Datum(int8_t value); + /// \brief Convenience constructor storing a uint8 scalar. explicit Datum(uint8_t value); + /// \brief Convenience constructor storing an int16 scalar. explicit Datum(int16_t value); + /// \brief Convenience constructor storing a uint16 scalar. explicit Datum(uint16_t value); + /// \brief Convenience constructor storing an int32 scalar. explicit Datum(int32_t value); + /// \brief Convenience constructor storing a uint32 scalar. explicit Datum(uint32_t value); + /// \brief Convenience constructor storing an int64 scalar. explicit Datum(int64_t value); + /// \brief Convenience constructor storing a uint64 scalar. explicit Datum(uint64_t value); + /// \brief Convenience constructor storing a float scalar. explicit Datum(float value); + /// \brief Convenience constructor storing a double scalar. explicit Datum(double value); + /// \brief Convenience constructor storing a string scalar. explicit Datum(std::string value); + /// \brief Convenience constructor storing a string scalar. explicit Datum(const char* value); - // Forward to convenience constructors for a DurationScalar from std::chrono::duration + /// \brief Convenience constructor for a DurationScalar from std::chrono::duration template