In [2]:
import cv2
import numpy as np
import pandas as pd
import os
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
import seaborn as sns
import matplotlib.pyplot as plt
import math

# COCO body parts and pose pairs
BODY_PARTS = {"Nose": 0, "Neck": 1, "RShoulder": 2, "RElbow": 3, "RWrist": 4,
              "LShoulder": 5, "LElbow": 6, "LWrist": 7, "RHip": 8, "RKnee": 9,
              "RAnkle": 10, "LHip": 11, "LKnee": 12, "LAnkle": 13, "REye": 14,
              "LEye": 15, "REar": 16, "LEar": 17, "Background": 18}

# Load pre-trained pose estimation model
net = cv2.dnn.readNetFromTensorflow("graph_opt.pb")

# Function to calculate distance between two points
def calculate_distance(a, b):
    return math.sqrt((a[0] - b[0]) ** 2 + (a[1] - b[1]) ** 2)

# Function to extract pose features from a frame
def extract_pose_features(frame):
    inWidth = 368
    inHeight = 368
    frameWidth = frame.shape[1]
    frameHeight = frame.shape[0]

    inp = cv2.dnn.blobFromImage(frame, 1.0, (inWidth, inHeight), (127.5, 127.5, 127.5), swapRB=True, crop=False)
    net.setInput(inp)
    out = net.forward()
    out = out[:, :19, :, :]

    points = []
    threshold = 0.1
    for i in range(len(BODY_PARTS)):
        heatMap = out[0, i, :, :]
        _, conf, _, point = cv2.minMaxLoc(heatMap)
        x = (frameWidth * point[0]) / out.shape[3]
        y = (frameHeight * point[1]) / out.shape[2]
        if conf > threshold:
            points.append((int(x), int(y)))
        else:
            points.append(None)

    # Define features: distances between keypoints
    keypoint_pairs = [
        ("RWrist", "LWrist"), ("RWrist", "Neck"), ("LWrist", "Neck"),
        ("Neck", "Nose"), ("Nose", "REye"), ("Nose", "LEye"),
        ("RHip", "RKnee"), ("RKnee", "RAnkle"), ("LHip", "LKnee"), ("LKnee", "LAnkle")
    ]

    features = []
    for kp1, kp2 in keypoint_pairs:
        idx1, idx2 = BODY_PARTS[kp1], BODY_PARTS[kp2]
        if points[idx1] and points[idx2]:
            distance = calculate_distance(points[idx1], points[idx2])
            features.append(distance)
        else:
            features.append(0)  # Use 0 if keypoints are not found

    return features

# Function to process a single video and corresponding CSV
def process_video(video_path, csv_path, fps=15):
    cap = cv2.VideoCapture(video_path)
    features_list = []
    
    # Load the CSV gesture values without headers
    gesture_df = pd.read_csv(csv_path, header=None)
    
    # Flatten the dataframe columns to create a single sequence of gesture values
    gesture_values = gesture_df.values.flatten()  
    
    frame_num = 0
    frame_rate = 1 / fps 
    num_frames = len(gesture_values)  # The number of frames to process should match the number of gesture values

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
  
    print(f"Video: {video_path}, Frames in Video: {total_frames}, Gesture Values in CSV: {num_frames}")
    
    # Ignore if the number of frames and number of gesture values do not match
    if total_frames != num_frames:
        print(f"Skipping {video_path} due to mismatch in number of frames and gesture values.")
        cap.release()
        return None, None
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Extract pose features from the frame every 15 FPS
        if frame_num % round(cap.get(cv2.CAP_PROP_FPS) * frame_rate) == 0:
            features = extract_pose_features(frame)
            if features:
                features_list.append(features)
        
        frame_num += 1
        
        # Stop if we have extracted enough frames matching the gesture values
        if len(features_list) >= num_frames:
            break

    cap.release()
    return np.array(features_list), gesture_values[:len(features_list)]

# Function to process all videos and CSV files in a folder
def process_all_videos_and_csvs(folder_path, fps=15):
    all_sequences = []
    all_labels = []

    for file_name in os.listdir(folder_path):
        if file_name.endswith('.mp4'):
            video_path = os.path.join(folder_path, file_name)
            csv_path = video_path.replace('.mp4', '.csv')

            if os.path.exists(csv_path):
                print(f"Processing: {file_name} and {os.path.basename(csv_path)}")
                video_features, video_labels = process_video(video_path, csv_path, fps)

                # Skip if the size does not match
                if video_features is None or video_labels is None:
                    continue

                # Sequence length for LSTM
                sequence_length = 10

                # Prepare sequences from the video features
                for i in range(len(video_features) - sequence_length):
                    all_sequences.append(video_features[i:i + sequence_length])
                    all_labels.append(video_labels[i + sequence_length - 1])  # Label from the last frame in the sequence

    return np.array(all_sequences), np.array(all_labels)

# Path to the folder containing videos and CSV files
folder_path = 'train1'  

# Process all videos and CSVs
sequences, labels = process_all_videos_and_csvs(folder_path)

# Normalize features
if sequences.shape[0] > 0:  
    scaler = StandardScaler()
    num_features = sequences.shape[2]
    sequences = scaler.fit_transform(sequences.reshape(-1, num_features)).reshape(-1, sequences.shape[1], num_features)

    # Encode labels
    label_encoder = LabelEncoder()
    encoded_labels = label_encoder.fit_transform(labels)

    # Split the data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(sequences, encoded_labels, test_size=0.2, random_state=42)

    # Build the LSTM model
    model = Sequential()
    model.add(LSTM(64, input_shape=(X_train.shape[1], X_train.shape[2]), return_sequences=False))
    model.add(Dense(32, activation='relu'))
    model.add(Dense(len(np.unique(y_train)), activation='softmax'))

    # Compile the model
    model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    # Train the model
    model.fit(X_train, y_train, epochs=30, batch_size=32, validation_data=(X_test, y_test))

    # Save the trained model
    model.save('gesture_recognition_model.keras')  

    # Evaluate the model
    loss, accuracy = model.evaluate(X_test, y_test)
    print(f"Test accuracy: {accuracy * 100:.2f}%")

    # Predict the labels for the test set
    y_pred = model.predict(X_test)
    
    y_pred_classes = np.argmax(y_pred, axis=1)  

    # Calculate the accuracy score
    accuracy = accuracy_score(y_test, y_pred_classes)
    print(f"Test accuracy: {accuracy * 100:.2f}%")

    # Generate the confusion matrix
    conf_matrix = confusion_matrix(y_test, y_pred_classes)

    # Plot the confusion matrix
    plt.figure(figsize=(10, 7))
    sns.heatmap(conf_matrix, annot=True, fmt="d", cmap='Blues', xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.show()

    # Generate the classification report
    class_report = classification_report(y_test, y_pred_classes, target_names=label_encoder.classes_)
    print("Classification Report:\n", class_report)

else:
    print("No valid sequences were found during processing.")

Processing: 009.mp4 and 009.csv
Video: train1/009.mp4, Frames in Video: 6745, Gesture Values in CSV: 6745


KeyboardInterrupt: 

In [None]:
import matplotlib.pyplot as plt


def plot_training_curves(history):
    # Plot training & validation accuracy values
    plt.figure(figsize=(14, 5))
    
    # Accuracy plot
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend(loc='best')
    
    # Loss plot
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend(loc='best')
    
    plt.tight_layout()
    plt.show()


history = model.fit(X_train, y_train, epochs=50, validation_data=(X_test, y_test))
plot_training_curves(history)

In [None]:
from sklearn.preprocessing import LabelEncoder
import numpy as np

# Define the custom gesture labels
gesture_labels = ["Idle", "Stop", "Pass", "Turn left", "Left wait", "Turn right", "Change Lane", "Slow down", "Get off"]

# Manually assign these labels to the LabelEncoder
label_encoder = LabelEncoder()
label_encoder.classes_ = np.array(gesture_labels)




y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)  # Get the class with highest probability

# Calculate the accuracy score
accuracy = accuracy_score(y_test, y_pred_classes)
print(f"Test accuracy: {accuracy * 100:.2f}%")

# Generate the confusion matrix (raw)
conf_matrix = confusion_matrix(y_test, y_pred_classes)

# Normalize the confusion matrix to display values between 0 and 1
conf_matrix_normalized = conf_matrix.astype('float') / conf_matrix.sum(axis=1)[:, np.newaxis]

# Plot the normalized confusion matrix (0-1 values)
plt.figure(figsize=(10, 7))
sns.heatmap(conf_matrix_normalized, annot=True, fmt=".2f", cmap='Blues', xticklabels=gesture_labels, yticklabels=gesture_labels)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Normalized Confusion Matrix')
plt.show()

# Generate the classification report
try:
    class_report = classification_report(y_test, y_pred_classes, target_names=gesture_labels)
    print("Classification Report:\n", class_report)
except ValueError as e:
    print(f"Error: {e}. Check that `y_test` and `gesture_labels` have matching labels.")

In [None]:
import matplotlib.pyplot as plt


def plot_training_curves(history):
    # Plot training & validation accuracy values
    plt.figure(figsize=(14, 5))
    
    # Accuracy plot
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend(loc='best')
    
    # Loss plot
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend(loc='best')
    
    plt.tight_layout()
    plt.show()


history = model.fit(X_train, y_train, epochs=10, validation_data=(X_test, y_test))
plot_training_curves(history)