Skip to content

Commit

Permalink
Migrated parquet and ipc.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored and ritchie46 committed Jun 25, 2021
1 parent 80f0791 commit 58fd5e5
Show file tree
Hide file tree
Showing 18 changed files with 132 additions and 160 deletions.
2 changes: 1 addition & 1 deletion polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ description = "Arrow interfaces for Polars DataFrame library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "d5bf08d52294688869e1e594c5cacd99663fd94b", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "7f2c3c1b8eab991be8b2bf37288036b8010fa02c", default-features = false }
thiserror = "^1.0"
num = "^0.4"
26 changes: 0 additions & 26 deletions polars/polars-arrow/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,29 +169,3 @@ pub trait ListFromIter {
}
}
impl ListFromIter for ListArray<i64> {}

#[cfg(test)]
mod test {
use super::*;
use arrow::array::{Array, Int32Array};

#[test]
fn test_is_null() {
let arr = Int32Array::from(vec![Some(0), None, Some(2)]);
let a: &dyn Array = &arr;
assert_eq!(
a.is_null_mask()
.iter()
.map(|v| v.unwrap())
.collect::<Vec<_>>(),
&[false, true, false]
);
assert_eq!(
a.is_not_null_mask()
.iter()
.map(|v| v.unwrap())
.collect::<Vec<_>>(),
&[true, false, true]
);
}
}
6 changes: 3 additions & 3 deletions polars/polars-arrow/src/kernels/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ pub fn set_at_idx_no_null<T, I>(
set_value: T,
data_type: DataType,
) -> Result<PrimitiveArray<T>>
where
T: NativeType,
I: IntoIterator<Item = usize>,
where
T: NativeType,
I: IntoIterator<Item = usize>,
{
let mut buf = MutableBuffer::with_capacity(array.len());
buf.extend_from_slice(array.values().as_slice());
Expand Down
22 changes: 11 additions & 11 deletions polars/polars-arrow/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ pub struct TrustMyLength<I: Iterator<Item = J>, J> {
}

impl<I, J> TrustMyLength<I, J>
where
I: Iterator<Item = J>,
where
I: Iterator<Item = J>,
{
#[inline]
pub fn new(iter: I, len: usize) -> Self {
Expand All @@ -17,8 +17,8 @@ impl<I, J> TrustMyLength<I, J>
}

impl<I, J> Iterator for TrustMyLength<I, J>
where
I: Iterator<Item = J>,
where
I: Iterator<Item = J>,
{
type Item = J;

Expand All @@ -35,8 +35,8 @@ impl<I, J> Iterator for TrustMyLength<I, J>
impl<I, J> ExactSizeIterator for TrustMyLength<I, J> where I: Iterator<Item = J> {}

impl<I, J> DoubleEndedIterator for TrustMyLength<I, J>
where
I: Iterator<Item = J> + DoubleEndedIterator,
where
I: Iterator<Item = J> + DoubleEndedIterator,
{
#[inline]
fn next_back(&mut self) -> Option<Self::Item> {
Expand All @@ -56,17 +56,17 @@ unsafe impl<I, J> arrow::trusted_len::TrustedLen for TrustMyLength<I, J> where I

pub trait CustomIterTools: Iterator {
fn fold_first_<F>(mut self, f: F) -> Option<Self::Item>
where
Self: Sized,
F: FnMut(Self::Item, Self::Item) -> Self::Item,
where
Self: Sized,
F: FnMut(Self::Item, Self::Item) -> Self::Item,
{
let first = self.next()?;
Some(self.fold(first, f))
}

fn trust_my_length(self, length: usize) -> TrustMyLength<Self, Self::Item>
where
Self: Sized,
where
Self: Sized,
{
TrustMyLength::new(self, length)
}
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ docs-selection = [
]

[dependencies]
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "d5bf08d52294688869e1e594c5cacd99663fd94b", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "7f2c3c1b8eab991be8b2bf37288036b8010fa02c", default-features = false }
polars-arrow = {version = "0.14.2", path = "../polars-arrow"}
thiserror = "1.0"
num = "^0.4"
Expand Down
9 changes: 7 additions & 2 deletions polars/polars-core/src/chunked_array/ops/repeat_by.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use super::RepeatBy;
use crate::prelude::*;
use arrow::array::LargeListArray;
use arrow::array::ListArray;
use polars_arrow::array::ListFromIter;

type LargeListArray = ListArray<i64>;

impl<T> RepeatBy for ChunkedArray<T>
where
T: PolarsNumericType,
Expand All @@ -18,7 +20,10 @@ where
ListChunked::new_from_chunks(
self.name(),
vec![Arc::new(unsafe {
LargeListArray::from_iter_primitive_trusted_len::<T, _, _>(iter)
LargeListArray::from_iter_primitive_trusted_len::<T::Native, _, _>(
iter,
T::get_dtype().to_arrow(),
)
})],
)
}
Expand Down
4 changes: 3 additions & 1 deletion polars/polars-core/src/chunked_array/upstream_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::prelude::*;
use crate::utils::NoNull;
use crate::utils::{get_iter_capacity, CustomIterTools};
use arrow::array::{BooleanArray, PrimitiveArray, Utf8Array};
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::buffer::MutableBuffer;
use polars_arrow::utils::TrustMyLength;
use rayon::iter::{FromParallelIterator, IntoParallelIterator};
Expand Down Expand Up @@ -206,6 +205,9 @@ where
#[cfg(feature = "object")]
impl<T: PolarsObject> FromIterator<Option<T>> for ObjectChunked<T> {
fn from_iter<I: IntoIterator<Item = Option<T>>>(iter: I) -> Self {
use arrow::bitmap::Bitmap;
use arrow::bitmap::MutableBitmap;

let iter = iter.into_iter();
let size = iter.size_hint().0;
let mut null_mask_builder = MutableBitmap::with_capacity(size);
Expand Down
3 changes: 2 additions & 1 deletion polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ csv-file = ["csv", "csv-core", "memmap", "fast-float", "lexical", "arrow/io_csv"
fmt = ["polars-core/plain_fmt"]

[dependencies]
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "d5bf08d52294688869e1e594c5cacd99663fd94b" }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "7f2c3c1b8eab991be8b2bf37288036b8010fa02c" }
polars-core = {version = "0.14.2", path = "../polars-core", features = ["private"], default-features=false}
polars-arrow = {version = "0.14.2", path = "../polars-arrow"}
csv = {version="1.1", optional=true}
lexical = {version = "5.2", optional = true}
num_cpus = "1.13.0"
Expand Down
24 changes: 14 additions & 10 deletions polars/polars-io/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
use crate::csv_core::csv::{build_csv_reader, SequentialReader};
use crate::utils::to_arrow_compatible_df;
use crate::{SerReader, SerWriter};
pub use arrow::io::csv::write::WriterBuilder;
pub use arrow::io::csv::write;
use polars_core::prelude::*;
use std::fs::File;
use std::io::{Read, Seek, Write};
Expand All @@ -69,7 +69,9 @@ pub struct CsvWriter<'a, W: Write> {
/// File or Stream handler
buffer: &'a mut W,
/// Builds an Arrow CSV Writer
writer_builder: WriterBuilder,
writer_builder: write::WriterBuilder,
/// arrow specific options
options: write::SerializeOptions,
}

impl<'a, W> SerWriter<'a, W> for CsvWriter<'a, W>
Expand All @@ -79,17 +81,19 @@ where
fn new(buffer: &'a mut W) -> Self {
CsvWriter {
buffer,
writer_builder: WriterBuilder::new(),
writer_builder: write::WriterBuilder::new(),
options: write::SerializeOptions::default(),
}
}

fn finish(self, df: &DataFrame) -> Result<()> {
let df = to_arrow_compatible_df(df);
let mut csv_writer = self.writer_builder.build(self.buffer);
let mut writer = self.writer_builder.from_writer(self.buffer);

let iter = df.iter_record_batches();
write::write_header(&mut writer, &df.schema().to_arrow())?;
for batch in iter {
csv_writer.write(&batch)?
write::write_batch(&mut writer, &batch, &self.options)?;
}
Ok(())
}
Expand All @@ -101,31 +105,31 @@ where
{
/// Set whether to write headers
pub fn has_headers(mut self, has_headers: bool) -> Self {
self.writer_builder = self.writer_builder.has_headers(has_headers);
self.writer_builder.has_headers(has_headers);
self
}

/// Set the CSV file's column delimiter as a byte character
pub fn with_delimiter(mut self, delimiter: u8) -> Self {
self.writer_builder = self.writer_builder.with_delimiter(delimiter);
self.writer_builder.delimiter(delimiter);
self
}

/// Set the CSV file's date format
pub fn with_date_format(mut self, format: String) -> Self {
self.writer_builder = self.writer_builder.with_date_format(format);
self.options.date_format = format;
self
}

/// Set the CSV file's time format
pub fn with_time_format(mut self, format: String) -> Self {
self.writer_builder = self.writer_builder.with_time_format(format);
self.options.time_format = format;
self
}

/// Set the CSV file's timestamp format array in
pub fn with_timestamp_format(mut self, format: String) -> Self {
self.writer_builder = self.writer_builder.with_timestamp_format(format);
self.options.timestamp_format = format;
self
}

Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/csv_core/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ trait ToPolarsError: Debug {
impl ToPolarsError for lexical::Error {}
impl ToPolarsError for fast_float::Error {}

pub(crate) trait PrimitiveParser: ArrowPrimitiveType {
pub(crate) trait PrimitiveParser: PolarsPrimitiveType {
fn parse(bytes: &[u8]) -> Result<Self::Native>;
}

Expand Down
1 change: 1 addition & 0 deletions polars/polars-io/src/csv_core/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::csv_core::{buffer::*, parser::*};
use crate::PhysicalIoExpr;
use crate::ScanAggregation;
use csv::ByteRecordsIntoIter;
use polars_arrow::array::ValueSize;
use polars_core::utils::accumulate_dataframes_vertical;
use polars_core::{prelude::*, POOL};
use rayon::prelude::*;
Expand Down
19 changes: 8 additions & 11 deletions polars/polars-io/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@
use super::{finish_reader, ArrowReader, ArrowResult, RecordBatch};
use crate::prelude::*;
use crate::utils::to_arrow_compatible_df;
use arrow::io::ipc::{
read::FileReader as ArrowIPCFileReader, write::FileWriter as ArrowIPCFileWriter,
};
use arrow::io::ipc::{read, write};
use polars_core::prelude::*;
use std::io::{Read, Seek, Write};
use std::sync::Arc;
Expand Down Expand Up @@ -65,7 +63,7 @@ pub struct IpcReader<R> {
rechunk: bool,
}

impl<R> ArrowReader for ArrowIPCFileReader<R>
impl<'a, R> ArrowReader for read::FileReader<'a, R>
where
R: Read + Seek,
{
Expand All @@ -74,7 +72,7 @@ where
}

fn schema(&self) -> Arc<Schema> {
Arc::new((&*self.schema()).into())
Arc::new((&**self.schema()).into())
}
}

Expand All @@ -93,9 +91,10 @@ where
self
}

fn finish(self) -> Result<DataFrame> {
fn finish(mut self) -> Result<DataFrame> {
let rechunk = self.rechunk;
let ipc_reader = ArrowIPCFileReader::try_new(self.reader)?;
let metadata = read::read_file_metadata(&mut self.reader)?;
let ipc_reader = read::FileReader::new(&mut self.reader, metadata);
finish_reader(ipc_reader, rechunk, None, None, None)
}
}
Expand Down Expand Up @@ -132,7 +131,7 @@ where

fn finish(self, df: &DataFrame) -> Result<()> {
let df = to_arrow_compatible_df(df);
let mut ipc_writer = ArrowIPCFileWriter::try_new(self.writer, &df.schema().to_arrow())?;
let mut ipc_writer = write::FileWriter::try_new(self.writer, &df.schema().to_arrow())?;

let iter = df.iter_record_batches();

Expand All @@ -156,9 +155,7 @@ mod test {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let mut df = create_df();

IpcWriter::new(&mut buf)
.finish(&mut df)
.expect("ipc writer");
IpcWriter::new(&mut buf).finish(&df).expect("ipc writer");

buf.set_position(0);

Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl<R: Read> ArrowReader for ArrowJsonReader<R> {
}

fn schema(&self) -> Arc<Schema> {
Arc::new((&*self.schema()).into())
Arc::new((&**self.schema()).into())
}
}

Expand Down

0 comments on commit 58fd5e5

Please sign in to comment.