From 7556ebc3d22467b77210103787fadcdc492c40f0 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Thu, 21 Apr 2022 09:30:53 +0100 Subject: [PATCH] Fixed s3 example (#956) --- examples/s3/src/main.rs | 60 ++++++++++++++++++++++++++++------------- 1 file changed, 41 insertions(+), 19 deletions(-) diff --git a/examples/s3/src/main.rs b/examples/s3/src/main.rs index fb544c34f5d..5aa24431dd9 100644 --- a/examples/s3/src/main.rs +++ b/examples/s3/src/main.rs @@ -1,10 +1,7 @@ use arrow2::array::{Array, Int64Array}; -use arrow2::datatypes::DataType; use arrow2::error::Result; -use arrow2::io::parquet::read::{ - decompress, get_page_stream, page_stream_to_array, read_metadata_async, -}; -use futures::{future::BoxFuture, StreamExt}; +use arrow2::io::parquet::read; +use futures::future::BoxFuture; use range_reader::{RangeOutput, RangedAsyncReader}; use s3::Bucket; @@ -19,6 +16,7 @@ async fn main() -> Result<()> { let length = data.content_length.unwrap() as usize; println!("total size in bytes: {}", length); + // this maps a range (start, length) to a s3 ranged-bytes request let range_get = Box::new(move |start: u64, length: usize| { let bucket = bucket.clone(); let path = path.clone(); @@ -39,26 +37,50 @@ async fn main() -> Result<()> { }); // at least 4kb per s3 request. Adjust to your liking. - let mut reader = RangedAsyncReader::new(length, 4 * 1024, range_get); + let min_request_size = 4 * 1024; - let metadata = read_metadata_async(&mut reader).await?; + // this factory is used to create multiple http requests concurrently. Each + // task will get its own `RangedAsyncReader`. + let reader_factory = || { + Box::pin(futures::future::ready(Ok(RangedAsyncReader::new( + length, + min_request_size, + range_get.clone(), + )))) as BoxFuture<'static, std::result::Result> + }; - // metadata - println!("{}", metadata.num_rows); + // we need one reader to read the files' metadata + let mut reader = reader_factory().await?; - // pages of the first row group and first column - // This is IO bounded and SHOULD be done in a shared thread pool (e.g. Tokio) - let column_metadata = &metadata.row_groups[0].columns()[0]; - let pages = get_page_stream(column_metadata, &mut reader, None, vec![]).await?; + let metadata = read::read_metadata_async(&mut reader).await?; - // decompress the pages. This is CPU bounded and SHOULD be done in a dedicated thread pool (e.g. Rayon) - let pages = pages.map(|compressed_page| decompress(compressed_page?, &mut vec![])); + let schema = read::infer_schema(&metadata)?; - // deserialize the pages. This is CPU bounded and SHOULD be done in a dedicated thread pool (e.g. Rayon) - let array = - page_stream_to_array(pages, &metadata.row_groups[0].columns()[0], DataType::Int64).await?; + println!("total number of rows: {}", metadata.num_rows); + println!("Infered Arrow schema: {:#?}", schema); - let array = array.as_any().downcast_ref::().unwrap(); + // let's read the first row group only. Iterate over them to your liking + let group = &metadata.row_groups[0]; + + // no chunk size in deserializing + let chunk_size = None; + + // this is IO-bounded (and issues a join, thus the reader_factory) + let column_chunks = + read::read_columns_many_async(reader_factory, group, schema.fields, chunk_size).await?; + + // this is CPU-bounded and should be sent to a separate thread-pool. + // We do it here for simplicity + let chunks = read::RowGroupDeserializer::new(column_chunks, group.num_rows() as usize, None); + let chunks = chunks.collect::>>()?; + + // this is a single chunk because chunk_size is `None` + let chunk = &chunks[0]; + + let array = chunk.arrays()[0] + .as_any() + .downcast_ref::() + .unwrap(); // ... and have fun with it. println!("len: {}", array.len()); println!("null_count: {}", array.null_count());