
# Preprocessing Pipeline

## NeuroSense Analytics 

### v0.1.0

In [1]:
import numpy as np
import polars as pl
from datetime import date
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer

Creating a `preprocessor` class to automate the import workflow - preprocessor handles data import, datatype casting, one-hot encoding, MICE imputation, joining feature data with survey data and returns a Polars dataframe or NumPy series.  

Output is in the shape of `batch_size`, `time_steps`, `features` for LSTM processing.  

## DataPreprocessor  

> ### Parameters:  

- `path`: *`int`*  
Data path to use (INS_W1 = `1`, INS_W2 = `2`, INS_W3 = `3`, INS_W4 = `4`)  

- `imputer_max_iter`: *`int`, default = `10`*  
Max amount of iterations for IterativeImputer.  

- `imputer_random_state`: *`int`, default = `42`*  
Imputer random state.  

- `nearest_features`: *`int`, default = `None`*  
How many neighbours to sample when imputing.  

- `strategy`: *`{‘mean’, ‘median’, ‘most_frequent’, ‘constant’}`, default = `median`*  
What strategy to use when imputing  

- `impute`: *`bool`, default = `True`*  
Whether or not to run imputation.  

- `exclude_history`: *`bool`, default = `True`*  
Whether or not to exclude 14- and 7-day histories during preprocessing.

> ### Functions:  

- `import_csv_feature_data`:  

    - `csv`: *`str`*   
    File name to be preprocessed  

> ### Example Usage:   

`preprocessor_INS_W1 = DataPreprocessor(DATA_PATH_1, imputer_max_iter=20,impute=True)`


In [82]:
# This script imports CSV files containing feature and survey data, processes them, and prepares them for analysis.
# It also includes functions to load and preprocess the data, including scaling and encoding categorical variables.

# data path for the CSV files
# The data is organized into four directories, each containing feature and survey data.
DATA_PATH_1 = "./csv_data/INS-W_1/"
DATA_PATH_2 = "./csv_data/INS-W_2/"
DATA_PATH_3 = "./csv_data/INS-W_3/"
DATA_PATH_4 = "./csv_data/INS-W_4/"

# setting up scaler for MinMax scaling


class DataPreprocessor:
    def __init__(self, path: str, imputer_max_iter: int = 10, imputer_random_state: int = 42, nearest_features: int = None, strategy: str = "mean", exclude_history: bool = True, impute: bool = True): # Initialize the DataPreprocessor class
        self.path = path
        self.exclude_history = exclude_history # Exclude history data if specified
        self.scaler = MinMaxScaler() # Initialize the MinMaxScaler with specified parameters
        self.impute = impute
        self.imputer = IterativeImputer(max_iter=imputer_max_iter, random_state=imputer_random_state, n_nearest_features=nearest_features, initial_strategy=strategy) # Initialize the IterativeImputer with specified parameters

    # Load the CSV files, cast columns to appropriate types, and drop empty columns
    def import_csv_feature_data(self, file_name: str) -> pl.DataFrame:
        try:
            if self.exclude_history:
                q = (
                    pl.scan_csv(self.path + "FeatureData/" + file_name + ".csv")
                    .select(pl.col("*"))
                    .cast({"date": pl.Date})
                    .drop("")
                    .with_columns(pl.col("pid").str.replace_all("INS-W_",""))
                    .cast({"pid": pl.Int32})
                    .select(pl.exclude(pl.String))
                    .select(pl.exclude(["^.*14dhist$", "^.*7dhist$"]))
                )
            else:
                q = (
                    pl.scan_csv(self.path + "FeatureData/" + file_name + ".csv")
                    .select(pl.col("*"))
                    .cast({"date": pl.Date})
                    .drop("")
                    .with_columns(pl.col("pid").str.replace_all("INS-W_",""))
                    .cast({"pid": pl.Int32})
                    .select(pl.exclude(pl.String))
                )
            data = q.collect() # Collect the lazy frame into a DataFrame

            if self.impute:
                scaled_data = pl.from_numpy( # Convert to numpy array for scaling
                        self.scaler.fit_transform(data.select(pl.exclude([pl.Date, pl.Int32]))), schema=data.select(pl.exclude([pl.Date, pl.Int32])).columns # min max scaling on all columns except date and pid
                    )
                try:
                    self.imputer.fit(scaled_data) # Fit the imputer to the scaled data
                    imputed_data = pl.from_numpy(
                        self.imputer.transform(scaled_data), schema=data.select(pl.exclude(["pid","date"])).columns # Transform the scaled data using the imputer
                    )
                    data = data.select(["pid","date"])
                    data = data.hstack(imputed_data) # Add imputed data back to the DataFrame
                    del imputed_data # Delete the imputed data variable to free up memory
                except: 
                    print("Error in imputation, returning scaled data without imputation.")
                    return data
                return data
            return data
        
        except Exception as e:
            print(f"Error importing feature data from {self.path + 'FeatureData/' + file_name}: {e}")
            return pl.DataFrame()

    def import_csv_survey_data(self, file_name: str) -> pl.DataFrame:
        try: # Load survey data from CSV file
            q = (
                pl.scan_csv(self.path + "SurveyData/" + file_name + ".csv")
                .select(pl.col("*"))
                .cast({"date": pl.Date})
                .drop("")
                .with_columns(pl.col("pid").str.replace_all("INS-W_",""))
                .cast({"pid": pl.Int32})
            )
            data = q.collect()
            match file_name:
                case "ema":
                    survey_data = data.select(pl.exclude(["pid","date"])) # Convert to numpy array for scaling
                    scaled_data = pl.from_numpy(
                        self.scaler.fit_transform(survey_data), schema=survey_data.columns
                    )
                    data = data.select(["pid","date"])
                    data = data.hstack(scaled_data)
                    del scaled_data
                    return data
                case "post":
                    survey_data = data.select(pl.exclude(["pid","date"])) # Convert to numpy array for scaling
                    scaled_data = pl.from_numpy(
                        self.scaler.fit_transform(survey_data), schema=survey_data.columns
                    )
                    data = data.select(["pid","date"])
                    data = data.hstack(scaled_data)
                    del scaled_data
                    return data
                case "pre":
                    survey_data = data.select(pl.exclude(["pid","date"]))
                    scaled_data = pl.from_numpy(
                        self.scaler.fit_transform(survey_data), schema=survey_data.columns
                    )
                    data = data.select(["pid","date"])
                    data = data.hstack(scaled_data)
                    del scaled_data
                    return data
        except Exception as e:
            print(f"Error importing survey data from {self.path + 'SurveyData/' + file_name}: {e}")
            return pl.DataFrame()

    def import_dep_endterm(self) -> pl.DataFrame:
        try:
            q = (
                pl.scan_csv(self.path + "SurveyData/dep_endterm.csv")
                .select(pl.col("*"))
                .cast({"date": pl.Date})
                .drop("")
                .with_columns(pl.col("pid").str.replace_all("INS-W_",""))
                .cast({"pid": pl.Int32})    
                )
            data = q.collect()
            bdi2 = data.select(pl.exclude(["pid","date", "dep"]))
            data_scaled = pl.from_numpy(
                self.scaler.fit_transform(bdi2), schema=bdi2.columns # min max scaling on all columns except date and pid
            )
            data = data.select(["pid","date", "dep"])
            data = data.hstack(data_scaled) # Add scaled data back to the DataFrame
            del data_scaled # Delete the scaled data variable to free up memory
            data = data.to_dummies("dep")
            return data
        except Exception as e:
            print(f"Error importing endterm data from {self.path + 'SurveyData/dep_endterm.csv'}: {e}")
            return pl.DataFrame()

    def merge_survey_to_feature(self, feature_data: pl.DataFrame, survey_data: pl.DataFrame) -> pl.DataFrame:
        try:
            merged_data = feature_data.join(survey_data, on=["pid"], how="inner")
            return merged_data
        except Exception as e:
            print(f"Error merging feature and survey data: {e}")
            return pl.DataFrame()

# INS-W_1

In [79]:
preprocessor_INS_W1 = DataPreprocessor(DATA_PATH_1, imputer_max_iter=20,impute=True)