Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed guide and improved examples (#1247)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Sep 12, 2022
1 parent 212cdf3 commit 7c69aa8
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 52 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ jobs:
uses: peaceiris/actions-gh-pages@v3.7.3
with:
personal_token: ${{ secrets.GITHUB_TOKEN }}
publish_dir: ./target/doc/arrow2
publish_dir: ./target/doc
destination_dir: ./${{github.ref_name}}/docs
keep_files: false
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ implementation.

Check out [the guide](https://jorgecarleitao.github.io/arrow2/main/guide)
for a general introduction on how to use this crate, and
[API docs](https://jorgecarleitao.github.io/arrow2/main/docs)
[API docs](https://jorgecarleitao.github.io/arrow2/main/docs/arrow2)
for a detailed documentation of each of its APIs.

## Features
Expand Down
60 changes: 45 additions & 15 deletions examples/ipc_file_mmap.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
//! Example showing how to memory map an Arrow IPC file into a [`Chunk`].
use std::sync::Arc;

use arrow2::error::Result;
use arrow2::io::ipc::read;
use arrow2::mmap::{mmap_dictionaries_unchecked, mmap_unchecked};
use arrow2::array::{Array, BooleanArray};
use arrow2::chunk::Chunk;
use arrow2::datatypes::{Field, Schema};
use arrow2::error::Error;

// Arrow2 requires a struct that implements `Clone + AsRef<[u8]>`, which
// usually `Arc<Mmap>` supports. Here we mock it
#[derive(Clone)]
// Arrow2 requires something that implements `AsRef<[u8]>`, which
// `Mmap` supports. Here we mock it
struct Mmap(Vec<u8>);

impl AsRef<[u8]> for Mmap {
Expand All @@ -17,19 +17,49 @@ impl AsRef<[u8]> for Mmap {
}
}

fn main() -> Result<()> {
// Auxiliary function to write an arrow file
// This function is guaranteed to produce a valid arrow file
fn write(
chunks: &[Chunk<Box<dyn Array>>],
schema: &Schema,
ipc_fields: Option<Vec<arrow2::io::ipc::IpcField>>,
compression: Option<arrow2::io::ipc::write::Compression>,
) -> Result<Vec<u8>, Error> {
let result = vec![];
let options = arrow2::io::ipc::write::WriteOptions { compression };
let mut writer =
arrow2::io::ipc::write::FileWriter::try_new(result, schema, ipc_fields.clone(), options)?;
for chunk in chunks {
writer.write(chunk, ipc_fields.as_ref().map(|x| x.as_ref()))?;
}
writer.finish()?;
Ok(writer.into_inner())
}

fn check_round_trip(array: Box<dyn Array>) -> Result<(), Error> {
let schema = Schema::from(vec![Field::new("a", array.data_type().clone(), true)]);
let columns = Chunk::try_new(vec![array.clone()])?;

// given a mmap
let mmap = Arc::new(Mmap(vec![]));
let data = Arc::new(write(&[columns], &schema, None, None)?);

// read the metadata
let metadata = read::read_file_metadata(&mut std::io::Cursor::new(mmap.as_ref()))?;
// we first read the files' metadata
let metadata =
arrow2::io::ipc::read::read_file_metadata(&mut std::io::Cursor::new(data.as_ref()))?;

// mmap the dictionaries
let dictionaries = unsafe { mmap_dictionaries_unchecked(&metadata, mmap.clone())? };
// next we mmap the dictionaries
// Safety: `write` above guarantees that this is a valid Arrow IPC file
let dictionaries =
unsafe { arrow2::mmap::mmap_dictionaries_unchecked(&metadata, data.clone())? };

// and finally mmap a chunk (0 in this case).
let chunk = unsafe { mmap_unchecked(&metadata, &dictionaries, mmap, 0) }?;

println!("{chunk:?}");
// Safety: `write` above guarantees that this is a valid Arrow IPC file
let new_array = unsafe { arrow2::mmap::mmap_unchecked(&metadata, &dictionaries, data, 0)? };
assert_eq!(new_array.into_arrays()[0], array);
Ok(())
}

fn main() -> Result<(), Error> {
let array = BooleanArray::from([None, None, Some(true)]).boxed();
check_round_trip(array)
}
31 changes: 25 additions & 6 deletions guide/src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,29 @@ interoperability with the arrow format.

The typical use-case for this library is to perform CPU and memory-intensive analytics in a format that supports heterogeneous data structures, null values, and IPC and FFI interfaces across languages.

Arrow2 is divided into 5 main parts:
Arrow2 is divided in 5 main APIs:

* a [low-level API](./low_level.md) to efficiently operate with contiguous memory regions;
* a [high-level API](./high_level.md) to operate with arrow arrays;
* a [metadata API](./metadata.md) to declare and operate with logical types and metadata;
* a [compute API](./compute.md) with operators to operate over arrays;
* an [IO API](./io/README.md) with interfaces to read from, and write to, other formats.
* a [low-level API](./low_level.md) to efficiently operate with contiguous memory regions
* a [high-level API](./high_level.md) to operate with arrow arrays
* a [metadata API](./metadata.md) to declare and operate with logical types and metadata
* a [compute API](./compute.md) with operators to operate over arrays
* an IO API with interfaces to read from, and write to, other formats
* Arrow
* [Read files](./io/ipc_read.md)
* [Read streams](./io/ipc_stream_read.md)
* [Memory map files](./io/ipc_mmap.md)
* [Write](./io/ipc_write.md)
* CSV
* [Read](./io/csv_read.md)
* [Write](./io/csv_write.md)
* Parquet
* [Read](./io/parquet_read.md)
* [Write](./io/parquet_write.md)
* JSON and NDJSON
* [Read](./io/json_read.md)
* [Write](./io/json_write.md)
* Avro
* [Read](./io/avro_read.md)
* [Write](./io/avro_write.md)
* ODBC
* [Read and write](./io/odbc.md)
2 changes: 1 addition & 1 deletion guide/src/high_level.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,5 +279,5 @@ Below is a complete example of how to operate on a `Box<dyn Array>` without
extra allocations.

```rust,ignore
{{#include ../examples/cow.rs}}
{{#include ../../examples/cow.rs}}
```
11 changes: 0 additions & 11 deletions guide/src/io/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1 @@
# IO

This crate offers optional features that enable interoperability with different formats:

* Arrow (`io_ipc`)
* CSV (`io_csv`)
* Parquet (`io_parquet`)
* JSON and NDJSON (`io_json`)
* Avro (`io_avro` and `io_avro_async`)
* ODBC-compliant databases (`io_odbc`)

In this section you can find a guide and examples for each one of them.
2 changes: 1 addition & 1 deletion guide/src/io/odbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ When compiled with feature `io_odbc`, this crate can be used to read from, and w
any [ODBC](https://en.wikipedia.org/wiki/Open_Database_Connectivity) interface:

```rust
{{#include ../../../examples/odbc.rs}}
{{#include ../../../examples/io_odbc.rs}}
```
9 changes: 1 addition & 8 deletions guide/src/io/parquet_read.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,6 @@ Here is how to read a single column chunk from a single row group:
The example above minimizes memory usage at the expense of mixing IO and CPU tasks
on the same thread, which may hurt performance if one of them is a bottleneck.

For single-threaded reading, buffers used to read and decompress pages can be re-used.
This create offers an API that encapsulates the above logic:

```rust
{{#include ../../../examples/parquet_read_record.rs}}
```

### Parallelism decoupling of CPU from IO

One important aspect of the pages created by the iterator above is that they can cross
Expand All @@ -38,7 +31,7 @@ and that it is advantageous to have a single thread performing all IO-intensive
by delegating all CPU-intensive tasks to separate threads.

```rust
{{#include ../../../examples/parquet_read_parallel.rs}}
{{#include ../../../examples/parquet_read_parallel/src/main.rs}}
```

This can of course be reversed; in configurations where IO is bounded (e.g. when a
Expand Down
12 changes: 6 additions & 6 deletions src/io/ipc/write/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,14 @@ fn encode_dictionary(
}

pub fn encode_chunk(
columns: &Chunk<Box<dyn Array>>,
chunk: &Chunk<Box<dyn Array>>,
fields: &[IpcField],
dictionary_tracker: &mut DictionaryTracker,
options: &WriteOptions,
) -> Result<(Vec<EncodedData>, EncodedData)> {
let mut encoded_dictionaries = vec![];

for (field, array) in fields.iter().zip(columns.as_ref()) {
for (field, array) in fields.iter().zip(chunk.as_ref()) {
encode_dictionary(
field,
array.as_ref(),
Expand All @@ -189,7 +189,7 @@ pub fn encode_chunk(
)?;
}

let encoded_message = columns_to_bytes(columns, options);
let encoded_message = chunk_to_bytes(chunk, options);

Ok((encoded_dictionaries, encoded_message))
}
Expand All @@ -213,12 +213,12 @@ fn serialize_compression(

/// Write [`Chunk`] into two sets of bytes, one for the header (ipc::Schema::Message) and the
/// other for the batch's data
fn columns_to_bytes(columns: &Chunk<Box<dyn Array>>, options: &WriteOptions) -> EncodedData {
fn chunk_to_bytes(chunk: &Chunk<Box<dyn Array>>, options: &WriteOptions) -> EncodedData {
let mut nodes: Vec<arrow_format::ipc::FieldNode> = vec![];
let mut buffers: Vec<arrow_format::ipc::Buffer> = vec![];
let mut arrow_data: Vec<u8> = vec![];
let mut offset = 0;
for array in columns.arrays() {
for array in chunk.arrays() {
write(
array.as_ref(),
&mut buffers,
Expand All @@ -236,7 +236,7 @@ fn columns_to_bytes(columns: &Chunk<Box<dyn Array>>, options: &WriteOptions) ->
version: arrow_format::ipc::MetadataVersion::V5,
header: Some(arrow_format::ipc::MessageHeader::RecordBatch(Box::new(
arrow_format::ipc::RecordBatch {
length: columns.len() as i64,
length: chunk.len() as i64,
nodes: Some(nodes),
buffers: Some(buffers),
compression,
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl<W: Write> FileWriter<W> {
/// Writes [`Chunk`] to the file
pub fn write(
&mut self,
columns: &Chunk<Box<dyn Array>>,
chunk: &Chunk<Box<dyn Array>>,
ipc_fields: Option<&[IpcField]>,
) -> Result<()> {
if self.state != State::Started {
Expand All @@ -134,7 +134,7 @@ impl<W: Write> FileWriter<W> {
};

let (encoded_dictionaries, encoded_message) = encode_chunk(
columns,
chunk,
ipc_fields,
&mut self.dictionary_tracker,
&self.options,
Expand Down

0 comments on commit 7c69aa8

Please sign in to comment.