Skip to content

Commit

Permalink
improve transpose performance (#2466)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 25, 2022
1 parent c15ecdd commit 201c65a
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 71 deletions.
29 changes: 2 additions & 27 deletions polars/polars-core/src/frame/groupby/pivot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use rayon::prelude::*;
use std::cmp::Ordering;

use crate::frame::groupby::{GroupsIndicator, GroupsProxy};
use crate::frame::row::AnyValueBuffer;
use crate::POOL;
#[cfg(feature = "dtype-date")]
use arrow::temporal_conversions::date32_to_date;
Expand Down Expand Up @@ -249,6 +248,7 @@ impl DataFrame {
})
.collect::<Vec<_>>()
});
let results = DataFrame::new_no_checks(result_columns);

let mut dtype = self
.column(&values[column_index])
Expand All @@ -261,32 +261,7 @@ impl DataFrame {
(_, PivotAgg::Count) => dtype = DataType::UInt32,
_ => {}
}
let len = result_columns.len();
let mut buffers = (0..unique_vals.len())
.map(|_| {
let buf: AnyValueBuffer = (&dtype, len).into();
buf
})
.collect::<Vec<_>>();

// this is very expensive. A lot of cache misses here.
// This is the part that is performance critical.
result_columns.iter().for_each(|s| {
s.iter().zip(buffers.iter_mut()).for_each(|(av, buf)| {
let _out = buf.add(av);
debug_assert!(_out.is_some());
});
});
let cols = buffers
.into_iter()
.enumerate()
.map(|(i, buf)| {
let mut s = buf.into_series();
s.rename(&format!("{i}"));
s
})
.collect::<Vec<_>>();
let mut out = DataFrame::new_no_checks(cols);
let mut out = results.transpose_from_dtype(&dtype).unwrap();

// add the headers based on the unique vals
let headers = unique_vals.cast(&DataType::Utf8).unwrap();
Expand Down
14 changes: 11 additions & 3 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use rayon::prelude::*;

use crate::chunked_array::ops::unique::is_unique_helper;
use crate::prelude::*;
use crate::utils::{accumulate_dataframes_horizontal, split_ca, split_df, NoNull};
use crate::utils::{accumulate_dataframes_horizontal, get_supertype, split_ca, split_df, NoNull};

#[cfg(feature = "dataframe_arithmetic")]
mod arithmetic;
Expand Down Expand Up @@ -515,7 +515,7 @@ impl DataFrame {
/// # use polars_core::prelude::*;
/// let venus_air: DataFrame = df!("Element" => &["Carbon dioxide", "Nitrogen"],
/// "Fraction" => &[0.965, 0.035])?;
///
///
/// assert_eq!(venus_air.dtypes(), &[DataType::Utf8, DataType::Float64]);
/// # Ok::<(), PolarsError>(())
/// ```
Expand Down Expand Up @@ -548,7 +548,7 @@ impl DataFrame {
/// # use polars_core::prelude::*;
/// let earth: DataFrame = df!("Surface type" => &["Water", "Land"],
/// "Fraction" => &[0.708, 0.292])?;
///
///
/// let f1: Field = Field::new("Surface type", DataType::Utf8);
/// let f2: Field = Field::new("Fraction", DataType::Float64);
///
Expand Down Expand Up @@ -2711,6 +2711,14 @@ impl DataFrame {
}
Ok(acc_ca.rechunk())
}

/// Get the supertype of the columns in this DataFrame
pub fn get_supertype(&self) -> Option<Result<DataType>> {
self.columns
.iter()
.map(|s| Ok(s.dtype().clone()))
.reduce(|acc, b| get_supertype(&acc?, &b.unwrap()))
}
}

pub struct RecordBatchIter<'a> {
Expand Down
241 changes: 200 additions & 41 deletions polars/polars-core/src/frame/row.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::chunked_array::builder::get_list_builder;
use crate::prelude::*;
use crate::utils::get_supertype;
use crate::POOL;
use arrow::bitmap::Bitmap;
use rayon::prelude::*;
use std::fmt::{Debug, Formatter};

#[derive(Debug, Clone, PartialEq, Default)]
Expand Down Expand Up @@ -110,53 +112,67 @@ impl DataFrame {
Self::from_rows_and_schema(rows, &schema)
}

pub(crate) fn transpose_from_dtype(&self, dtype: &DataType) -> Result<DataFrame> {
let new_width = self.height();
let new_height = self.width();

match dtype {
#[cfg(feature = "dtype-i8")]
DataType::Int8 => numeric_transpose::<Int8Type>(&self.columns),
#[cfg(feature = "dtype-i16")]
DataType::Int16 => numeric_transpose::<Int16Type>(&self.columns),
DataType::Int32 => numeric_transpose::<Int32Type>(&self.columns),
DataType::Int64 => numeric_transpose::<Int64Type>(&self.columns),
#[cfg(feature = "dtype-u8")]
DataType::UInt8 => numeric_transpose::<UInt8Type>(&self.columns),
#[cfg(feature = "dtype-u16")]
DataType::UInt16 => numeric_transpose::<UInt16Type>(&self.columns),
DataType::UInt32 => numeric_transpose::<UInt32Type>(&self.columns),
DataType::UInt64 => numeric_transpose::<UInt64Type>(&self.columns),
DataType::Float32 => numeric_transpose::<Float32Type>(&self.columns),
DataType::Float64 => numeric_transpose::<Float64Type>(&self.columns),
_ => {
let mut buffers = (0..new_width)
.map(|_| {
let buf: AnyValueBuffer = (dtype, new_height).into();
buf
})
.collect::<Vec<_>>();

// this is very expensive. A lot of cache misses here.
// This is the part that is performance critical.
self.columns.iter().for_each(|s| {
let s = s.cast(dtype).unwrap();
s.iter().zip(buffers.iter_mut()).for_each(|(av, buf)| {
let _out = buf.add(av);
debug_assert!(_out.is_some());
});
});
let cols = buffers
.into_iter()
.enumerate()
.map(|(i, buf)| {
let mut s = buf.into_series();
s.rename(&format!("column_{i}"));
s
})
.collect::<Vec<_>>();
Ok(DataFrame::new_no_checks(cols))
}
}
}

#[cfg_attr(docsrs, doc(cfg(feature = "rows")))]
/// Transpose a DataFrame. This is a very expensive operation.
pub fn transpose(&self) -> Result<DataFrame> {
// TODO: if needed we could optimize this to specialized builders. Now we use AnyValue even though
// we cast all Series to same dtype. So we could patter match en create at once.

let height = self.height();
if height == 0 || self.width() == 0 {
let width = self.width();
if height == 0 || width == 0 {
return Err(PolarsError::NoData("empty dataframe".into()));
}

let dtype = self
.columns
.iter()
.map(|s| Ok(s.dtype().clone()))
.reduce(|a, b| get_supertype(&a?, &b?))
.unwrap()?;

let schema = Schema::new(
(0..height)
.map(|i| Field::new(format!("column_{}", i).as_ref(), dtype.clone()))
.collect(),
);

let row_container = vec![AnyValue::Null; height];
let mut row = Row(row_container);
let row_ptr = &row as *const Row;

let columns = self
.columns
.iter()
.map(|s| s.cast(&dtype))
.collect::<Result<Vec<_>>>()?;

let iter = columns.iter().map(|s| {
(0..s.len()).zip(row.0.iter_mut()).for_each(|(i, av)| {
// Safety:
// we iterate over the length of s, so we are in bounds
unsafe { *av = s.get_unchecked(i) };
});
// borrow checker does not allow row borrow, so we deref from raw ptr.
// we do all this to amortize allocs
// Safety:
// row is still alive
unsafe { &*row_ptr }
});
Self::from_rows_iter_and_schema(iter, &schema)
let dtype = self.get_supertype().unwrap()?;
self.transpose_from_dtype(&dtype)
}
}

Expand Down Expand Up @@ -375,3 +391,146 @@ impl From<(&DataType, usize)> for AnyValueBuffer {
}
}
}

fn numeric_transpose<T>(cols: &[Series]) -> Result<DataFrame>
where
T: PolarsNumericType,
ChunkedArray<T>: IntoSeries,
{
let new_width = cols[0].len();
let new_height = cols.len();

let has_nulls = cols.iter().any(|s| s.null_count() > 0);

let values_buf: Vec<Vec<T::Native>> = (0..new_width)
.map(|_| Vec::with_capacity(new_height))
.collect();
let validity_buf: Vec<_> = if has_nulls {
// we first use bools instead of bits, because we can access these in parallel without aliasing
(0..new_width).map(|_| vec![true; new_height]).collect()
} else {
(0..new_width).map(|_| vec![]).collect()
};

POOL.install(|| {
cols.iter().enumerate().for_each(|(row_idx, s)| {
let s = s.cast(&T::get_dtype()).unwrap();
let ca = s.unpack::<T>().unwrap();

// Safety
// we access in parallel, but every access is unique, so we don't break aliasing rules
// we also ensured we allocated enough memory, so we never reallocate and thus
// the pointers remain valid.
if has_nulls {
for (col_idx, opt_v) in ca.into_iter().enumerate() {
match opt_v {
None => unsafe {
let column = validity_buf.get_unchecked(col_idx);
let el_ptr = column.as_ptr() as *mut bool;
*el_ptr.add(row_idx) = false;
},
Some(v) => unsafe {
let column = values_buf.get_unchecked(col_idx);
let el_ptr = column.as_ptr() as *mut T::Native;
*el_ptr.add(row_idx) = v;
},
}
}
} else {
for (col_idx, v) in ca.into_no_null_iter().enumerate() {
unsafe {
let column = values_buf.get(col_idx).unwrap();
let el_ptr = column.as_ptr() as *mut T::Native;
*el_ptr.add(row_idx) = v;
}
}
}
})
});

let series = POOL.install(|| {
values_buf
.into_par_iter()
.zip(validity_buf)
.enumerate()
.map(|(i, (mut values, validity))| {
// Safety:
// all values are written we can now set len
unsafe {
values.set_len(new_height);
}

let validity = if has_nulls {
let validity = Bitmap::from_trusted_len_iter(validity.iter().copied());
if validity.null_count() > 0 {
Some(validity)
} else {
None
}
} else {
None
};

let arr = PrimitiveArray::<T::Native>::from_data(
T::get_dtype().to_arrow(),
values.into(),
validity,
);
let name = format!("column_{}", i);
ChunkedArray::<T>::new_from_chunks(&name, vec![Arc::new(arr) as ArrayRef])
.into_series()
})
.collect()
});

Ok(DataFrame::new_no_checks(series))
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_transpose() -> Result<()> {
let df = df![
"a" => [1, 2, 3],
"b" => [10, 20, 30],
]?;

let out = df.transpose()?;
let expected = df![
"column_0" => [1, 10],
"column_1" => [2, 20],
"column_2" => [3, 30],

]?;
assert!(out.frame_equal_missing(&expected));

let df = df![
"a" => [Some(1), None, Some(3)],
"b" => [Some(10), Some(20), None],
]?;
let out = df.transpose()?;
let expected = df![
"column_0" => [1, 10],
"column_1" => [None, Some(20)],
"column_2" => [Some(3), None],

]?;
assert!(out.frame_equal_missing(&expected));

let df = df![
"a" => ["a", "b", "c"],
"b" => [Some(10), Some(20), None],
]?;
let out = df.transpose()?;
let expected = df![
"column_0" => ["a", "10"],
"column_1" => ["b", "20"],
"column_2" => [Some("c"), None],

]?;
assert!(out.frame_equal_missing(&expected));
Ok(())
}
}

0 comments on commit 201c65a

Please sign in to comment.