Skip to content

Commit

Permalink
read parquet files: #61
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Aug 27, 2020
1 parent 2dd5a51 commit 8a7ca91
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ script:

after_success: |
cd .. && \
cargo doc --no-deps --package polars \
cargo doc --no-deps --all-features --package polars \
&& echo '<meta http-equiv=refresh content=0;url=polars/index.html>' > target/doc/index.html && \
sudo pip install ghp-import && \
ghp-import -n target/doc && \
Expand Down
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ structs in that order. For `ChunkedArray` a lot of functionality is also defined

## Functionality

### Read and write CSV/ JSON
### Read and write CSV | JSON | IPC | Parquet

```rust
use polars::prelude::*;
Expand Down Expand Up @@ -159,7 +159,9 @@ Additional cargo features:

* `pretty` (default)
- pretty printing of DataFrames
* `temporal (default)`
- Conversions between Chrono and Polars for temporal data
* `simd`
- SIMD operations
* `temporal`
- Conversions between [Chrono](https://docs.rs/chrono/latest/chrono/) and Polars for temporal data
* `paquet_ser`
- Read Apache Parquet format
4 changes: 3 additions & 1 deletion polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pretty = ["prettytable-rs"]
simd = ["arrow/packed_simd"]
docs = []
temporal = ["chrono"]
parquet_ser = ["parquet"]
default = ["pretty", "docs", "temporal"]

[dependencies]
Expand All @@ -28,4 +29,5 @@ rayon = "^1.3.1"
prettytable-rs = { version="^0.8.0", features=["win_crlf"], optional = true, default_features = false}
crossbeam = "^0.7"
chrono = {version = "^0.4.13", optional = true}
enum_dispatch = "^0.3.2"
enum_dispatch = "^0.3.2"
parquet = {version = "1.0.1", optional = true}
3 changes: 3 additions & 0 deletions polars/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub enum PolarsError {
NoData,
#[error("Memory should be 64 byte aligned")]
MemoryNotAligned,
#[cfg(feature = "parquet_ser")]
#[error(transparent)]
ParquetError(#[from] parquet::errors::ParquetError),
}

pub type Result<T> = std::result::Result<T, PolarsError>;
12 changes: 6 additions & 6 deletions polars/src/frame/ser/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ where
/// Write a DataFrame to Arrow's IPC format
pub struct IPCWriter<'a, W> {
writer: &'a mut W,
buffer_size: usize,
batch_size: usize,
}

impl<'a, W> IPCWriter<'a, W> {
/// Set the size of the write buffers. Buffer size is the amount of rows written at once.
pub fn with_buffer_size(mut self, buffer_size: usize) -> Self {
self.buffer_size = buffer_size;
/// Set the size of the write buffer. Batch size is the amount of rows written at once.
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
}
Expand All @@ -104,14 +104,14 @@ where
fn new(writer: &'a mut W) -> Self {
IPCWriter {
writer,
buffer_size: 1000,
batch_size: 1000,
}
}

fn finish(self, df: &mut DataFrame) -> Result<()> {
let mut ipc_writer = ArrowIPCFileWriter::try_new(self.writer, &df.schema)?;

let iter = df.iter_record_batches(self.buffer_size);
let iter = df.iter_record_batches(self.batch_size);

for batch in iter {
ipc_writer.write(&batch)?
Expand Down
2 changes: 2 additions & 0 deletions polars/src/frame/ser/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
pub mod csv;
pub mod ipc;
pub mod json;
#[cfg(feature = "parquet_ser")]
pub mod parquet;
use crate::prelude::*;
use arrow::{
csv::Reader as ArrowCsvReader, error::Result as ArrowResult, json::Reader as ArrowJsonReader,
Expand Down
93 changes: 93 additions & 0 deletions polars/src/frame/ser/parquet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//! # Reading Apache parquet files.
//!
//! ## Example
//!
//! ```rust
//! use polars::prelude::*;
//! use std::fs::File;
//!
//! fn example() -> Result<DataFrame> {
//! let r = File::open("some_file.parquet")?;
//! let reader = ParquetReader::new(r);
//! reader.finish()
//! }
//! ```
//!
use super::{finish_reader, ArrowReader, ArrowResult, RecordBatch};
use crate::prelude::*;
use arrow::record_batch::RecordBatchReader;
use parquet::arrow::{
arrow_reader::ParquetRecordBatchReader, ArrowReader as ParquetArrowReader,
ParquetFileArrowReader,
};
use parquet::file::reader::SerializedFileReader;
use std::io::{Read, Seek};
use std::rc::Rc;
use std::sync::Arc;

/// Read Apache parquet format into a DataFrame.
pub struct ParquetReader<R> {
reader: R,
rechunk: bool,
batch_size: usize,
}

impl ArrowReader for ParquetRecordBatchReader {
fn next(&mut self) -> ArrowResult<Option<RecordBatch>> {
self.next_batch()
}

fn schema(&self) -> Arc<Schema> {
<Self as RecordBatchReader>::schema(self)
}
}

impl<R> ParquetReader<R> {
/// Set the size of the read buffer. Batch size is the amount of rows read at once.
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
}

impl<R> SerReader<R> for ParquetReader<R>
where
R: 'static + Read + Seek + parquet::file::reader::Length + parquet::file::reader::TryClone,
{
fn new(reader: R) -> Self {
ParquetReader {
reader,
rechunk: true,
batch_size: 2048,
}
}

fn set_rechunk(mut self, rechunk: bool) -> Self {
self.rechunk = rechunk;
self
}

fn finish(self) -> Result<DataFrame> {
let rechunk = self.rechunk;

let file_reader = Rc::new(SerializedFileReader::new(self.reader)?);
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
let record_reader = arrow_reader.get_record_reader(self.batch_size)?;
finish_reader(record_reader, rechunk)
}
}

#[cfg(test)]
mod test {
use crate::prelude::*;
use std::fs::File;

#[test]
fn test_parquet() {
let r = File::open("data/simple.parquet").unwrap();
let reader = ParquetReader::new(r);
let df = reader.finish().unwrap();
assert_eq!(df.columns(), ["a", "b"]);
assert_eq!(df.shape(), (3, 2));
}
}
8 changes: 5 additions & 3 deletions polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
//! * [the csv module](frame/ser/csv/index.html)
//! * [the json module](frame/ser/json/index.html)
//! * [the IPC module](frame/ser/ipc/index.html)
//!
//! * [the parquet module](frame/ser/parquet/index.html)
//!
//! ## Joins
//!
Expand Down Expand Up @@ -149,10 +149,12 @@
//!
//! * `pretty` (default)
//! - pretty printing of DataFrames
//! * `temporal (default)`
//! - Conversions between Chrono and Polars for temporal data
//! * `simd`
//! - SIMD operations
//! * `temporal`
//! - Conversions between Chrono and Polars for temporal data
//! * `paquet_ser`
//! - Read Apache Parquet format
#![allow(dead_code)]
#![feature(iterator_fold_self)]
#[macro_use]
Expand Down
2 changes: 2 additions & 0 deletions polars/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,5 @@ pub(crate) fn create_df() -> DataFrame {
let s1 = Series::new("temp", [22.1, 19.9, 7., 2., 3.].as_ref());
DataFrame::new(vec![s0, s1]).unwrap()
}
#[cfg(feature = "parquet_ser")]
pub use crate::frame::ser::parquet::ParquetReader;

0 comments on commit 8a7ca91

Please sign in to comment.