# Libraries

In [9]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
# from keras_preprocessing.sequence import pad_sequences

import numpy as np
import os

# Preprocessing

In [13]:
# base path
directory_path = './labels_final'
# current directory
c_dir = os.getcwd()

# all actions
actions = np.array(sorted([folder for folder in os.listdir(directory_path) if os.path.isdir(os.path.join(directory_path, folder))])) # sorted to follow folder arrangement

# specific actions
# actions = np.array(['alligator', 'flower', 'kiss', 'listen', 'orange'])
# actions = np.array(['afternoon', 'house', 'again', 'open', 'kiss', 'sorry'])
# actions = np.array(sorted([folder for folder in os.listdir('./labels_new') if os.path.isdir(os.path.join(directory_path, folder))]))

In [14]:
# create a dictionary for int representation of actions
label_map = {label:num for num, label in enumerate(actions)}
label_map

{'all': 0,
 'base': 1,
 'bye': 2,
 'close': 3,
 'down': 4,
 'for': 5,
 'good': 6,
 'have': 7,
 'hello': 8,
 'how': 9,
 'if': 10,
 'listen': 11,
 'mad': 12,
 'nap': 13,
 'no': 14,
 'noisy': 15,
 'now': 16,
 'please': 17,
 'quiet': 18,
 'sad': 19,
 'show': 20,
 'thankyou': 21,
 'time': 22,
 'we': 23,
 'will': 24,
 'work': 25}

Note that at this point, we will not access the video folder, only the numpy folder.

In [49]:
import cv2
import os
import numpy as np
import mediapipe as mp
import pandas as pd
from Preprocessing.mp_support import mediapipe_detection, draw_landmarks, draw_styled_landmarks, extract_keypoints, extract_coordinates, prob_viz

class VideoPreprocessing():

    def __init__(self) -> None:
        self.savepath = "./Pipeline/video"
        self.clean_folder()
        pass

    def clean_folder(self) -> None:
        # Get a list of all files in the folder
        files = os.listdir(self.savepath)

        # Iterate over the files and remove each one
        for file in files:
            file_path = os.path.join(self.savepath, file)
            os.remove(file_path)

    def process_video(self, video_path) -> pd.DataFrame:
        self.extract_frames_data(video_path)
        return self.convert_to_df()

    def extract_frames_data(self, path) -> None:
        extract_coordinates(path, self.savepath)

    def convert_to_df(self) -> pd.DataFrame:

        # create empty data frame
        df = pd.DataFrame(columns=['frame', 'row_id', 'type', 'landmark_index', 'x', 'y', 'z'])
        
        frame_counter = 1 
        
        # for video
        for frame in os.listdir(self.savepath):
            
            frame_path = os.path.join(self.savepath, frame)
            
            with open(frame_path, 'rb') as file:
                npy_data = np.load(file)
            
                # scan through npy file and add data to df in batches of 3
                local_count = 1
                for j in range(0, 99, 3):
                    new_row = pd.Series({'frame': frame_counter, 'row_id': f"{frame_counter}-pose-{local_count}", 'type': 'pose', 'landmark_index': local_count, 'x': npy_data[j], 'y': npy_data[j+1], 'z': npy_data[j+2]})
                    
                    df = pd.concat([df, new_row.to_frame().T], ignore_index = True, axis = 0)
                    
                    local_count += 1
                
                local_count = 1
                for j in range(99, 162, 3):
                    new_row = pd.Series({'frame': frame_counter, 'row_id': f"{frame_counter}-left_hand-{local_count}", 'type': 'left_hand', 'landmark_index': local_count, 'x': npy_data[j], 'y': npy_data[j+1], 'z': npy_data[j+2]})
                    
                    df = pd.concat([df, new_row.to_frame().T], ignore_index = True, axis = 0)
                    
                    local_count += 1
                
                local_count = 1
                for j in range(162, 225, 3):
                    new_row = pd.Series({'frame': frame_counter, 'row_id': f"{frame_counter}-right_hand-{local_count}", 'type': 'right_hand', 'landmark_index': local_count, 'x': npy_data[j], 'y': npy_data[j+1], 'z': npy_data[j+2]})
                    
                    df = pd.concat([df, new_row.to_frame().T], ignore_index = True, axis = 0)
                    
                    local_count += 1
                
            frame_counter += 1
        
        return df

In [50]:
import sys
sys.path.append("d:\\SMU\\ml&applns")

from Preprocessing.DFTransformations import *
from Preprocessing.DataExtration import *
from Preprocessing.Average_parquet import Averager
from sklearn.preprocessing import LabelBinarizer, StandardScaler

import pandas as pd
import numpy as np
import warnings
import pickle

class Pipeline():

    def __init__(self, path_to_video) -> None:
        self.frames_target = 65
        self.duplicate = True
        self.remove_points = True

        with open('./Preprocessing/standard_scaler.pkl', 'rb') as f:
            self.scaler = pickle.load(f)

        data = VideoPreprocessing().process_video(path_to_video)
        self.data = Averager(data).average_pf()
        pass

In [51]:
path = "./Show/Show1.mp4"
Pipeline(path)

<__main__.Pipeline at 0x18513944910>

In [52]:
import os

sequences, labels = [], []  # sequence -> video, labels -> action

directory_path = "./Pipeline"
action = "/video"
action_counter = 0
window = []         # window -> single frame
no_frames_per_action = len(os.listdir(directory_path+action))
if no_frames_per_action >= 15 and no_frames_per_action < 150:
    action_counter += 1
    for frame_num in range(1, no_frames_per_action + 1):
        res = np.load(directory_path + action + f"/{frame_num}.npy")    # res -> coordinate key points
        window.append(res)
    sequences.append(window)
    labels.append(25)
print('-'*75)

---------------------------------------------------------------------------


In [53]:
# due to difference in number of frames, pad x and y
# x = np.array(pad_sequences(sequences, dtype = 'float', padding = 'post', value = 0))
x = np.array(sequences)
y = to_categorical(labels).astype(int)

# x_train, x_test, y_train, y_test = train_test_split(x, y, test_size = 0.2, stratify = y)

In [54]:

# Define the amount of padding you want for each dimension
pad_amount = [(0, 0), (290, 0), (0, 0)]  # Padding amount for each dimension

# Pad the array
padded_array = np.pad(x, pad_width=pad_amount, mode='constant', constant_values=0)


In [55]:
padded_array.shape

(1, 321, 225)

In [56]:
import tensorflow as tf

In [57]:
model = tf.keras.models.load_model("lstm_model_final.h5")

In [58]:
prediction = model.predict(padded_array)



In [59]:
np.argmax(prediction)

19