Skip to content

Commit

Permalink
Add: Pandas & Apache Arrow document exports
Browse files Browse the repository at this point in the history
  • Loading branch information
ashvardanian committed Aug 15, 2022
1 parent a8274f0 commit 4802207
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 75 deletions.
75 changes: 45 additions & 30 deletions include/ukv/arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,20 @@
* https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
*/
#pragma once
#ifdef __cplusplus
extern "C" {
#endif

#include <inttypes.h> // `int64_t`
#include <stdlib.h> // `malloc`

#include "ukv/docs.h"

// if __has_include ("arrow/c/abi.h")
// #define ARROW_C_DATA_INTERFACE 1
// #define ARROW_C_STREAM_INTERFACE 1
// #endif

#ifndef ARROW_C_DATA_INTERFACE
#define ARROW_C_DATA_INTERFACE

Expand Down Expand Up @@ -62,6 +73,33 @@ struct ArrowArrayStream {

#endif // ARROW_C_STREAM_INTERFACE

static void release_malloced_schema(struct ArrowSchema* schema) {
for (int64_t i = 0; i < schema->n_children; ++i) {
struct ArrowSchema* child = schema->children[i];
if (child->release != NULL)
child->release(child);
}
free(schema->children);
schema->release = NULL;
}

static void release_malloced_array(struct ArrowArray* array) {
// Free children
for (int64_t i = 0; i < array->n_children; ++i) {
struct ArrowArray* child = array->children[i];
if (child->release != NULL)
child->release(child);
}
free(array->children);
// Freeing buffers can be avoided, UKV still owns those regions
// while the connection is alive and hasn't be reused for any other
// requests.
// for (int64_t i = 0; i < array->n_buffers; ++i)
// free((void*)array->buffers[i]);
free(array->buffers);
array->release = NULL;
}

static void ukv_to_arrow_schema( //
ukv_size_t const docs_count,
ukv_size_t const fields_count,
Expand Down Expand Up @@ -92,7 +130,7 @@ static void ukv_to_arrow_schema( //
array->buffers[0] = NULL; // no nulls, so bitmap can be omitted
array->children = malloc(sizeof(struct ArrowArray*) * array->n_children);

// Allocate sub-schemas and sub-arays
// Allocate sub-schemas and sub-arrays
// TODO: Don't malloc every child schema/array separately,
// use `private_data` member for that.
for (ukv_size_t field_idx = 0; field_idx != fields_count; ++field_idx)
Expand All @@ -106,7 +144,7 @@ static void ukv_to_arrow_column( //
ukv_str_view_t const field_name,
ukv_type_t const field_type,

ukv_val_ptr_t const column_validities,
ukv_1x8_t const* column_validities,
ukv_val_len_t const column_offsets,
ukv_val_ptr_t const column_contents,

Expand Down Expand Up @@ -187,33 +225,6 @@ static void ukv_to_arrow_column( //
}
}

static void release_malloced_schema(struct ArrowSchema* schema) {
for (int64_t i = 0; i < schema->n_children; ++i) {
struct ArrowSchema* child = schema->children[i];
if (child->release != NULL)
child->release(child);
}
free(schema->children);
schema->release = NULL;
}

static void release_malloced_array(struct ArrowArray* array) {
// Free children
for (int64_t i = 0; i < array->n_children; ++i) {
struct ArrowArray* child = array->children[i];
if (child->release != NULL)
child->release(child);
}
free(array->children);
// Freeing buffers can be avoided, UKV still owns those regions
// while the connection is alive and hasn't be reused for any other
// requests.
// for (int64_t i = 0; i < array->n_buffers; ++i)
// free((void*)array->buffers[i]);
free(array->buffers);
array->release = NULL;
}

/**
*
* @param[in] collections Can have 0, 1 or `fields_count` elements.
Expand All @@ -240,4 +251,8 @@ static void ukv_to_arrow_stream( //

struct ArrowArrayStream* stream,
ukv_arena_t* arena) {
}
}

#ifdef __cplusplus
} /* end extern "C" */
#endif
8 changes: 8 additions & 0 deletions include/ukv/cpp/ranges.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,13 @@ element_at transform_reduce_n(iterator_at begin, std::size_t n, element_at init,
return init;
}

template <typename transform_at, typename output_iterator_at, typename iterator_at>
output_iterator_at transform_n(iterator_at begin, std::size_t n, output_iterator_at output, transform_at transform) {
for (std::size_t i = 0; i != n; ++i, ++begin, ++output)
*output = *begin;
return output;
}

template <typename element_at, typename iterator_at>
element_at reduce_n(iterator_at begin, std::size_t n, element_at init) {
return transform_reduce_n(begin, n, init, [](auto x) { return x; });
Expand Down Expand Up @@ -458,6 +465,7 @@ class strings_tape_iterator_t {
return {remaining_count_ - 1, current_ + std::strlen(current_) + 1};
}

ukv_str_view_t operator*() const noexcept { return current_; }
bool is_end() const noexcept { return !remaining_count_; }
ukv_size_t size() const noexcept { return remaining_count_; }
};
Expand Down
4 changes: 4 additions & 0 deletions include/ukv/cpp/table_view.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ class column_view_t {
else
return {validities_, conversions_, collisions_, reinterpret_cast<element_at*>(scalars_), count_, name_};
}

ukv_1x8_t* validities() const noexcept { return validities_; }
ukv_val_len_t* offsets() const noexcept { return offsets_; }
ukv_val_ptr_t contents() const noexcept { return scalars_ ?: tape_; }
};

/**
Expand Down
9 changes: 5 additions & 4 deletions python/pybind.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ struct py_col_t {
ukv_arena_t* member_arena() noexcept { return native.member_arena(); }
ukv_options_t options() noexcept {
auto txn_ptr = py_txn_ptr.lock();
return static_cast<ukv_options_t>( //
ukv_options_default_k | //
(txn_ptr->track_reads ? ukv_option_read_track_k : ukv_options_default_k) |
(txn_ptr->flush_writes ? ukv_option_write_flush_k : ukv_options_default_k));
return txn_ptr ? static_cast<ukv_options_t>( //
ukv_options_default_k | //
(txn_ptr->track_reads ? ukv_option_read_track_k : ukv_options_default_k) |
(txn_ptr->flush_writes ? ukv_option_write_flush_k : ukv_options_default_k))
: ukv_options_default_k;
}
};

Expand Down

0 comments on commit 4802207

Please sign in to comment.