# Heartbeat Classification

## 1. Setup

### 1.1. Library Imports

In [None]:
import random
from pathlib import Path

from src.features import feature_extraction

import dask.dataframe as dd
import numpy as np
import pandas as pd
from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_sample_weight
from xgboost import XGBClassifier

### 1.2. Configuration

In [None]:
ROOT = Path(".")

PATH_TO_DATA = ROOT / "data"
PATH_TO_X = PATH_TO_DATA / "X_train.csv"
PATH_TO_Y = PATH_TO_DATA / "y_train.csv"

PATH_TO_FEATURES = PATH_TO_DATA / "features.csv"
ENCODING = "utf-8"
COMPRESSION = None


##################################################
# Reproducibility
##################################################
SEED = 3
random.seed(SEED)
RS_NUMPY = np.random.RandomState(SEED)


##################################################
# Others
##################################################
SAMPLING_FREQ = 300  # Hz
feature_extract = lambda signal: feature_extraction(signal, SAMPLING_FREQ)

## 2. Data Preparation

### 2.1. Feature Extraction

In [None]:
features = None

# Check if the features were already computed.
if PATH_TO_FEATURES.exists():
    features = pd.read_csv(PATH_TO_FEATURES, index_col=0)
else:
    # Load raw ECG signals.
    X = pd.read_csv(PATH_TO_X, index_col=0)

    # Sanity checks.
    # Check for non-empty DataFrames.
    assert not X.empty, "X is empty."

    # Extract features from ECG signals.
    # Use Dask to parallelize the computation.
    columns = feature_extract(X.iloc[0]).index.to_list()
    meta = {col: "float64" for col in columns}

    dX = dd.from_pandas(X, npartitions=4)
    features_dask = dX.map_partitions(
        lambda df: df.apply(lambda x: feature_extract(x), axis=1),
        meta=meta,
    )
    features = features_dask.compute()

    # Save the computed features to a CSV file.
    features.to_csv(PATH_TO_FEATURES, encoding=ENCODING, compression=COMPRESSION)


assert features is not None, "Error: `features` DataFrame incorrectly initialized."

### 2.2. Dataset Creation

In [None]:
##################################################
# Loading the Labels
##################################################
# Load labels.
y = pd.read_csv(PATH_TO_Y, index_col=0)


# Sanity checks.
# Check for non-empty DataFrames.
assert not y.empty, "y is empty."

# Check for no NaN values in target variable.
assert y.isnull().sum().sum() == 0, "y contains NaN values."

# Check for correct number of classes.
assert len(y["y"].unique()) == 4, "Incorrect number of classes."

# Check for matching row numbers.
assert features.shape[0] == y.shape[0], "X and y have different numbers of rows."


##################################################
# Data Split: Train and Test Dataset Creation
##################################################
X_train, X_test, y_train, y_test = train_test_split(
    features, y, test_size=0.1, random_state=RS_NUMPY, stratify=y
)

### 2.3. Imputation of Missing Values

We use the median value estimator derived from the train dataset to impute missing values in both the train and test datasets. This approach is chosen (as opposed to using different estimators per set) because the train dataset is substantially larger and assumed to be "representative" of the overall population. Consequently, it provides a more reliable estimator than one that would be calculated from the smaller test dataset.



In [None]:
median_estimator = X_train.median(axis=0, skipna=True)

X_train = X_train.fillna(median_estimator, axis=0)
X_test = X_test.fillna(median_estimator, axis=0)

## 3. Training Process

In [None]:
# Compute sample weights for handling class imbalance.
sample_weights = compute_sample_weight(class_weight="balanced", y=y_train)


# Train XGBoost classifier with selected features and sample weights.
xgb = XGBClassifier(use_label_encoder=False, n_jobs=-1, random_state=SEED)
xgb.fit(X_train, y_train, sample_weight=sample_weights)

## 4. Evaluation

In [None]:
# Predictions
y_test_pred = xgb.predict(X_test)

# Evaluation metrics
score_test = f1_score(y_test, y_test_pred, average="micro")
print(f"F1 test set: {score_test}")