In [None]:
import polars as pl
import polars.selectors as cs
import resource
import time

import pandas as pd
from datetime import datetime
from random import randint

### Split a str column of a polars into different columns based on separator

The goal of this section is to be able to create different columns from a single column that contains multiples substrings. For example "cat_dog_parrot"

The section is creating expanding the stack overflow answer: [https://stackoverflow.com/questions/73699500/python-polars-split-string-column-into-many-columns-by-delimiter]

In [None]:
# create fake dataframe
df = pl.DataFrame(
    {
        "my_str": ["cat", "cat/dog", None, "", "cat/dog/aardvark/mouse/frog"],
        "another_column": [1,2,3,4,5]
    }
)


string_sepator = "/"
column_to_split = "my_str"
# if we have additional columns other then the str column
other_cols_to_keep = ["another_column"]
# prefix of the new str column, ex. if the value is "name"
# the names of the new columns will be "name01", "name02", ...
new_str_column_prefix = column_to_split
df

In [None]:
splitted_str_df = (
    df
    .with_row_count('id')
    .with_columns(pl.col(column_to_split).str.split(string_sepator).alias("split_str"))
    .explode("split_str")
    .with_columns(
        (new_str_column_prefix + "_" + pl.arange(0, pl.count()).cast(pl.Utf8).str.zfill(2))
        .over("id")
        .alias("col_nm")
    )
    .pivot(
        index=['id', 'my_str'] + other_cols_to_keep,
        values='split_str',
        columns='col_nm',
    )
    .with_columns(
        pl.col(f'^{new_str_column_prefix}_.*$').fill_null("")
    )
)

splitted_str_df

## Apply a function to all columns

In [None]:
df.select(
    pl.all().cast(str) + "_test"
)

## withcolumns with list comprehension

In [None]:
splitted_str_df.with_columns(
    [(pl.col(col).cast(str)+"_test").alias(f"{col}_test") for col in splitted_str_df.columns]
)

## Polars selectors

In [None]:
df = pl.DataFrame(
    {
        "w": ["xx", "yy", "xx", "yy", "xx"],
        "x": [1, 2, 1, 4, -2],
        "y": [3.0, 4.5, 1.0, 2.5, -2.0],
        "z": ["a", "b", "a", "b", "b"],
    },
)
# group by str columns, sum all numeric
df.group_by(by=cs.string()).agg(cs.numeric().sum())


## Polars fold

In [None]:
df = pl.DataFrame(

    {

        "a": [2, 1, 3],

        "b": [1, 2, 3],

        "c": [1.0, 2.0, 3.0],

    }

)

df.fold(lambda s1, s2: s1 + s2)

## Transform a column passing transformation dict

In [None]:
country_code_dict = {
    "CA": "Canada",
    "DE": "Germany",
    "FR": "France",
    None: "Not specified",
}

df = pl.DataFrame(
    {
        "country_code": ["FR", None, "ES", "DE"],
    }
).with_row_count()

df = df.with_columns(
    pl.col("country_code").map_dict(
        country_code_dict, 
        # default="unknown"
    ).alias("remapped")
)

df

## Test a condition on multiple columns, keep only rows that satisfy the condition

In [None]:
col_dict = {"country_code": ["FR", "DE"], "remapped": ["France", "test"]}
my_test = df.select(
    [pl.col(col).is_in(value) for col, value in col_dict.items()]
).select(
    pl.all_horizontal("*").alias("keep_row")
)
pl.concat(
    [df, my_test],
    how="horizontal"
)

## Handle non relational dataframe

Given a non relational dataframe that is composed by several relational dataframe
concatenated vertically. The unique names of the columns are contained in the columns_id_column column and the value of the columns is contained in the value column.

1) Create a column initialized with the null value for every unique value in columns_id_column 
2) For every row fill the value of the column related to the measure of that row
3) remove the columns that does not satisfy a minimum threshold
4) Do a mergeasof join between the features dataset and the label

In [None]:
from datetime import datetime, timedelta

# Create the data
data = {
    "_time": [datetime(2023, 1, 1) + timedelta(days = i) for i in range(3000)],
    "_field": ["A", "B", "C"]*1000,
    "_value": [randint(1, 100) for _ in range(3000)]
}

# Create the Polars DataFrame
df = pl.DataFrame(
    data, 
    schema={
        "_time": datetime,
        "_field": str,
        "_value": int
    }
)

# Assuming you have the previous DataFrames 'df' and 'df_box_lotti'
common_time = df['_time'][0]  # Common time value shared with 'df'

# Create additional rows with different _time values
end_time_values = (
    [
            common_time - timedelta(days = i) 
            for i in range(1,101)
    ] 
    + [
        common_time + timedelta(days = i) 
        for i in range(1,101)
    ]
)

additional_data = {
    "end_time": end_time_values        
    , "label": [randint(0, 1) for _ in range(1,201)]
}

# Append the additional rows to the existing 'df_box_lotti' DataFrame
df_box_lotti = pl.DataFrame(additional_data)

In [None]:
old_time = time.time()

time_column = "_time"
columns_id_column = "_field"
value_column = "_value"
# time column name for the right dataframe in merge as of
right_time_column = "end_time"
var_fisiche = list(df[columns_id_column].unique())
# minimum number of appearances that a value of columns_id_column should have
# to be kept in the final dataframe
min_num_var_fisiche = 20

# we want to create columns for all the var_fisiche
# but at this step in every row all the columns will be null 
# except the column with the name equal to the _field value in that row.
# Later on we will groupby time stamp and put in the same row
# all the phisical measurements related to that time stamp
df_relazionale = df.with_columns(
    [
        (
            pl.when(pl.col(columns_id_column) == var).then(pl.col(value_column)).otherwise(pl.lit(None))
        ).alias(var) for var in var_fisiche
    ]
)


# print time needed and memory usage
usage = resource.getrusage(resource.RUSAGE_SELF)
time_record = f"Time create row data for var fisiche: {round(time.time()-old_time,2)}, memory usage: {round(usage.ru_maxrss / (1024 * 1024),2)}GB"
print(time_record)
old_time = time.time()

numerosita_var_fisiche = (
    df_relazionale.select(
        var_fisiche
    ).select(
        pl.all().is_not_null().sum()
    ).to_dicts()[0]
)
# filter out columns with low numerosity
var_fisiche_to_select = [
    key 
    for key, value in numerosita_var_fisiche.items() 
    if value > min_num_var_fisiche
]

df_relazionale_ridotto = df_relazionale.select(
    [time_column] + var_fisiche_to_select
)

In [None]:
# compute the min time instant in df_relazionale_ridotto
# to filter out end_times in df_box_lotti that 
# happens before this minimum time
min_var_fisiche_time = df_relazionale_ridotto[time_column].min()

# sort variables before performing merge_as_of
# this should be done in polars with set_sorted before join_asof
df_box_lotti = df_box_lotti.sort(
    by=right_time_column
).set_sorted(right_time_column)
df_relazionale_ridotto = df_relazionale_ridotto.sort(
    by=time_column
).set_sorted(time_column)




In [None]:
# compute final dataset with merge_as_of
# Merge all the rows in the left dataframe with the nearest row in the left
# dataframe that has a value of time_column <= right_time_column
complete_df = (
    df_relazionale_ridotto.join_asof(
        df_box_lotti.filter(
            (pl.col(right_time_column) > min_var_fisiche_time)
        ), 
        strategy="forward", left_on = time_column, right_on = right_time_column
    )
)


### Groupby with list comprehension

In [None]:
complete_df = complete_df.groupby(
    ['LOT_NUMBER', 'BOX_NUMBER']
).agg( 
    [
        pl.col('CONFORMITY').first(),
        pl.col('NC_REASON').first(),
        pl.col('NC_DESCRIPTION').first(),        
    ] +
    [ 
        pl.when(
                pl.col(col).is_not_null().sum()>0
            ).then(pl.col(col).mean().cast(float)
            ).otherwise(None).alias(f"mean_{col}")
        for col in var_fisiche_to_select if col != "BOX_number"      
    ]  
)