In [1]:
import os

import mlflow
import numpy as np
import pandas as pd
from beartype.typing import List, Dict
from sklearn.pipeline import Pipeline

In [2]:
# Get current working directory
import sys
current_path = os.getcwd()
sys.path.append(os.path.join(current_path, '..'))

In [3]:
from roug_ml.utl.evaluation.eval_utl import calc_loss_acc_val
from roug_ml.utl.evaluation.multiclass import compute_multiclass_confusion_matrix
from roug_ml.utl.mlflow_utils import get_best_run
from roug_ml.utl.paths_utl import create_dir
from roug_ml.models.hyperoptimization import generate_param_grid_with_different_size_layers, \
    get_or_create_experiment, get_best_run_from_hyperoptim
from roug_ml.utl.processing.filters import LowPassFilter
from roug_ml.utl.processing.signal_processing import SignalProcessor
from roug_ml.utl.transforms.features_management import FeatureFlattener
from roug_ml.utl.set_seed import set_seed

set_seed(42)

from roug_ml.models.hyperoptimization import parallele_hyper_optim
from roug_ml.utl.parameter_utils import restructure_dict

from roug_ml.utl.etl.data_extraction import extract_activity_data_from_users
from roug_ml.models.pipelines.pipelines import NNTorch

from processing.dataset_processing import covert_3a_from_pandas_to_dict
from roug_ml.configs.data_paths import M_HEALTH_PATH
from roug_ml.configs.data_labels import M_HEALTH_ACTIVITIES_LABELS

AttributeError: module 'tensorflow.compat.v2.__internal__' has no attribute 'register_load_context_function'

In [None]:


loss_functions = {
    "categorical_crossentropy": "categorical_crossentropy"
}


def filter_data_dict(data_dict: dict, subjects: list) -> dict:
    """
    Filter the data_dict dictionary based on a specific set of subjects.
    :param data_dict: The input dictionary.
    :param subjects: The list of subjects to keep.
    returns: The filtered dictionary.
    """
    filtered_dict = {
        key: [value[i] for i in range(len(data_dict['user'])) if data_dict['user'][i] in subjects]
        for key, value in data_dict.items()}
    return filtered_dict


class MHealtAnalysis:
    def __init__(self) -> None:
        """
        Performs following analysis:
            - Extract data
            - Make plots for analysis
        """
        self.list_of_positions = ['chest', 'ankle', 'right_arm']
        # self.list_of_positions = ['right_arm']
        self.number_of_point_per_activity = 511  # TODO: automatic

        self.features_to_extract = ['mean', 'std', 'rms', 'max', 'min', 'var']

        # Define the number of parallel workers
        # Use all available CPU cores except one
        self.num_workers = 1  # multiprocessing.cpu_count() - 1
        self.framework = 'torch'
        # self.framework = 'tf'

        # model params:
        self.nn_params_keys = ["activations", "in_nn", "input_shape", "output_shape"]
        self.other_keys = ["batch_size", "cost_function", "learning_rate", "metrics", "n_epochs",
                           "nn_key"]

        self.results_path = os.path.join(M_HEALTH_PATH, '../models')
        create_dir(self.results_path)

        mlflow.set_tracking_uri("http://localhost:8000")
        self.mlflow_experiment_name = 'run_non_parallel_shu_5'
        self.mlflow_experiment_id = get_or_create_experiment(self.mlflow_experiment_name)

        self.re_optimize = True

    def run(self):
        """


        """
        # Load data
        dataset = self.read_data(M_HEALTH_PATH)

        # Process data to right format
        dataset_dict_chest = covert_3a_from_pandas_to_dict(in_dataset=dataset,
                                                           in_label_col=23,
                                                           in_user_col=24,
                                                           in_3a_col=[0, 1, 2])

        dataset_dict_ankle = covert_3a_from_pandas_to_dict(in_dataset=dataset,
                                                           in_label_col=23,
                                                           in_user_col=24,
                                                           in_3a_col=[5, 6, 7])

        dataset_dict_right_arm = covert_3a_from_pandas_to_dict(in_dataset=dataset,
                                                               in_label_col=23,
                                                               in_user_col=24,
                                                               in_3a_col=[14, 15, 16])

        dataset_dict = {
            'y_num': dataset_dict_chest['y_num'],
            'y_onehot': dataset_dict_chest['y_onehot'],
            'x_chest': dataset_dict_chest['x'],
            'x_ankle': dataset_dict_ankle['x'],
            'x_right_arm': dataset_dict_right_arm['x'],
            'User': dataset_dict_chest['User']
        }

        # Create dataset
        final_data_set = self.extract_data_during_activity(
            in_num_of_users=10,
            in_dataset_dict=dataset_dict,
            in_accelerometer_position=self.list_of_positions)

        data_set_train, data_set_test = self.split_data_for_train_test_portion_of_pa(
            in_data=final_data_set, in_points_in_each_set=self.number_of_point_per_activity,
            in_accelerometer_position=self.list_of_positions)

        # Split by patients: in the original dataset we use
        subjects_to_keep = ['subject' + str(i) for i in range(7)]
        data_set_train = filter_data_dict(data_set_train, subjects_to_keep)

        subjects_to_keep = ['subject' + str(i + 7) for i in range(3)]
        data_set_test = filter_data_dict(data_set_test, subjects_to_keep)

        key_position = 'x_' + 'right_arm'
        # key_position = 'x_' + 'chest'
        # key_position = 'x_' + 'ankle'

        x_train, y_train_oh = self.create_x_y(data_set_train, in_key=key_position + '_train',
                                              in_features_to_extract=self.features_to_extract,
                                              sample_freq=50,
                                              window_size=1,
                                              cutoff_frequency=5
                                              )

        x_val, y_val_oh = self.create_x_y(data_set_test, in_key=key_position + '_test',
                                          in_features_to_extract=self.features_to_extract,
                                          sample_freq=50,
                                          window_size=1,
                                          cutoff_frequency=5)

        # Reshape the input data to match the expected shape of the model
        x_train = x_train.reshape(x_train.shape[0], x_train.shape[1], 1)
        x_val = x_val.reshape(x_val.shape[0], x_val.shape[1], 1)

        y = y_train_oh
        y_val = y_val_oh

        if self.re_optimize:
            # Optimize and get best run
            params = self.generate_params(x_train=x_train)
            results = self.hyperoptimize(x_train, y, x_val, y_val, params)
            best_params, best_val_accuracy, best_run_id = get_best_run_from_hyperoptim(results)
        else:
            # Get the best run from mlopt
            mlflow.set_tracking_uri("http://localhost:8000")
            best_run_id, best_params = get_best_run(self.mlflow_experiment_name, "val_accuracy")

            best_params = restructure_dict(best_params, self.nn_params_keys, self.other_keys,
                                           in_from_mlflow=True)

        # Load the best model
        best_model = mlflow.pytorch.load_model("runs:/{}/models".format(best_run_id))

        pipeline_torch = Pipeline(steps=[('NN', NNTorch(**best_params))])
        pipeline_torch.named_steps['NN'].nn_model = best_model
        #
        predictions = pipeline_torch.predict(x_val)
        # Convert one-hot encoded targets to binary labels
        y_val_binary = np.argmax(y_val, axis=1)
        val_acc = calc_loss_acc_val(predictions, y_val_binary)
        print(val_acc)
        compute_multiclass_confusion_matrix(targets=y_val_binary,
                                            outputs=predictions,
                                            class_labels=M_HEALTH_ACTIVITIES_LABELS
                                            )

    def generate_params(self, x_train):
        """
        This function generates the list of parameters for the hyperparameter optimization.

        :return: list_params: list of dictionaries, where each dictionary contains a unique
         combination of hyperparameters
        """
        # Define hyperparameters
        nn_key = ['CNN']
        input_shape = [np.asarray(x_train).shape[1]]
        output_shape = [13]
        batch_size = [  # 32, 64,
            128]
        cost_function = [loss_functions['categorical_crossentropy']]
        learning_rate = [  # 0.01,
            0.001]
        n_epochs = [  # 300, 150, 140, 130, 120,
            100]
        metrics = ['accuracy']
        layer_sizes = [  # [200, 300], [200, 300, 100], [200, 300, 400, 100], [50, 300, 100, 50],
            # [100, 200, 200, 100], [50, 100, 100, 100, 50],
            [100, 200, 100, 200, 100]
        ]
        activations = [
            # ['relu', 'relu'], ['relu', 'tanh'], ['relu', 'relu', 'relu'],
            # ['tanh', 'tanh', 'tanh', 'tanh'], ['relu', 'relu', 'relu', 'relu'],
            # ['relu', 'relu', 'relu', 'relu', 'tanh'],
            ['relu', 'relu', 'relu', 'relu', 'relu']
        ]
        cnn_filters = [1, 3, 5, 10]

        list_params = generate_param_grid_with_different_size_layers(nn_key, input_shape,
                                                                     output_shape, batch_size,
                                                                     cost_function, learning_rate,
                                                                     n_epochs, metrics,
                                                                     layer_sizes, activations,
                                                                     cnn_filters)

        list_params = [
            restructure_dict(params, self.nn_params_keys, self.other_keys, in_from_mlflow=False) for
            params in list_params]

        return list_params

    def hyperoptimize(self, x_train, y, x_val, y_val, list_params):
        """
        This function performs hyperparameter optimization using parallel computing.

        :param x_train: array-like, shape (n_samples, n_features), input training data
        :param y: array-like, shape (n_samples, ), target training values
        :param x_val: array-like, shape (n_samples, n_features), input validation data
        :param y_val: array-like, shape (n_samples, ), target validation values
        :param list_params: list of dictionaries, where each dictionary contains a unique
        combination of hyperparameters

        :return: results: list of tuples, each containing the parameters, validation accuracy, and
         run ID for one run of the model
        """
        # Perform hyperparameter optimization
        results = parallele_hyper_optim(in_num_workers=self.num_workers,
                                        x_train=x_train,
                                        y=y,
                                        x_val=x_val,
                                        y_val=y_val,
                                        param_grid_outer=list_params,
                                        in_framework=self.framework,
                                        model_save_path=self.results_path,
                                        in_mlflow_experiment_name=self.mlflow_experiment_id
                                        )

        return results

    @staticmethod
    def create_x_y(final_data_set: dict,
                   in_key: str,
                   in_features_to_extract: list,
                   sample_freq: int,
                   window_size: float,
                   cutoff_frequency: int):
        """
        Prepare the data by generating the x_data and y_hot_data arrays.
        :param final_data_set: The final data set containing the input data.
        :param in_key: The key to access the input data in the final_data_set.
        :param in_features_to_extract: A list of features to extract from the data. These features
                are extracted from final_data_set[in_key] and used as input (x_train).
        :param sample_freq: The sample frequency in Hz.
        :param window_size: The size of the window in seconds.
        :param cutoff_frequency: The cutoff frequency for low-pass filtering.

        Returns:
            tuple: A tuple containing the x_train and y_train_oh arrays.
        """

        sampling_rate = 1 / sample_freq
        points_for_mean = int(window_size / sampling_rate)

        dataset_features = []
        for x in final_data_set[in_key]:
            sp = SignalProcessor(input_signal=x,
                                 in_filter=(LowPassFilter, {'cutoff_frequency': cutoff_frequency}),
                                 in_window_size=points_for_mean,
                                 in_stride=points_for_mean
                                 )
            sp.apply_filter(in_signal=sp.input_signal)
            # sp.calibrate_data(in_signal=sp.output_signal)
            features = sp.extract_windowed_features(in_signal=sp.output_signal)
            dataset_features.append(features)

        flattener = FeatureFlattener(final_data_set, in_features_to_extract)
        new_feat = flattener.run(dataset_features)

        final_data_set['new_feat'] = np.asarray(new_feat)

        x_data = final_data_set['new_feat']
        y_hot_data = np.asarray(final_data_set['y'])

        return x_data, y_hot_data

    @staticmethod
    def read_data(in_path: str):
        """
        read data
        :param in_path: the path
        """
        _, _, filenames = next(os.walk(in_path))
        total_df = []
        for file_x in filenames:
            if file_x.endswith(".log"):
                print(file_x[8:-4])
                # Jutar preprocessed_path \ file_x
                path_to_read = os.path.join(in_path, file_x)
                #  leer archivo path_to_read
                input_df = pd.read_csv(path_to_read, delimiter="\t", header=None)
                input_df[24] = file_x[8:-4]
                total_df.append(input_df[[0, 1, 2, 5, 6, 7, 14, 15, 16, 23, 24]])
        return pd.concat(total_df, axis=0)

    @staticmethod
    def _split_train_test(data: np.array, in_points_in_each_set: int):
        """
        Splits the data into a training set and a test set.

        Parameters:
        data (np.array): The data to be split.
        in_points_in_each_set (int): Number of points in each dataset.

        Returns:
        tuple: Contains arrays for training data and test data.
        """
        # Convert 3a to image for each position
        train_data = data[0:in_points_in_each_set]
        test_data = data[in_points_in_each_set + 1: 2 * in_points_in_each_set + 1]
        return train_data, test_data

    def extract_data_during_activity(self,
                                     in_num_of_users: int = 10,
                                     in_dataset_dict: Dict = None,
                                     in_accelerometer_position: List[str] = None) -> Dict:
        """
        Extract data from physical activities periods (Label > 0) from dataset.
        :param in_num_of_users: number of patients
        :param in_dataset_dict: dictionary with data
        :param in_accelerometer_position: position of accelerometer in the body
        :return: dictionary final_data_set with train test data.
        """

        data_keys = [f"x_{key}" for key in in_accelerometer_position]
        final_data_set = {key: [] for key in ['y_label', 'y_onehot', 'user', *data_keys]}

        list_of_classes = in_dataset_dict['y_num']
        for label in np.unique(list_of_classes):
            # Number of users
            list_user = ['subject' + str(i + 1) for i in range(in_num_of_users)]
            for user_i in list_user:
                user_data_dict = extract_activity_data_from_users(
                    user_i, label, in_dataset_dict, self.list_of_positions)

                for key in in_accelerometer_position:
                    final_data_set[f"x_{key}"].append(user_data_dict[f"{key}_label"])

                final_data_set['y_label'].append(label)
                final_data_set['y_onehot'].append(user_data_dict['y_onehot'][0])
                final_data_set['user'].append(user_i)
        return final_data_set

    def split_data_for_train_test_portion_of_pa(self,
                                                in_data: Dict,
                                                in_points_in_each_set: int = int(1022 / 2),
                                                in_accelerometer_position=None) -> Dict:
        """
        Split data for training and test sets. in_points_in_each_set for training and
        in_points_in_each_set for testing. Data is already a dict of lists where x[0] is a set of 3a
        date that correspond to label y_label[0] and its one_hot_version (y_onehot[0]). Therefore,
        the only variable that is processed (split) here is x
        :param in_data: dictionary with data
        :param in_points_in_each_set: int, number of points in the dataset
        :param in_accelerometer_position: position of accelerometer in the body
        :return: dictionary with train test data.
        """
        # TODO: split is done 50% of an activity period trainning and 50% of the same activity for
        #  training. Improve this

        if in_accelerometer_position is None:
            in_accelerometer_position = ['x_chest', 'x_ankle', 'x_right_arm']

        train_test_keys = [f"x_{key}_train" for key in in_accelerometer_position] + \
                          [f"x_{key}_test" for key in in_accelerometer_position]
        final_data_set = {key: [] for key in ['y_label', 'y_onehot', 'user', *train_test_keys]}

        for key in in_accelerometer_position:
            for data in in_data[f"x_{key}"]:
                train_data, test_data = self._split_train_test(data, in_points_in_each_set)
                final_data_set[f"x_{key}_train"].append(train_data)
                final_data_set[f"x_{key}_test"].append(test_data)

        # Label are the same training and test set
        final_data_set['y_label'] = in_data['y_label']
        final_data_set['y_onehot'] = in_data['y_onehot']
        final_data_set['user'] = in_data['user']
        final_data_set['y'] = final_data_set['y_onehot']

        # And you have a list of keys that you want to keep:
        base_keys = ['y', 'y_label', 'y_onehot', 'user']
        keys_to_keep = base_keys + [f"x_{key}_train" for key in in_accelerometer_position]

        # You can create a new dictionary with only these keys as follows:
        data_set_train = {k: final_data_set[k] for k in keys_to_keep if k in final_data_set}

        keys_to_keep = base_keys + [f"x_{key}_test" for key in in_accelerometer_position]

        # You can create a new dictionary with only these keys as follows:
        data_set_test = {k: final_data_set[k] for k in keys_to_keep if k in final_data_set}

        return data_set_train, data_set_test


if __name__ == '__main__':
    analysis = MHealtAnalysis()
    analysis.run()