In [1]:
import os
import pandas as pd
from time import time
from tqdm import tqdm as progressbar
from utils.constant import *

print("Loading Datasets...")
datasets = []

for filename in progressbar(os.listdir(DIR_RAW)):
    if filename.endswith(".csv"):
        file_path = os.path.join(DIR_RAW, filename)
        df = pd.read_csv(file_path, low_memory=False)
        datasets.append(df)

print("Datasets loaded.")

Loading Datasets...


100%|██████████| 6/6 [00:09<00:00,  1.64s/it]

Datasets loaded.





In [2]:
def cleaning_datasets(datasets: list[pd.DataFrame], columns_to_keep: list[str] = ["Territorio", "SEXISTAT1", "ETA1", "TIME", "Value"]) -> list[pd.DataFrame]:
    """Pulisce una lista di DataFrame secondo criteri specificati."""

    for i, dataset in progressbar(enumerate(datasets), desc="Pulizia dei Dataset"):
        # Mantieni solo le colonne specificate
        dataset = dataset[columns_to_keep]

        # Rimuovi record indesiderati e stringhe specifiche, converte in numerico
        dataset = dataset[(dataset['SEXISTAT1'] != 9) & 
                          (dataset['ETA1'] != 'TOTAL') & 
                          (dataset['Territorio'].isin(ITA_STATE))]
        dataset['ETA1'] = pd.to_numeric(dataset['ETA1'].replace({'Y_GE100': '100'}).str.replace('Y', ''), errors='coerce')
        dataset.dropna(subset=['ETA1'], inplace=True)

        # Verifica tipo della colonna
        if not pd.api.types.is_integer_dtype(dataset['ETA1']):
            raise ValueError("La colonna ETA1 contiene ancora tipi non int")

        datasets[i] = dataset

    return datasets



print("Cleaning Datasets...")

cleaned_datasets = cleaning_datasets(datasets)

print("Datasets cleaned.")

Cleaning Datasets...


Pulizia dei Dataset: 6it [00:01,  3.74it/s]

Datasets cleaned.





In [3]:
def age_range(dataset: pd.DataFrame, age_bins: list[int] = [0, 9, 19, 29, 39, 49, 59, 69, 79, 89, 99, 109]) -> pd.DataFrame:
    """Crea gruppi di età e aggiunge le colonne di età raggruppate con nomi leggibili."""
    
    age_groups = pd.cut(dataset['ETA1'], bins=age_bins, right=False)

    pivot_df = dataset.pivot_table(
        index=['TIME', 'SEXISTAT1'],
        columns=age_groups, 
        values='Value', 
        aggfunc='sum', 
        fill_value=0, 
        observed=False
    ).reset_index()

    format_name = "Age_{}_{}"
    pivot_df.columns = ['TIME', 'SEXISTAT1'] + [format_name.format(interval.left, interval.right) for interval in pivot_df.columns[2:]]

    return pd.merge(dataset.drop(columns=['Value', 'ETA1']), pivot_df, on=['TIME', 'SEXISTAT1'], how='left')


features_function = {
    "age_range": age_range,
}

def apply_feature_engineering(datasets: list[pd.DataFrame]) -> list[pd.DataFrame]:
    """Applica tutte le funzioni di feature engineering definite a ciascun dataset."""
    for i, dataset in enumerate(progressbar(datasets, desc="Feature Engineering")):
        for feature_name, feature_function in features_function.items():
            dataset = feature_function(dataset)  # Applica ogni funzione di feature engineering
        datasets[i] = dataset.drop_duplicates()  # Rimuovi duplicati dopo tutte le applicazioni

    return datasets

print("Feature Engineering...")
featured_dataset = apply_feature_engineering(cleaned_datasets)
print("Feature Engineering done.")

Feature Engineering...


Feature Engineering: 100%|██████████| 6/6 [00:00<00:00, 10.99it/s]

Feature Engineering done.





In [4]:
def merge_datasets(datasets: list[pd.DataFrame]) -> pd.DataFrame:
    """Unisce una lista di DataFrame in un unico DataFrame."""

    def rename_columns(dataset: pd.DataFrame) -> pd.DataFrame:
        """Rinomina le colonne del DataFrame con nomi più riconoscibili."""

        new_column_names = {
            "Territorio": "Territory",
            "SEXISTAT1": "Sex",
            "TIME": "Year"
        }

        return dataset.rename(columns=new_column_names)
    
    def convert_unsupported_types(dataset: pd.DataFrame) -> pd.DataFrame:
        """Converte i tipi di dati non supportati da Parquet in tipi supportati."""
        for col in dataset.columns:
            dtype = dataset[col].dtype
            if isinstance(dtype, pd.CategoricalDtype) or pd.api.types.is_object_dtype(dataset[col]):
                dataset[col] = dataset[col].astype(str)
            elif isinstance(dtype, pd.IntervalDtype):
                dataset[col] = dataset[col].apply(lambda x: str(x) if pd.notnull(x) else None)
        return dataset

    def sort_by_year(dataset: pd.DataFrame) -> pd.DataFrame:
        """Ordina il DataFrame in base alla colonna TIME."""
        if not pd.api.types.is_numeric_dtype(dataset['TIME']):
            dataset['TIME'] = pd.to_numeric(dataset['TIME'], errors='coerce')
        return dataset.sort_values(by='TIME', ascending=True)

    merged_dataset = pd.concat(datasets, ignore_index=True)
    merged_dataset = convert_unsupported_types(merged_dataset)
    merged_dataset = sort_by_year(merged_dataset)
    merged_dataset = rename_columns(merged_dataset)

    return merged_dataset


def process_and_save(datasets: list[pd.DataFrame], output_dir: str = DIR_CLEANED, output_filename: str = f"merged_dataset{int(time())}", save_to_csv: bool = False):
    """Processa i dataset e li salva in formato Parquet e CSV (opzionale)."""
    if not os.path.exists(output_dir):  
        os.makedirs(output_dir)

    merged_dataset = merge_datasets(datasets)

    file_path = os.path.join(output_dir, output_filename)
    try:
        merged_dataset.to_parquet(file_path + ".parquet", index=False)
        if save_to_csv:
            merged_dataset.to_csv(file_path + ".csv", index=False)
            print(f"Dataset salvato come {file_path} in formato CSV e Parquet!")
        else:
            print(f"Dataset salvato come {file_path} in formato Parquet!")
    except Exception as e:
        print(f"Errore durante il salvataggio del dataset")


print("Merging Datasets and Saving...")

process_and_save(featured_dataset, save_to_csv=True)

Merging Datasets and Saving...
Dataset salvato come ../data/cleaned/merged_dataset1726155302 in formato CSV e Parquet!
