In [1]:
import pandas as pd
import json
import polars as pl
import pathlib
from typing import Type
import datetime

ENCODING = 'utf-8'
INPUT_PATH = pathlib.Path().absolute().parent / "data" / "predict-energy-behavior-of-prosumers" / "client.csv"
COUNTY_MAP_PATH: pathlib.Path = pathlib.Path().absolute().parent / "data" / "predict-energy-behavior-of-prosumers" / "county_id_to_name_map.json"
BASE_PATH: pathlib.Path = pathlib.Path().absolute().parent / "data" / "raw" / "clients"

product_type_map: dict[int, str] = {0: "Combined", 1: "Fixed", 2: "General service", 3: "Spot"}


In [2]:
def preprocess_product_type(data: pl.DataFrame) -> pl.DataFrame:
    """Preprocesses the product type column in the given DataFrame.

    Args:
        data (pd.DataFrame): The DataFrame containing the product type column.

    Returns:
        pd.DataFrame: The DataFrame with the product type column preprocessed.

    Examples:
        >>> data = pd.DataFrame({"product_type": ["A", "B", "C"]})
        >>> preprocess_product_type(data)
           product_type
        0             A
        1             B
        2             C
    """
    # data["product_type"] = data["product_type"].apply(lambda x: product_type_map.get(x, x))
    return data.with_columns([pl.col("product_type").replace(product_type_map, default="General service")])


def preprocess_county(data: pl.DataFrame) -> pl.DataFrame:
    """Preprocesses the county column in the given DataFrame.

    Args:
        data (pd.DataFrame): The DataFrame containing the county column.

    Returns:
        pd.DataFrame: The DataFrame with the county column preprocessed.

    Examples:
        >>> data = pd.DataFrame({"county": [1, 2, 3]})
        >>> preprocess_county(data)
           county
        0  County1
        1  County2
        2  County3
    """
    with COUNTY_MAP_PATH.open("r") as f:
        county_id_to_name_map: dict[int, str] = json.load(f)

    return data.with_columns([pl.col("county").replace(county_id_to_name_map, default="Unknown")])


def preprocess_date(data: pl.DataFrame) -> pl.DataFrame:
    """Preprocesses the date column in the given DataFrame by converting it to datetime format.

    Args:
        data (pd.DataFrame): The DataFrame containing the date column.

    Returns:
        pd.DataFrame: The DataFrame with the date column preprocessed.

    Examples:
        >>> data = pd.DataFrame({"date": ["2022-01-01", "2022-01-02", "2022-01-03"]})
        >>> preprocess_date(data)
               date
        0 2022-01-01
        1 2022-01-02
        2 2022-01-03
    """
    # data["date"] = pd.to_datetime(data["date"])
    return data.with_columns([pl.col("date").str.to_date("%Y-%m-%d")])


def preprocess_is_business(data: pl.DataFrame) -> pl.DataFrame:
    """Preprocesses the is_business column in the given DataFrame by replacing 0 with False and 1 with True.

    Args:
        data (pd.DataFrame): The DataFrame containing the is_business column.

    Returns:
        pd.DataFrame: The DataFrame with the is_business column preprocessed.

    Examples:
        >>> data = pd.DataFrame({"is_business": [0, 1, 0]})
        >>> preprocess_is_business(data)
           is_business
        0        False
        1         True
        2        False
    """
    # data["is_business"] = data["is_business"].replace({0: False, 1: True})
    return data.with_columns([pl.col("is_business").cast(pl.Boolean)])


def preprocess_clients_columns(data: pl.DataFrame) -> pd.DataFrame:
    """Preprocesses multiple columns in the given DataFrame.

    This function applies a series of preprocessing steps to the product_type, county, date, and is_business columns in
        the DataFrame.

    Args:
        data (pd.DataFrame): The DataFrame to be preprocessed.

    Returns:
        pd.DataFrame: The preprocessed DataFrame.

    Examples:
        >>> data = pd.DataFrame({"product_type": ["A", "B", "C"], "county": [1, 2, 3],
            "date": ["2022-01-01", "2022-01-02", "2022-01-03"], "is_business": [0, 1, 0]})
        >>> preprocess_clients_columns(data)
           product_type   county       date  is_business
        0             A  County1 2022-01-01        False
        1             B  County2 2022-01-02         True
        2             C  County3 2022-01-03        False
    """
    data = preprocess_product_type(data)
    data = preprocess_county(data)
    return preprocess_is_business(data)


def set_column_types(data: pl.DataFrame) -> pl.DataFrame:
    """Sets the column types of the given DataFrame.

    This function iterates over each column in the DataFrame and sets the column types based on their current dtype.
        Columns with dtype 'object' are converted to 'string[pyarrow]', columns with dtype 'int' are converted to
        'int32', and columns with dtype 'float' are converted to 'float32'.

    Args:
        data (pd.DataFrame): The DataFrame to set the column types.

    Returns:
        pd.DataFrame: The DataFrame with the updated column types.
    """
    column_mapping: dict[Type[pl.Int64 | pl.Float64], Type[pl.Int32 | pl.Float32]] = {
        pl.Int64: pl.Int32,
        pl.Float64: pl.Float32,
    }
    return data.with_columns(
        [
            pl.col(column).cast(column_mapping.get(col_dtype, col_dtype))
            for column, col_dtype in zip(data.columns, data.dtypes)
        ]
    )


def fetch_clients_at_day(day: datetime.date) -> pl.DataFrame:
    """Fetches client data at a specific day from a Kafka consumer.

    This function creates a Kafka consumer and fetches client data from the specified day. The fetched data is returned
        as a pandas DataFrame.

    Args:
        day (datetime.date): The specific day to fetch client data.

    Returns:
        pd.DataFrame: The DataFrame containing the fetched client data.
    """
    clients: pd.DataFrame = pd.read_csv(INPUT_PATH)

    clients_row: list[dict] = []
    for _, row in clients.iterrows():
        clients_row.append(row.to_dict())

    data = pl.from_dicts(clients_row)
    return preprocess_date(data).filter(pl.col("date") == day)

def create_base_output_path(date: datetime.date) -> pathlib.Path:
    """Creates the base output path for client data.

    This function takes a date and constructs the base output path for storing client data. The output path is a
        pathlib.Path object.

    Args:
        date (datetime.date): The date used to construct the base output path.

    Returns:
        pathlib.Path: The base output path for client data.
    """
    year, month, day = date.year, date.month, date.day
    return BASE_PATH / str(year) / str(month) / str(day) / "clients.parquet"


def save_clients_to_datalake(data: pd.DataFrame, output_path: pathlib.Path) -> bool:
    """Saves client data to the datalake.

    This function takes a DataFrame of client data and a specific day, and saves the data to the datalake. The data is
        saved as a parquet file at the corresponding output path based on the given day.

    Args:
        data (pd.DataFrame): The DataFrame of client data to be saved.
        day (datetime.date): The specific day associated with the client data.

    Returns:
        bool: True if the data is successfully saved, False otherwise.
    """
    # print(f"output path: {output_path}")
    try:
        output_path.parent.mkdir(parents=True, exist_ok=True)
        data.write_parquet(output_path)
        return True
    except Exception as e:
        print(e)
        return False

In [3]:
# Prenditi tutti i dati di ieri da kafka
start_date: datetime.date = datetime.date(year=2021, month=9, day=1)
end_date: datetime.date = datetime.date(year=2023, month=5, day=29)
days_to_process: datetime.timedelta = end_date - start_date

processed_dates = []
unprocessed_dates = []

for day_offset in range(days_to_process.days + 1):
    process_date: datetime.date = start_date + datetime.timedelta(days=day_offset)
    output_path: pathlib.Path = create_base_output_path(process_date)
    if output_path.exists():
        print(f"{process_date} already processed")
        continue

    print(f"Processing {process_date}")

    clients: pl.DataFrame = fetch_clients_at_day(day=process_date)

    clients = set_column_types(clients)

    clients = preprocess_clients_columns(clients)

    if data_saved := save_clients_to_datalake(clients, output_path):
        processed_dates.append(process_date)
    else:
        unprocessed_dates.append(process_date)

unprocessed_dates

2021-09-01 already processed
2021-09-02 already processed
2021-09-03 already processed
2021-09-04 already processed
2021-09-05 already processed
2021-09-06 already processed
2021-09-07 already processed
2021-09-08 already processed
2021-09-09 already processed
2021-09-10 already processed
2021-09-11 already processed
2021-09-12 already processed
2021-09-13 already processed
2021-09-14 already processed
2021-09-15 already processed
2021-09-16 already processed
2021-09-17 already processed
2021-09-18 already processed
2021-09-19 already processed
2021-09-20 already processed
2021-09-21 already processed
2021-09-22 already processed
2021-09-23 already processed
2021-09-24 already processed
2021-09-25 already processed
2021-09-26 already processed
2021-09-27 already processed
2021-09-28 already processed
2021-09-29 already processed
2021-09-30 already processed
2021-10-01 already processed
2021-10-02 already processed
2021-10-03 already processed
2021-10-04 already processed
2021-10-05 alr

[]