### Portfolio

# Learning of Structured Data

### – Sheet 3 –

This assignment was created by:
- Mark Erokhin (mark.erokhin@study.thws.de) - k68532;
- Anastasiia Demidova   (anastasiia.demidova@study.thws.de) - k67881.

## Exercise 1: (Data) Loading & Visualization

In [1]:
#import nesessary liblraries:
import os
import csv
import random
import matplotlib.pyplot as plt
import numpy as np
import subprocess

In [2]:
path_train="./data/train"
path_test="./data/test"
valid_actions = {"boxing", "drums", "guitar", "rowing", "violin"}
keypoint_names = {
    "0": "Nose       ", "1": "Neck     ", "2": "RShoulder  ", "3": "RElbow      ", "4": "RWrist     ",
    "5": "LShoulder", "6": "LElbow  ", "7": "LWrist    ", "8": "MidHip  ", "9": "RHip       ",
    "10": "RKnee     ", "11": "RAnkle ", "12": "LHip      ", "13": "LKnee     ", "14": "LAnkle ",
    "15": "REye      ", "16": "LEye      ", "17": "REar      ", "18": "LEar      ", "19": "LBigToe",
    "20": "LSmallToe", "21": "LHeel   ", "22": "RBigToe", "23": "RSmallToe", "24": "RHeel  "
}

In [6]:
# class initialization:

class Dataset:
    def __init__(self, path_train, path_test):
        self.data = {"train": {}, "test": {}}
        self._load_data(path_train, "train")
        self._load_data(path_test, "test")
        
    # loads the .csv files in data/train/ and data/test/
    def _load_data(self, directory, dataset_type):
        for file_name in os.listdir(directory): # use listdir to get filenames in nesessary folder
            # print(file_name)
            if file_name.endswith(".csv"): # take files only with .csv extantion
                subject, action, iteration = self._parse_file_name(file_name)
                if action not in valid_actions: # check that isn't used invalid action
                    raise ValueError(f"Invalid action '{action}' in file '{file_name}'. Allowed actions: {valid_actions}")

                file_path = os.path.join(directory, file_name) # create the path to the nesessary file
                sequence = self._load_csv(file_path)

                if subject not in self.data[dataset_type]:
                    self.data[dataset_type][subject] = {}
                if action not in self.data[dataset_type][subject]:
                    self.data[dataset_type][subject][action] = {}

                self.data[dataset_type][subject][action][iteration] = sequence # add sequence in the data object

    # get subject, action, and iteration from filename
    def _parse_file_name(self, file_name):
        base_name = file_name.rsplit(".", 1)[0]
        subject, action, iteration = base_name.split("_")
        return subject, action, iteration

    # read nesessary file row by row
    def _load_csv(self, file_path):
        sequence = []
        with open(file_path, "r") as f:
            reader = csv.reader(f)
            for row in reader:
                sequence.append([float(value) for value in row])
        return sequence
    
    # create column names to dislay:
    def get_column_names(self):
            columns = []
            for i in range(len(keypoint_names)):
                keypoint = keypoint_names[str(i)]
                columns.extend([f"X {keypoint}", f"Y {keypoint}", f"C {keypoint}"])
            return columns
    
    # Ensure all numbers have the same format
    def format_sequence(self, seq):
        formatted_seq = []
        for row in seq:
            formatted_row = [f"{value:.6e}" for value in row]
            formatted_seq.append(formatted_row)
        return formatted_seq
    
    def getSequence(self, dataset, subject, action, iteration):
        try:
            seq = self.data[dataset][subject][action][iteration]
            label = f"{subject}_{action}_{iteration}.csv"
            columns = self.get_column_names()
            formatted_seq = self.format_sequence(seq)
            return seq, label, columns, formatted_seq
        except KeyError:
            raise ValueError(f"Sequence not found for {dataset}/{subject}_{action}_{iteration}.csv")

    def random(self, dataset):
        try:
            subject = random.choice(list(self.data[dataset].keys()))
            action = random.choice(list(self.data[dataset][subject].keys()))
            iteration = random.choice(list(self.data[dataset][subject][action].keys()))
            return self.getSequence(dataset, subject, action, iteration)
        except IndexError:
            raise ValueError(f"No data available in {dataset} dataset")
        
# Visualization ------------------------------------------------------------------------------------------ 
    def visualize(self, dataset, subject, action, iteration, frame, output_dir="visualizations"):
        seq, _, _, _ = self.getSequence(dataset, subject, action, iteration)
        frame_data = seq[frame]

        # Validate that frame_data has a length divisible by 3
        if len(frame_data) % 3 != 0:
            raise ValueError(f"Frame data length ({len(frame_data)}) is not divisible by 3. Ensure the data format is [X, Y, C] triplets.")
        
        x_coords = frame_data[::3]
        y_coords = frame_data[1::3]
        confidences = frame_data[2::3]

        # Skeleton connections
        skeleton = [
            (0, 15), (0, 16), (0, 1), (1, 2), (1, 5), (2, 3), (3, 4),
            (5, 6), (6, 7), (1, 8), (8, 9), (9, 10), (10, 11), (8, 12),
            (12, 13), (13, 14), (11, 24), (11, 22), (14, 19), (14, 21), 
            (17,15), (16,18), (19, 20), (22, 23)
        ]

        plt.figure(figsize=(8, 8))
        # Draw skeleton connections
        for joint1, joint2 in skeleton:
            if confidences[joint1] > 0.5 and confidences[joint2] > 0.5:
                plt.plot(
                    [x_coords[joint1], x_coords[joint2]],
                    [y_coords[joint1], y_coords[joint2]],
                    color=plt.cm.jet(joint1 / len(keypoint_names)),  # Gradient color
                    linewidth=2
                )

        # Scatter keypoints
        plt.scatter(x_coords, y_coords, c=confidences, cmap="viridis", s=100)

        # Annotate keypoints
        for i, name in keypoint_names.items():
            idx = int(i)
            if confidences[idx] > 0.5:  # Only annotate confident keypoints
                plt.text(x_coords[idx], y_coords[idx], name, fontsize=9, ha="right", va="bottom")

        plt.gca().invert_yaxis()
        plt.title(f"{subject} - {action} - Iteration {iteration} - Frame {frame}")
        plt.axis("equal")

        os.makedirs(output_dir, exist_ok=True)
        output_path = os.path.join(output_dir, f"{subject}_{action}_{iteration}_frame_{frame}.png")
        plt.savefig(output_path)
        plt.close()


    def save_video(self, dataset, subject, action, iteration, output_video="output.mp4", frame_rate=30):
        output_dir = "temp_frames"
        os.makedirs(output_dir, exist_ok=True)

        seq, _, _, _ = self.getSequence(dataset, subject, action, iteration)
        for frame in range(len(seq)):
            self.visualize(dataset, subject, action, iteration, frame, output_dir=output_dir)

        subprocess.run([
            "ffmpeg", "-y", "-framerate", str(frame_rate), "-i",
            os.path.join(output_dir, f"{subject}_{action}_{iteration}_frame_%d.png"),
            "-c:v", "libx264", "-pix_fmt", "yuv420p", output_video
        ])

        for file_name in os.listdir(output_dir):
            os.remove(os.path.join(output_dir, file_name))
        os.rmdir(output_dir)



In [4]:

if __name__ == "__main__":
    dataset = Dataset(path_train, path_test)
    seq, label, columns, formatted_seq = dataset.getSequence("train", "p0", "boxing", "01")
    print("Sequence Label:", label)
    print(columns)
    for row in formatted_seq:
        print(row)

    random_seq, random_label, columns, formatted_rand_seq = dataset.random("train")
    print("Random Sequence Label:", random_label)
    print(columns)
    for row in formatted_rand_seq:
        print(row)
        


Sequence Label: p0_boxing_01.csv
['X Nose       ', 'Y Nose       ', 'C Nose       ', 'X Neck     ', 'Y Neck     ', 'C Neck     ', 'X RShoulder  ', 'Y RShoulder  ', 'C RShoulder  ', 'X RElbow      ', 'Y RElbow      ', 'C RElbow      ', 'X RWrist     ', 'Y RWrist     ', 'C RWrist     ', 'X LShoulder', 'Y LShoulder', 'C LShoulder', 'X LElbow  ', 'Y LElbow  ', 'C LElbow  ', 'X LWrist    ', 'Y LWrist    ', 'C LWrist    ', 'X MidHip  ', 'Y MidHip  ', 'C MidHip  ', 'X RHip       ', 'Y RHip       ', 'C RHip       ', 'X RKnee     ', 'Y RKnee     ', 'C RKnee     ', 'X RAnkle ', 'Y RAnkle ', 'C RAnkle ', 'X LHip      ', 'Y LHip      ', 'C LHip      ', 'X LKnee     ', 'Y LKnee     ', 'C LKnee     ', 'X LAnkle ', 'Y LAnkle ', 'C LAnkle ', 'X REye      ', 'Y REye      ', 'C REye      ', 'X LEye      ', 'Y LEye      ', 'C LEye      ', 'X REar      ', 'Y REar      ', 'C REar      ', 'X LEar      ', 'Y LEar      ', 'C LEar      ', 'X LBigToe', 'Y LBigToe', 'C LBigToe', 'X LSmallToe', 'Y LSmallToe', 'C 

In [7]:
if __name__ == "__main__":
    vizualization = Dataset(path_train, path_test)
    vizualization.save_video("train", "p0", "boxing", "01", output_video="p0_boxing_01.mp4")

ValueError: Frame data length (79) is not divisible by 3. Ensure the data format is [X, Y, C] triplets.