Skip to content

Commit

Permalink
feat: struct -> json encoding expression (#12583)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 20, 2023
1 parent ddf459c commit f9ec2bc
Show file tree
Hide file tree
Showing 14 changed files with 104 additions and 15 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -120,7 +120,7 @@ impl StructChunked {
}

#[inline]
pub(crate) fn chunks(&self) -> &Vec<ArrayRef> {
pub fn chunks(&self) -> &Vec<ArrayRef> {
&self.chunks
}

Expand Down
1 change: 1 addition & 0 deletions crates/polars-json/src/json/write/mod.rs
Expand Up @@ -12,6 +12,7 @@ pub use fallible_streaming_iterator::*;
use polars_error::{PolarsError, PolarsResult};
pub(crate) use serialize::new_serializer;
use serialize::serialize;
pub use utf8::serialize_to_utf8;

/// [`FallibleStreamingIterator`] that serializes an [`Array`] to bytes of valid JSON
/// # Implementation
Expand Down
16 changes: 3 additions & 13 deletions crates/polars-json/src/json/write/serialize.rs
Expand Up @@ -8,10 +8,9 @@ use arrow::offset::Offset;
#[cfg(feature = "chrono-tz")]
use arrow::temporal_conversions::parse_offset_tz;
use arrow::temporal_conversions::{
date32_to_date, date64_to_date, duration_ms_to_duration, duration_ns_to_duration,
duration_s_to_duration, duration_us_to_duration, parse_offset, timestamp_ms_to_datetime,
timestamp_ns_to_datetime, timestamp_s_to_datetime, timestamp_to_datetime,
timestamp_us_to_datetime,
date32_to_date, duration_ms_to_duration, duration_ns_to_duration, duration_s_to_duration,
duration_us_to_duration, parse_offset, timestamp_ms_to_datetime, timestamp_ns_to_datetime,
timestamp_s_to_datetime, timestamp_to_datetime, timestamp_us_to_datetime,
};
use arrow::types::NativeType;
use chrono::{Duration, NaiveDate, NaiveDateTime};
Expand Down Expand Up @@ -419,9 +418,6 @@ pub(crate) fn new_serializer<'a>(
ArrowDataType::FixedSizeList(_, _) => {
fixed_size_list_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
},
ArrowDataType::List(_) => {
list_serializer::<i32>(array.as_any().downcast_ref().unwrap(), offset, take)
},
ArrowDataType::LargeList(_) => {
list_serializer::<i64>(array.as_any().downcast_ref().unwrap(), offset, take)
},
Expand All @@ -443,12 +439,6 @@ pub(crate) fn new_serializer<'a>(
offset,
take,
),
ArrowDataType::Date64 => date_serializer(
array.as_any().downcast_ref().unwrap(),
date64_to_date,
offset,
take,
),
ArrowDataType::Timestamp(tu, None) => {
let convert = match tu {
TimeUnit::Nanosecond => timestamp_ns_to_datetime,
Expand Down
14 changes: 14 additions & 0 deletions crates/polars-json/src/json/write/utf8.rs
@@ -1,6 +1,10 @@
// Adapted from https://github.com/serde-rs/json/blob/f901012df66811354cb1d490ad59480d8fdf77b5/src/ser.rs
use std::io;

use arrow::array::{Array, MutableUtf8ValuesArray, Utf8Array};

use crate::json::write::new_serializer;

pub fn write_str<W>(writer: &mut W, value: &str) -> io::Result<()>
where
W: io::Write,
Expand Down Expand Up @@ -136,3 +140,13 @@ where

writer.write_all(s)
}

pub fn serialize_to_utf8(array: &dyn Array) -> Utf8Array<i64> {
let mut values = MutableUtf8ValuesArray::<i64>::with_capacity(array.len());
let mut serializer = new_serializer(array, 0, usize::MAX);

while let Some(v) = serializer.next() {
unsafe { values.push(std::str::from_utf8_unchecked(v)) }
}
values.into()
}
3 changes: 2 additions & 1 deletion crates/polars-plan/Cargo.toml
Expand Up @@ -16,6 +16,7 @@ libloading = { version = "0.8.0", optional = true }
polars-core = { workspace = true, features = ["lazy", "zip_with", "random"] }
polars-ffi = { workspace = true, optional = true }
polars-io = { workspace = true, features = ["lazy"] }
polars-json = { workspace = true, optional = true }
polars-ops = { workspace = true, features = ["zip_with"] }
polars-parquet = { workspace = true, optional = true }
polars-time = { workspace = true, optional = true }
Expand Down Expand Up @@ -56,7 +57,7 @@ parquet = ["polars-io/parquet", "polars-parquet"]
async = ["polars-io/async"]
cloud = ["async", "polars-io/cloud"]
ipc = ["polars-io/ipc"]
json = ["polars-io/json"]
json = ["polars-io/json", "polars-json"]
csv = ["polars-io/csv"]
temporal = ["polars-core/temporal", "dtype-date", "dtype-datetime", "dtype-time"]
# debugging purposes
Expand Down
20 changes: 20 additions & 0 deletions crates/polars-plan/src/dsl/function_expr/struct_.rs
Expand Up @@ -9,6 +9,8 @@ pub enum StructFunction {
FieldByIndex(i64),
FieldByName(Arc<str>),
RenameFields(Arc<Vec<String>>),
#[cfg(feature = "json")]
JsonEncode,
}

impl StructFunction {
Expand Down Expand Up @@ -58,6 +60,8 @@ impl StructFunction {
.collect(),
),
}),
#[cfg(feature = "json")]
JsonEncode => mapper.with_dtype(DataType::Utf8),
}
}
}
Expand All @@ -69,6 +73,8 @@ impl Display for StructFunction {
FieldByIndex(index) => write!(f, "struct.field_by_index({index})"),
FieldByName(name) => write!(f, "struct.field_by_name({name})"),
RenameFields(names) => write!(f, "struct.rename_fields({:?})", names),
#[cfg(feature = "json")]
JsonEncode => write!(f, "struct.to_json"),
}
}
}
Expand All @@ -80,6 +86,8 @@ impl From<StructFunction> for SpecialEq<Arc<dyn SeriesUdf>> {
FieldByIndex(index) => map!(struct_::get_by_index, index),
FieldByName(name) => map!(struct_::get_by_name, name.clone()),
RenameFields(names) => map!(struct_::rename_fields, names.clone()),
#[cfg(feature = "json")]
JsonEncode => map!(struct_::to_json),
}
}
}
Expand Down Expand Up @@ -111,3 +119,15 @@ pub(super) fn rename_fields(s: &Series, names: Arc<Vec<String>>) -> PolarsResult
.collect::<Vec<_>>();
StructChunked::new(ca.name(), &fields).map(|ca| ca.into_series())
}

#[cfg(feature = "json")]
pub(super) fn to_json(s: &Series) -> PolarsResult<Series> {
let ca = s.struct_()?;

let iter = ca
.chunks()
.iter()
.map(|arr| polars_json::json::write::serialize_to_utf8(arr.as_ref()));

Ok(Utf8Chunked::from_chunk_iter(ca.name(), iter).into_series())
}
6 changes: 6 additions & 0 deletions crates/polars-plan/src/dsl/struct_.rs
Expand Up @@ -35,4 +35,10 @@ impl StructNameSpace {
Arc::from(names),
)))
}

#[cfg(feature = "json")]
pub fn json_encode(self) -> Expr {
self.0
.map_private(FunctionExpr::StructExpr(StructFunction::JsonEncode))
}
}
1 change: 1 addition & 0 deletions py-polars/docs/source/reference/expressions/struct.rst
Expand Up @@ -10,4 +10,5 @@ The following methods are available under the `expr.struct` attribute.
:template: autosummary/accessor_method.rst

Expr.struct.field
Expr.struct.json_encode
Expr.struct.rename_fields
1 change: 1 addition & 0 deletions py-polars/docs/source/reference/series/struct.rst
Expand Up @@ -18,4 +18,5 @@ The following methods are available under the `Series.struct` attribute.
:template: autosummary/accessor_attribute.rst

Series.struct.fields
Series.struct.json_encode
Series.struct.schema
22 changes: 22 additions & 0 deletions py-polars/polars/expr/struct.py
Expand Up @@ -150,3 +150,25 @@ def rename_fields(self, names: Sequence[str]) -> Expr:
"""
return wrap_expr(self._pyexpr.struct_rename_fields(names))

def json_encode(self) -> Expr:
"""
Convert this struct to a string column with json values.
Examples
--------
>>> pl.DataFrame(
... {"a": [{"a": [1, 2], "b": [45]}, {"a": [9, 1, 3], "b": None}]}
... ).with_columns(pl.col("a").struct.json_encode().alias("encoded"))
shape: (2, 2)
┌──────────────────┬────────────────────────┐
│ a ┆ encoded │
│ --- ┆ --- │
│ struct[2] ┆ str │
╞══════════════════╪════════════════════════╡
│ {[1, 2],[45]} ┆ {"a":[1,2],"b":[45]} │
│ {[9, 1, 3],null} ┆ {"a":[9,1,3],"b":null} │
└──────────────────┴────────────────────────┘
"""
return wrap_expr(self._pyexpr.struct_json_encode())
17 changes: 17 additions & 0 deletions py-polars/polars/series/struct.py
Expand Up @@ -92,3 +92,20 @@ def unnest(self) -> DataFrame:
"""
return wrap_df(self._s.struct_unnest())

def json_encode(self) -> Series:
"""
Convert this struct to a string column with json values.
Examples
--------
>>> s = pl.Series("a", [{"a": [1, 2], "b": [45]}, {"a": [9, 1, 3], "b": None}])
>>> s.struct.json_encode()
shape: (2,)
Series: 'a' [str]
[
"{"a":[1,2],"b"…
"{"a":[9,1,3],"…
]
"""
4 changes: 4 additions & 0 deletions py-polars/src/expr/struct.rs
Expand Up @@ -15,4 +15,8 @@ impl PyExpr {
fn struct_rename_fields(&self, names: Vec<String>) -> Self {
self.inner.clone().struct_().rename_fields(names).into()
}

fn struct_json_encode(&self) -> Self {
self.inner.clone().struct_().json_encode().into()
}
}
11 changes: 11 additions & 0 deletions py-polars/tests/unit/namespaces/test_struct.py
Expand Up @@ -28,3 +28,14 @@ def test_rename_fields() -> None:
"a",
"b",
]


def test_struct_json_encode() -> None:
assert pl.DataFrame(
{"a": [{"a": [1, 2], "b": [45]}, {"a": [9, 1, 3], "b": None}]}
).with_columns(pl.col("a").struct.json_encode().alias("encoded")).to_dict(
as_series=False
) == {
"a": [{"a": [1, 2], "b": [45]}, {"a": [9, 1, 3], "b": None}],
"encoded": ['{"a":[1,2],"b":[45]}', '{"a":[9,1,3],"b":null}'],
}

0 comments on commit f9ec2bc

Please sign in to comment.