In [3]:
import matplotlib.pyplot as plt
import multiprocessing
import numpy as np
import pandas as pd
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, udf, greatest, least, struct
from pyspark.sql.types import *
import random
import scipy.signal as signal
import scipy.io
import warnings

warnings.filterwarnings('ignore')

## Load & Explore Data

### Perform EDA & Pre-processing on random sample of 3 activities

In [4]:
# Load exercise dataset 
exercise_dataset = scipy.io.loadmat('../bigfiles/exercise_data.50.0000_singleonly.mat', struct_as_record=False)

# Load activities and data full objects
exercise_constants = exercise_dataset['exerciseConstants'][0][0].activities
subject_data = exercise_dataset['subject_data']

# extract activities names into an array
all_activities = []
for act in exercise_constants[0]:
    all_activities.append(act[0])

In [5]:
activities_to_remove = ['Tap Left Device', 'Tap Right Device', 'Device on Table', 
'Non-Exercise', 'Note', 'Unlisted Exercise', 'Rest',
'Arm Band Adjustment', '<Initial Activity>', 'Invalid', 'Arm straight up', 
'Tap IMU Device', 'Triceps extension (lying down) (left arm)',
'Triceps extension (lying down) (right arm)', 'Alternating Dumbbell Curl']

# Remove activities from all_activities that aren't exercises or have too <2 sets in the dataset
for activity in activities_to_remove:
    all_activities.remove(activity)

In [6]:
# Only take data from 3 random exercises as POC of this data processing pipeline
activities_to_process = random.choices(all_activities, k=3)
print(activities_to_process)

['Dip', 'Overhead Triceps Extension', 'Band Pull-Down Row']


In [7]:
# define dictionaries for accelerometer and gyroscope data
activities_accelerometer_data_dict = { activities_to_process[0]: [], activities_to_process[1]: [], activities_to_process[2]: []}
activities_gyroscope_data_dict = { activities_to_process[0]: [], activities_to_process[1]: [], activities_to_process[2]: []}
rep_counts_actual = []

# iterate over subject data to search for those activities and save the data related
for data_item in subject_data:
    for x in data_item:
        if len(x) > 0:
            if x[0] is not None and len(x[0]) > 0:
                data_activity_name = x[0,0].activityName[0]
                data_activity_reps = x[0,0].activityReps[0]
                data_item_accelDataMatrix = x[0,0].data[0,0].accelDataMatrix
                data_item_gyroDataMatrix = x[0,0].data[0,0].gyroDataMatrix
                if data_activity_name in activities_to_process:
                    activities_accelerometer_data_dict[data_activity_name].append(data_item_accelDataMatrix)
                    activities_gyroscope_data_dict[data_activity_name].append(data_item_gyroDataMatrix)
                    rep_counts_actual.append(data_activity_reps)

In [9]:
# Turn activities_gyroscope_data_dict and activities_accelerometer_data_dict into a spark dataframe
ss = SparkSession.builder.getOrCreate()
sc = ss.sparkContext
sc.setLogLevel("ERROR")

# Create schema for signal data
schema = StructType([
    StructField("overall_set_num", IntegerType(), True),
    StructField("activity_name", StringType(), True),
    StructField("activity_set_num", IntegerType(), True),
    StructField("x", ArrayType(), True),
    StructField("y", ArrayType(), True),
    StructField("z", ArrayType(), True)
])

# Initialize empty dataframes
gyroscope_df = ss.createDataFrame(sc.emptyRDD(), schema)
accelerometer_df = ss.createDataFrame(sc.emptyRDD(), schema)
gyroscope_data = []
accelerometer_data = []

# Iterate over activities and append data to dataframes
overall_set_num = 0
for activity in activities_to_process:
    for activity_set_num, activity_set in enumerate(activities_gyroscope_data_dict[activity]):
        x = [time_point[1] for time_point in activity_set]
        y = [time_point[2] for time_point in activity_set]
        z = [time_point[3] for time_point in activity_set]
        data_row = (overall_set_num, str(activity), activity_set_num, x, y, z)
        gyroscope_data.append(data_row)
        overall_set_num += 1
    overall_set_num -= (activity_set_num+1)
    for activity_set_num, activity_set in enumerate(activities_accelerometer_data_dict[activity]):
        x = [time_point[1] for time_point in activity_set]
        y = [time_point[2] for time_point in activity_set]
        z = [time_point[3] for time_point in activity_set]
        data_row = (overall_set_num, str(activity), activity_set_num, x, y, z)
        accelerometer_data.append(data_row)
        overall_set_num += 1

# Create dataframes directly from the lists of data rows
gyroscope_df = ss.createDataFrame(gyroscope_data, schema)
accelerometer_df = ss.createDataFrame(accelerometer_data, schema)

23/06/27 10:13:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


TypeError: __init__() missing 1 required positional argument: 'elementType'

In [10]:
# subtract 1 from every value in accelerometer_df.overall_set_num
accelerometer_df = accelerometer_df.withColumn("overall_set_num", accelerometer_df.overall_set_num - 1)

NameError: name 'accelerometer_df' is not defined

In [None]:
assert(accelerometer_df.count() == gyroscope_df.count())
assert(accelerometer_df.agg({"overall_set_num": "max"}).collect()[0][0] == gyroscope_df.agg({"overall_set_num": "max"}).collect()[0][0])

In [None]:
# save dataframes to csv
gyroscope_df.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("bigfiles/gyroscope_data0624.csv")
accelerometer_df.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("bigfiles/accelerometer_data0624.csv")

# load dataframes from csv
#gyroscope_df = ss.read.csv("bigfiles/gyroscope_data.csv", header=True, inferSchema=True)
#accelerometer_df = ss.read.csv("bigfiles/accelerometer_data.csv", header=True, inferSchema=True)


### Visualizations

#### Accelerometer Measurements chart (only the first result of exercises per activity has been taken)

In [None]:
# Data to graph
for activity in activities_to_process:
       t, x, y, z = [], [], [], []

       for data_activity in activities_accelerometer_data_dict[activity][0]: # take only the values corresponding to the results of the first excersise
              t.append(data_activity[0]) # time value
              x.append(data_activity[1]) # X value
              y.append(data_activity[2]) # Y value
              z.append(data_activity[3]) # Z value

       fig, ax = plt.subplots()
       ax.plot(t, x, label = 'X')
       ax.plot(t, y, label = 'Y')
       ax.plot(t, z, label = 'Z')

       ax.set(xlabel='Time (seconds)', ylabel='Acceleration output (g)', title=activity)
       ax.grid()

       fig.tight_layout()
       fig.set_size_inches(25, 5)

       plt.legend()
       plt.show()

#### Gyroscope Measurements chart (only the first result of exercises per activity has been taken)

In [None]:
# Data to graph
for activity in activities_to_process:
       t, x, y, z = [], [], [], []

       for data_activity in activities_gyroscope_data_dict[activity][0]: # take only the values corresponding to the results of the first excersise
              t.append(data_activity[0]) # time value
              x.append(data_activity[1]) # X value
              y.append(data_activity[2]) # Y value
              z.append(data_activity[3]) # Z value

       fig, ax = plt.subplots()
       ax.plot(t, x, label = 'X')
       ax.plot(t, y, label = 'Y')
       ax.plot(t, z, label = 'Z')

       ax.set(xlabel='Time (seconds)', ylabel='Gyroscope output (g)', title=activity)
       ax.grid()

       fig.tight_layout()
       fig.set_size_inches(25, 5)

       plt.legend()
       plt.show()

## Recognition Pre-processing

In [None]:
k = 1
for i in range(k):
    for col_prefix in ['x', 'y', 'z']:
        gyroscope_df = gyroscope_df.withColumn(col_prefix + '_lag_' + str(i), lag(col(col_prefix), i).over(w))
        accelerometer_df = accelerometer_df.withColumn(col_prefix + '_lag_' + str(i), lag(col(col_prefix), i).over(w))

## Segmentation Pre-Processing
Given data points containing x,y,z, and time, smooth this data with a Butterworth low-pass filter (-60dB at 20Hz), then windowed into 5-second windows sliding at 1/50 of a second (i.e., each 5s window shares 4.8s of data with the previous window). Then perform PCA to get the computed features:
aX, aYZPC1, and gPC1
- aX: the X-axis accelerometer signal
- aYZPC1: the projection of only the Y and Z axes onto the first principal component of those two axes. This indicates the movement perpendicular to the user's arm, which allows us to learn about the Y and Z axes regardless of the unknown rotation of the the accelerometer on the armband
- gPC1: the projection of the three-dimensional accelerometer signal (x,y,z) onto its first principal component

In [None]:
def process_set(data_source, source_name, overall_set_num):
    window_duration = 5  # Window duration in seconds
    sampling_rate = 50  # Sampling rate in Hz
    window_size = int(window_duration * sampling_rate) # 250
    overlap = 10 # distinct 10 points 200ms, shared 4.8 seconds => 240 overlap

    # initialize schemas for accelerometer and gyroscope pyspark dataframes
    # accelerometer features: aYZPC and aX
    # gyroscope features: gPC
    aX_cols = [f"x{i}" for i in range(window_size)]
    aYZPC1_cols = [f"aYZPC{i}" for i in range(window_size)]
    gPC1_cols = [f"gPC{i}" for i in range(window_size)]

    common_fields = [StructField("window_index", IntegerType(), True),
        StructField("overall_set_num", IntegerType(), True),
        StructField("activity_name", StringType(), True),
        StructField("activity_set_num", IntegerType(), True)]

    yz_signal_cols = ['y','z']
    xyz_signal_cols = ['x'] + yz_signal_cols
    yzfields = [StructField(f"y", FloatType(), True), 
        StructField(f"z", FloatType(), True)]
    
    # set the correct schema depending on the source_name
    if source_name == "accelerometer":
        pca_schema = StructType(yzfields)
        input_cols = yz_signal_cols
        output_schema = StructType(common_fields + [StructField(col, FloatType(), True) for col in aX_cols + aYZPC1_cols])
    elif source_name == "gyroscope":
        pca_schema = StructType(yzfields + \
            [StructField(f"x", FloatType(), True)])
        input_cols = xyz_signal_cols
        output_schema = StructType(common_fields + [StructField(col, FloatType(), True) for col in gPC1_cols])
        
    # get current set information
    set_data = data_source.filter(data_source.overall_set_num == overall_set_num)
    activity_name = set_data.select('activity_name').first()[0]
    activity_set_num = set_data.select('activity_set_num').first()[0]

    # apply butterworth lowpass filter to x, y, z columns
    x = list(set_data.select(set_data['x']).toPandas()['x'])
    y = list(set_data.select(set_data['y']).toPandas()['y'])
    z = list(set_data.select(set_data['z']).toPandas()['z'])
    smoothed_x = apply_butterworth_lowpass(x)
    smoothed_y = apply_butterworth_lowpass(y)
    smoothed_z = apply_butterworth_lowpass(z)

    # Slide the window over to compute features for each 5 second interval/window
    result_table = []
    for i in range(0, len(smoothed_x) - window_size + 1, overlap):
        # Get the windowed data that we'll use to perform PCA
        individual_window_table = {col: [] for col in input_cols} # initialize empty table
        if source_name == "gyroscope": # to compute gPC1 with all 3 dimensions (x,y,z)
            individual_window_table["x"] = smoothed_x[i:i+window_size]
        individual_window_table["y"] = smoothed_y[i:i+window_size]
        individual_window_table["z"] = smoothed_z[i:i+window_size]

        # Create dict individual_window_table to pandas DF
        individual_window_table = pd.DataFrame.from_dict(individual_window_table)
        # Convert the pandas DataFrame to a PySpark DataFrame
        pyspark_window_df = ss.createDataFrame(individual_window_table, pca_schema)

        # Combine the input columns into a single vector column
        assembler = VectorAssembler(inputCols=input_cols, outputCol='features_vect')
        assembled_df = assembler.transform(pyspark_window_df)

        # Perform PCA and keep only the first principal component
        pca = PCA(k=1, inputCol='features_vect', outputCol='pca_features')
        pca_model = pca.fit(assembled_df)
        pca_result = pca_model.transform(assembled_df).select('pca_features')

        # Flatten the pca_result DataFrame so that each element is a floatType and then transpose it
        pca_result = pca_result.rdd.map(lambda x: [float(element) for element in x.pca_features.toArray()]) \
            .flatMap(lambda x: x).collect()

        # Append the pca_row and other columns to the result_table
        # TODO: maybe using smoothed_x would be better than raw x, try this to see if it improves performance
        if source_name == "accelerometer":
            result_table.append([i, overall_set_num, activity_name, activity_set_num] + x[i:i+window_size] + pca_result)
        elif source_name == "gyroscope":
            result_table.append([i, overall_set_num, activity_name, activity_set_num] + pca_result)

    # Write result_table to a file
    filename = f'bigfiles/{source_name}/result_table_{overall_set_num}_0625.csv'
    with open(filename, 'w') as file:
        writer = csv.writer(file)
        writer.writerows(result_table)

    return filename



In [None]:
def preprocess(data_source, source_name):
    # Iterate over all distinct set_nums in the data table
    # So that we can process 1 exercise set's signal in all 4 dimensions (x, y, z, t) at a time
    # Use all of the available CPU cores to preprocess all of the sets in parallel
    overall_set_nums = data_source.select('overall_set_num').distinct().rdd.flatMap(lambda x: x).collect()
    num_processes = multiprocessing.cpu_count()  
    pool = multiprocessing.Pool(processes=num_processes)
    output_filenames = pool.map(process_set, overall_set_nums)
    return output_filenames

## Join both dataframes
(TODO)

In [None]:
# join gyroscope_features and accelerometer_features by overall_set_num, window_index, activity_name
joined_sparkdf = gyroscope_features.join(
    accelerometer_features,
    (gyroscope_features["overall_set_num"] == accelerometer_features["overall_set_num"]) &
    (gyroscope_features["window_index"] == accelerometer_features["window_index"]) &
    (gyroscope_features["activity_name"] == accelerometer_features["activity_name"]),
    "inner"
)

In [None]:
joined_sparkdf.count()

## Recognition features (computed for each signal)
20 features computed over each of 3 derived signals (aX, aYZPC1, and gPC1)
- Autocorrelation bins: 5 evenly-spaced bins of the 5- second autocorrelation – summed per bin (5 features).
- RMS: The root-mean-square amplitude of the signal.
- Power bands: The magnitude of the power spectrum in 10 bands spaced linearly from 0.1-25Hz (10 features).
- Mean, standard deviation, kurtosis, interquartile range (4 features).

In [None]:

get_variance = udf(lambda row: np.variance([x for x in row]), IntegerType())

new_df = df.withColumn("null_count", count_empty_columns(struct([df[x] for x in df.columns])))

new_df.show()

In [None]:
# TODO: change to use column prefix

def get_autocorrelation_bins(set_df, col):
    # Compute the autocorrelation for current column (col)
    window = Window.orderBy("signal")
    set_df = set_df.withColumn("lag", lag(col("signal")).over(window))
    lags = np.array(set_df.select("lag").rdd.flatMap(lambda x: x).collect())
    signals = np.array(set_df.select("signal").rdd.flatMap(lambda x: x).collect())
    autocorrelation = np.correlate(signals, lags, mode="full")

    # Bin autocorrelation values
    num_bins = 5
    ac_length = len(autocorrelation)

    # Compute the lag interval between consecutive bins
    lag_interval = ac_length / num_bins

    # Create an array to store the sums for each bin
    bin_sums = np.zeros(num_bins)

    # Iterate over the autocorrelation values and add to the corresponding bin
    for i, value in enumerate(autocorrelation):
        bin_index = int(i // lag_interval)
        if bin_index < num_bins:
            bin_sums[bin_index] += value

    return bin_sums

def get_rms(set_df, col):
    # Calculate the RMS amplitude
    rms_df = set_df.select(sqrt(avg(abs(col("signal").cast("double")) ** 2)).alias("rms_amplitude"))
    return rms_df.first()["rms_amplitude"]

def get_mean(set_df, col):
    # Calculate the mean
    mean_df = set_df.select(avg(col("signal")).alias("mean"))
    return mean_df.first()["mean"]

def get_stddev(set_df, col):
    # Calculate the standard deviation
    stddev_df = set_df.select(stddev(col("signal")).alias("stddev"))
    return stddev_df.first()["stddev"]

def get_stddev(row):
    data = [float(x.strip()) for x in row.split(",")]
    return np.variance(data)

def get_kurtosis(row):
    
    return scipy.stats.kurtosis

def row_to_list

def get_interquartile_range(set_df, col):
    # Calculate the interquartile range
    quartiles = set_df.approxQuantile(col, [0.25, 0.75], 0.0)
    iqr = quartiles[1] - quartiles[0]
    return iqr

def get_powerbands(set_df, col):
    # Define the frequency bands
    lower_bounds = [0.1, 2.5, 5.0, 7.5, 10.0, 12.5, 15.0, 17.5, 20.0, 22.5]
    upper_bounds = [2.5, 5.0, 7.5, 10.0, 12.5, 15.0, 17.5, 20.0, 22.5, 25.0]

    # Convert frequency column to vector column
    vectorAssembler = VectorAssembler(inputCols=["frequency"], outputCol="features")
    set_df = vectorAssembler.transform(set_df)

    # Define a user-defined function (UDF) to compute the power spectrum
    def compute_power_spectrum(features):
        power_spectrum = [value * value for value in features]
        return Vectors.dense(power_spectrum)

    compute_power_spectrum_udf = udf(compute_power_spectrum, DoubleType())

    # Compute the power spectrum and create a new column
    df = df.withColumn("power_spectrum", compute_power_spectrum_udf("features"))

    # Define bucketizer splits based on the frequency bands
    bucketizer = Bucketizer(splits=[lower_bounds, upper_bounds], inputCol="frequency", outputCol="bucket")

    # Apply bucketizer and compute the sum of power spectrum within each band
    df = bucketizer.transform(df)
    result = df.groupby("bucket").agg({"power_spectrum": "sum"}).orderBy("bucket")

    # Show the computed power bands
    result.show()

    return result

In [None]:
def get_signal_features(window, signal_prefix):
    features = []
    features.extend(get_autocorrelation_bins(window, signal_prefix)) # 5 features
    features.append(get_rms(window, signal_prefix)) # 1 feature
    features.append(get_mean(window, signal_prefix)) # 1 feature
    features.append(get_stddev(window, signal_prefix)) # 1 feature
    features.append(get_kurtosis(window, signal_prefix)) # 1 feature
    features.extend(get_interquartile_range(window, signal_prefix)) # 2 features
    features.extend(get_powerbands(window, signal_prefix)) # 10 features
    return features

def get_signal_computed_feature_names(signal_prefix):
    names = [f"{signal_prefix}_autocorrelation_bin{ab}" for ab in range(5)]
    names.append(f"{signal_prefix}_rms_amplitude")
    names.append(f"{signal_prefix}_mean")
    names.append(f"{signal_prefix}_stddev")
    names.append(f"{signal_prefix}_kurtosis")
    names.append(f"{signal_prefix}_interquartile_range_start")
    names.append(f"{signal_prefix}_interquartile_range_end")
    names.extend([f"{signal_prefix}_powerband{pb}" for pb in range(10)])
    return names

def get_signal_schema(signal_prefix):
    return [StructField(name, FloatType(), True)
        for name in get_signal_computed_feature_names(signal_prefix)]

In [None]:
# Compute the 20 signal features for each signal for every 5 second window (row)
derived_signals = ['aYZPC1', 'aX', 'gPC1']
for signal_prefix in derived_signals:
    signal_columns = [f"{signal_prefix}{i}" for i in range(250)]
    schema = get_signal_computed_feature_names(signal_prefix)
    udf_get_signal_features = udf(get_signal_features, schema) # Register the UDF
    for i, col_name in enumerate(get_signal_computed_feature_names(signal_prefix)):
        joined_sparkdf = joined_sparkdf.withColumn(col_name, udf_get_signal_features(*signal_columns)[i])