Skip to content

Commit

Permalink
Serialize/Deserialize LazyFrames/Logical plans (#3244)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 27, 2022
1 parent d7751a8 commit d798cae
Show file tree
Hide file tree
Showing 25 changed files with 234 additions and 7 deletions.
2 changes: 1 addition & 1 deletion polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ default = [
ndarray = ["polars-core/ndarray"]
# serde support for dataframes and series
serde = ["polars-core/serde"]
serde-lazy = ["polars-core/serde-lazy", "polars-lazy/serde"]
serde-lazy = ["polars-core/serde-lazy", "polars-lazy/serde", "polars-time/serde", "polars-io/serde"]
parquet = ["polars-io", "polars-core/parquet", "polars-lazy/parquet", "polars-io/parquet"]
lazy = ["polars-core/lazy", "polars-lazy", "polars-lazy/compile"]
# commented out until UB is fixed
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ parquet = ["arrow/io_parquet"]
# scale to terrabytes?
bigidx = ["polars-arrow/bigidx"]

serde-lazy = ["serde", "polars-arrow/serde"]
serde-lazy = ["serde", "polars-arrow/serde", "indexmap/serde"]

docs-selection = [
"ndarray",
Expand Down
6 changes: 6 additions & 0 deletions polars/polars-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ impl PolarsFloatType for Float32Type {}
impl PolarsFloatType for Float64Type {}

#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum AnyValue<'a> {
Null,
/// A binary true or false.
Expand Down Expand Up @@ -235,6 +236,7 @@ pub enum AnyValue<'a> {
/// A 64-bit date representing the elapsed time since UNIX epoch (1970-01-01)
/// in nanoseconds (64 bits).
#[cfg(feature = "dtype-datetime")]
#[cfg_attr(feature = "serde", serde(skip))]
Datetime(i64, TimeUnit, &'a Option<TimeZone>),
// A 64-bit integer representing difference between date-times in [`TimeUnit`]
#[cfg(feature = "dtype-duration")]
Expand All @@ -243,15 +245,19 @@ pub enum AnyValue<'a> {
#[cfg(feature = "dtype-time")]
Time(i64),
#[cfg(feature = "dtype-categorical")]
#[cfg_attr(feature = "serde", serde(skip))]
Categorical(u32, &'a RevMapping),
/// Nested type, contains arrays that are filled with one of the datetypes.
List(Series),
#[cfg(feature = "object")]
/// Can be used to fmt and implements Any, so can be downcasted to the proper value type.
#[cfg_attr(feature = "serde", serde(skip))]
Object(&'a dyn PolarsObjectSafe),
#[cfg(feature = "dtype-struct")]
#[cfg_attr(feature = "serde", serde(skip))]
Struct(Vec<AnyValue<'a>>, &'a [Field]),
#[cfg(feature = "dtype-struct")]
#[cfg_attr(feature = "serde", serde(skip))]
StructOwned(Box<(Vec<AnyValue<'a>>, Vec<Field>)>),
/// A UTF8 encoded string type.
Utf8Owned(String),
Expand Down
5 changes: 5 additions & 0 deletions polars/polars-core/src/frame/asof_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ use crate::prelude::*;
use crate::utils::slice_slice;
use asof::*;
use num::Bounded;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use std::borrow::Cow;

#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", serde(bound(deserialize = "'de: 'static")))]
pub struct AsOfOptions {
pub strategy: AsofStrategy,
/// A tolerance in the same unit as the asof column
Expand Down Expand Up @@ -38,6 +42,7 @@ fn check_asof_columns(a: &Series, b: &Series) -> Result<()> {
}

#[derive(Clone, Copy, Debug, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum AsofStrategy {
/// selects the last row in the right DataFrame whose ‘on’ key is less than or equal to the left’s key
Backward,
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-core/src/frame/explode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use crate::utils::get_supertype;
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::buffer::Buffer;
use polars_arrow::kernels::concatenate::concatenate_owned_unchecked;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

fn get_exploded(series: &Series) -> Result<(Series, Buffer<i64>)> {
match series.dtype() {
Expand All @@ -17,6 +19,7 @@ fn get_exploded(series: &Series) -> Result<(Series, Buffer<i64>)> {

/// Arguments for `[DataFrame::melt]` function
#[derive(Clone, Default, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct MeltArgs {
pub id_vars: Vec<String>,
pub value_vars: Vec<String>,
Expand Down
4 changes: 4 additions & 0 deletions polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use arrow::Either;
#[cfg(feature = "chunked_ids")]
use std::borrow::Cow;

#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use single_keys::*;
use single_keys_inner::*;
use single_keys_left::*;
Expand Down Expand Up @@ -113,11 +115,13 @@ pub(crate) fn check_categorical_src(l: &DataType, r: &DataType) -> Result<()> {
}

#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum JoinType {
Left,
Inner,
Outer,
#[cfg(feature = "asof_join")]
#[cfg_attr(feature = "serde", serde(skip))]
AsOf(AsOfOptions),
Cross,
#[cfg(feature = "semi_anti_join")]
Expand Down
1 change: 1 addition & 0 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub enum NullStrategy {
}

#[derive(Copy, Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum UniqueKeepStrategy {
First,
Last,
Expand Down
8 changes: 8 additions & 0 deletions polars/polars-core/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@ use crate::prelude::*;
use indexmap::IndexMap;
use std::fmt::{Debug, Formatter};

#[cfg(feature = "serde-lazy")]
use serde::{Deserialize, Serialize};

#[derive(PartialEq, Eq, Clone, Default)]
#[cfg_attr(feature = "serde-lazy", derive(Serialize, Deserialize))]
#[cfg_attr(
all(feature = "serde-lazy", feature = "object"),
serde(bound(deserialize = "'de: 'static"))
)]
pub struct Schema {
inner: PlIndexMap<String, DataType>,
}
Expand Down
1 change: 1 addition & 0 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ polars-time = { version = "0.20.0", path = "../polars-time", default-features =
polars-utils = { version = "0.20.0", path = "../polars-utils" }
rayon = "1.5"
regex = "1.5"
serde = { version = "1", features = ["derive"], optional = true }
serde_json = { version = "1", optional = true }
simdutf8 = "0.1"

Expand Down
4 changes: 4 additions & 0 deletions polars/polars-io/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ use polars_core::prelude::*;
use polars_time::prelude::*;
#[cfg(feature = "temporal")]
use rayon::prelude::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::fs::File;
use std::io::Write;
Expand Down Expand Up @@ -144,6 +146,7 @@ where
}

#[derive(Copy, Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum CsvEncoding {
/// Utf8 encoding
Utf8,
Expand All @@ -152,6 +155,7 @@ pub enum CsvEncoding {
}

#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum NullValues {
/// A single value that's used for all columns
AllColumns(String),
Expand Down
4 changes: 4 additions & 0 deletions polars/polars-io/src/options.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct RowCount {
pub name: String,
pub offset: u32,
Expand Down
5 changes: 4 additions & 1 deletion polars/polars-lazy/src/dsl/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,10 @@ impl AsRef<Expr> for AggExpr {
#[derive(Clone, PartialEq)]
#[must_use]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", serde(bound(deserialize = "'de: 'static")))]
#[cfg_attr(
all(feature = "serde", feature = "object"),
serde(bound(deserialize = "'de: 'static"))
)]
pub enum Expr {
Alias(Box<Expr>, Arc<str>),
Column(Arc<str>),
Expand Down
5 changes: 4 additions & 1 deletion polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use crate::logical_plan::optimizer::{
predicate_pushdown::PredicatePushDown, projection_pushdown::ProjectionPushDown,
};
use crate::physical_plan::state::ExecutionState;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

#[cfg(any(feature = "parquet", feature = "csv-file"))]
use crate::prelude::aggregate_scan_projections::agg_projection;
Expand All @@ -46,6 +48,7 @@ use polars_core::frame::explode::MeltArgs;
use polars_io::RowCount;

#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct JoinOptions {
pub allow_parallel: bool,
pub force_parallel: bool,
Expand Down Expand Up @@ -83,7 +86,7 @@ impl IntoLazy for DataFrame {
#[derive(Clone, Default)]
#[must_use]
pub struct LazyFrame {
pub(crate) logical_plan: LogicalPlan,
pub logical_plan: LogicalPlan,
pub(crate) opt_state: OptState,
}

Expand Down
11 changes: 11 additions & 0 deletions polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub(crate) use builder::*;
pub use lit::*;
use polars_core::frame::explode::MeltArgs;

#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

// Will be set/ unset in the fetch operation to communicate overwriting the number of rows to scan.
thread_local! {pub(crate) static FETCH_ROWS: Cell<Option<usize>> = Cell::new(None)}

Expand All @@ -39,6 +42,11 @@ pub enum Context {

// https://stackoverflow.com/questions/1031076/what-are-projection-and-selection
#[derive(Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(
all(feature = "serde", feature = "object"),
serde(bound(deserialize = "'de: 'static"))
)]
pub enum LogicalPlan {
/// Filter on a boolean mask
Selection {
Expand Down Expand Up @@ -104,6 +112,7 @@ pub enum LogicalPlan {
keys: Arc<Vec<Expr>>,
aggs: Vec<Expr>,
schema: SchemaRef,
#[cfg_attr(feature = "serde", serde(skip))]
apply: Option<Arc<dyn DataFrameUdf>>,
maintain_order: bool,
options: GroupbyOptions,
Expand Down Expand Up @@ -153,6 +162,7 @@ pub enum LogicalPlan {
schema: SchemaRef,
},
/// A User Defined Function
#[cfg_attr(feature = "serde", serde(skip))]
Udf {
input: Box<LogicalPlan>,
function: Arc<dyn DataFrameUdf>,
Expand All @@ -164,6 +174,7 @@ pub enum LogicalPlan {
options: UnionOptions,
},
/// Catches errors and throws them later
#[cfg_attr(feature = "serde", serde(skip))]
Error {
input: Box<LogicalPlan>,
err: Arc<Mutex<Option<PolarsError>>>,
Expand Down
7 changes: 7 additions & 0 deletions polars/polars-lazy/src/logical_plan/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use polars_io::RowCount;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct CsvParserOptions {
pub(crate) delimiter: u8,
pub(crate) comment_char: Option<u8>,
Expand All @@ -25,6 +26,7 @@ pub struct CsvParserOptions {
}
#[cfg(feature = "parquet")]
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ParquetOptions {
pub(crate) n_rows: Option<usize>,
pub(crate) with_columns: Option<Vec<String>>,
Expand All @@ -34,6 +36,7 @@ pub struct ParquetOptions {
}

#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct IpcScanOptions {
pub n_rows: Option<usize>,
pub with_columns: Option<Vec<String>>,
Expand All @@ -42,20 +45,23 @@ pub struct IpcScanOptions {
}

#[derive(Clone, Debug, Copy, Default)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct UnionOptions {
pub(crate) slice: bool,
pub(crate) slice_offset: i64,
pub(crate) slice_len: IdxSize,
}

#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct GroupbyOptions {
pub(crate) dynamic: Option<DynamicGroupOptions>,
pub(crate) rolling: Option<RollingGroupOptions>,
pub(crate) slice: Option<(i64, usize)>,
}

#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct DistinctOptions {
pub(crate) subset: Option<Arc<Vec<String>>>,
pub(crate) maintain_order: bool,
Expand Down Expand Up @@ -133,6 +139,7 @@ pub struct LogicalPlanUdfOptions {
}

#[derive(Clone, PartialEq, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct SortArguments {
pub(crate) reverse: Vec<bool>,
// Can only be true in case of a single column.
Expand Down
1 change: 1 addition & 0 deletions polars/polars-time/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ chrono = "0.4"
lexical = { version = "6", default-features = false, features = ["std", "parse-floats", "parse-integers"] }
polars-arrow = { version = "0.20.0", path = "../polars-arrow", features = ["compute"] }
polars-core = { version = "0.20.0", path = "../polars-core", features = ["temporal", "dtype-date", "dtype-datetime"] }
serde = { version = "1", features = ["derive"], optional = true }

[features]
dtype-date = []
Expand Down
5 changes: 5 additions & 0 deletions polars/polars-time/src/groupby/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ use polars_core::frame::groupby::GroupsProxy;
use polars_core::prelude::*;
use polars_core::POOL;

#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

#[repr(transparent)]
struct Wrap<T>(pub T);

#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct DynamicGroupOptions {
/// Time or index column
pub index_column: String,
Expand All @@ -26,6 +30,7 @@ pub struct DynamicGroupOptions {
}

#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct RollingGroupOptions {
/// Time or index column
pub index_column: String,
Expand Down
4 changes: 4 additions & 0 deletions polars/polars-time/src/windows/duration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ use polars_core::prelude::{
use polars_core::utils::arrow::temporal_conversions::NANOSECONDS;
use std::ops::Mul;

#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

#[derive(Copy, Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Duration {
// the number of months for the duration
months: i64,
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-time/src/windows/groupby.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::prelude::*;
use polars_arrow::utils::CustomIterTools;
use polars_core::prelude::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

#[derive(Clone, Copy, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum ClosedWindow {
Left,
Right,
Expand Down

0 comments on commit d798cae

Please sign in to comment.