Skip to content

Commit

Permalink
improve melt function (#2799)
Browse files Browse the repository at this point in the history
* improve melt function

* update lazy melt schema
  • Loading branch information
ritchie46 committed Mar 1, 2022
1 parent 4a31e0a commit 56e1f28
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 44 deletions.
30 changes: 30 additions & 0 deletions polars/polars-arrow/src/kernels/concatenate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use arrow::array::growable::make_growable;
use arrow::array::{Array, ArrayRef};
use arrow::error::{ArrowError, Result};
use std::sync::Arc;

/// Concatenate multiple [Array] of the same type into a single [`Array`].
/// This does not check the arrays types.
pub fn concatenate_owned_unchecked(arrays: &[ArrayRef]) -> Result<Arc<dyn Array>> {
if arrays.is_empty() {
return Err(ArrowError::InvalidArgumentError(
"concat requires input of at least one array".to_string(),
));
}
let mut arrays_ref = Vec::with_capacity(arrays.len());
let mut lengths = Vec::with_capacity(arrays.len());
let mut capacity = 0;
for array in arrays {
arrays_ref.push(&**array);
lengths.push(array.len());
capacity += array.len();
}

let mut mutable = make_growable(&arrays_ref, false, capacity);

for (i, len) in lengths.iter().enumerate() {
mutable.extend(i, 0, *len)
}

Ok(mutable.as_arc())
}
1 change: 1 addition & 0 deletions polars/polars-arrow/src/kernels/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use arrow::array::BooleanArray;
use arrow::bitmap::utils::BitChunks;
use std::iter::Enumerate;
pub mod concatenate;
pub mod ewm;
pub mod float;
pub mod list;
Expand Down
128 changes: 105 additions & 23 deletions polars/polars-core/src/frame/explode.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::chunked_array::ops::explode::offsets_to_indexes;
use crate::prelude::*;
use crate::utils::accumulate_dataframes_vertical_unchecked;
use crate::utils::get_supertype;
use arrow::buffer::Buffer;
use polars_arrow::kernels::concatenate::concatenate_owned_unchecked;

fn get_exploded(series: &Series) -> Result<(Series, Buffer<i64>)> {
match series.dtype() {
Expand Down Expand Up @@ -135,6 +136,8 @@ impl DataFrame {
/// * `id_vars` - String slice that represent the columns to use as id variables.
/// * `value_vars` - String slice that represent the columns to use as value variables.
///
/// If `value_vars` is empty all columns that are not in `id_vars` will be used.
///
/// ```ignore
/// # use polars_core::prelude::*;
/// let df = df!("A" => &["a", "b", "a"],
Expand Down Expand Up @@ -180,33 +183,90 @@ impl DataFrame {
/// | "a" | 5 | "D" | 6 |
/// +-----+-----+----------+-------+
/// ```
pub fn melt<I, J, S>(&self, id_vars: I, value_vars: J) -> Result<Self>
pub fn melt<I, J>(&self, id_vars: I, value_vars: J) -> Result<Self>
where
I: IntoIterator<Item = S>,
J: IntoIterator<Item = S>,
S: AsRef<str>,
I: IntoVec<String>,
J: IntoVec<String>,
{
let ids = self.select(id_vars)?;
let id_vars = id_vars.into_vec();
let value_vars = value_vars.into_vec();
self.melt2(id_vars, value_vars)
}

/// Similar to melt, but without generics. This may be easier if you want to pass
/// an empty `id_vars` or empty `value_vars`.
pub fn melt2(&self, id_vars: Vec<String>, mut value_vars: Vec<String>) -> Result<Self> {
let len = self.height();

let value_vars = value_vars.into_iter();
// if value vars is empty we take all columns that are not in id_vars.
if value_vars.is_empty() {
let id_vars_set = PlHashSet::from_iter(id_vars.iter().map(|s| s.as_str()));
value_vars = self
.get_columns()
.iter()
.filter_map(|s| {
if id_vars_set.contains(s.name()) {
None
} else {
Some(s.name().to_string())
}
})
.collect();
}

let mut dataframe_chunks = Vec::with_capacity(value_vars.size_hint().0);
// values will all be placed in single column, so we must find their supertype
let schema = self.schema();
let mut iter = value_vars.iter().map(|v| {
schema
.get(v)
.ok_or_else(|| PolarsError::NotFound(v.clone()))
});
let mut st = iter.next().unwrap()?.clone();
for dt in iter {
st = get_supertype(&st, dt?)?;
}

for value_column_name in value_vars {
let value_column_name = value_column_name.as_ref();
let variable_col = Utf8Chunked::full("variable", value_column_name, len).into_series();
let mut value_col = self.column(value_column_name)?.clone();
value_col.rename("value");
let values_len = value_vars.iter().map(|name| name.len()).sum::<usize>();

let mut df_chunk = ids.clone();
df_chunk.hstack_mut(&[variable_col, value_col])?;
dataframe_chunks.push(df_chunk)
// The column name of the variable that is melted
let mut variable_col = MutableUtf8Array::<i64>::with_capacities(
len * value_vars.len() + 1,
len * values_len + 1,
);
// prepare ids
let ids_ = self.select(id_vars)?;
let mut ids = ids_.clone();
if ids.width() > 0 {
for _ in 0..value_vars.len() - 1 {
ids.vstack_mut_unchecked(&ids_)
}
}
ids.as_single_chunk_par();
drop(ids_);

let mut df = accumulate_dataframes_vertical_unchecked(dataframe_chunks)?;
df.rechunk();
Ok(df)
let mut values = Vec::with_capacity(value_vars.len());

for value_column_name in &value_vars {
variable_col.extend_trusted_len_values(std::iter::repeat(value_column_name).take(len));
let value_col = self.column(value_column_name)?.cast(&st)?;
values.extend_from_slice(value_col.chunks())
}
let values_arr = concatenate_owned_unchecked(&values)?;
// Safety
// The give dtype is correct
let values =
unsafe { Series::from_chunks_and_dtype_unchecked("value", vec![values_arr], &st) };

let variable_col = variable_col.into_arc();
// Safety
// The give dtype is correct
let variables = unsafe {
Series::from_chunks_and_dtype_unchecked("variable", vec![variable_col], &DataType::Utf8)
};

ids.hstack_mut(&[variables, values])?;

Ok(ids)
}
}

Expand Down Expand Up @@ -267,18 +327,40 @@ mod test {

#[test]
#[cfg_attr(miri, ignore)]
fn test_melt() {
fn test_melt() -> Result<()> {
let df = df!("A" => &["a", "b", "a"],
"B" => &[1, 3, 5],
"C" => &[10, 11, 12],
"D" => &[2, 4, 6]
)
.unwrap();

let melted = df.melt(&["A", "B"], &["C", "D"]).unwrap();
let melted = df.melt(&["A", "B"], &["C", "D"])?;
assert_eq!(
Vec::from(melted.column("value").unwrap().i32().unwrap()),
Vec::from(melted.column("value")?.i32()?),
&[Some(10), Some(11), Some(12), Some(2), Some(4), Some(6)]
)
);

let melted = df.melt2(vec![], vec![]).unwrap();
let value = melted.column("value")?;
// utf8 because of supertype
let value = value.utf8()?;
let value = value.into_no_null_iter().collect::<Vec<_>>();
assert_eq!(
value,
&["a", "b", "a", "1", "3", "5", "10", "11", "12", "2", "4", "6"]
);

let melted = df.melt2(vec!["A".into()], vec![]).unwrap();
let value = melted.column("value")?;
let value = value.i32()?;
let value = value.into_no_null_iter().collect::<Vec<_>>();
assert_eq!(value, &[1, 3, 5, 10, 11, 12, 2, 4, 6]);
let variable = melted.column("variable")?;
let variable = variable.utf8()?;
let variable = variable.into_no_null_iter().collect::<Vec<_>>();
assert_eq!(variable, &["B", "B", "B", "C", "C", "C", "D", "D", "D"]);
assert!(melted.column("A").is_ok());
Ok(())
}
}
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ impl<'a> ALogicalPlanBuilder<'a> {
}

pub fn melt(self, id_vars: Arc<Vec<String>>, value_vars: Arc<Vec<String>>) -> Self {
let schema = det_melt_schema(&value_vars, self.schema());
let schema = det_melt_schema(&id_vars, &value_vars, self.schema());

let lp = ALogicalPlan::Melt {
input: self.root,
Expand Down
54 changes: 39 additions & 15 deletions polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::prelude::*;
use crate::utils;
use crate::utils::{combine_predicates_expr, has_expr};
use polars_core::prelude::*;
use polars_core::utils::get_supertype;
use polars_io::csv::CsvEncoding;
#[cfg(feature = "csv-file")]
use polars_io::csv_core::utils::infer_file_schema;
Expand Down Expand Up @@ -338,7 +339,7 @@ impl LogicalPlanBuilder {
}

pub fn melt(self, id_vars: Arc<Vec<String>>, value_vars: Arc<Vec<String>>) -> Self {
let schema = det_melt_schema(&value_vars, self.0.schema());
let schema = det_melt_schema(&id_vars, &value_vars, self.0.schema());
LogicalPlan::Melt {
input: Box::new(self.0),
id_vars,
Expand Down Expand Up @@ -439,18 +440,41 @@ impl LogicalPlanBuilder {
}
}

pub(crate) fn det_melt_schema(value_vars: &[String], input_schema: &Schema) -> SchemaRef {
let mut fields = input_schema
.iter_fields()
.filter(|field| !value_vars.contains(field.name()))
.collect::<Vec<_>>();

fields.reserve(2);

let value_dtype = input_schema.get(&value_vars[0]).expect("field not found");

fields.push(Field::new("variable", DataType::Utf8));
fields.push(Field::new("value", value_dtype.clone()));

Arc::new(Schema::from(fields))
pub(crate) fn det_melt_schema(
id_vars: &[String],
value_vars: &[String],
input_schema: &Schema,
) -> SchemaRef {
let mut new_schema = Schema::from(
id_vars
.iter()
.map(|id| Field::new(id, input_schema.get(id).unwrap().clone())),
);
new_schema.with_column("variable".to_string(), DataType::Utf8);

// We need to determine the supertype of all value columns.
let mut st = None;

// take all columns that are not in `id_vars` as `value_var`
if value_vars.is_empty() {
let id_vars = PlHashSet::from_iter(id_vars);
for (name, dtype) in input_schema.iter() {
if !id_vars.contains(name) {
match &st {
None => st = Some(dtype.clone()),
Some(st_) => st = Some(get_supertype(st_, dtype).unwrap()),
}
}
}
} else {
for name in value_vars {
let dtype = input_schema.get(name).unwrap();
match &st {
None => st = Some(dtype.clone()),
Some(st_) => st = Some(get_supertype(st_, dtype).unwrap()),
}
}
}
new_schema.with_column("value".to_string(), st.unwrap());
Arc::new(new_schema)
}
16 changes: 14 additions & 2 deletions py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -3728,10 +3728,17 @@ def pivot(
)

def melt(
self, id_vars: Union[List[str], str], value_vars: Union[List[str], str]
self,
id_vars: Optional[Union[List[str], str]] = None,
value_vars: Optional[Union[List[str], str]] = None,
) -> "DataFrame":
"""
Unpivot DataFrame to long format.
Unpivot a DataFrame from wide to long format, optionally leaving identifiers set.
This function is useful to massage a DataFrame into a format where one or more columns are identifier variables
(id_vars), while all other columns, considered measured variables (value_vars), are “unpivoted” to the row axis,
leaving just two non-identifier columns, ‘variable’ and ‘value’.
Parameters
----------
Expand All @@ -3740,6 +3747,7 @@ def melt(
value_vars
Values to use as identifier variables.
If `value_vars` is empty all columns that are not in `id_vars` will be used.
Examples
--------
Expand Down Expand Up @@ -3776,6 +3784,10 @@ def melt(
value_vars = [value_vars]
if isinstance(id_vars, str):
id_vars = [id_vars]
if value_vars is None:
value_vars = []
if id_vars is None:
id_vars = []
return wrap_df(self._df.melt(id_vars, value_vars))

def shift(self, periods: int) -> "DataFrame":
Expand Down
15 changes: 13 additions & 2 deletions py-polars/polars/internals/lazy_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1767,17 +1767,24 @@ def drop_nulls(self, subset: Optional[Union[List[str], str]] = None) -> "LazyFra
return wrap_ldf(self._ldf.drop_nulls(subset))

def melt(
self, id_vars: Union[str, List[str]], value_vars: Union[str, List[str]]
self,
id_vars: Optional[Union[str, List[str]]] = None,
value_vars: Optional[Union[str, List[str]]] = None,
) -> "LazyFrame":
"""
Unpivot DataFrame to long format.
Unpivot a DataFrame from wide to long format, optionally leaving identifiers set.
This function is useful to massage a DataFrame into a format where one or more columns are identifier variables
(id_vars), while all other columns, considered measured variables (value_vars), are “unpivoted” to the row axis,
leaving just two non-identifier columns, ‘variable’ and ‘value’.
Parameters
----------
id_vars
Columns to use as identifier variables.
value_vars
Values to use as identifier variables.
If `value_vars` is empty all columns that are not in `id_vars` will be used.
Examples
--------
Expand Down Expand Up @@ -1814,6 +1821,10 @@ def melt(
value_vars = [value_vars]
if isinstance(id_vars, str):
id_vars = [id_vars]
if value_vars is None:
value_vars = []
if id_vars is None:
id_vars = []
return wrap_ldf(self._ldf.melt(id_vars, value_vars))

def map(
Expand Down
2 changes: 1 addition & 1 deletion py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ impl PyDataFrame {
PyDataFrame::new(self.df.clone())
}

pub fn melt(&self, id_vars: Vec<&str>, value_vars: Vec<&str>) -> PyResult<Self> {
pub fn melt(&self, id_vars: Vec<String>, value_vars: Vec<String>) -> PyResult<Self> {
let df = self
.df
.melt(id_vars, value_vars)
Expand Down

0 comments on commit 56e1f28

Please sign in to comment.