In [None]:
# change directory to root folder
# %cd "C:\Users\New Asus\Documents\FIT4701_2025_Sem1\SHD-HAR-Dataset\SHD-HAR-Dataset-main"

In [None]:
import pandas as pd

In [None]:
import re
from math import sqrt, atan2

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import collections

In [None]:
import cv2

In [None]:
import csv

In [None]:
import tensorflow as tf
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.decomposition import PCA

In [None]:
import os

In [None]:
class ESP32:
    """Parse ESP32 Wi-Fi Channel State Information (CSI) obtained using ESP32 CSI Toolkit by Hernandez and Bulut.
    ESP32 CSI Toolkit: https://stevenmhernandez.github.io/ESP32-CSI-Tool/
    """

    # View README.md for more information on null subcarriers
    NULL_SUBCARRIERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 64, 65, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 382, 383]

    def __init__(self, csi_file):
        self.csi_file = csi_file
        self.__read_file()
    
    def __read_file(self):
        """Read RAW CSI file (.csv) using Pandas and return a Pandas dataframe
        """
        self.csi_df = pd.read_csv(self.csi_file)

    def seek_file(self):
        """Seek RAW CSI file
        """
        return self.csi_df

    def filter_by_sig_mode(self, sig_mode):
        """Filter CSI data by signal mode
        Args:  
            sig_mode (int):
            0 : Non - High Throughput Signals (non-HT)
            1 : HIgh Throughput Signals (HT)
        """
        self.csi_df = self.csi_df.loc[self.csi_df['sig_mode'] == sig_mode]
        return self

    def get_csi(self):
        """Read CSI string as Numpy array

        The CSI data collected by ESP32 contains channel frequency responses (CFR) represented by two signed bytes (imaginary, real) for each sub-carriers index
        The length (bytes) of the CSI sequency depends on the CFR type
        CFR consist of legacy long training field (LLTF), high-throughput LTF (HT-LTF), and space- time block code HT-LTF (STBC-HT-LTF)
        Ref: https://docs.espressif.com/projects/esp-idf/en/latest/esp32/api-guides/wifi.html#wi-fi-channel-state-information

        NOTE: Not all 3 field may not be present (as represented in table and configuration)
        """
        raw_csi_data = self.csi_df['data'].copy()
        csi_data = np.array([np.fromstring(csi_datum.strip('[ ]'), dtype=int, sep = ',') for csi_datum in raw_csi_data])
        self.csi_data = csi_data
        return self

    # NOTE: Currently does not provide support for all signal subcarrier types
    def remove_null_subcarriers(self):
        """Remove NULL subcarriers from CSI
        """

        # Non-HT Signals (20 Mhz) - non STBC
        if self.csi_data.shape[1] == 128:
            remove_null_subcarriers = self.NULL_SUBCARRIERS[:24]
        # HT Signals (40 Mhz) - non STBC
        elif self.csi_data.shape[1] == 384:
            remove_null_subcarriers = self.NULL_SUBCARRIERS
        else:
            return self

        csi_data_T = self.csi_data.T
        csi_data_T_clean = np.delete(csi_data_T, remove_null_subcarriers, 0)
        csi_data_clean = csi_data_T_clean.T
        self.csi_data = csi_data_clean

        return self

    def get_amplitude_from_csi(self):
        """Calculate the Amplitude (or Magnitude) from CSI
        Ref: https://farside.ph.utexas.edu/teaching/315/Waveshtml/node88.html
        """
        amplitude = np.array([np.sqrt(data[::2]**2 + data[1::2]**2) for data in self.csi_data])
        self.amplitude = amplitude
        return self

    def get_phase_from_csi(self):
        """Calculate the Amplitude (or Magnitude) from CSI
        Ref: https://farside.ph.utexas.edu/teaching/315/Waveshtml/node88.html
        """
        phase = np.array([np.arctan2(data[::2], data[1::2]) for data in self.csi_data])
        self.phase = phase
        return self

In [None]:
def amplitude_plot(amp, start_stamp = 0, num_packets = 1000, plot_name = ""):
    """
    plotting function for visualizing subcarrier amplitude per packet
    amp: csi amplitude array
    start_stamp: start plot from which packet
    num_packet: plot how many packets
    plot_name: name of activity being plotted   
    """
    plt.clf()
    # number of subcarriers
    num_lines = amp.shape[1]

    # setup color map
    cmap = plt.cm.hsv
    norm = np.linspace(0, 1, num_lines)
    sm = plt.cm.ScalarMappable(cmap=cmap, norm=plt.Normalize(0, 1))
    
    # setup plot
    plt.figure(figsize=(15,10))
    df = np.asarray(amp, dtype=np.int32)

    # plot each subcarrier
    for i in range(num_lines):   
        plt.plot(range(num_packets), df[start_stamp:start_stamp+num_packets, i], color= sm.to_rgba(norm[i]), linewidth = 0.5)
        
    # attach labels and limitations
    plt.xlabel("Packet")
    plt.ylabel("Amplitude")
    plt.xlim(0, num_packets)
    plt.title(f"Amplitude-Packet plot ({plot_name})")
    plt.show()

In [None]:
def apply_pca(X, variance_threshold=0.95):
    """
    Apply PCA and determine the number of components (k) to keep based on a variance threshold.
    
    Args:
        X: Tensor of shape (n_samples, n_features), your input data.
        variance_threshold: Float, target cumulative variance (e.g., 0.95 for 95%).
    
    Returns:
        reduced_X: Tensor with reduced dimensions.
        k: Integer, number of components retained.
    """
    # Center the data
    mean = tf.reduce_mean(X, axis=0)
    centered_X = X - mean
    
    # Perform SVD
    s, u, v = tf.linalg.svd(centered_X, full_matrices=False)
    
    # Compute explained variance ratio
    explained_variance_ratio = s**2 / tf.reduce_sum(s**2)
    
    # Compute cumulative explained variance
    cumulative_variance = tf.cumsum(explained_variance_ratio)
    
    # Find k where cumulative variance meets or exceeds the threshold
    k = tf.argmax(cumulative_variance >= variance_threshold) + 1
    k = tf.cast(k, tf.int32)  # Ensure k is an integer
    
    # Reduce the data to k components
    reduced_X = tf.matmul(centered_X, v[:, :k])
    
    return reduced_X

In [None]:
def csv_amplitude(directory):
    """
    function to convert directory of raw csv files to ampltude csv files
    directory: directory containing raw csv files
    """
    # Walk through the directory and its subdirectories
    for dirpath, _, filenames in os.walk(directory):
        # Filter out CSV files
        csv_files = [filename for filename in filenames if filename.endswith('.csv')]
        
        if csv_files:  # Proceed only if CSV files are found in the current directory
            # Create the output directory if it doesn't exist
            output_directory = os.path.join(dirpath, 'amplitude')
            os.makedirs(output_directory, exist_ok=True)
            
            # Process each CSV file
            for csv_file in csv_files:
                # Construct the full path to the CSV file
                csv_file_path = os.path.join(dirpath, csv_file)
                
                # Construct the output CSV file path
                output_csv_file = os.path.join(output_directory, os.path.splitext(csv_file)[0] + ".csv")
                
                # Load the matrix from the CSV file
                matrix = (ESP32(csv_file_path)
                          .filter_by_sig_mode(1)
                          .get_csi()
                          .remove_null_subcarriers()
                          .get_amplitude_from_csi().amplitude)
                
                # Adjust the decimals according to your needs
                matrix = np.round(matrix, decimals = 5)
                
                # Write each row of the matrix to the output CSV file sequentially
                with open(output_csv_file, 'w', newline='') as csvfile:
                    writer = csv.writer(csvfile)
                    for row in matrix:
                        writer.writerow(row)

In [None]:
def directory_to_heatmap(directory, dim = 64):
    """
    function to convert amplitude files to heatmap jpg
    directory: amplitude file directory
    dim: length and width of jpg
    """
    # Walk through the directory and its subdirectories
    for dirpath, _, filenames in os.walk(directory):
        # Filter out CSV files
        csv_files = [filename for filename in filenames if filename.endswith('.csv')]
        
        if csv_files:  # Proceed only if CSV files are found in the current directory
            # Create the output directory if it doesn't exist
            path_components = os.path.split(dirpath)
            output_head = path_components[0] + "_picture"
            # Indicate progress
            print(f"{path_components[1]} start")
            output_directory = os.path.join(output_head, path_components[1])
            os.makedirs(output_directory, exist_ok=True)
            
            # Process each CSV file
            for csv_file in csv_files:
                # Construct the full path to the CSV file
                csv_file_path = os.path.join(dirpath, csv_file)
                
                # Construct the output image path
                output_image = os.path.join(output_directory, os.path.splitext(csv_file)[0] + ".jpg")
                
                # Load the matrix from the CSV file
                matrix = pd.read_csv(csv_file_path, header=None) 
                matrix = matrix.astype(float)
    
                # Normalize data
                scaler = MinMaxScaler()
                scaled_matrix = scaler.fit_transform(matrix)
    
                # Resize data using OpenCV
                resized_matrix = cv2.resize(scaled_matrix, (dim, dim))

                # Reshape data for image
                image_data = resized_matrix.reshape(1, *resized_matrix.shape, 1)
    
                # Create ImageDataGenerator
                datagen = ImageDataGenerator(featurewise_center=False)
    
                # Fit generator to data
                datagen.fit(image_data)
    
                # Generate heatmap image
                for X_batch in datagen.flow(image_data, batch_size=1):
                    heatmap_image = X_batch.squeeze()
                    break  # only generate one batch
    
                # Plot and save heatmap
                plt.imshow(heatmap_image, cmap='hot', interpolation='nearest')
                plt.axis('off')
                plt.savefig(output_image, bbox_inches='tight')  # Save as .jpg file
                plt.close()
            # Indicate progress
            print(f"{path_components[1]} done!")

In [None]:
def process_amplitude(directory):
    """
    function to convert directory of raw csv files to ampltude csv files
    directory: directory containing raw csv files
    """
    # Walk through the directory and its subdirectories
    for dirpath, _, filenames in os.walk(directory):
        # Filter out CSV files
        csv_files = [filename for filename in filenames if filename.endswith('.csv')]
        
        if csv_files:  # Proceed only if CSV files are found in the current directory
            # Create the output directory if it doesn't exist
            output_directory = os.path.join(dirpath, 'amplitude')
            os.makedirs(output_directory, exist_ok=True)
            
            # Process each CSV file
            for csv_file in csv_files:
                # Construct the full path to the CSV file
                csv_file_path = os.path.join(dirpath, csv_file)
                
                # Construct the output CSV file path
                output_csv_file = os.path.join(output_directory, os.path.splitext(csv_file)[0] + ".csv")
                
                matrix = np.loadtxt(csv_file_path, delimiter=',')
                matrix = tf.convert_to_tensor(matrix, dtype=tf.float32)
                pca = PCA(n_components=0.95)
                matrix = pca.fit_transform(matrix)
                # matrix = apply_pca(matrix, variance_threshold=0.95)
                
                # Write each row of the matrix to the output CSV file sequentially
                with open(output_csv_file, 'w', newline='') as csvfile:
                    writer = csv.writer(csvfile)
                    for row in matrix:
                        writer.writerow(row)

In [None]:
# process_amplitude(r"C:\Users\New Asus\Documents\FIT4701_2025_Sem1\resources\SHD-HAR-Dataset\SHD-HAR-Dataset-main\amplitude\front")

In [None]:
# csv_file_path = r"C:\Users\New Asus\Documents\FIT4701_2025_Sem1\resources\SHD-HAR-Dataset\SHD-HAR-Dataset-main\amplitude\clap_subset\amplitude\clap1.csv"

# # Load the matrix from the CSV file
# matrix = pd.read_csv(csv_file_path, header=None) 
# matrix = matrix.astype(float)
# # Get the amplitude array
# num_packets = min(1000, matrix.shape[0])
# amplitude_plot(matrix, start_stamp=0, num_packets=num_packets, plot_name="Example Plot")


In [None]:
# csv_file_path = r"C:\Users\New Asus\Documents\FIT4701_2025_Sem1\resources\SHD-HAR-Dataset\SHD-HAR-Dataset-main\amplitude\pca_front\amplitude\clap1.csv"

# # Load the matrix from the CSV file
# matrix = pd.read_csv(csv_file_path, header=None) 
# matrix = matrix.astype(float)
# # Get the amplitude array
# # amp = esp32.amplitude
# # print(amp)
# num_packets = min(1000, matrix.shape[0])
# amplitude_plot(matrix, start_stamp=0, num_packets=num_packets, plot_name="Example Plot")

In [None]:
# esp32 = ESP32(r"C:\Users\New Asus\Documents\FIT4701_2025_Sem1\resources\SHD-HAR-Dataset\SHD-HAR-Dataset-main\raw\front\jump\jump1.csv")

# # Read and filter the dataset (optional)
# esp32.get_csi()  # Extract CSI data
# esp32.remove_null_subcarriers()  # Remove null subcarriers
# esp32.get_amplitude_from_csi()  # Compute amplitude

# # Get the amplitude array
# amp = esp32.amplitude
# print(amp)
# num_packets = min(1000, amp.shape[0])
# amplitude_plot(amp, start_stamp=0, num_packets=num_packets, plot_name="Example Plot")

In [None]:
# esp32 = ESP32(r"C:\Users\New Asus\Documents\FIT4701_2025_Sem1\resources\SHD-HAR-Dataset\SHD-HAR-Dataset-main\raw\front\clap\clap1.csv")

# # Read and filter the dataset (optional)
# esp32.get_csi()  # Extract CSI data
# esp32.remove_null_subcarriers()  # Remove null subcarriers
# esp32.get_amplitude_from_csi()  # Compute amplitude

# # Get the amplitude array
# amp = esp32.amplitude
# print(amp)

## References: https://github.com/StatQuest/pca_demo/blob/master/pca_demo.py
# pca = PCA() # create a PCA object
# pca.fit(amp) # do the math
# pca_data = pca.transform(amp) # get PCA coordinates for scaled_data

# per_var = np.round(pca.explained_variance_ratio_* 100, decimals=1)
# labels = ['PC' + str(x) for x in range(1, len(per_var)+1)]

# plt.figure(figsize=(100, 6))
# plt.bar(x=range(1,len(per_var)+1), height=per_var, tick_label=labels)
# plt.ylabel('Percentage of Explained Variance')
# plt.xlabel('Principal Component')
# plt.title('Scree Plot')
# plt.show()

In [None]:
directory_to_heatmap(r"C:\Users\New Asus\Documents\FIT4701_2025_Sem1\training_img_dataset\front_dataset\front_micro\pca_training")