# AIDL_B_AS01 - Signal Processing, Pattern Recognition and Machine Learning

## Human Activity Recognition Using Smartphones

**Dataset:** https://archive.ics.uci.edu/ml/datasets/human+activity+recognition+using+smartphones

**Goal:** Recognize human activity (6 classes) from sensor readings

---
1. DATA INSPECTION AND ACQUISITION
---

In [None]:
import numpy as np
import os

import random

import matplotlib.pyplot as plt
import numpy as np

In [None]:
DATA_PATH = "./data_set/UCI HAR Dataset/"

SIGNAL_FILES = [
    "body_acc_x",
    "total_acc_x",
    "body_gyro_x" # cant find exact match "body_gyro_acc_x"
]

ACTIVITY_NAMES = {
    1: 'WALKING', 2: 'WALKING_UPSTAIRS', 3: 'WALKING_DOWNSTAIRS',
    4: 'SITTING', 5: 'STANDING', 6: 'LYING'
}

In [None]:
def load_signals(data_path, dataset_type, signal_files):
    """
    Loads specified time-series signals for a given dataset type (train or test).

    Args:
        data_path (str): The base path to the data directory.
        dataset_type (str): 'train' or 'test'.
        signal_files (list): List of base names for the signal files (e.g., 'body_acc_x').

    Returns:
        dict: A dictionary where keys are signal names and values are 
              NumPy arrays of shape (N_instances, 128).
    """
    signals = {}
    
    # Path to the Inertial Signals folder
    signals_dir = os.path.join(data_path, dataset_type, 'Inertial Signals')
    
    print(f"--- Loading {dataset_type.upper()} Signals from: {signals_dir} ---")
    
    for signal_name in signal_files:
        # Construct the full filename
        filename = f"{signal_name}_{dataset_type}.txt"
        file_path = os.path.join(signals_dir, filename)
        
        try:
            data_matrix = np.loadtxt(file_path, dtype=np.float32)
            signals[signal_name] = data_matrix
            print(f"Loaded {filename}: Shape {data_matrix.shape}")
        except FileNotFoundError:
            print(f"ERROR: File not found at {file_path}. Skipping.")
        
    return signals


def load_labels(data_path, dataset_type):
    """
    Loads the activity labels for a given dataset type
    """
    labels_dir = os.path.join(data_path, dataset_type)
    filename = f"y_{dataset_type}.txt"
    file_path = os.path.join(labels_dir, filename)
    
    try:
        # Labels are single integers per line
        labels = np.loadtxt(file_path, dtype=np.int32)
        print(f"Loaded {filename}: Shape {labels.shape}")
        return labels
    except FileNotFoundError:
        print(f"ERROR: Label file not found at {file_path}. Cannot proceed.")
        return None

In [None]:
# Load Train Data and Labels
train_signals = load_signals(DATA_PATH, 'train', SIGNAL_FILES)
train_labels = load_labels(DATA_PATH, 'train')

# Load Test Data and Labels
test_signals = load_signals(DATA_PATH, 'test', SIGNAL_FILES)
test_labels = load_labels(DATA_PATH, 'test')

# Verify the data
if all(s in train_signals for s in SIGNAL_FILES) and train_labels is not None:
    expected_train_shape = (7352, 128) # from pdf
    first_signal_key = SIGNAL_FILES[0]
    
    if train_signals[first_signal_key].shape == expected_train_shape:
        print("\nTraining data dimensions verified successfully!")
    else:
        print(f"\nWARNING: Expected training signal shape {expected_train_shape}, but got {train_signals[first_signal_key].shape}")

In [None]:
# Data visualization and exploration

sensor_key = 'body_acc_x'
data_to_plot = train_signals[sensor_key]
sample_indices = {}

for label_code in range(1, 7):
    # Find the index of the first instance that matches the current label
    # np.where returns a tuple, we take the first element (the array of indices) 
    # and then the first index in that array.
    index = np.where(train_labels == label_code)[0][0]
    sample_indices[label_code] = index

In [None]:
fig, axes = plt.subplots(3, 2, figsize=(15, 12))
fig.suptitle(f'Sample Time Series for Sensor: {sensor_key.upper()}', fontsize=16)
axes = axes.flatten() # Flatten the 3x2 grid of axes for easy iteration

# Time axis for 128 samples
time = np.arange(128) 

for i, (label_code, index) in enumerate(sample_indices.items()):
    signal_vector = data_to_plot[index, :]
    activity_name = ACTIVITY_NAMES[label_code]
    
    # Plotting the signal
    axes[i].plot(time, signal_vector, linewidth=1.5)
    axes[i].set_title(f'Activity {label_code}: {activity_name}')
    axes[i].set_xlabel('Sample Number (n)')
    axes[i].set_ylabel('Amplitude')
    axes[i].grid(True, alpha=0.5)

plt.tight_layout(rect=[0, 0.03, 1, 0.96]) # Adjust layout to fit suptitle
plt.show()

---
2. YULE-WALKER LINEAR PREDICTION
---

Discussion on High Order $p$ (Theoretical Question)You are asked: What happens if $p$ is too high? 

When the predictor order $p$ is increased beyond the actual order of the underlying process:

Overfitting: The model starts to fit the noise (the random variations/artifacts) in the signal in addition to the deterministic, underlying pattern. Essentially, the model tries to memorize the training data too well

Increased Variance/Instability: The coefficients become less stable (higher variance). Since the Yule-Walker method is non-iterative and assumes the true signal is AR, a very high order means the autocorrelation matrix becomes close to singular (ill-conditioned), leading to unreliable coefficient estimates

Noisy Prediction: The predicted signal, while having a very low training error (the error you are minimizing), will perform poorly on new, unseen data. It will incorporate noise that is irrelevant to the true signal structure. For your classification task, high $p$ will lead to feature vectors that are highly sensitive to noise, reducing classification accuracy

In [None]:
# --- Helper Functions for Yule-Walker ---

def calculate_autocorrelation(signal, p):
    """
    Calculates the biased autocorrelation estimate r[k] for lags 0 up to p.

    Args:
        signal (np.array): The 1D input signal (zero-mean assumption, though not strictly required for this estimate).
        p (int): The maximum lag (order) to calculate for.

    Returns:
        np.array: Autocorrelation vector r[0], r[1], ..., r[p]
    """
    N = len(signal)
    # Using 'full' mode and taking the positive lags provides a robust way
    # to estimate the autocorrelation. We normalize by N (biased estimate).
    r_full = np.correlate(signal, signal, mode='full') / N
    
    # Extract only the positive lags: r[0] is at index N-1, up to r[p]
    r = r_full[N-1 : N-1 + p + 1]
    return r

def solve_yule_walker(r, p):
    """
    Solves the Yule-Walker equations R_p * a = -r_p for the LPC coefficients 'a'.

    Args:
        r (np.array): Autocorrelation vector r[0] to r[p].
        p (int): The order of the predictor.

    Returns:
        np.array: The LPC coefficients a = [a1, a2, ..., ap].
    """
    # 1. Construct the R_p Toeplitz matrix (p x p)
    # R_p = [r[|i-j|]] where i, j are 1 to p.
    # The elements are taken from r[0] to r[p-1].
    R = np.zeros((p, p))
    for i in range(p):
        for j in range(p):
            R[i, j] = r[np.abs(i - j)]

    # 2. Construct the -r_p vector (p x 1)
    # r_p = [r[1], r[2], ..., r[p]]
    r_p = -r[1 : p + 1]

    # 3. Solve the linear system R * a = r_p
    # a_coeffs will be [a1, a2, ..., ap]
    try:
        a_coeffs = np.linalg.solve(R, r_p)
    except np.linalg.LinAlgError:
        # If the matrix R is singular or ill-conditioned (rare, but possible)
        print("Warning: R matrix is singular. Using least-squares approximation.")
        a_coeffs = np.linalg.lstsq(R, r_p, rcond=None)[0]

    return a_coeffs

def calculate_prediction_error_and_signal(signal, a, p):
    """
    Calculates the Mean Squared Error (MSE) of the p-order linear prediction
    and returns the full predicted signal vector.
    
    Args:
        signal (np.array): The 1D input signal (length N).
        a (np.array): The LPC coefficients [a1, a2, ..., ap] (length p).
        p (int): The order of the predictor.

    Returns:
        tuple: (Mean Squared Error (MSE), Predicted Signal Vector)
    """
    N = len(signal)
    predicted_signal = np.zeros(N)
    
    # We use explicit indices for prediction: x_hat[n] = -sum_{k=1}^{p} a_k * x[n-k]
    for n in range(p, N):
        # The coefficients 'a' are [a1, a2, ..., ap].
        
        # We need the p previous samples: x[n-1], x[n-2], ..., x[n-p].
        # 1. Get the slice of p samples: signal[n-p] up to signal[n-1]
        #    This is the slice signal[n-p : n] (forward slice, length p).
        # 2. Reverse the slice to align with the coefficients: [::-1]
        #    This yields [x[n-1], x[n-2], ..., x[n-p]].
        past_samples = signal[n-p : n][::-1] 
        
        # Calculate the predicted sample
        prediction = -np.sum(a * past_samples)
        predicted_signal[n] = prediction
        
    # Calculate error (MSE) only on the predicted part (from index p to N)
    original_predicted_part = signal[p:]
    prediction_actual_part = predicted_signal[p:]
    
    error = original_predicted_part - prediction_actual_part
    mse = np.mean(error**2)
    
    return mse, predicted_signal

In [None]:
data_to_analyze = train_signals['body_acc_x']

N_SAMPLES = data_to_analyze.shape[1] # 128

In [None]:
P_RANGE = range(2, 21, 2) # Test range p=2, 4, 6, ..., 20
N_RANDOM_SAMPLES = 10 # Average over 10 samples
all_average_errors = {}
random_indices = random.sample(range(data_to_analyze.shape[0]), N_RANDOM_SAMPLES)

print("--- Step 1: Finding Optimal Predictor Order p ---")

for p in P_RANGE:
    current_p_errors = []
    
    for idx in random_indices:
        signal = data_to_analyze[idx, :]
        
        # Calculate coefficients
        r_coeffs = calculate_autocorrelation(signal, p)
        a_coeffs = solve_yule_walker(r_coeffs, p)
        
        # Calculate error
        mse, _ = calculate_prediction_error_and_signal(signal, a_coeffs, p)
        current_p_errors.append(mse)
        
    avg_error = np.mean(current_p_errors)
    all_average_errors[p] = avg_error
    print(f"Order p={p}: Average MSE = {avg_error:.6f}")

# Determine the best p
BEST_P = min(all_average_errors, key=all_average_errors.get)
MIN_AVG_ERROR = all_average_errors[BEST_P]

print(f"\nOptimal Predictor Order: BEST_P = {BEST_P}")
print(f"Minimum Average MSE: {MIN_AVG_ERROR:.6f}")

In [None]:
PLOT_INDEX = random_indices[0] # Use the first random sample for the plot
signal_to_plot = data_to_analyze[PLOT_INDEX, :]

# Recalculate coefficients and prediction for the best p
r_best = calculate_autocorrelation(signal_to_plot, BEST_P)
a_best = solve_yule_walker(r_best, BEST_P)
_, predicted_signal_best = calculate_prediction_error_and_signal(signal_to_plot, a_best, BEST_P)

plt.figure(figsize=(12, 6))
time_axis = np.arange(N_SAMPLES)
plt.plot(time_axis, signal_to_plot, label='Original Data (x[n])', color='C0', linewidth=2)
plt.plot(time_axis[BEST_P:], predicted_signal_best[BEST_P:], 
         label=f'Predicted Data (x̂[n], Order p={BEST_P})', color='C1', linestyle='--', linewidth=2)
plt.axvline(x=BEST_P, color='gray', linestyle=':', label='Prediction Start')

plt.title(f'Linear Prediction (Yule-Walker) for Best Order p={BEST_P}')
plt.xlabel('Sample Number (n)')
plt.ylabel('Amplitude')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

#### 2.5 Analysis Questions

##### Q1: What happens when p is too high?

Overfitting and Noise Modeling

A high-order model gains too many degrees of freedom, allowing it to fit not only the underlying, predictable structure of the signal (the actual dynamics of the human activity) but also the random, non-repeating fluctuations and noise inherent in that specific 128-sample segment. For instance, the model might try to fit a small, unique spike in the signal that is purely noise, rather than a genuine periodic movement

Poor Generalization

While a high $p$ might lead to a very low prediction error on the 10 training segments used to find the best $p$ (especially if $p$ approaches $N=128$), the resulting LPC coefficients ($\mathbf{a}$) will be highly sensitive to those specific noise patterns. This means the model loses its ability to generalize to new, unseen segments of the same activity, leading to higher prediction errors when tested on the entire dataset. A model that performs perfectly on training data but poorly on test data is the definition of overfitting

Computational and Stability Issues

Increasing $p$ significantly increases the size of the Toeplitz autocorrelation matrix Rp

##### Q2: How is linear prediction related to AR modeling? When is the output of a linear predictor an AR process?

Linear Prediction is the method you use to find the numerical rules (the AR coefficients) that best describe how a signal's past values predict its future values. If we assume a signal is a song for which we try to predict the next note based on the previous: the AR model is the rulebook of the music ("next note is always a mix of previous five notes with little randomness") --> true weights and the Linear Prediction is the tool we use to figure out that rulebook by studying a piece of music --> finding the correct weights that minimize the error.

The Linear Prediction coefficients define the mathematical Auto Regressive model.


---
3. KNN ALGORITHM IMPLEMENTATION
---

In [None]:
# KNN classifier from scratch
 
def knn_classifier(X_train, y_train, X_test, k=3):
    """
    Implement KNN classification on a set of test data.
    
    Args:
        X_train (np.array): Training feature vectors (N_train, D_features)
        y_train (np.array): Training labels (N_train,)
        X_test (np.array): Testing feature vectors (N_test, D_features)
        k (int): Number of nearest neighbors to consider.

    Returns:
        np.array: Predicted labels for the test set (N_test,)
    """
    y_pred = []
    
    # Iterate through every instance in the test set
    for test_instance in X_test:
        distances = []
        
        # Step 1: Calculate distance to ALL training instances
        for i, train_instance in enumerate(X_train):
            dist = euclidean_distance(test_instance, train_instance)
            # Store the distance and the corresponding training label
            distances.append((dist, y_train[i]))
            
        # Step 2: Find the K-Nearest Neighbors
        # Sort the list based on distance (the first element of the tuple)
        distances.sort(key=lambda x: x[0])
        
        # Get the labels of the K closest neighbors
        neighbors = distances[:k]
        neighbor_labels = [label for (dist, label) in neighbors]
        
        # Step 3: Perform Majority Vote
        # Use Counter to find the most common label (class)
        # Counter.most_common(1) returns [(most_common_label, count)]
        most_common_label = Counter(neighbor_labels).most_common(1)[0][0]
        y_pred.append(most_common_label)
        
    return np.array(y_pred)

def euclidean_distance(x1, x2):
    """
    Calculate Euclidean distance between two vectors (x1 and x2)
    using numpy for efficiency.
    
    The Euclidean distance is defined as: sqrt(sum((x1_k - x2_k)^2))
    """
    # Use numpy's efficient broadcasting and vector operations
    squared_diff = (x1 - x2) ** 2
    distance = np.sqrt(np.sum(squared_diff))
    return distance

In [None]:
# - Total accuracy = (# correctly classified) / (# total instances) * 100%

def calculate_accuracy(y_true, y_pred):
    """
    Calculates the total classification accuracy (in percent).
    """
    correct_predictions = np.sum(y_true == y_pred)
    total_instances = len(y_true)
    accuracy = (correct_predictions / total_instances) * 100
    return accuracy

---
4. KNN BENCHMARK (RAW DATA)
---

In [None]:
# 4.1 Perform KNN on raw data
# - Use all 3 sensor files (body_acc_x, total_acc_x, body_gyro_x)
# - Set K=3
# - Use Euclidean distance metric


In [None]:
# 4.2 Report baseline accuracy
# - Calculate and report total classification accuracy
# - This serves as benchmark for comparison


---
5. KNN DATA PRE-PROCESSING (NORMALIZATION)
---

In [None]:
# 5.1 Implement normalization methods
# Options to consider:
#   - Zero mean and unit standard deviation (z-score)
#   - Range normalization to [0, 1] (min-max scaling)
#   - Global mean subtraction and global std division

def normalize_zscore(X_train, X_test):
    """
    Z-score normalization
    """
    pass

def normalize_minmax(X_train, X_test):
    """
    Min-max normalization to [0, 1]
    """
    pass


In [None]:
# 5.2 Apply normalization to training and test data
# - Normalize both train and test sets consistently


In [None]:
# 5.3 Perform KNN with K=3 on normalized data
# - Calculate classification accuracy


5.4 Comparison with Benchmark

Discuss improvement (or lack thereof) compared to raw data:

---
6. SELECTING MOST IMPORTANT TRAINING SET
---

In [None]:
# 6.1 Analyze importance of each sensor type
# - body_acc_x: Body acceleration (x-axis)
# - total_acc_x: Total acceleration (x-axis)
# - body_gyro_x: Body gyroscope (x-axis)


6.2 Selection and Justification

Selected two most important files:
1. 
2. 

Justification:

In [None]:
# 6.3 Create reduced training and test sets
# - Use only the selected two sensor types


---
7. KNN FEATURES DESIGN
---

In [None]:
# 7.1 Feature Configuration A: LPC Coefficients
# - Use Linear Prediction Coding coefficients from step 2
# - Extract p coefficients (from optimal order)
# - Maximum feature vector length: 32

def extract_lpc_features(X, order):
    """
    Extract LPC coefficients as features
    """
    pass


In [None]:
# 7.2 Feature Configuration B: Time Domain Features
# Possible features:
#   - Mean, Standard deviation, Variance
#   - Minimum, Maximum, Range
#   - Median, Quartiles
#   - Zero-crossing rate
#   - Mean crossing rate
#   - Skewness, Kurtosis
#   - Energy, Entropy
#   - Signal magnitude area

def extract_time_features(X):
    """
    Extract time domain features
    """
    pass


In [None]:
# 7.3 Feature Configuration C: Frequency Domain Features
# - Apply FFT to each signal
# - Extract features from frequency spectrum:
#   * Spectral energy
#   * Spectral entropy
#   * Dominant frequency
#   * Spectral centroid
#   * Power spectral density features
#   * Frequency bands energy

def extract_frequency_features(X):
    """
    Extract frequency domain features using FFT
    """
    pass


In [None]:
# 7.4 Combined Feature Configurations
# - Experiment with combinations of A, B, C
# - Design at least 3 different configurations
# - Keep total feature vector length ≤ 32


In [None]:
# 7.5 Evaluate each configuration
# - Run KNN (K=3) on each feature set
# - Report classification accuracy
# - Compare and select best configuration


7.6 Theoretical Question

Q: Is frequency domain analysis (FFT) related to spectrograms? How are they similar/different?

*Answer:*

---
8. KNN FINE-TUNING (HYPERPARAMETER OPTIMIZATION)
---

In [None]:
# 8.1 Test different values of K
# - Try K values: 1, 3, 5, 7, 9, 11, 13, 15, etc.
# - Use best feature configuration from step 7


In [None]:
# 8.2 Plot K vs Accuracy
# - Visualize effect of K on classification accuracy


8.3 Analysis and Discussion

Q1: What happens when K increases? Why?

*Answer:*

Q2: What happens when K decreases? Why?

*Answer:*

Q3: How is this related to underfitting/overfitting?

*Answer:*

Q4: Why select odd values of K?

*Answer:*

In [None]:
# 8.4 Report optimal K value
# - Select K that gives best accuracy
# - Report final classification performance


---
9. RESULTS SUMMARY AND CONCLUSIONS
---

In [None]:
# 9.1 Summary table of all experiments
# - Benchmark accuracy (raw data)
# - Normalized data accuracy
# - Accuracy with different feature configurations
# - Accuracy with different K values


9.2 Best Configuration

Optimal Configuration:
- Normalization method: 
- Feature set: 
- K value: 
- Final accuracy: 

9.3 Insights and Conclusions

Key Findings:

Feature Importance Observations:

Lessons Learned: