Skip to content

Commit

Permalink
unnest operation (#2848)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Mar 7, 2022
1 parent 5a5c993 commit 3e1071d
Show file tree
Hide file tree
Showing 12 changed files with 169 additions and 2 deletions.
33 changes: 33 additions & 0 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2859,6 +2859,39 @@ impl DataFrame {
.map(|s| Ok(s.dtype().clone()))
.reduce(|acc, b| get_supertype(&acc?, &b.unwrap()))
}

/// Unnest the given `Struct` columns. This means that the fields of the `Struct` type will be
/// inserted as columns.
#[cfg(feature = "dtype-struct")]
#[cfg_attr(docsrs, doc(cfg(feature = "dtype-struct")))]
pub fn unnest<I: IntoVec<String>>(&self, cols: I) -> Result<DataFrame> {
let cols = cols.into_vec();
self.unnest_impl(cols.into_iter().collect())
}

#[cfg(feature = "dtype-struct")]
fn unnest_impl(&self, cols: PlHashSet<String>) -> Result<DataFrame> {
let mut new_cols = Vec::with_capacity(std::cmp::min(self.width() * 2, self.width() + 128));
let mut count = 0;
for s in &self.columns {
if cols.contains(s.name()) {
let ca = s.struct_()?;
new_cols.extend_from_slice(ca.fields());
count += 1;
} else {
new_cols.push(s.clone())
}
}
if count != cols.len() {
// one or more columns not found
// the code below will return an error with the missing name
let schema = self.schema();
for col in cols {
let _ = schema.get(&col).ok_or(PolarsError::NotFound(col))?;
}
}
DataFrame::new(new_cols)
}
}

pub struct RecordBatchIter<'a> {
Expand Down
10 changes: 9 additions & 1 deletion polars/polars-lazy/src/dsl/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,15 @@ impl StructNameSpace {
.collect();
DataType::Struct(fields)
}
_ => dt.clone(),
// The types will be incorrect, but its better than nothing
// we can get an incorrect type with python lambdas, because we only know return type when running
// the query
dt => DataType::Struct(
names2
.iter()
.map(|name| Field::new(name, dt.clone()))
.collect(),
),
}),
)
.with_fmt("struct.rename_fields")
Expand Down
36 changes: 36 additions & 0 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,42 @@ impl LazyFrame {
}
}
}

/// Unnest the given `Struct` columns. This means that the fields of the `Struct` type will be
/// inserted as columns.
#[cfg(feature = "dtype-struct")]
#[cfg_attr(docsrs, doc(cfg(feature = "dtype-struct")))]
pub fn unnest<I: IntoVec<String>>(self, cols: I) -> Self {
let cols = cols.into_vec();
self.unnest_impl(cols.into_iter().collect())
}

#[cfg(feature = "dtype-struct")]
fn unnest_impl(self, cols: PlHashSet<String>) -> Self {
let schema = self.schema();

let mut new_schema = Schema::with_capacity(schema.len() * 2);
for (name, dtype) in schema.iter() {
if cols.contains(name) {
if let DataType::Struct(flds) = dtype {
for fld in flds {
new_schema.with_column(fld.name().clone(), fld.data_type().clone())
}
} else {
// todo: return lazy error here.
panic!("expected struct dtype")
}
} else {
new_schema.with_column(name.clone(), dtype.clone())
}
}
self.map(
move |df| df.unnest(&cols),
Some(AllowedOptimizations::default()),
Some(new_schema),
Some("unnest"),
)
}
}

/// Utility struct for lazy groupby operation.
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-lazy/src/physical_plan/executors/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ impl JoinExec {

impl Executor for JoinExec {
fn execute<'a>(&'a mut self, state: &'a ExecutionState) -> Result<DataFrame> {
if state.verbose {
eprintln!("join parallel: {}", self.parallel);
};
let mut input_left = self.input_left.take().unwrap();
let mut input_right = self.input_right.take().unwrap();

Expand Down
2 changes: 1 addition & 1 deletion py-polars/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ test: venv
$(PYTHON) -m pytest tests

test-with-cov: venv
@cd tests && ../$(PYTHON) -m pytest --cov=polars --cov-fail-under=91 --import-mode=importlib
@cd tests && ../$(PYTHON) -m pytest --cov=polars --cov-fail-under=90 --import-mode=importlib

doctest:
cd tests && ../$(PYTHON) run_doc_examples.py
Expand Down
1 change: 1 addition & 0 deletions py-polars/docs/source/reference/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ Manipulation/ selection
DataFrame.interpolate
DataFrame.transpose
DataFrame.upsample
DataFrame.unnest

Apply
-----
Expand Down
1 change: 1 addition & 0 deletions py-polars/docs/source/reference/lazyframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Manipulation/ selection
LazyFrame.sort
LazyFrame.melt
LazyFrame.interpolate
LazyFrame.unnest

Apply
-----
Expand Down
22 changes: 22 additions & 0 deletions py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -4746,8 +4746,30 @@ def is_empty(self) -> bool:
return self.height == 0

def to_struct(self, name: str) -> "pli.Series":
"""
Convert a ``DataFrame`` to a ``Series`` of type ``Struct``
Parameters
----------
name
Name for the struct Series
"""
return pli.wrap_s(self._df.to_struct(name))

def unnest(self, names: Union[str, List[str]]) -> "DataFrame":
"""
Decompose a struct into its fields. The fields will be inserted in to the `DataFrame` on the
location of the `struct` type.
Parameters
----------
names
Names of the struct columns that will be decomposed by its fields
"""
if isinstance(names, str):
names = [names]
return wrap_df(self._df.unnest(names))


class RollingGroupBy:
"""
Expand Down
14 changes: 14 additions & 0 deletions py-polars/polars/internals/lazy_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1889,6 +1889,20 @@ def interpolate(self) -> "LazyFrame":
"""
return self.select(pli.col("*").interpolate())

def unnest(self, names: Union[str, List[str]]) -> "LazyFrame":
"""
Decompose a struct into its fields. The fields will be inserted in to the `DataFrame` on the
location of the `struct` type.
Parameters
----------
names
Names of the struct columns that will be decomposed by its fields
"""
if isinstance(names, str):
names = [names]
return wrap_ldf(self._ldf.unnest(names))


class LazyGroupBy:
"""
Expand Down
5 changes: 5 additions & 0 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,11 @@ impl PyDataFrame {
let s = self.df.clone().into_struct(name);
s.into_series().into()
}

pub fn unnest(&self, names: Vec<String>) -> PyResult<Self> {
let df = self.df.unnest(names).map_err(PyPolarsErr::from)?;
Ok(df.into())
}
}

fn finish_groupby(gb: GroupBy, agg: &str) -> PyResult<PyDataFrame> {
Expand Down
4 changes: 4 additions & 0 deletions py-polars/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,4 +650,8 @@ impl PyLazyFrame {
pub fn columns(&self) -> Vec<String> {
self.ldf.schema().iter_names().cloned().collect()
}

pub fn unnest(&self, cols: Vec<String>) -> PyLazyFrame {
self.ldf.clone().unnest(cols).into()
}
}
40 changes: 40 additions & 0 deletions py-polars/tests/test_struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,43 @@ def test_rename_fields() -> None:
"a",
"b",
]


def struct_unnesting() -> None:
df = pl.DataFrame({"a": [1, 2]})
out = df.select(
[
pl.all().alias("a_original"),
pl.col("a")
.apply(lambda x: (x, x * 2, x % 2 == 0))
.struct.rename_fields(["a", "a_squared", "mod2eq0"])
.alias("foo"),
]
).unnest("foo")

expected = pl.DataFrame(
{
"a_original": [1, 2],
"a": [1, 2],
"a_squared": [2, 4],
"mod2eq0": [False, True],
}
)

assert out.frame_equal(expected)

out = (
df.lazy()
.select(
[
pl.all().alias("a_original"),
pl.col("a")
.apply(lambda x: (x, x * 2, x % 2 == 0))
.struct.rename_fields(["a", "a_squared", "mod2eq0"])
.alias("foo"),
]
)
.unnest("foo")
.collect()
)
out.frame_equal(expected)

0 comments on commit 3e1071d

Please sign in to comment.