In [31]:
import pandas as pd
import numpy as np

## Load static pose for dataset and test data

In [32]:
df_static_dataset = pd.read_csv("./test_data/dataset_static_pose.csv", sep=" ")
df_static_test_data = pd.read_csv("./test_data/test_data_static_pose.csv", sep=" ")

original_column_names = df_static_dataset.columns.to_list()

# Convert columns to integers
df_static_dataset.columns = list(range(len(df_static_dataset.columns)))
df_static_test_data.columns = list(range(len(df_static_test_data.columns)))

## Statistical analysis

In [11]:
# Analyze the dataframes by producing the range, mean, and standard deviation of each column
print("Static Pose Dataset:")
print(df_static_dataset.describe())
print("\nStatic Pose Test Data:")
print(df_static_test_data.describe())

Static Pose Dataset:
                0           1            2           3           4    \
count    151.000000  151.000000   151.000000  151.000000  151.000000   
mean   47499.668874  -64.397351   938.059603 -404.046358  -18.245033   
std     1457.800246   35.351205    24.414267   37.578600   30.726529   
min    45000.000000 -193.000000   882.000000 -511.000000  -83.000000   
25%    46249.500000  -82.500000   923.000000 -427.000000  -34.000000   
50%    47500.000000  -62.000000   939.000000 -405.000000  -18.000000   
75%    48749.500000  -48.500000   953.000000 -377.500000   -5.500000   
max    50000.000000  125.000000  1054.000000 -274.000000  129.000000   

               5           6           7            8           9    ...  \
count   151.000000  151.000000  151.000000   151.000000  151.000000  ...   
mean    988.443709  262.854305  150.496689  1001.516556   85.066225  ...   
std      17.619927   16.082039   25.113841    18.102248   19.844619  ...   
min     948.000000  211.00

In [12]:
import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
from scipy.spatial import procrustes

# Assuming df_static_dataset and df_static_test_data are your dataframes with accelerometer data
test_vector_columns = ["8 Acc LUA^ accX", "9 Acc LUA^ accY", "10 Acc LUA^ accZ"]


def normalize_data(dataset, test_data, columns):
    """Normalize test data to have the same mean and standard deviation as the dataset."""
    mean_dataset = dataset[columns].mean()
    std_dataset = dataset[columns].std()

    test_data_normalized = (test_data[columns] - test_data[columns].mean()) / test_data[
        columns
    ].std()
    test_data_normalized = test_data_normalized * std_dataset + mean_dataset

    return pd.DataFrame(test_data_normalized, columns=columns)


def covariance_alignment(dataset, test_data, columns):
    """Align the covariance matrices of the test data to the dataset."""
    cov_matrix_dataset = np.cov(dataset[columns], rowvar=False)
    cov_matrix_test = np.cov(test_data[columns], rowvar=False)

    eigvals_dataset, eigvecs_dataset = np.linalg.eigh(cov_matrix_dataset)
    eigvals_test, eigvecs_test = np.linalg.eigh(cov_matrix_test)

    rotation_matrix_cov = eigvecs_dataset @ eigvecs_test.T

    aligned_cov_test_data = test_data[columns].values @ rotation_matrix_cov

    return pd.DataFrame(aligned_cov_test_data, columns=columns)


def pca_alignment(dataset, test_data, columns):
    """Align the PCA components of the test data to the dataset."""
    pca_dataset = PCA(n_components=3)
    pca_test = PCA(n_components=3)

    pca_dataset.fit(dataset[columns])
    pca_test.fit(test_data[columns])

    components_dataset = pca_dataset.components_
    components_test = pca_test.components_

    rotation_matrix_pca = components_dataset.T @ components_test

    aligned_pca_test_data = test_data[columns].values @ rotation_matrix_pca.T

    return pd.DataFrame(aligned_pca_test_data, columns=columns)


def procrustes_analysis(dataset, test_data, columns):
    """Perform Procrustes Analysis to align the test data to the dataset."""
    num_rows_test = test_data.shape[0]

    # Downsample the dataset to match the number of rows in the test data
    if dataset.shape[0] > num_rows_test:
        dataset_sample = dataset.sample(n=num_rows_test, random_state=42)
    else:
        dataset_sample = dataset

    mtx1 = dataset_sample[columns].values
    mtx2 = test_data[columns].values

    # Perform Procrustes Analysis
    mtx1, mtx2, disparity = procrustes(mtx1, mtx2)

    # Convert aligned data back to DataFrame
    aligned_test_data = pd.DataFrame(mtx2, columns=columns)

    return aligned_test_data


# Main processing function with options for each step
def process_data(
    dataset,
    test_data,
    columns,
    normalize=True,
    covariance_align=True,
    pca_align=True,
    procrustes_align=True,
    final_normalize=True,
):
    if normalize:
        test_data = normalize_data(dataset, test_data, columns)

    if covariance_align:
        test_data = covariance_alignment(dataset, test_data, columns)

    if pca_align:
        test_data = pca_alignment(dataset, test_data, columns)

    if procrustes_align:
        test_data = procrustes_analysis(dataset, test_data, columns)

    if final_normalize:
        test_data = normalize_data(dataset, test_data, columns)

    return test_data


# Process the data with all steps
df_processed_test_data = process_data(
    df_static_dataset, df_static_test_data, test_vector_columns
)

# Process the data with only normalization
df_processed_test_data_norm_only = process_data(
    df_static_dataset,
    df_static_test_data,
    test_vector_columns,
    covariance_align=False,
    pca_align=False,
    procrustes_align=False,
    final_normalize=False,
)

KeyError: "None of [Index(['8 Acc LUA^ accX', '9 Acc LUA^ accY', '10 Acc LUA^ accZ'], dtype='object')] are in the [columns]"

## Plot dataset compared to test data

In [None]:
# Use plotly to visualize the aligned data compared to the dataset and test data
import plotly.express as px


def plot_data(data, title):
    fig = px.line(data, title=title)
    fig.update_traces(mode="lines", hovertemplate="%{y}")
    fig.update_layout(hovermode="x unified")
    fig.show()


df_new_test_data: pd.DataFrame
df_processed_new_test_data: pd.DataFrame
df_processed_new_test_data_no_procrustes: pd.DataFrame

plot_data(df_static_dataset[test_vector_columns], "Dataset Accelerometer Data")
plot_data(df_static_test_data[test_vector_columns], "Test Accelerometer Data")
plot_data(df_processed_test_data_norm_only, "Normalized Test Accelerometer Data")
plot_data(df_processed_test_data, "Normalized, Cov & PCA & Procrustes Aligned Test Accelerometer Data")
plot_data(df_new_test_data[test_vector_columns], "New Test Accelerometer Data")
plot_data(df_processed_new_test_data, "Transformed New Test Accelerometer Data")
plot_data(df_processed_new_test_data_no_procrustes, "Transformed (No Procrustes) New Test Accelerometer Data")

In [None]:
print("Static Pose Dataset:")
print(df_static_dataset[test_vector_columns].describe())
print("\nNew Test Data:")
print(df_new_test_data[test_vector_columns].describe())
print("\nTransformed New Test Data:")
print(df_processed_new_test_data.describe())

Static Pose Dataset:
       8 Acc LUA^ accX  9 Acc LUA^ accY  10 Acc LUA^ accZ
count       151.000000       151.000000        151.000000
mean        150.496689      1001.516556         85.066225
std          25.113841        18.102248         19.844619
min          90.000000       959.000000         16.000000
25%         133.000000       989.000000         70.500000
50%         149.000000      1001.000000         86.000000
75%         167.500000      1012.000000         99.000000
max         224.000000      1062.000000        131.000000

New Test Data:
       8 Acc LUA^ accX  9 Acc LUA^ accY  10 Acc LUA^ accZ
count       671.000000       671.000000        671.000000
mean         -0.034277         5.661699        317.378539
std         328.047048       288.553666        154.259424
min       -2794.000000     -2959.000000      -1319.000000
25%         -50.500000       -37.000000        240.000000
50%           2.000000        -3.000000        341.000000
75%          38.500000        39.50

## Transformation parameter computation

### Step 1: Compute and Save Transformation Parameters

In [38]:
import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
from scipy.spatial import procrustes
import pickle

test_vector_columns = [4,5,6, 7,8,9, 10,11,12, 13,14,15, 16,17,18,  22,23,24, 37,38,39,43,44,45, 56,57,58,  84,85,86, 97]
# test_vector_columns = ["8 Acc LUA^ accX", "9 Acc LUA^ accY", "10 Acc LUA^ accZ"]


def compute_normalization_params(dataset, columns):
    mean_dataset = dataset[columns].mean()
    std_dataset = dataset[columns].std()
    return mean_dataset, std_dataset


def compute_covariance_alignment_params(dataset, columns):
    cov_matrix_dataset = np.cov(dataset[columns], rowvar=False)
    eigvals_dataset, eigvecs_dataset = np.linalg.eigh(cov_matrix_dataset)
    return eigvecs_dataset


def compute_pca_alignment_params(dataset, columns):
    pca_dataset = PCA(n_components=len(columns))
    pca_dataset.fit(dataset[columns])
    components_dataset = pca_dataset.components_
    return components_dataset


def compute_procrustes_params(dataset, columns):
    return dataset[columns]


# Save transformation parameters
def save_transformation_params(dataset, test_data, columns, filepath):
    params = {}
    params["normalization"] = compute_normalization_params(dataset, columns)
    params["covariance"] = compute_covariance_alignment_params(dataset, columns)
    params["pca"] = compute_pca_alignment_params(dataset, columns)
    params["procrustes"] = compute_procrustes_params(dataset, columns)

    with open(filepath, "wb") as f:
        pickle.dump(params, f)

save_transformation_params(
    df_static_dataset, df_static_test_data, test_vector_columns, "./pickled/transformation_params.pkl"
)

### Step 2: Apply Saved Transformation Parameters to New Test Data

In [52]:
import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
from scipy.spatial import procrustes
import pickle

test_vector_columns = [4,5,6, 7,8,9, 10,11,12, 13,14,15, 16,17,18,  22,23,24, 37,38,39,43,44,45, 56,57,58,  84,85,86, 97]
# test_vector_columns = ["8 Acc LUA^ accX", "9 Acc LUA^ accY", "10 Acc LUA^ accZ"]


def normalize_data_with_params(test_data, columns, mean_dataset, std_dataset):
    test_data_normalized = (test_data[columns] - test_data[columns].mean()) / test_data[columns].std()
    test_data_normalized = test_data_normalized * std_dataset + mean_dataset
    return pd.DataFrame(test_data_normalized, columns=columns)


def covariance_alignment_with_params(test_data, columns, eigvecs_dataset):
    cov_matrix_test = np.cov(test_data[columns], rowvar=False)
    eigvals_test, eigvecs_test = np.linalg.eigh(cov_matrix_test)
    rotation_matrix_cov = eigvecs_dataset @ eigvecs_test.T
    aligned_cov_test_data = test_data[columns].values @ rotation_matrix_cov
    return pd.DataFrame(aligned_cov_test_data, columns=columns)


def pca_alignment_with_params(test_data, columns, components_dataset):
    n_components = components_dataset.shape[0]
    pca_test = PCA(n_components=n_components)
    pca_test.fit(test_data[columns])
    components_test = pca_test.components_
    rotation_matrix_pca = components_dataset.T @ components_test
    aligned_pca_test_data = test_data[columns].values @ rotation_matrix_pca.T
    return pd.DataFrame(aligned_pca_test_data, columns=columns)


def upsample_data(test_data, target_size):
    """Upsample the test data to the target size by duplication."""
    num_rows_test = test_data.shape[0]

    if num_rows_test < target_size:
        num_duplicates = target_size // num_rows_test
        remainder = target_size % num_rows_test

        upsampled_data = pd.concat([test_data] * num_duplicates, ignore_index=True)
        if remainder > 0:
            upsampled_data = pd.concat(
                [upsampled_data, test_data.sample(n=remainder, random_state=42)],
                ignore_index=True,
            )
        return upsampled_data
    else:
        return test_data


def procrustes_analysis_with_params(test_data, columns, dataset):
    num_rows_test = test_data.shape[0]

    if dataset.shape[0] > num_rows_test:
        dataset_sample = dataset.sample(n=num_rows_test, random_state=42)
    elif dataset.shape[0] < num_rows_test:
        dataset_sample = upsample_data(dataset, num_rows_test)
    else:
        dataset_sample = dataset

    mtx1 = dataset_sample.values
    mtx2 = test_data[columns].values
    mtx1, mtx2, disparity = procrustes(mtx1, mtx2)
    aligned_test_data = pd.DataFrame(mtx2, columns=columns)
    return aligned_test_data


def apply_transformation_params(
    test_data,
    columns,
    filepath,
    normalize=True,
    covariance_align=True,
    pca_align=True,
    procrustes_align=True,
    final_normalize=True,
):
    with open(filepath, "rb") as f:
        params = pickle.load(f)

    if normalize:
        mean_dataset, std_dataset = params["normalization"]
        test_data = normalize_data_with_params(test_data, columns, mean_dataset, std_dataset)

    if covariance_align:
        eigvecs_dataset = params["covariance"]
        test_data = covariance_alignment_with_params(test_data, columns, eigvecs_dataset)

    if pca_align:
        components_dataset = params["pca"]
        test_data = pca_alignment_with_params(test_data, columns, components_dataset)

    if procrustes_align:
        dataset_sample = params["procrustes"]
        test_data = procrustes_analysis_with_params(test_data, columns, dataset_sample)

    if final_normalize:
        mean_dataset, std_dataset = params["normalization"]
        test_data = normalize_data_with_params(test_data, columns, mean_dataset, std_dataset)

    return test_data


TEST_DATA_FILE = 'test_data_22_05.csv'

# Load new test data
df_new_test_data = pd.read_csv(f'./test_data/{TEST_DATA_FILE}', sep=" ")
df_new_test_data.columns = list(range(len(df_new_test_data.columns)))
df_new_test_data.set_index(0, inplace=True)

# df_new_test_data_time = df_new_test_data.index

# Extract the columns before interpolating/filling
df_new_test_data = df_new_test_data[test_vector_columns]

# Assuming your DataFrame is named df
# df_new_test_data = df_new_test_data.interpolate(method='linear')
df_new_test_data = df_new_test_data.ffill()

# Drop rows that have NaN values in any of the test vector columns
df_new_test_data = df_new_test_data.dropna(subset=test_vector_columns)

df_processed_new_test_data = apply_transformation_params(
    df_new_test_data,
    test_vector_columns,
    "./pickled/transformation_params.pkl",
    normalize=False,
    # covariance_align=False,
    # pca_align=False,
    procrustes_align=False,
    final_normalize=False,
)

# Insert time column
df_processed_new_test_data.insert(0, "1 MILLISEC", df_new_test_data.index.values)
df_processed_new_test_data.set_index('1 MILLISEC', inplace=True)

# Change type for all columns to Int64
# df_processed_new_test_data.apply(pd.to_numeric, errors='coerce').astype('Int64')
# df_processed_new_test_data = pd.to_numeric(df_processed_new_test_data, errors='coerce').astype('Int64')

# Convert columns to strings
df_processed_new_test_data.columns = [original_column_names[i] for i in range(len(original_column_names)) if i in test_vector_columns]
df_processed_new_test_data.to_csv(f'./test_data/processed_{TEST_DATA_FILE}', sep=' ')