In [None]:
import cv2
import numpy as np
from matplotlib import pyplot as plt
import time
import mediapipe as mp
import tensorflow as tf
import os
import random
from collections import deque


from sklearn.model_selection import train_test_split

from tensorflow.keras.layers import *
from tensorflow.keras.models import Sequential,load_model
from tensorflow.keras.utils import to_categorical , plot_model
from tensorflow.keras.callbacks import EarlyStopping,ModelCheckpoint


In [None]:
#holistic model
mp_holistic = mp.solutions.holistic
#for drawing
mp_drawing = mp.solutions.drawing_utils


In [None]:
import zipfile
# Specify the path to the zip file
zip_file_path = 'Dataset.zip'

# Specify the directory where you want to extract the files
extracted_dir = 'extracted_data/'

In [None]:
# Create the directory if it doesn't exist
os.makedirs(extracted_dir, exist_ok=True)

# Extract the files from the zip archive
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extracted_dir)

# List the extracted files
extracted_files = os.listdir(extracted_dir)

# Process each extracted file as needed
for file_name in extracted_files:
    file_path = os.path.join(extracted_dir, file_name)
    # Do something with the file (e.g., read, process)
    print(f"Processing file: {file_path}")

In [None]:
# Specify the directory containing the exercise folders
ucf50_dir = os.path.join(extracted_dir,'Dataset')

# List the exercise folders in the UCF50 directory
exercises = os.listdir(ucf50_dir)

# Print the names of the exercise folders
for exercise_folder in exercises:
    print(exercise_folder)

In [None]:
DATA_PATH = 'extracted_data/Dataset/'
ACTIONS = np.array(['JumpingJack','PullUps','PushUps','Squats','Situp'])
SEQUENCE_LENGTH = 30 # 30 Frames
DATA_PATH

In [None]:
random_range = random.sample(range(len(exercises)),5)
plt.figure(figsize=(50,50))

for counter,random_index in enumerate(random_range,1):
    selected_class_name = exercises[random_index]
    video_names = os.listdir(os.path.join(DATA_PATH,selected_class_name))
    video_file = random.choice(video_names)
    video_reader = cv2.VideoCapture(os.path.join(DATA_PATH,selected_class_name,video_file))
    _,bgr_frame  = video_reader.read()

    video_reader.release()

    rgb_frame = cv2.cvtColor(bgr_frame,cv2.COLOR_BGR2RGB)
    cv2.putText(rgb_frame,selected_class_name,(10,30),cv2.FONT_HERSHEY_SIMPLEX,1,(255,0,0),2)
    plt.subplot(5,4,counter)
    plt.imshow(rgb_frame)
    plt.axis('off')



In [None]:
minimum_class = 100000000000000
for cls in ACTIONS:
    minimum_class = min(minimum_class,len(os.listdir(os.path.join(DATA_PATH,cls))))
    print(f'class name : {cls} | {len(os.listdir(os.path.join(DATA_PATH,cls)))}')

In [None]:
#only we takes pose to make it Dense as much as possiable
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    return np.array(pose)


In [None]:
def mediapipe_detection(image,model):
    image = cv2.cvtColor(image,cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image) #make prediction
    image.flags.writeable = True
    image = cv2.cvtColor(image,cv2.COLOR_RGB2BGR)
    return image,results   

In [None]:
#only we takes pose to make it Dense as much as possiable
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    return np.array(pose)


In [None]:
def mediapipe_detection(image,model):
    image = cv2.cvtColor(image,cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image) #make prediction
    image.flags.writeable = True
    image = cv2.cvtColor(image,cv2.COLOR_RGB2BGR)
    return image,results 


In [None]:
def draw_landmarks(image,results):
    mp_drawing.draw_landmarks(image,results.pose_landmarks,mp_holistic.POSE_CONNECTIONS)
    #mp_drawing.draw_landmarks(image,results.left_hand_landmarks,mp_holistic.HAND_CONNECTIONS)
    #mp_drawing.draw_landmarks(image,results.right_hand_landmarks,mp_holistic.HAND_CONNECTIONS)

In [None]:
def frames_extraction(video_path):
    keypoints_list = []
    video_reader = cv2.VideoCapture(video_path)
    video_frames_num = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))#property
    skip_frames_window = max(int(video_frames_num/SEQUENCE_LENGTH),1)
    with mp_holistic.Holistic(min_detection_confidence=0.5,min_tracking_confidence=0.5) as holistic:

        for frame_counter in range(SEQUENCE_LENGTH):
            video_reader.set(cv2.CAP_PROP_POS_FRAMES,frame_counter*skip_frames_window)

            ok,frame = video_reader.read()
            if not ok:
                break

            image , result = mediapipe_detection(image=frame,model=holistic)
            keypoints = extract_keypoints(result)


            keypoints_list.append(keypoints)

        video_reader.release()
        return keypoints_list


In [None]:
def Create_dataset(data_path,Classes_List):
    features = []
    labels = []
    video_paths = []
    for class_index,class_name in enumerate(Classes_List):
        print(f'Extracting Data of Class: {class_name}')
        files_list = os.listdir(os.path.join(data_path,class_name))
        for filename in files_list:
            video_path = os.path.join(data_path,class_name,filename)
            frames = frames_extraction(video_path=video_path)
            if len(frames) == SEQUENCE_LENGTH:
                features.append(frames)
                labels.append(class_index)
                video_paths.append(video_path)

    return features, labels, video_paths  

In [None]:
features,labels,video_paths = Create_dataset(data_path=DATA_PATH,Classes_List=ACTIONS)

In [None]:
import pickle

In [1]:
#features

In [18]:
features = np.array(features) #no. of videos , seq_len,number of features

In [None]:
features.shape

In [None]:
with open('features.pkl', 'wb') as file:
    pickle.dump(features, file)

In [None]:
with open('features.pkl', 'rb') as file:
    features = pickle.load(file)

In [None]:
with open('labels.pkl', 'wb') as file:
    pickle.dump(labels, file)

In [None]:
with open('labels.pkl', 'rb') as file:
    labels = pickle.load(file)

In [24]:
one_hot = to_categorical(labels)

In [None]:
one_hot

In [26]:
#train test
features_train , features_test , labels_train , labels_test = train_test_split(
    features,
    one_hot,
    test_size=0.15,
    shuffle=True,
    random_state=27
)

In [None]:
print(f'train features shape : {features_train.shape} | labels train shape : {labels_train.shape}')

In [None]:
print(f'test features shape : {features_test.shape} | labels test shape : {labels_test.shape}')

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM,Dense
from tensorflow.keras.callbacks import TensorBoard

In [None]:
pip install keras-tuner

In [None]:
import keras_tuner

In [None]:
pip install seaborn

In [51]:
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from keras.callbacks import Callback

class ConfusionMatrixCallback(Callback):
    def __init__(self, model, features, labels):
        super(ConfusionMatrixCallback, self).__init__()
        self.model = model
        self.features = features
        self.labels = labels
        
    def on_epoch_end(self, epoch, logs=None):
        predictions = np.argmax(self.model.predict(self.features), axis=1)
        true_labels = np.argmax(self.labels, axis=1)
        cm = confusion_matrix(true_labels, predictions)
        plt.figure(figsize=(8, 6))
        sns.heatmap(cm, annot=True, cmap='Blues', fmt='g')
        plt.xlabel('Predicted labels')
        plt.ylabel('True labels')
        plt.title('Confusion Matrix')
        plt.show()

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
import keras_tuner

def create_LSTM_model(hp):
    model = Sequential()
    model.add(LSTM(64, return_sequences=True, activation=hp.Choice('activation', ['relu', 'tanh']), input_shape=(30, 132)))
    model.add(LSTM(128, return_sequences=True, activation=hp.Choice('activation', ['relu', 'tanh'])))
    model.add(LSTM(64, return_sequences=False, activation=hp.Choice('activation', ['relu', 'tanh'])))
    model.add(Dense(hp.Choice('units', [16, 32, 64]), activation='relu'))
    model.add(Dense(hp.Choice('units', [8, 16, 32]), activation='relu'))
    model.add(Dense(ACTIONS.shape[0], activation='softmax'))
    
    learning_rate = hp.Float("lr", min_value=1e-4, max_value=1e-2, sampling="log", default=1e-3)
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    
    model.compile(
        optimizer=optimizer,
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )
    
    model.summary()
    return model



In [None]:
tuner = keras_tuner.RandomSearch(
    create_LSTM_model,
    objective='val_loss',
    max_trials=5)


In [None]:
tuner.search(features_train, labels_train, epochs=5,validation_split=0.20,verbose=1)
best_model = tuner.get_best_models()[0]

In [88]:
history = best_model.fit(features_train, labels_train, epochs=5, validation_split=0.20, verbose=1)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [None]:
print(best_model.__doc__)

In [None]:
best_model.evaluate(features_test,labels_test,callbacks=[confusion_matrix_callback])

In [None]:
import matplotlib.pyplot as plt

# Assuming you have already trained your 'best_model' and obtained evaluation results
# Replace these with your actual evaluation results
evaluation_results = best_model.evaluate(features_test, labels_test)
loss = evaluation_results[0]  # Assuming loss is the first metric
accuracy = evaluation_results[1]  # Assuming accuracy is the second metric

# Define the metrics and their values for plotting
epochs = range(1, len(history.history['loss']) + 1)
loss_values = history.history['loss']
val_loss_values = history.history['val_loss']
accuracy_values = history.history['accuracy']
val_accuracy_values = history.history['val_accuracy']

# Plotting the evaluation results
plt.figure(figsize=(10, 5))

# Plot loss
plt.subplot(1, 2, 1)
plt.plot(epochs, loss_values, label='Training loss')
plt.plot(epochs, val_loss_values, label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

# Plot accuracy
plt.subplot(1, 2, 2)
plt.plot(epochs, accuracy_values, label='Training accuracy')
plt.plot(epochs, val_accuracy_values, label='Validation accuracy')
plt.title('Training and validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.show()


In [None]:
import matplotlib.pyplot as plt
from keras.models import load_model

# Load the trained model
best_model = load_model('BestModel.keras')

# Evaluate the model on test data
evaluation_results = best_model.evaluate(features_test, labels_test)
loss = evaluation_results[0]  # Assuming loss is the first metric
accuracy = evaluation_results[1]  # Assuming accuracy is the second metric

# Plotting the evaluation results
plt.figure(figsize=(8, 6))

# Plot loss and accuracy on the same figure
plt.bar(['Loss'], [loss], color='blue', label='Loss')
plt.bar(['Accuracy'], [accuracy], color='green', label='Accuracy')

plt.title('Evaluation Results')
plt.ylabel('Value')
plt.legend()
plt.show()


In [None]:
tuner.search_space_summary()

In [None]:
best_model.save('BestModel.keras')

In [None]:
def predict_on_webcam(model, sequence_length=30):
    # Initialize the VideoCapture object to read from the webcam (assuming it's the first webcam, change the index if needed).
    video_capture = cv2.VideoCapture(0)

    # Initialize a variable to store the predicted action being performed in real-time.
    predicted_class_name = ''
    frame_buffer = []

    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        # Iterate until the video capture is active.
        while True:
            # Read the frame from the webcam.
            ok, frame = video_capture.read()

            # Check if frame is not read properly then break the loop.
            if not ok:
                break

            # Pre-process the frame and extract keypoints.
            _ , result = mediapipe_detection(image=frame, model=holistic)
            keypoints = extract_keypoints(result)

            if len(keypoints) > 0:  # Check if keypoints are detected
                frame_buffer.append(keypoints)
                if len(frame_buffer) == sequence_length:
                    # Convert the frame buffer to a numpy array and feed it to the model.
                    frame_sequence = np.array(frame_buffer)[np.newaxis, ...]
                    predicted_labels_probabilities = model.predict(frame_sequence)[0]
                    predicted_label = np.argmax(predicted_labels_probabilities)
                    predicted_class_name = ACTIONS[predicted_label]
                    frame_buffer.pop(0)  # Remove the oldest frame from the buffer

            # Write predicted class name on top of the frame.
            cv2.putText(frame, 'Result:', (0, 115), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            cv2.putText(frame, predicted_class_name, (int(frame.shape[1] / 4), 115), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)

            # Display the frame.
            cv2.imshow('Real-time Action Recognition', frame)

            # Check for 'q' key press to quit.
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

    # Release the video capture object and close all OpenCV windows.
    video_capture.release()
    cv2.destroyAllWindows()



In [60]:
# Call the function
predict_on_webcam(best_model)

