Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed s3 example (#956)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Apr 21, 2022
1 parent 8480258 commit 7556ebc
Showing 1 changed file with 41 additions and 19 deletions.
60 changes: 41 additions & 19 deletions examples/s3/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use arrow2::array::{Array, Int64Array};
use arrow2::datatypes::DataType;
use arrow2::error::Result;
use arrow2::io::parquet::read::{
decompress, get_page_stream, page_stream_to_array, read_metadata_async,
};
use futures::{future::BoxFuture, StreamExt};
use arrow2::io::parquet::read;
use futures::future::BoxFuture;
use range_reader::{RangeOutput, RangedAsyncReader};
use s3::Bucket;

Expand All @@ -19,6 +16,7 @@ async fn main() -> Result<()> {
let length = data.content_length.unwrap() as usize;
println!("total size in bytes: {}", length);

// this maps a range (start, length) to a s3 ranged-bytes request
let range_get = Box::new(move |start: u64, length: usize| {
let bucket = bucket.clone();
let path = path.clone();
Expand All @@ -39,26 +37,50 @@ async fn main() -> Result<()> {
});

// at least 4kb per s3 request. Adjust to your liking.
let mut reader = RangedAsyncReader::new(length, 4 * 1024, range_get);
let min_request_size = 4 * 1024;

let metadata = read_metadata_async(&mut reader).await?;
// this factory is used to create multiple http requests concurrently. Each
// task will get its own `RangedAsyncReader`.
let reader_factory = || {
Box::pin(futures::future::ready(Ok(RangedAsyncReader::new(
length,
min_request_size,
range_get.clone(),
)))) as BoxFuture<'static, std::result::Result<RangedAsyncReader, std::io::Error>>
};

// metadata
println!("{}", metadata.num_rows);
// we need one reader to read the files' metadata
let mut reader = reader_factory().await?;

// pages of the first row group and first column
// This is IO bounded and SHOULD be done in a shared thread pool (e.g. Tokio)
let column_metadata = &metadata.row_groups[0].columns()[0];
let pages = get_page_stream(column_metadata, &mut reader, None, vec![]).await?;
let metadata = read::read_metadata_async(&mut reader).await?;

// decompress the pages. This is CPU bounded and SHOULD be done in a dedicated thread pool (e.g. Rayon)
let pages = pages.map(|compressed_page| decompress(compressed_page?, &mut vec![]));
let schema = read::infer_schema(&metadata)?;

// deserialize the pages. This is CPU bounded and SHOULD be done in a dedicated thread pool (e.g. Rayon)
let array =
page_stream_to_array(pages, &metadata.row_groups[0].columns()[0], DataType::Int64).await?;
println!("total number of rows: {}", metadata.num_rows);
println!("Infered Arrow schema: {:#?}", schema);

let array = array.as_any().downcast_ref::<Int64Array>().unwrap();
// let's read the first row group only. Iterate over them to your liking
let group = &metadata.row_groups[0];

// no chunk size in deserializing
let chunk_size = None;

// this is IO-bounded (and issues a join, thus the reader_factory)
let column_chunks =
read::read_columns_many_async(reader_factory, group, schema.fields, chunk_size).await?;

// this is CPU-bounded and should be sent to a separate thread-pool.
// We do it here for simplicity
let chunks = read::RowGroupDeserializer::new(column_chunks, group.num_rows() as usize, None);
let chunks = chunks.collect::<Result<Vec<_>>>()?;

// this is a single chunk because chunk_size is `None`
let chunk = &chunks[0];

let array = chunk.arrays()[0]
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
// ... and have fun with it.
println!("len: {}", array.len());
println!("null_count: {}", array.null_count());
Expand Down

0 comments on commit 7556ebc

Please sign in to comment.