Skip to content

Commit

Permalink
Update arrow (#1565)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 21, 2021
1 parent ee779f0 commit 50a93c7
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 23 deletions.
2 changes: 1 addition & 1 deletion polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ description = "Arrow interfaces for Polars DataFrame library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "0b3756802ed239505bf72661907dc064e73b9382", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "b62184d32a56465f145e4692e1a90fc4fe418d82", default-features = false }
#arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch="dev", default-features = false }
#arrow = { package = "arrow2", version = "0.5.3", default-features=false}
thiserror = "^1.0"
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ docs-selection = [
]

[dependencies]
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "0b3756802ed239505bf72661907dc064e73b9382", default-features = false, features=["compute"] }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "b62184d32a56465f145e4692e1a90fc4fe418d82", default-features = false, features=["compute"] }
#arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", default-features = false, features=["compute"], branch="dev" }
#arrow = { package = "arrow2", version="0.5.3", default-features = false, features=["compute"]}
polars-arrow = {version = "0.16.0", path = "../polars-arrow"}
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ temporal = ["polars-core/dtype-date", "polars-core/dtype-datetime"]
private = []

[dependencies]
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "0b3756802ed239505bf72661907dc064e73b9382", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "b62184d32a56465f145e4692e1a90fc4fe418d82", default-features = false }
#arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", default-features = false, branch="dev"}
#arrow = { package = "arrow2", version="0.5.3", --default-features=false }
polars-core = {version = "0.16.0", path = "../polars-core", features = ["private"], default-features=false}
Expand Down
65 changes: 50 additions & 15 deletions polars/polars-io/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ use super::{finish_reader, ArrowReader, ArrowResult, RecordBatch};
use crate::prelude::*;
use crate::{PhysicalIoExpr, ScanAggregation};
use arrow::datatypes::PhysicalType;
use arrow::io::parquet::write::{array_to_pages, DynIter, Encoding};
use arrow::io::parquet::{read, write};
use arrow::error::ArrowError;
use arrow::io::parquet::write::{array_to_pages, DynIter, DynStreamingIterator, Encoding};
use arrow::io::parquet::{
read,
write::{self, *},
};
use polars_core::prelude::*;
use rayon::prelude::*;
use std::collections::VecDeque;
use std::io::{Read, Seek, Write};
use std::sync::Arc;

Expand Down Expand Up @@ -119,6 +124,34 @@ where
}
}

struct Bla {
columns: VecDeque<CompressedPage>,
current: Option<CompressedPage>,
}

impl Bla {
pub fn new(columns: VecDeque<CompressedPage>) -> Self {
Self {
columns,
current: None,
}
}
}

impl FallibleStreamingIterator for Bla {
type Item = CompressedPage;
type Error = ArrowError;

fn advance(&mut self) -> ArrowResult<()> {
self.current = self.columns.pop_front();
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
self.current.as_ref()
}
}

/// Write a DataFrame to parquet format
///
/// # Example
Expand Down Expand Up @@ -187,20 +220,22 @@ where
.zip(parquet_schema_iter.columns().par_iter())
.zip(encodings.par_iter())
.map(|((array, descriptor), encoding)| {
let array = array.clone();

let pages =
array_to_pages(array, descriptor.clone(), options, *encoding).unwrap();
pages.collect::<Vec<_>>()
let encoded_pages =
array_to_pages(array.as_ref(), descriptor.clone(), options, *encoding)?;
encoded_pages
.map(|page| {
compress(page?, vec![], options.compression).map_err(|x| x.into())
})
.collect::<ArrowResult<VecDeque<_>>>()
})
.collect::<Vec<_>>();

let out = write::DynIter::new(columns.into_iter().map(|column| {
// one parquet page per array.
// we could use `array.slice()` to split it based on some number of rows.
Ok(DynIter::new(column.into_iter()))
}));
ArrowResult::Ok(out)
.collect::<ArrowResult<Vec<VecDeque<CompressedPage>>>>()?;

let row_group = DynIter::new(
columns
.into_iter()
.map(|column| Ok(DynStreamingIterator::new(Bla::new(column)))),
);
ArrowResult::Ok(row_group)
});

write::write_file(
Expand Down
34 changes: 29 additions & 5 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 50a93c7

Please sign in to comment.