In [3]:
import struct
import numpy as np
from itertools import islice
import pandas as pd
import os
import math
import matplotlib.pyplot as plt

## 1. Parse Bin to excel Classes

In [8]:
class FrameHeader:
    def __init__(self, binary):
        #self.magic_word = binary[0:8]
        self.version = int.from_bytes(binary[0:4], 'little')
        self.packet_length = int.from_bytes(binary[4:8], 'little')
        self.platform = int.from_bytes(binary[8:12], 'little')
        self.frame_number = int.from_bytes(binary[12:16], 'little')
        self.time_cpu_cycle = int.from_bytes(binary[16:20], 'little')
        self.num_del = int.from_bytes(binary[20:24], 'little')
        self.num_tlv = int.from_bytes(binary[24:28], 'little')
        self.subframe_no = int.from_bytes(binary[28:32], 'little')
        self.tlv = []
        ptr = 32
        for n in range(self.num_tlv):
            temp = TLV(binary[ptr:])
            self.tlv.append(temp)
            ptr += 8 + temp.tlv_header.length
        self.binary = binary
    
    def __str__(self):
        return "binary :"+str(self.binary)+"\nmagic word :"+str(b'\x02\x01\x04\x03\x06\x05\x08\x07')+"\nversion :"+str(self.version)+"\nframe_number :"+str(self.frame_number)+"\npacket_length :"+str(self.packet_length)+"\nnumber of TLV:"+str(self.num_tlv)

class TLVHeader: 
    def __init__(self, binary):
        self.type = int.from_bytes(binary[0:4], 'little')
        self.length = int.from_bytes(binary[4:8], 'little')

# if type = 301
class TLVPointCoord:
    def __init__(self, binary):
        xyzUnit = struct.unpack('<f', binary[0:4])[0]
        dopplerUnit = struct.unpack('<f', binary[4:8])[0]
        snrUnit = struct.unpack('<f', binary[8:12])[0]
        noiseUnit = struct.unpack('<f', binary[12:16])[0]
        # decompress all values
        self.x = []
        self.y = []
        self.z = []
        self.doppler = []
        self.snr = []
        self.noise = []
        # start pointer at 20, throw 4 bytes away
        ptr = 20
        while ptr < len(binary):
            ptr += 10
            self.x.append(int.from_bytes(binary[ptr:ptr+2], 'little') * xyzUnit)
            self.y.append(int.from_bytes(binary[ptr+2:ptr+4], 'little') * xyzUnit)
            self.z.append(int.from_bytes(binary[ptr+4:ptr+6], 'little') * xyzUnit)
            self.doppler.append(int.from_bytes(binary[ptr+6:ptr+8], 'little') * dopplerUnit)
            self.snr.append(int.from_bytes(binary[ptr+8:ptr+9], 'little') * snrUnit)
            self.noise.append(int.from_bytes(binary[ptr+9:ptr+10], 'little') * noiseUnit)
        #self.rest = binary[ptr:]
        
    def __str__(self):
        return "Service 301 \nx :"+str(self.x)+"\ny :"+str(self.y)+"\nz:"+str(self.z)+"\ndoppler:"+str(self.doppler)+"\nsnr:"+str(self.snr)+"\nnoise:"+str(self.noise)+"\nrest of it:"#+str(self.rest)

# if type = 302/303
class TLVTargetList:
    def __init__(self, binary):
        self.tid = []
        self.posX = []
        self.posY = []
        self.posZ = []
        self.velX = []
        self.velY = []
        self.velZ = []
        self.accX = []
        self.accY = []
        self.accZ = []
        self.gain = []
        self.confidence = []
        
        targetStruct = 'I27f'
        targetSize = struct.calcsize(targetStruct)
        try:
            targetData = struct.unpack(targetStruct,binary[:targetSize])
        except:
            print('ERROR: Target TLV parsing failed')
        # ignore extra values 
        while (len(binary)> targetSize):
            self.tid.append(targetData[0])
            self.posX.append(targetData[1])
            self.posY.append(targetData[2])
            self.posZ.append(targetData[3])
            self.velX.append(targetData[4])
            self.velY.append(targetData[5])
            self.velZ.append(targetData[6])
            self.accX.append(targetData[7])
            self.accY.append(targetData[8])
            self.accZ.append(targetData[9])
            self.gain.append(targetData[26])
            self.confidence.append(targetData[27])
            #throw EC away
            binary = binary[targetSize:]
            #self.re_len = len(binary)
        
    def __str__(self):
        return "Service 302/3 \ntid :"+str(self.tid)+"\nposX"+str(self.posX)+"\nposY"+str(self.posY)+"\nposZ"+str(self.posZ)+"\nvelX"+str(self.velX)+"\nvelY"+str(self.velY)+"\nvelZ"+str(self.velZ)+"\naccX"+str(self.accX)+"\naccY"+str(self.accY)+"\naccZ"+str(self.accZ)+"\ngain"+str(self.gain)+"\nconfidence"+str(self.confidence)#+"\nRemaining:"+str(self.re_len)

#empty type for error messages
class UnimplTLVService:
    def __init__(self, type):
        error = "TLV Service "+ str(type) + " has not been implemented yet"

class TLVTargetIndex: 
    def __init__(self, binary):
        self.index = []
        for b in binary:
            self.index.append(int(b))
    
    def __str__(self):
        return "Service 306 \nIndexes :" + str(self.index)
    
class TLVEnhancedPresenceIndication:
    def __init__(self, binary):
        pointStruct = '1b'  # While there are technically 2 bits per zone, we need to use at least 1 byte to represent
        pointStructSize = struct.calcsize(pointStruct)
        numZones = (binary[0]) # First byte in the TLV is the number of zones, the rest of it is the occupancy data
        self.zonePresence = [0]
        binary = binary[1:]
        zoneCount = 0
        while(zoneCount < numZones):
            try:
                idx = math.floor((zoneCount)/4)
                self.zonePresence.append(binary[idx] >> (((zoneCount) * 2) % 8) & 3)
                zoneCount = zoneCount + 1
            except:
                print('Error: Enhanced Presence Detection TLV Parser Failed')
                break
        binary = binary[pointStructSize:]
    

    def __str__(self):
        return "Zone array is :" + str(self.zonePresence)

class TLV:
    def __init__(self, binary):
        self.tlv_header = TLVHeader(binary[0:8])
        if self.tlv_header.type == 301:
            self.tlv_value = TLVPointCoord(binary[8:8+self.tlv_header.length])
        elif self.tlv_header.type == 308:# or self.tlv_header.type == 302 or self.tlv_header.type == 303:
        #elif self.tlv_header.type == 302 or self.tlv_header.type ==303:
            self.tlv_value = TLVTargetList(binary[8:8+self.tlv_header.length])
        elif self.tlv_header.type == 309:
        #elif self.tlv_header.type == 306:
            self.tlv_value = TLVTargetIndex(binary[8:8+self.tlv_header.length])
        elif self.tlv_header.type == 315:
            self.tlv_value = TLVEnhancedPresenceIndication(binary[8:8+self.tlv_header.length])
        else:
            self.tlv_value = UnimplTLVService(self.tlv_header.type)
            #print("Service out of range " ,self.tlv_header.type)



### Parse binaries in a folder to excel file

In [None]:
def parse_bin_file(file_path):
    # Parses a single binary file and yields parsed frames as DataFrames

    with open(file_path, "rb") as f:
        data = f.read()

    frames = data.split(b'\x02\x01\x04\x03\x06\x05\x08\x07')
    frames = frames[1:]


    for frame in frames:
        frame_header = FrameHeader(frame)
        df_frame = pd.DataFrame([])
        for tlv in frame_header.tlv:                    
            df_tlv = pd.DataFrame.from_dict(tlv.tlv_value.__dict__, orient="index")
            df_frame = pd.concat([df_frame, df_tlv])
        yield df_frame

# Folder containing the binary files
folder_path = "01_raw_bin/01_sitting_1000frames"

writer = pd.ExcelWriter("02_parsed/01_sitting_1000frames.xlsx", engine='openpyxl')

# Iterate through the binary files in the folder
sheet_count = 1  # Track sheet number starting from 1
for filename in os.listdir(folder_path):
    if filename.endswith(".bin"):
        file_path = os.path.join(folder_path, filename)
        print(f"Processing file: {file_path}")
        for frame_df in parse_bin_file(file_path):
            # Sheet name for each frame within a file
            frame_number = str(sheet_count)
            sheet_name = f"{filename} - Frame {frame_number}"
            frame_df.to_excel(writer, sheet_name=sheet_name)
            sheet_count += 1
            
# Save and close the Excel file
writer._save()
writer.close()

print("Completed")

## 2. Read excel file

In [12]:
excel_file = "02_parsed/01_sitting_1000frames.xlsx"
xl = pd.ExcelFile(excel_file)

frames_data = []

# Iterate over each sheet in the Excel file
for sheet_name in xl.sheet_names:
    df = xl.parse(sheet_name)
    
    # Extracting specific rows and excluding the first column and row
    frame_data = df.iloc[0:3, 1:].values
    
    # Remove columns where all values (excluding the first row) are either zero or NaN
    mask = ~np.all(np.isnan(frame_data) | (frame_data == 0), axis=0)
    frame_data = frame_data[:, mask]
    
    frames_data.append(frame_data)

## 3. Display dataset head

In [None]:
print("Data for Frame 1:")
frame_data = frames_data[0]  # Frame indexing starts from 0
print(frame_data)

## A. Data Collection
### A1. Read Parsed Data

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

# Function to load and clean data from an Excel file
def load_frame_data(excel_file):
    xl = pd.ExcelFile(excel_file)
    frame_data_list = []
    for sheet_name in xl.sheet_names:
        df = xl.parse(sheet_name)
        frame_data = df.iloc[0:3, 1:].values
        mask = ~np.all(np.isnan(frame_data) | (frame_data == 0), axis=0)
        frame_data = frame_data[:, mask]
        # Ensure that there are at least 3 rows (x, y, z)
        if frame_data.shape[0] == 3:
            frame_data_list.append(frame_data)
    return frame_data_list


# Load data for sitting, lying, and curling
excel_file_sitting = f"02_parsed/01_sitting_1000frames.xlsx"
excel_file_lying = f"02_parsed/01_lying_1000frames.xlsx"
excel_file_curling = f"02_parsed/01_curling_1000frames.xlsx"

frame_data_sitting = load_frame_data(excel_file_sitting)
frame_data_lying = load_frame_data(excel_file_lying)
frame_data_curling = load_frame_data(excel_file_curling)

'''
print("Sitting Data for Frame 1:")
frame_data = frame_data_sitting[0]
print(frame_data)

print("Lying Data for Frame 1:")
frame_data = frame_data_lying[0]
print(frame_data)

print("Curling Data for Frame 1:")
frame_data = frame_data_curling[0]
print(frame_data)
'''

### A2. Visualize

In [None]:
# Define the function to plot the data for a given frame range
def plot_xz_plane(frame_data_sitting, frame_data_lying, frame_data_curling, start_frame=1, end_frame=1000):
    fig, axes = plt.subplots(1, 3, figsize=(20, 7))

    # Plot sitting data
    for i in range(start_frame - 1, min(end_frame, len(frame_data_sitting))):
        frame_data = frame_data_sitting[i]
        x = frame_data[0]
        z = frame_data[2]
        axes[0].scatter(x, z, s=1)

    # Plot lying data
    for i in range(start_frame - 1, min(end_frame, len(frame_data_lying))):
        frame_data = frame_data_lying[i]
        x = frame_data[0]
        z = frame_data[2]
        axes[1].scatter(x, z, s=1)

    # Plot curling data
    for i in range(start_frame - 1, min(end_frame, len(frame_data_curling))):
        frame_data = frame_data_curling[i]
        x = frame_data[0]
        z = frame_data[2]
        axes[2].scatter(x, z, s=1)

    axes[0].set_title('Sitting')
    axes[1].set_title('Lying')
    axes[2].set_title('Curling')

    for ax in axes:
        ax.set_xlabel('X')
        ax.set_ylabel('Z')
        ax.grid(True)

    plt.tight_layout()
    plt.show()

# Plot the XZ plane for frames 1 to 1000
plot_xz_plane(frame_data_sitting, frame_data_lying, frame_data_curling, start_frame=1, end_frame=1000)

### A3. Data Labelling & Cleaning

- Selective Cropping x > 2 and z < 6

In [17]:
# Function to filter the data based on given conditions
def filter_frame_data(frame_data_list):
    filtered_data_list = []
    for frame_data in frame_data_list:
        x = frame_data[0]
        z = frame_data[2]
        mask = (x >= 0) & (x <= 2) & (z >= 6) & (z <= 9)
        filtered_frame_data = frame_data[:, mask]
        filtered_data_list.append(filtered_frame_data)
    return filtered_data_list

# Filter the data for sitting, lying, and curling
filtered_data_sitting = filter_frame_data(frame_data_sitting)
filtered_data_lying = filter_frame_data(frame_data_lying)
filtered_data_curling = filter_frame_data(frame_data_curling)

- display

In [None]:
# Function to plot the filtered data for a given frame range
def plot_filtered_xz_plane(filtered_data_sitting, filtered_data_lying, filtered_data_curling, start_frame=1, end_frame=1000):
    fig, axes = plt.subplots(1, 3, figsize=(20, 7))

    # Plot filtered sitting data
    for i in range(start_frame - 1, min(end_frame, len(filtered_data_sitting))):
        frame_data = filtered_data_sitting[i]
        x = frame_data[0]
        z = frame_data[2]
        axes[0].scatter(x, z, s=1)

    # Plot filtered lying data
    for i in range(start_frame - 1, min(end_frame, len(filtered_data_lying))):
        frame_data = filtered_data_lying[i]
        x = frame_data[0]
        z = frame_data[2]
        axes[1].scatter(x, z, s=1)

    # Plot filtered curling data
    for i in range(start_frame - 1, min(end_frame, len(filtered_data_curling))):
        frame_data = filtered_data_curling[i]
        x = frame_data[0]
        z = frame_data[2]
        axes[2].scatter(x, z, s=1)

    axes[0].set_title('Sitting')
    axes[1].set_title('Lying')
    axes[2].set_title('Curling')

    for ax in axes:
        ax.set_xlabel('X')
        ax.set_ylabel('Z')
        ax.set_xlim(0, 2)
        ax.set_ylim(6, 10)
        ax.grid(True)

    plt.tight_layout()
    plt.show()

# Plot the filtered XZ plane for frames 1 to 1000
plot_filtered_xz_plane(filtered_data_sitting, filtered_data_lying, filtered_data_curling, start_frame=1, end_frame=1000)


- DBSCAN clustering to remove outlier

In [20]:
from sklearn.cluster import DBSCAN

# Function to concatenate XZ data from all frames into a single array
def concatenate_frame_data(filtered_data_list):
    concatenated_data = []
    for frame_data in filtered_data_list:
        x = frame_data[0]
        z = frame_data[2]
        points = np.vstack((x, z)).T
        concatenated_data.append(points)
    return np.concatenate(concatenated_data, axis=0)

# Concatenate data for sitting, lying, and curling
concat_data_sitting = concatenate_frame_data(filtered_data_sitting)
concat_data_lying = concatenate_frame_data(filtered_data_lying)
concat_data_curling = concatenate_frame_data(filtered_data_curling)

# Perform DBSCAN clustering to remove outliers
def dbscan_clustering(data, eps=0.025, min_samples=5):
    clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(data)
    labels = clustering.labels_
    # Separate inliers and outliers
    inliers = data[labels != -1]
    outliers = data[labels == -1]
    return inliers, outliers

# Apply DBSCAN clustering on concatenated data
inliers_sitting, outliers_sitting = dbscan_clustering(concat_data_sitting)
inliers_lying, outliers_lying = dbscan_clustering(concat_data_lying)
inliers_curling, outliers_curling = dbscan_clustering(concat_data_curling)

- visualize clustering results

In [None]:
# Function to plot the filtered data with clustering results for a given frame range
def plot_clustered_xz_plane(inliers_sitting, inliers_lying, inliers_curling, start_frame=1, end_frame=1000):
    fig, axes = plt.subplots(1, 3, figsize=(20, 7))

    # Plot clustered sitting data
    axes[0].scatter(inliers_sitting[:, 0], inliers_sitting[:, 1], s=1)
    axes[0].set_title('Sitting')

    # Plot clustered lying data
    axes[1].scatter(inliers_lying[:, 0], inliers_lying[:, 1], s=1)
    axes[1].set_title('Lying')

    # Plot clustered curling data
    axes[2].scatter(inliers_curling[:, 0], inliers_curling[:, 1], s=1)
    axes[2].set_title('Curling')

    for ax in axes:
        ax.set_xlabel('X')
        ax.set_ylabel('Z')
        ax.set_xlim(0, 2)
        ax.set_ylim(6, 10)
        ax.grid(True)

    plt.tight_layout()
    plt.show()

# Plot the clustered XZ plane data
plot_clustered_xz_plane(inliers_sitting, inliers_lying, inliers_curling)

- Reassign Inlier Points to Original Frames

In [22]:
def reassign_inliers_to_frames(inliers, original_data):
    frame_data_list = [np.zeros((3, 0)) for _ in range(len(original_data))]
    for inlier in inliers:
        x, z = inlier
        for i, frame_data in enumerate(original_data):
            mask = (frame_data[0] == x) & (frame_data[2] == z)
            if np.any(mask):
                frame_data_list[i] = np.hstack((frame_data_list[i], frame_data[:, mask]))
                break
    return frame_data_list

# Reassign inliers to original frames
filtered_data_sitting_inliers = reassign_inliers_to_frames(inliers_sitting, filtered_data_sitting)
filtered_data_lying_inliers = reassign_inliers_to_frames(inliers_lying, filtered_data_lying)
filtered_data_curling_inliers = reassign_inliers_to_frames(inliers_curling, filtered_data_curling)

- Apply Sliding Window (window size of 5 and a shift of 1)

In [23]:
def apply_sliding_window(data_list, window_size=5, shift=1):
    sliding_window_data = []
    for i in range(0, len(data_list) - window_size + 1, shift):
        window_frames = data_list[i:i + window_size]
        combined_frame = np.hstack(window_frames)
        sliding_window_data.append(combined_frame)
    return sliding_window_data

# Apply sliding window on inlier data
sliding_window_data_sitting = apply_sliding_window(filtered_data_sitting_inliers)
sliding_window_data_lying = apply_sliding_window(filtered_data_lying_inliers)
sliding_window_data_curling = apply_sliding_window(filtered_data_curling_inliers)

- remove empty frames

In [None]:
def remove_empty_frames_after_sliding_window(sliding_window_data_list, activity_name):
    print(f"Initial number of frames in {activity_name} after sliding window: {len(sliding_window_data_list)}")
    filtered_frames = [frame for frame in sliding_window_data_list if frame.shape[1] > 0]
    print(f"Number of frames in {activity_name} after removing empty frames: {len(filtered_frames)}")
    return filtered_frames

# Remove empty frames after sliding window
nonempty_data_sitting = remove_empty_frames_after_sliding_window(sliding_window_data_sitting, "sitting")
nonempty_data_lying = remove_empty_frames_after_sliding_window(sliding_window_data_lying, "lying")
nonempty_data_curling = remove_empty_frames_after_sliding_window(sliding_window_data_curling, "curling")

### A4. Data normalization

In [25]:
def normalize_frames(filtered_frames):
    normalized_frames = []

    # Define the maximum sequence length
    max_length = max(len(frame[0]) for frame in filtered_frames)

    # Iterate over each frame in filtered_frames
    for frame in filtered_frames:
        # Extract X and Z values from the frame's data
        x_values = frame[0]
        z_values = frame[2]

        # Pad or truncate the sequences to match the maximum length
        padded_x_values = np.pad(x_values, (0, max_length - len(x_values)), mode='constant', constant_values=0)
        padded_z_values = np.pad(z_values, (0, max_length - len(z_values)), mode='constant', constant_values=0)

        # Normalize X and Z values within the specified range
        normalized_x = (padded_x_values - 0) / (2 - 0)  # Normalize within 0 to 2 range
        normalized_z = (padded_z_values - 6) / (10 - 6)  # Normalize within 6 to 10 range

        # Create a mask to filter out Z values above 0.8 and below 0.4
        mask = (normalized_z >= 0.4) & (normalized_z <= 0.8)

        # Apply the mask to the normalized values
        filtered_x = normalized_x[mask]
        filtered_z = normalized_z[mask]

        # Append filtered normalized X and Z values to the normalized frames
        normalized_frames.append([filtered_x, filtered_z])

    return normalized_frames

# Normalize the data for sitting, lying, and curling
normalized_data_sitting = normalize_frames(nonempty_data_sitting)
normalized_data_lying = normalize_frames(nonempty_data_lying)
normalized_data_curling = normalize_frames(nonempty_data_curling)

- Choose x random frames from each pose

In [26]:
import random

def select_random_frames(normalized_data, num_frames, seed=42):
    random.seed(seed)
    random.shuffle(normalized_data)
    return normalized_data[:num_frames]

# Select 300 random frames for each pose
selected_data_sitting = select_random_frames(normalized_data_sitting, num_frames=300)
selected_data_lying = select_random_frames(normalized_data_lying, num_frames=300)
selected_data_curling = select_random_frames(normalized_data_curling, num_frames=300)


- Visualize normalized data

In [None]:
# Function to plot the selected data for a given frame range
def plot_selected_xz_plane(selected_data_sitting, selected_data_lying, selected_data_curling):
    fig, axes = plt.subplots(1, 3, figsize=(20, 7))

    # Plot selected sitting data
    for frame_data in selected_data_sitting:
        x = frame_data[0]
        z = frame_data[1]
        axes[0].scatter(x, z, s=1)

    # Plot selected lying data
    for frame_data in selected_data_lying:
        x = frame_data[0]
        z = frame_data[1]
        axes[1].scatter(x, z, s=1)

    # Plot selected curling data
    for frame_data in selected_data_curling:
        x = frame_data[0]
        z = frame_data[1]
        axes[2].scatter(x, z, s=1)

    axes[0].set_title('Sitting')
    axes[1].set_title('Lying')
    axes[2].set_title('Curling')

    for ax in axes:
        ax.set_xlabel('X')
        ax.set_ylabel('Z')
        ax.set_xlim(0, 1)
        ax.set_ylim(0, 1)
        ax.grid(True)

    plt.tight_layout()
    plt.show()

# Plot the selected XZ plane data
plot_selected_xz_plane(selected_data_sitting, selected_data_lying, selected_data_curling)

### A6. Convert to image


In [32]:
import os
import numpy as np
import matplotlib.pyplot as plt

# Function to create occupancy grid image for each frame
def create_occupancy_grid_for_frames(slide_array, num_frames, output_folder):
    # Ensure the output subfolder exists
    os.makedirs(output_folder, exist_ok=True)

    # Iterate over each frame in the sequence
    for frame_index in range(num_frames):
        frame_data = slide_array[frame_index]

        # Define grid size
        grid_size = 100

        # Create occupancy grid
        occupancy_grid = np.zeros((grid_size, grid_size))

        # Scale normalized XZ coordinates to fit within the grid size
        x_values = np.clip(frame_data[0] * (grid_size - 1), 0, grid_size - 1)
        z_values = np.clip(frame_data[1] * (grid_size - 1), 0, grid_size - 1)

        # Update occupancy grid
        for x, z in zip(x_values, z_values):
            grid_x = int(x)
            grid_y = int(z)
            occupancy_grid[grid_y, grid_x] = 1

        # Create figure and plot occupancy grid
        fig, ax = plt.subplots(figsize=(grid_size * 1.3 / 100, grid_size * 1.3 / 100))
        ax.imshow(occupancy_grid, cmap='gray', origin='lower', extent=[0, grid_size, 0, grid_size])
        ax.axis('off')

        # Save the figure without margins
        plt.savefig(os.path.join(output_folder, f"frame_{frame_index + 1}.png"), bbox_inches='tight', pad_inches=0)
        plt.close(fig)  # Close the figure explicitly

# Specify the output folder where the images will be saved
output_folder = f"03_occupancy_grid/01_set"

# Ensure the output folder exists
os.makedirs(output_folder, exist_ok=True)

# Define number of frames for each dataset
num_frames_sitting = len(selected_data_sitting)
num_frames_lying = len(selected_data_lying)
num_frames_curling = len(selected_data_curling)

# Call the function to generate and save images for each dataset
create_occupancy_grid_for_frames(selected_data_sitting, num_frames_sitting, os.path.join(output_folder, "sitting"))
create_occupancy_grid_for_frames(selected_data_lying, num_frames_lying, os.path.join(output_folder, "lying"))
create_occupancy_grid_for_frames(selected_data_curling, num_frames_curling, os.path.join(output_folder, "curling"))
