In [None]:
!pip install wandb -qU
!pip install keras-tuner
import wandb
wandb.login()

In [None]:
import os
from typing import Dict
from google.colab import drive
import numpy as np
import pandas as pd
import keras
from keras import Sequential
from keras.layers import (
    Conv1D,
    MaxPooling1D,
    LSTM,
    Dense,
    Dropout,
    GlobalAveragePooling1D,
    BatchNormalization,
    Flatten,
)
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping, ModelCheckpoint, Callback
from keras.utils import to_categorical
from keras.metrics import Precision, Recall, AUC

import keras_tuner
import matplotlib.pyplot as plt
import yaml
import warnings

warnings.filterwarnings("ignore")

In [None]:
run = wandb.init(
    project="opportunity-sensors",
    config={
        "learning_rate": 0.001,
        "architecture": "CNN-LSTM",
        "dataset": "Opportunity",
        "epochs": 10,
    },
)

In [None]:
from wandb.integration.keras import WandbMetricsLogger, WandbModelCheckpoint

wandb_callbacks = [
    WandbMetricsLogger(),
    WandbModelCheckpoint(filepath="cnn_lstm{epoch:02d}.keras"),
    # todo: define these callbacks elsewhere
    # EarlyStopping(monitor="val_loss", patience=3, restore_best_weights=True),
    # ModelCheckpoint(filepath="cnn_lstm{epoch:02d}", save_best_only=True),
]

# add this callback to model callbacks when training

In [None]:
drive.mount("/content/drive")
data_path = "/content/drive/My Drive/opportunity_raw/"
config_path = "/content/drive/My Drive/opportunity_config/"
os.listdir(data_path)

In [None]:
def load_variable_names(filename):
    with open(filename, "r") as file:
        names = yaml.safe_load(file)
    return names

In [None]:
column_names = load_variable_names(
    os.path.join(config_path, "unique_column_names.yaml")
)
locomotion_set = load_variable_names(os.path.join(config_path, "locomotion_set.yaml"))
body_features = load_variable_names(os.path.join(config_path, "body_features.yaml"))
assert type(locomotion_set) == list, "Yaml not loaded correctly"

Load data


In [None]:
def get_file(filepath: str):
    """
    Gets the data file with required subset columns (Locomotion set used here)."""
    df = pd.read_csv(filepath, header=None, names=column_names, delimiter=" ")
    return df[locomotion_set]

In [None]:
def remove_missing_values(df: pd.DataFrame, method="linear", order=None):
    """
    Interpolation for missing values
    """
    # todo: add method for removing missing rows entirely
    # todo allow interpolation only if certain number of columns have missing values
    if not df.empty:
        if method == "spline":
            return df.interpolate(method=method, order=order)
        return df.interpolate(method=method, limit_direction="both")
    return df

In [None]:
def apply_sliding_window_combine(
    filepath: str, target_var: str = "Locomotion", window_size=16, overlap=0.5
):
    """
    Apply sliding window transforms to features and target of the given file.

    Params:
    - filepath (str): Path of the csv file that data is contained in.
    - target_var (str): Target variable chosen for the given csv file.
    - window_size (int): Size of one sliding window for the transform.
    - overlap (float): Percentage overlap between two consecutive sliding windows.
    """
    # todo: error handling for no or multiple target variables.
    # todo: allow alternative saving as a pandas dataframe.
    shift_by = int(window_size * (1 - overlap))
    df = get_file(filepath)
    df = remove_missing_values(df)
    # print(df.isnull().any())

    def sliding_window_processing(data, window_size, shift_by):
        """
        Perform sliding window operations on a single column of a df, return as np array.
        """
        start, end = 0, window_size
        windows = []
        while end <= len(data):
            windows.append(data[start:end])
            start += shift_by
            end += shift_by
        return np.array(windows)

    sliding_windows = {
        feature: sliding_window_processing(df[feature].values, window_size, shift_by)
        for feature in body_features
    }
    target = sliding_window_processing(df[target_var].values, window_size, shift_by)
    combined_data = np.stack(
        [sliding_windows[feature] for feature in body_features], axis=-1
    )

    return combined_data, target

In [None]:
# test filenames according to opportunity challenge
# one issue remains - subject wise training needs to be checked as well
test_filenames = [
    "S1-ADL4_sensors_data.txt",
    "S1-ADL5_sensors_data.txt",
    "S2-ADL4_sensors_data.txt",
    "S2-ADL5_sensors_data.txt",
    "S3-ADL4_sensors_data.txt",
    "S3-ADL5_sensors_data.txt",
    "S4-ADL4_sensors_data.txt",
    "S4-ADL5_sensors_data.txt",
]


combined_Xtrain = []
combined_train_target = []
combined_Xtest = []
combined_test_target = []


for filepath in os.listdir(data_path):
    if os.path.basename(filepath) in test_filenames:
        print("test", filepath)
        combined_test, combined_test_t = apply_sliding_window_combine(
            os.path.join(data_path, filepath)
        )
        print(combined_test.shape, combined_test_t.shape)
        combined_test_target.append(combined_test_t)
        combined_Xtest.append(combined_test)

    else:
        print("train", filepath)
        combined_train, combined_train_t = apply_sliding_window_combine(
            os.path.join(data_path, filepath)
        )
        # print(combined_train.shape, combined_train_t.shape)
        combined_train_target.append(combined_train_t)
        combined_Xtrain.append(combined_train)


print(combined_Xtrain[0].shape, combined_Xtrain[1].shape)
final_combined_train = np.concatenate(combined_Xtrain)
final_combined_test = np.concatenate(combined_Xtest)
final_combined_train_target = np.concatenate(combined_train_target)
final_combined_test_target = np.concatenate(combined_test_target)

X_train = np.array(final_combined_train)
X_test = np.array(final_combined_test)

In [None]:
def majority_voting_labels(y_train):
    y_train_adjusted = np.where(y_train > 2, y_train - 1, y_train)
    y_train_majority_voted = np.apply_along_axis(
        lambda x: np.bincount(x).argmax(), 1, y_train_adjusted
    )

    return y_train_majority_voted

In [None]:
y_train_majority = majority_voting_labels(final_combined_train_target)
y_test_majority = majority_voting_labels(final_combined_test_target)

y_train_majority_encoded = to_categorical(y_train_majority)
y_test_majority_encoded = to_categorical(y_test_majority)

assert y_train_majority_encoded.shape[1] == 5, "Label Encoding is incorrect"
assert not np.any(np.isnan(y_train_majority_encoded)), "y_train contains NaN values"
assert not np.any(np.isnan(X_train)), "X_train still has missing values"

In [None]:
y_train_classes, y_train_counts = np.unique(y_train_majority, return_counts=True)
y_test_classes, y_test_counts = np.unique(y_test_majority, return_counts=True)

plt.figure(figsize=(12, 6))

plt.subplot(1, 2, 1)
plt.bar(y_train_classes, y_train_counts, color="blue", alpha=0.7)
plt.xlabel("Class")
plt.ylabel("Count")
plt.title("Class Distribution in Training Data")

plt.subplot(1, 2, 2)
plt.bar(y_test_classes, y_test_counts, color="green", alpha=0.7)
plt.xlabel("Class")
plt.ylabel("Count")
plt.title("Class Distribution in Test Data")

plt.tight_layout()
plt.show()

In [None]:
print(f"X_train shape: {X_train.shape}")
print(f"X_test shape: {X_test.shape}")

In [None]:
def build_model(hp):
    input_shape = (16, 77)
    num_classes = 5

    model = Sequential()
    model.add(LSTM(units=32, return_sequences=True, input_shape=input_shape))
    model.add(LSTM(units=32, return_sequences=True))

    model.add(
        Conv1D(
            filters=64,
            kernel_size=hp.Int("kernel_size", min_value=3, max_value=5, step=2),
            strides=hp.Int("strides", min_value=2, max_value=4, step=1),
            activation="relu",
        )
    )
    model.add(BatchNormalization())
    model.add(MaxPooling1D(pool_size=2, strides=2))
    model.add(Conv1D(filters=128, kernel_size=3, strides=1, activation="relu"))
    model.add(GlobalAveragePooling1D())

    model.add(Dense(num_classes, activation="softmax"))
    optimizer = Adam(
        learning_rate=hp.Choice("learning_rate", values=[1e-2, 1e-3, 1e-4])
    )

    model.compile(
        optimizer=optimizer,
        loss="categorical_crossentropy",
        metrics=["accuracy", Precision(), Recall(), AUC()],
    )
    return model

In [None]:
build_model(hp=keras_tuner.HyperParameters())

In [None]:
def train_model(model, X_val, y_val, epochs=10, batch_size=64):
    pass

In [None]:
tuner = keras_tuner.RandomSearch(
    hypermodel=build_model,
    objective="val_accuracy",
    max_trials=5,
    executions_per_trial=3,
    overwrite=True,
    directory="results",
    project_name="opportunity",
)

In [None]:
tuner.search(
    X_train,
    y_train_majority_encoded,
    epochs=10,
    validation_split=0.2,
    callbacks=wandb_callbacks,
)

In [None]:
tuner.search_space_summary()

In [None]:
models = tuner.get_best_models(num_models=2)
best_model = models[0]
best_model.summary()

In [None]:
tuner.results_summary()

In [None]:
def evaluate_model(model, X_test, y_test, metrics):
    pass

In [None]:
def convert_history_to_dict(history) -> Dict:
    pass

In [None]:
def plot_loss_curves(history_dict):
    pass