Skip to content

Commit

Permalink
csv parser: cast on the threadpool instead of when done parsing (#2154)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 24, 2021
1 parent 8f9d351 commit cf866eb
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 21 deletions.
21 changes: 3 additions & 18 deletions polars/polars-io/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,25 +508,9 @@ where
self.null_values,
self.predicate,
self.aggregate,
&to_cast,
)?;
let mut df = csv_reader.as_df()?;

// cast to the original dtypes in the schema
for fld in to_cast {
use DataType::*;
df.may_apply(fld.name(), |s| match (s.dtype(), fld.data_type()) {
#[cfg(feature = "temporal")]
(Utf8, Date) => s.utf8().unwrap().as_date(None).map(|ca| ca.into_series()),
#[cfg(feature = "temporal")]
(Utf8, Datetime) => s
.utf8()
.unwrap()
.as_datetime(None)
.map(|ca| ca.into_series()),
(_, dt) => s.cast(dt),
})?;
}
df
csv_reader.as_df()?
} else {
let reader_bytes = get_reader_bytes(&mut self.reader)?;
let mut csv_reader = CoreReader::new(
Expand All @@ -552,6 +536,7 @@ where
self.null_values,
self.predicate,
self.aggregate,
&[],
)?;
csv_reader.as_df()?
};
Expand Down
36 changes: 33 additions & 3 deletions polars/polars-io/src/csv_core/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,25 @@ use std::fmt;
use std::sync::atomic::Ordering;
use std::sync::{atomic::AtomicUsize, Arc};

pub fn to_cast(df: &mut DataFrame, to_cast: &[&Field]) -> Result<()> {
// cast to the original dtypes in the schema
for fld in to_cast {
use DataType::*;
df.may_apply(fld.name(), |s| match (s.dtype(), fld.data_type()) {
#[cfg(feature = "temporal")]
(Utf8, Date) => s.utf8().unwrap().as_date(None).map(|ca| ca.into_series()),
#[cfg(feature = "temporal")]
(Utf8, Datetime) => s
.utf8()
.unwrap()
.as_datetime(None)
.map(|ca| ca.into_series()),
(_, dt) => s.cast(dt),
})?;
}
Ok(())
}

/// CSV file reader
pub(crate) struct CoreReader<'a> {
reader_bytes: Option<ReaderBytes<'a>>,
Expand All @@ -38,6 +57,7 @@ pub(crate) struct CoreReader<'a> {
null_values: Option<Vec<String>>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&'a [ScanAggregation]>,
to_cast: &'a [&'a Field],
}

impl<'a> fmt::Debug for CoreReader<'a> {
Expand Down Expand Up @@ -123,6 +143,7 @@ impl<'a> CoreReader<'a> {
null_values: Option<NullValues>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&'a [ScanAggregation]>,
to_cast: &'a [&'a Field],
) -> Result<CoreReader<'a>> {
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
let mut reader_bytes = reader_bytes;
Expand Down Expand Up @@ -215,6 +236,7 @@ impl<'a> CoreReader<'a> {
null_values,
predicate,
aggregate,
to_cast,
})
}

Expand Down Expand Up @@ -478,7 +500,11 @@ impl<'a> CoreReader<'a> {
}
}

Ok(df)
df.map(|mut df| {
to_cast(&mut df, self.to_cast)?;
Ok(df)
})
.transpose()
})
.collect::<Result<Vec<_>>>()
})?;
Expand Down Expand Up @@ -545,12 +571,16 @@ impl<'a> CoreReader<'a> {
usize::MAX,
)?;
}
Ok(DataFrame::new_no_checks(

let mut df = DataFrame::new_no_checks(
buffers
.into_iter()
.map(|buf| buf.into_series())
.collect::<Result<_>>()?,
))
);

to_cast(&mut df, self.to_cast)?;
Ok(df)
})
.collect::<Result<Vec<_>>>()
})?;
Expand Down

0 comments on commit cf866eb

Please sign in to comment.