### Feature based classification

In this file, we aim to classify the recorded activities using traditional methods, including preprocessing, feature extraction, and classification.

Loading relevant libraries:

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from xgboost import XGBClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GroupKFold
from sklearn.metrics import accuracy_score, classification_report
from scipy.signal import find_peaks
from scipy.stats import entropy

Loading dataset:

In [6]:
x = np.load('scaled_spec_resampled_array.npy')
y = np.load('labels_array.npy')-1
subjects = np.load('subjects_array.npy')

print(f"X shape: {x.shape}")
print(f"Y shape: {y.shape}")
print(f"Subjects shape: {subjects.shape}")

num_samples = x.shape[0]

X shape: (1754, 2048, 80)
Y shape: (1754,)
Subjects shape: (1754,)


Setting parameters:

In [7]:
nfft = 2048
fs = 128000
f_lo = -64000.0
f_hi = 63937.5

freq_axis = np.linspace(f_lo, f_hi, nfft)
print(len(freq_axis))

2048


Denoising and feature extraction based on this paper:

Y. Kim and H. Ling, "Human Activity Classification Based on Micro-Doppler Signatures Using a Support Vector Machine," in IEEE Transactions on Geoscience and Remote Sensing, vol. 47, no. 5, pp. 1328-1337, May 2009

Features extracted:

(1) Torso Doppler frequency: Indicates the speed of the human subject.

(2) Total bandwidth (BW) of the Doppler signal: Relates to the speed of limb motions.

(3) Offset of the total Doppler signal: Measures asymmetry between forward and backward limb motions.

(4) BW without micro-Dopplers: Represents the Doppler bandwidth of the torso alone.

(5) Normalized standard deviation (STD) of the Doppler signal strength: Related to the dynamic range of the motion.

(6) Period of limb motion: Corresponds to the swing rate of arms and legs.


Also added 2 other features:

(7) High envelope standard deviation: The std of the peak frequency can indicate different types of motion. For example, running might show more variability compared to walking.

(8) Entropy: How dispersed is the energy across frequencies.

In [8]:
def denoise_and_extract_features(frequencies, spectrogram, noise_threshold):
    # Initialize feature variables
    torso_doppler_frequency = []
    total_bw_doppler = []
    offset_total_doppler = []
    bw_without_micro_dopplers = []
    normalized_std_doppler = []
    entropies = []
    high_envelope = []
    low_envelope = []

    spec_denoised = np.where(spectrogram < noise_threshold, noise_threshold, spectrogram)
    for t_index in range(spec_denoised.shape[1]):

        column = spec_denoised[:, t_index]
        
        if np.all(column == noise_threshold):   # If the entire column is noise
            high_freq = 0
            low_freq = 0
            peak_freq = 0
            total_bw = 0
            offset_total = 0
            normalized_std = 0
            entr = 0
        else:   # If the column contains signal
            high_freq = frequencies[column > noise_threshold][-1]
            low_freq = frequencies[column > noise_threshold][0]
            # Torso Doppler Frequency (1)
            peak_freq = frequencies[np.argmax(column)]
            # Total Bandwidth of the Doppler Signal (2)
            total_bw = high_freq-low_freq
            # Offset of the Total Doppler (3)
            offset_total = (high_freq + low_freq) / 2
            # Normalized STD of the Doppler Signal Strength (5)
            std_doppler = np.std(column)
            mean_doppler = np.mean(column)
            normalized_std = std_doppler / mean_doppler if mean_doppler != 0 else 0
            # New feature: Entropy (8)
            entr = entropy(column)

        high_envelope.append(high_freq)
        low_envelope.append(low_freq)
        torso_doppler_frequency.append(peak_freq)
        total_bw_doppler.append(total_bw)
        offset_total_doppler.append(offset_total)
        normalized_std_doppler.append(normalized_std)
        entropies.append(entr)

    # Bandwidth Without Micro-Dopplers (4)
    bw_without_micro_dopplers = np.mean(np.array(sorted(high_envelope)[-5:]) - np.array(sorted(low_envelope)[:5]))

    # Period of Limb Motion (6)
    peaks, _ = find_peaks(high_envelope, height=np.nanmean(high_envelope))
    peak_intervals = np.diff(peaks)
    periods = peak_intervals / fs
    mean_period = np.mean(periods) if len(periods) > 0 else 0

    # New feature: Peak frequency standard deviation (7)
    peak_freq_variation = np.std(high_envelope)

    return [np.mean(torso_doppler_frequency),
            np.mean(total_bw_doppler),
            np.mean(offset_total_doppler),
            bw_without_micro_dopplers,
            np.mean(normalized_std_doppler),
            mean_period,
            np.mean(entropies),
            peak_freq_variation]

Now we extract these features from the training set:

In [9]:
noise_threshold = -83  # dBm (based on the paper's noise threshold)
features = []
for i in range(num_samples):
    features.append(denoise_and_extract_features(freq_axis, x[i,:,:], noise_threshold))
features = np.array(features)
print(features.shape)

(1754, 8)


Now that we have extracted the features, it is time to split the dataset. Due to the limited dataset size, we chose to perform 5-fold cross-validation to obtain a more accurate estimate of each model's performance.

We also note that each of these 5 folds contains activities performed by different subjects, with no overlap across folds. In other words, in each iteration, the test set contains activities from subjects not used in training. This approach simulates a realistic scenario, as in the real world, the subjects in the test set are not seen by the model during training.

It's also important to note that the number of activities performed varies across subjects, i.e. some subjects have performed more repetitions than others (see code block below). We account for this variation when splitting the data, resulting in 5 roughly equal folds.

In [10]:
# Number of activities different per subject
unique, counts = np.unique(subjects, return_counts=True)
subject_counts = dict(zip(unique, counts))
sorted_subject_counts = dict(sorted(subject_counts.items(), key=lambda item: item[1], reverse=True))
for key, value in sorted_subject_counts.items():
    print(f"Subject {key} performed {value} activities")

Subject 8 performed 36 activities
Subject 14 performed 36 activities
Subject 57 performed 36 activities
Subject 30 performed 34 activities
Subject 53 performed 34 activities
Subject 28 performed 33 activities
Subject 29 performed 33 activities
Subject 31 performed 33 activities
Subject 32 performed 33 activities
Subject 33 performed 33 activities
Subject 34 performed 33 activities
Subject 36 performed 33 activities
Subject 37 performed 33 activities
Subject 38 performed 33 activities
Subject 39 performed 33 activities
Subject 40 performed 33 activities
Subject 41 performed 33 activities
Subject 43 performed 33 activities
Subject 44 performed 33 activities
Subject 45 performed 33 activities
Subject 46 performed 33 activities
Subject 47 performed 33 activities
Subject 50 performed 33 activities
Subject 51 performed 33 activities
Subject 52 performed 33 activities
Subject 55 performed 33 activities
Subject 56 performed 33 activities
Subject 35 performed 32 activities
Subject 54 performed 

Split the data in roughly 5 equal folds, ensuring that each subject is only in one fold:

In [11]:
gkf = GroupKFold(n_splits=5)
splits = list(gkf.split(x, y, groups=subjects))

# Verification:
for i, (train_index, test_index) in enumerate(splits):
    print(f"Fold {i+1}:")
    print(f"Length of training set: {len(train_index)} samples")
    print(f"Length of testing set (fold): {len(test_index)} samples")
    print(f"Unique subjects in test set: {np.unique(subjects[test_index])}")

Fold 1:
Length of training set: 1398 samples
Length of testing set (fold): 356 samples
Unique subjects in test set: [ 4 16 20 23 25 28 32 38 42 46 54 57 58 60 67]
Fold 2:
Length of training set: 1401 samples
Length of testing set (fold): 353 samples
Unique subjects in test set: [ 5  8  9 19 21 31 35 39 45 49 52 61 64 68 72]
Fold 3:
Length of training set: 1405 samples
Length of testing set (fold): 349 samples
Unique subjects in test set: [ 2  3  7 11 14 18 26 29 37 44 55 65 70 71]
Fold 4:
Length of training set: 1406 samples
Length of testing set (fold): 348 samples
Unique subjects in test set: [ 6 12 17 22 34 36 43 47 48 53 56 62 63 69]
Fold 5:
Length of training set: 1406 samples
Length of testing set (fold): 348 samples
Unique subjects in test set: [ 1 10 13 15 24 27 30 33 40 41 50 51 59 66]


Let's move on to the classification. We try KNN, Random Forests, XGBoost and SVM

In [12]:
# KNN classifier
k_values = np.arange(1,11)
accuracies = np.zeros((5, len(k_values)))
for i, (train_index, test_index) in enumerate(splits):
    x_train, x_test = features[train_index], features[test_index]
    y_train, y_test = y[train_index], y[test_index]
    print(f"Fold {i+1}...")
    for k in k_values:
        knn = KNeighborsClassifier(n_neighbors=k)
        knn.fit(x_train, y_train)
        y_pred = knn.predict(x_test)
        acc = accuracy_score(y_test,y_pred)
        # print('k:',k,'accuracy:',acc)
        accuracies[i,k-1] = acc
print("Done!")

# Print the mean accuracy for each k
for k, accuracy in enumerate(np.mean(accuracies, axis=0)):
    print(f"For k = {k+1}, KNN mean accuracy = {100*accuracy:.2f}%")

Fold 1...
Fold 2...
Fold 3...
Fold 4...
Fold 5...
Done!
For k = 1, KNN mean accuracy = 75.55%
For k = 2, KNN mean accuracy = 74.63%
For k = 3, KNN mean accuracy = 76.00%
For k = 4, KNN mean accuracy = 77.37%
For k = 5, KNN mean accuracy = 77.54%
For k = 6, KNN mean accuracy = 77.66%
For k = 7, KNN mean accuracy = 77.60%
For k = 8, KNN mean accuracy = 77.31%
For k = 9, KNN mean accuracy = 77.15%
For k = 10, KNN mean accuracy = 77.26%


In [13]:
# Random Forest
tree_values = [10, 100, 250, 500, 1000]
accuracies = np.zeros((5, len(tree_values)))
for i, (train_index, test_index) in enumerate(splits):
    x_train, x_test = features[train_index], features[test_index]
    y_train, y_test = y[train_index], y[test_index]
    print(f"Fold {i+1}...")
    for j, trees in enumerate(tree_values):
        rf = RandomForestClassifier(n_estimators=trees)
        rf.fit(x_train, y_train)
        y_pred = rf.predict(x_test)
        acc = accuracy_score(y_test, y_pred)
        accuracies[i,j] = acc
print("Done!")

# Print the mean accuracy for each tree number
for j, accuracy in enumerate(np.mean(accuracies, axis=0)):
    print(f"For {tree_values[j]} trees, RF mean accuracy = {100*accuracy:.2f}%")
# print(classification_report(y_test, y_pred, target_names=['Walking', 'Sitting Down', 'Standing Up', 'Picking up an Object', 'Drinking Water', 'Falling']))

Fold 1...
Fold 2...
Fold 3...
Fold 4...
Fold 5...
Done!
For 10 trees, RF mean accuracy = 81.25%
For 100 trees, RF mean accuracy = 83.82%
For 250 trees, RF mean accuracy = 84.21%
For 500 trees, RF mean accuracy = 84.04%
For 1000 trees, RF mean accuracy = 83.99%


In [14]:
# XGBoost
tree_values = [10, 100, 250, 500, 1000]
accuracies = np.zeros((5, len(tree_values)))
for i, (train_index, test_index) in enumerate(splits):
    x_train, x_test = features[train_index], features[test_index]
    y_train, y_test = y[train_index], y[test_index]
    print(f"Fold {i+1}...")
    for j, trees in enumerate(tree_values):
        xgb = XGBClassifier(n_estimators=trees)
        xgb.fit(x_train, y_train)
        y_pred = xgb.predict(x_test)
        acc = accuracy_score(y_test, y_pred)
        accuracies[i,j] = acc
print("Done!")

# Print the mean accuracy for each tree number
for j, accuracy in enumerate(np.mean(accuracies, axis=0)):
    print(f"For {tree_values[j]} trees, XGBoost mean accuracy = {100*accuracy:.2f}%")
# print(classification_report(y_test, y_pred, target_names=['Walking', 'Sitting Down', 'Standing Up', 'Picking up an Object', 'Drinking Water', 'Falling']))

Fold 1...
Fold 2...
Fold 3...
Fold 4...
Fold 5...
Done!
For 10 trees, XGBoost mean accuracy = 84.45%
For 100 trees, XGBoost mean accuracy = 83.53%
For 250 trees, XGBoost mean accuracy = 83.59%
For 500 trees, XGBoost mean accuracy = 83.47%
For 1000 trees, XGBoost mean accuracy = 83.47%


In [16]:
# SVM
kernels = ['linear', 'poly', 'rbf', 'sigmoid']
accuracies = np.zeros((5, len(kernels)))
for i, (train_index, test_index) in enumerate(splits):
    x_train, x_test = features[train_index], features[test_index]
    y_train, y_test = y[train_index], y[test_index]
    print(f"Fold {i+1}...")
    scaler = StandardScaler()
    x_train_scaled = scaler.fit_transform(x_train)
    x_test_scaled = scaler.transform(x_test)
    for j, kernel in enumerate(kernels):
        svc = SVC(kernel=kernels[j], random_state=42)
        svc.fit(x_train_scaled, y_train)
        y_pred = svc.predict(x_test_scaled)
        acc = accuracy_score(y_test, y_pred)
        accuracies[i,j] = acc
print("Done!")

# Print the mean accuracy for each kernel
for j, accuracy in enumerate(np.mean(accuracies, axis=0)):
    print(f"For {kernels[j]} kernel, SVM mean accuracy = {100*accuracy:.2f}%")
# print(classification_report(y_test, y_pred, target_names=['Walking', 'Sitting Down', 'Standing Up', 'Picking up an Object', 'Drinking Water', 'Falling']))

Fold 1...
Fold 2...
Fold 3...
Fold 4...
Fold 5...
Done!
For linear kernel, SVM mean accuracy = 80.73%
For poly kernel, SVM mean accuracy = 69.39%
For rbf kernel, SVM mean accuracy = 83.14%
For sigmoid kernel, SVM mean accuracy = 60.11%
