Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added example
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Oct 31, 2021
1 parent c236700 commit d020b4d
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 8 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Expand Up @@ -83,7 +83,8 @@ flate2 = "1"
doc-comment = "0.3"
crossbeam-channel = "0.5.1"
# used to test async readers
tokio = { version = "1", features = ["macros", "rt"] }
tokio = { version = "1", features = ["macros", "rt", "fs"] }
tokio-util = { version = "0.6", features = ["compat"] }
# used to run formal property testing
proptest = { version = "1", default_features = false, features = ["std"] }

Expand Down
36 changes: 36 additions & 0 deletions examples/csv_read_async.rs
@@ -0,0 +1,36 @@
use std::sync::Arc;

use futures::io::Cursor;
use tokio::fs::File;
use tokio_util::compat::*;

use arrow2::array::*;
use arrow2::error::Result;
use arrow2::io::csv::read_async::*;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];

let file = File::open(file_path).await?.compat();

let mut reader = AsyncReaderBuilder::new().create_reader(file);

let schema = Arc::new(infer_schema(&mut reader, None, true, &infer).await?);

let mut rows = vec![ByteRecord::default(); 100];
let rows_read = read_rows(&mut reader, 0, &mut rows).await?;

let batch = deserialize_batch(
&rows[..rows_read],
schema.fields(),
None,
0,
deserialize_column,
)?;
println!("{}", batch.column(0));
Ok(())
}
13 changes: 13 additions & 0 deletions guide/src/io/csv_reader.md
Expand Up @@ -30,6 +30,19 @@ thereby maximizing IO throughput. The example below shows how to do just that:
{{#include ../../../examples/csv_read_parallel.rs}}
```

## Async

This crate also supports reading from a CSV asyncronously through the `csv-async` crate.
The example below demonstrates this:

```rust
{{#include ../../../examples/csv_read_async.rs}}
```

Note that the deserialization _should_ be performed on a separate thread to not
block (see also [here](https://ryhl.io/blog/async-what-is-blocking/)), which this
example does not show.

## Customization

In the code above, `parser` and `infer` allow for customization: they declare
Expand Down
9 changes: 4 additions & 5 deletions src/io/csv/mod.rs
@@ -1,17 +1,16 @@
#![deny(missing_docs)]
//! Transfer data between the Arrow memory format and CSV (comma-separated values).
//! Convert data between the Arrow and CSV (comma-separated values).

use crate::error::ArrowError;

pub use csv::Error as CSVError;

#[cfg(any(feature = "io_csv_read_async", feature = "io_csv_read"))]
mod read_utils;
#[cfg(any(feature = "io_csv_read_async", feature = "io_csv_read"))]
mod utils;

impl From<CSVError> for ArrowError {
fn from(error: CSVError) -> Self {
#[cfg(any(feature = "io_csv_read", feature = "io_csv_write"))]
impl From<csv::Error> for ArrowError {
fn from(error: csv::Error) -> Self {
ArrowError::External("".to_string(), Box::new(error))
}
}
Expand Down
15 changes: 13 additions & 2 deletions src/io/mod.rs
@@ -1,7 +1,18 @@
//! Contains modules to interface with other formats such as [`csv`],
//! [`parquet`], [`json`], [`ipc`], [`mod@print`] and [`avro`].
#[cfg(any(feature = "io_csv_read", feature = "io_csv_write"))]
#[cfg_attr(docsrs, doc(cfg(feature = "io_csv")))]
#[cfg(any(
feature = "io_csv_read",
feature = "io_csv_read_async",
feature = "io_csv_write",
))]
#[cfg_attr(
docsrs,
doc(cfg(any(
feature = "io_csv_read",
feature = "io_csv_read_async",
feature = "io_csv_write",
)))
)]
pub mod csv;

#[cfg(feature = "io_json")]
Expand Down

0 comments on commit d020b4d

Please sign in to comment.