Skip to content

Commit

Permalink
csv: cast columns concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 7, 2022
1 parent 152ced8 commit 814eac5
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 19 deletions.
3 changes: 1 addition & 2 deletions polars/polars-io/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,7 @@ where
df = parse_dates(df, &*fixed_schema)
}

// TODO: parallelize this?
cast_columns(&mut df, &to_cast_local)?;
cast_columns(&mut df, &to_cast_local, true)?;
Ok(df)
}
}
Expand Down
52 changes: 35 additions & 17 deletions polars/polars-io/src/csv_core/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,39 @@ use std::fmt;
use std::sync::atomic::Ordering;
use std::sync::{atomic::AtomicUsize, Arc};

pub(crate) fn cast_columns(df: &mut DataFrame, to_cast: &[&Field]) -> Result<()> {
// cast to the original dtypes in the schema
for fld in to_cast {
use DataType::*;
df.may_apply(fld.name(), |s| match (s.dtype(), fld.data_type()) {
#[cfg(feature = "temporal")]
(Utf8, Date) => s.utf8().unwrap().as_date(None).map(|ca| ca.into_series()),
#[cfg(feature = "temporal")]
(Utf8, Datetime(tu, _)) => s
.utf8()
.unwrap()
.as_datetime(None, *tu)
.map(|ca| ca.into_series()),
(_, dt) => s.cast(dt),
})?;
pub(crate) fn cast_columns(df: &mut DataFrame, to_cast: &[&Field], parallel: bool) -> Result<()> {
use DataType::*;

let cast_fn = |s: &Series, fld: &Field| match (s.dtype(), fld.data_type()) {
#[cfg(feature = "temporal")]
(Utf8, Date) => s.utf8().unwrap().as_date(None).map(|ca| ca.into_series()),
#[cfg(feature = "temporal")]
(Utf8, Datetime(tu, _)) => s
.utf8()
.unwrap()
.as_datetime(None, *tu)
.map(|ca| ca.into_series()),
(_, dt) => s.cast(dt),
};

if parallel {
let cols = df
.get_columns()
.iter()
.map(|s| {
if let Some(fld) = to_cast.iter().find(|fld| fld.name().as_str() == s.name()) {
cast_fn(s, fld)
} else {
Ok(s.clone())
}
})
.collect::<Result<Vec<_>>>()?;
*df = DataFrame::new_no_checks(cols)
} else {
// cast to the original dtypes in the schema
for fld in to_cast {
df.may_apply(fld.name(), |s| cast_fn(s, fld))?;
}
}
Ok(())
}
Expand Down Expand Up @@ -503,7 +521,7 @@ impl<'a> CoreReader<'a> {
}

df.map(|mut df| {
cast_columns(&mut df, self.to_cast)?;
cast_columns(&mut df, self.to_cast, false)?;
Ok(df)
})
.transpose()
Expand Down Expand Up @@ -581,7 +599,7 @@ impl<'a> CoreReader<'a> {
.collect::<Result<_>>()?,
);

cast_columns(&mut df, self.to_cast)?;
cast_columns(&mut df, self.to_cast, false)?;
Ok(df)
})
.collect::<Result<Vec<_>>>()
Expand Down

0 comments on commit 814eac5

Please sign in to comment.