# ACTION RECOGINITION

### Import and Install Dependencies

In [None]:
!pip install tensorflow opencv-python mediapipe scikit-learn matplotlib

In [1]:
import cv2      #video capture
import numpy as np    #data manuplation
import os    #easier file paths
from matplotlib import pyplot as pltb    #for stats graphs
import time    #sleeps between frames we collect
import mediapipe as mp    #key points of face, arm etc.

### Keypoints using MP Holistic

In [2]:
mp_holistic = mp.solutions.holistic     #holistic model
mp_drawing = mp.solutions.drawing_utils    #drawing utilities
mp_face = mp.solutions.face_mesh    #for face connections

In [3]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)    #BGR -> RGB
    image.flags.writeable = False     #image no longer writable
    results = model.process(image)     #make prediction
    image.flags.writeable = True     #image is now writable
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)    #RGB -> BGR
    return image, results

In [4]:
def draw_landmarks(image, results):
    #  draws landmarks easily        the landmarks           connection b/w landmarks
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_face.FACEMESH_TESSELATION)
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)

In [5]:
def draw_styled_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_face.FACEMESH_TESSELATION,
                              mp_drawing.DrawingSpec(color = (80,110,10), thickness = 1,circle_radius = 1),   # landmark
                              mp_drawing.DrawingSpec(color = (80,356,121), thickness = 1,circle_radius = 1))   # connection

    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color = (80,110,10), thickness = 1,circle_radius = 1),   # landmark
                             mp_drawing.DrawingSpec(color = (80,356,121), thickness = 1,circle_radius = 1))   # connection

    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color = (80,110,10), thickness = 1,circle_radius = 1),   # landmark
                              mp_drawing.DrawingSpec(color = (80,356,121), thickness = 1,circle_radius = 1))   # connection

    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color = (80,110,10), thickness = 1,circle_radius = 1),   # landmark
                              mp_drawing.DrawingSpec(color = (80,356,121), thickness = 1,circle_radius = 1))   # connection

In [None]:
from google.colab.patches import cv2_imshow

In [None]:
#VIDEO CAPTURE
cam = cv2.VideoCapture(0)   #access web cam
#mediapipe model          intial detection                tracking further from detection
with mp_holistic.Holistic(min_detection_confidence = 0.5, min_tracking_confidence = 0.5) as holistic:
    while cam.isOpened():    #checks if our web cam working
        ret, frame = cam.read()    #read feed (return val, frame)

        image, results = mediapipe_detection(frame, holistic)     #make detection
        # print(results)

        draw_style_landmarks(image, results)    #draw landmarks

        cv2_imshow('OpenCV Feed', image)   #show to screen

        #breaking gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):    #if key == q we break our loop
            break
cam.release()    #break the web cam
# cv2.destroyAllWindows()     #deletes all the frmaes

In [None]:
results.left_hand_landmarks.landmark   #list
# count
# face_landmarks
# index
# left_hand_landmarks
# mro
# pose_landmarks
# right_hand_landmarks

### Extract Keypoint Values

In [6]:
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    return np.concatenate([pose, face, lh, rh])


### Setup Folders for Collection

In [14]:
# DATA_PATH = os.path.join('MP_Data')    #path of exported data np arrays
DATA_PATH = '/content/MP_Data'
actions = np.array(['Hello', 'Thanks', 'I_love_you', 'Yes', 'No', 'Please', 'Sorry', 'Help',
                    'You', 'Me', 'Stop'])   #actions we need to detect
no_sequence = 30    #30 videos of data
sequence_length = 30    #length of videos


In [None]:
#makes folder for evry action that contains 30 video folders
for action in actions:
    for sequence in range(no_sequence):
        try:
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
        except:
            pass

### Collect Keypoint Values for Training and Testing

In [None]:
cam = cv2.VideoCapture(0)   #access web cam
#mediapipe model          intial detection                tracking further from detection
with mp_holistic.Holistic(min_detection_confidence = 0.5, min_tracking_confidence = 0.5) as holistic:
    for action in actions:    #loops through action
        for sequence in range(no_sequence):    #loop thro seq aka videos
            for frame_num in range(sequence_length):    #loops thro each frame aka video lenth
                ret, frame = cam.read()    #read feed (return val, frame)
                image, results = mediapipe_detection(frame, holistic)     #make detection
                # print(results)
                draw_style_landmarks(image, results)    #draw landmarks

                #break
                if frame_num == 0:
                    cv2.putText(image, 'STARTING COLLECTION', (50, 100),
                                cv2.FONT_HERSHEY_SIMPLEX, 5, (0, 255, 0), 1, cv2.LINE_AA)
                    cv2.putText(image,  'Collecting frames for {} Video number {}'.format(action, sequence),
                                (15, 12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    cv2.waitKey(2500)
                else:
                    cv2.putText(image, 'Collectingframes for {} Video number {}'.format(action, sequence),
                                (15, 12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)

                keypoints = extract_keypoints(results)    #extrats keypoints from result
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))   #frame name, where we save our frame
                np.save(npy_path, keypoints)    #saves keypoints at the path

                cv2.imshow('OpenCV Feed', image)   #show to screen
                #breaking gracefully
                if cv2.waitKey(10) & 0xFF == ord('q'):    #if key == q we break our loop
                    break
cam.release()    #break the web cam
cv2.destroyAllWindows()     #deletes all the frmaes

### Preprocess Data and Create Labels and Features

In [7]:
from sklearn.model_selection import train_test_split    #for splitting data for test and train
from tensorflow.keras.utils import to_categorical    #to convert to one-hot encoding

In [None]:
label_map = {label:num for num, label in enumerate(actions)}    #creates map for actions

In [None]:
label_map

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
sequences, labels = [], []     #videos with there action
for action in actions:
    for sequence in range(no_sequence):
        window = []     #one video of a action
        for frame_num in range(sequence_length):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), f"{frame_num}.npy"))
            window.append(res)     #one frame added each time
        sequences.append(window)      #one video appended
        labels.append(label_map[action])    #for that video it's label(action) is appended

X = np.array(sequences)     #array of all videos
y = to_categorical(labels).astype(int)     #one-hot encoding

np.savez_compressed("X_y_data.npz", X=X, y=y)

In [None]:
data = np.load("X_y_data.npz")
X = data["X"]
y = data["y"]

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.05)

In [None]:
y_test

In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset

# Convert to tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(np.argmax(y_train, axis=1), dtype=torch.long)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(np.argmax(y_test, axis=1), dtype=torch.long)
print(X_train_tensor.shape)
print(X_train_tensor)
# Datasets
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
print(test_dataset)
# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16)


### Build and Train Encoder Transformer model

In [8]:
import torch     #core pytorch lib
import torch.nn as nn     #layers like linear, transformer, ReLu etc.

In [9]:
#                             calls all base class of all models
class SignLanguageTransformer(nn.Module):    #a nn class for our project
#                     (features pre frame)             (no of actions)           (no of attention heads)       (size of ff net in each encoder)
    def __init__(self, input_dim = 1662, seq_len = 30, num_classes = 11, d_model = 512, nhead = 4, num_layers = 2, dim_feedforward = 1024, dropout = 0.1):
#                                  (no of frames in one seq)   (size of each embedding vector)     (no of layers)                       (d rate for reglarization)
        super(SignLanguageTransformer, self).__init__()    #to register layers with PyTorch
        self.input_proj = nn.Linear(input_dim, d_model)    #linear layer projecting i/p features (fame -> low dimensional space)
        self.pos_embedding = nn.Parameter(torch.randn(1, seq_len, d_model))   #learnable pos embedding
        encoder_layer = nn.TransformerEncoderLayer(    #create encoderlayer
            d_model = d_model,    #i/p \ o/p vectors
            nhead = nhead,     #no od attention heads to split each vector
            dim_feedforward = dim_feedforward,    #size of internal MLP(ff block)
            dropout = dropout,    #for regularization
            batch_first = True     #ensure shape(batch, seq_len, features)
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers = num_layers)     #stacks multiple encoder layers
        self.global_pool = nn.AdaptiveAvgPool1d(1)   #when  transformer o/p a seq of 30 frame embeddings, avgs to single vector
        self.classifier = nn.Sequential(    #final classifier
            nn.Linear(d_model, 256),     #projects t o/p 512 -> 256 -> 11 (actions)
            nn.ReLU(),    #better learning
            nn.Dropout(dropout),    #better generization
            nn.Linear(256, num_classes)
        )
    def forward(self, x):    #how i/p passes through network
        x = self.input_proj(x) + self.pos_embedding    #project i/p features (512 -> 256)
        # x -> (batch, 30, 512)
        x = self.transformer(x)   #feeds seq thro trans stack(allows self attention to learn rel b/w frames)
        x = x.permute(0, 2, 1)    #(batch, features, seq_len) as AdaptiveAvgPool1d expects features on axis 1.
        x = self.global_pool(x).squeeze(2)    #o/p -> (batch, 512)
        return self.classifier(x)    #(batch, 11)

In [10]:
import torch.nn.functional as F     #imports the funtional API of nn lib
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")    #checks if GPU avail otherwisw sets cpu
model = SignLanguageTransformer().to(device)    #moves weights to the device selected
criterion = nn.CrossEntropyLoss()   #sets the loss function
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)   #uses Adam and tells it to update all weights
                                       #learning rate = 0.0001    (Adaptive Moment Estimation)

In [None]:
import time

num_epochs = 100

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    correct, total = 0, 0
    start_time = time.time()

    for batch_X, batch_y in train_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)

        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == batch_y).sum().item()
        total += batch_y.size(0)

    train_acc = 100 * correct / total
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss:.4f}, Train Accuracy: {train_acc:.2f}%, Time: {time.time() - start_time:.2f}s")


### Make Predictions & Evaluation

In [None]:
model.eval()
correct, total = 0, 0

with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        outputs = model(batch_X)
        _, predicted = torch.max(outputs.data, 1)
        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()

print(f"Test Accuracy: {100 * correct / total:.2f}%")

from sklearn.metrics import confusion_matrix, accuracy_score
import seaborn as sns
import matplotlib.pyplot as plt

all_preds = []
all_labels = []

with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch = X_batch.to(device)
        outputs = model(X_batch)
        preds = torch.argmax(outputs, dim=1)

        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(y_batch.cpu().numpy())  # fixed line


cm = confusion_matrix(all_labels, all_preds)

plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=actions, yticklabels=actions)
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.title("Confusion Matrix")
plt.show()



### Save Weights

In [16]:
# torch.save(model.state_dict(), 'sign_lang_model_weights_100.00_final.pth')

model = SignLanguageTransformer().to(device)
model.load_state_dict(torch.load('sign_lang_model_weights_94.12.pth', map_location=torch.device('cpu')))
model.eval()

SignLanguageTransformer(
  (input_proj): Linear(in_features=1662, out_features=512, bias=True)
  (transformer): TransformerEncoder(
    (layers): ModuleList(
      (0-1): 2 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=512, out_features=512, bias=True)
        )
        (linear1): Linear(in_features=512, out_features=1024, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=1024, out_features=512, bias=True)
        (norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
      )
    )
  )
  (global_pool): AdaptiveAvgPool1d(output_size=1)
  (classifier): Sequential(
    (0): Linear(in_features=512, out_features=256, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.1, inplace=False)

In [None]:
# !unzip /content/MP_Data.zip -d /content/

### Test in Real Time

In [12]:
from scipy import stats
colors = [
    (245, 117, 16),   # Orange
    (117, 245, 16),   # Lime
    (16, 117, 245),   # Blue
    (255, 0, 0),      # Red
    (0, 255, 0),      # Green
    (0, 0, 255),      # Dark Blue
    (255, 255, 0),    # Yellow
    (255, 0, 255),    # Magenta
    (0, 255, 255),    # Cyan
    (128, 0, 128),    # Purple
    (255, 165, 0)     # Dark Orange
]

def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    return output_frame


In [17]:
import cv2
import numpy as np
import torch
import time
from scipy import stats
import mediapipe as mp
import matplotlib.pyplot as plt

# Real-time detection loop
sequence = []
sentence = []
predictions = []
threshold = 0.5

cap = cv2.VideoCapture(0)
prev_time = time.time()

with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Detect landmarks
        image, results = mediapipe_detection(frame, holistic)
        draw_styled_landmarks(image, results)

        # Extract keypoints & build sequence
        keypoints = extract_keypoints(results)
        sequence.append(keypoints)
        sequence = sequence[-30:]

        # Predict when we have enough frames
        if len(sequence) == 30:
            input_tensor = torch.tensor(np.expand_dims(sequence, axis=0), dtype=torch.float32).to(device)
            with torch.no_grad():
                res = model(input_tensor).detach().cpu().numpy()[0]

            predicted_class = np.argmax(res)
            predictions.append(predicted_class)

            # Debouncing logic: only accept prediction if consistent for 10 frames
            if np.unique(predictions[-10:])[0] == predicted_class:
                if res[predicted_class] > threshold:
                    if len(sentence) == 0 or actions[predicted_class] != sentence[-1]:
                        sentence.append(actions[predicted_class])

            if len(sentence) > 5:
                sentence = sentence[-5:]
        else:
            res = np.zeros(len(actions))  # For visualization before 30 frames

        # Visualize predictions
        image = prob_viz(res, actions, image, colors)

        # Display sentence
        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3,30),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

        # FPS (optional)
        curr_time = time.time()
        fps = 1 / (curr_time - prev_time)
        prev_time = curr_time
        cv2.putText(image, f'FPS: {int(fps)}', (500, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)

        # Show final frame
        cv2.imshow('Sign Language Detection', image)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()