In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
import torch.nn as nn
import torch.optim as optim
import cv2
import mediapipe as mp
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report
import math

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cuda


In [3]:
angles = pd.read_csv('angles.csv')
landmarks = pd.read_csv('landmarks.csv')
xyz_distances = pd.read_csv('xyz_distances.csv')
labels = pd.read_csv('labels.csv') 

In [4]:
data = pd.concat([angles, landmarks, xyz_distances], axis=1)
data

Unnamed: 0,vid_id,frame_order,right_elbow_right_shoulder_right_hip,left_elbow_left_shoulder_left_hip,right_knee_mid_hip_left_knee,right_hip_right_knee_right_ankle,left_hip_left_knee_left_ankle,right_wrist_right_elbow_right_shoulder,left_wrist_left_elbow_left_shoulder,vid_id.1,...,z_left_wrist_right_wrist,x_left_ankle_right_ankle,y_left_ankle_right_ankle,z_left_ankle_right_ankle,x_left_hip_avg_left_wrist_left_ankle,y_left_hip_avg_left_wrist_left_ankle,z_left_hip_avg_left_wrist_left_ankle,x_right_hip_avg_right_wrist_right_ankle,y_right_hip_avg_right_wrist_right_ankle,z_right_hip_avg_right_wrist_right_ankle
0,0,0,16.926802,7.667874,18.982162,112.747505,112.62553,112.099300,101.05565,0,...,-14.432587,-9.223137,-1.071556,-2.174812,-3.164901,-34.542374,-2.345901,4.737422,-33.934704,0.018932
1,0,1,14.199318,8.954973,18.966124,109.707190,109.76263,110.645454,102.00027,0,...,-13.150227,-9.346758,-1.539425,-2.993454,-3.677829,-34.017323,-6.151936,4.967526,-32.812057,-3.735701
2,0,2,18.065800,10.315741,17.527954,114.562100,112.08965,113.340350,104.09502,0,...,-8.034512,-9.144169,-1.128830,-0.910919,-4.326368,-33.580940,-3.725380,5.561866,-32.919320,-3.424566
3,0,3,23.270214,17.336140,17.195545,117.674810,115.43172,114.634530,107.38297,0,...,-4.689800,-9.452256,-1.272339,-0.120029,-5.293504,-34.349266,-0.320938,6.583301,-33.541080,-1.500698
4,0,4,22.831680,13.822096,17.355429,117.536720,117.96766,112.306390,98.39078,0,...,-5.634483,-9.299101,-0.966522,-0.713718,-5.696359,-30.211918,-0.966287,7.416256,-30.537058,-2.617974
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
83917,447,296,107.305360,90.115130,51.515130,134.348110,147.88246,136.570850,123.31597,447,...,28.049961,19.578148,-0.674988,16.383720,10.704135,-9.279395,-29.094950,-6.808452,-8.885754,-35.941513
83918,447,297,100.225420,94.343120,53.821750,146.845400,163.03261,140.316000,140.18736,447,...,20.149796,18.968037,-1.072559,20.114521,10.303999,-8.118381,-32.593430,-6.124963,-8.289589,-38.209377
83919,447,298,100.277790,89.232956,49.927440,137.958940,152.91435,136.715000,128.46019,447,...,22.095844,18.483020,-0.213051,12.890191,10.183672,-6.712150,-28.004406,-6.046250,-7.904314,-31.676674
83920,447,299,99.322820,87.257810,49.641770,148.995380,167.36041,138.741460,134.61838,447,...,22.606620,18.282482,-0.910675,13.242300,10.493062,-6.295994,-29.168915,-5.701420,-7.739532,-33.408203


In [5]:
data = data.loc[:, ~data.columns.duplicated()]

In [6]:
data_with_labels = pd.merge(data, labels, on='vid_id', how='inner')

In [7]:
X = data_with_labels.drop(columns=['class', 'vid_id'])  # Drop non-numeric columns
y = data_with_labels['class'] 

In [8]:
y = pd.get_dummies(y).values 

In [9]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [10]:
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

In [11]:
X_train = torch.tensor(X_train).float()
X_test = torch.tensor(X_test).float()
y_train = torch.tensor(y_train).float()
y_test = torch.tensor(y_test).float()

In [12]:
class PoseDataset(Dataset):
    def __init__(self, X, y, seq_length=30):
        self.X = X
        self.y = y
        self.seq_length = seq_length

    def __len__(self):
        return len(self.X) - self.seq_length + 1

    def __getitem__(self, idx):
        sequence = self.X[idx:idx + self.seq_length]
        if sequence.size(0) == self.seq_length:
            return sequence.view(self.seq_length, -1), self.y[idx + self.seq_length - 1]
        else:
            raise IndexError("Invalid sequence length")


In [13]:
train_dataset = PoseDataset(X_train, y_train)
test_dataset = PoseDataset(X_test, y_test)

In [14]:
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32,shuffle=False)


In [15]:
class PoseTransformer(nn.Module):
    def __init__(self, num_features=155, seq_length=30, d_model=256, nhead=4, num_layers=4, num_classes=5):
        super(PoseTransformer, self).__init__()
        self.embedding = nn.Linear(num_features, d_model)
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dropout=0.3)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(d_model, num_classes)

    def forward(self, x):
        batch_size = x.size(0)
        sequence_length = x.size(1)
        num_features = x.size(2)

        x = x.view(batch_size, sequence_length, num_features)
        x = self.embedding(x)
        x = x.permute(1, 0, 2)
        x = self.transformer_encoder(x)
        x = self.fc(x.mean(dim=0))
        return x

In [16]:
model = PoseTransformer(num_features=X_train.size(1), seq_length=30, num_classes=y_train.shape[1])
model = model.to(device) 
# Use CrossEntropyLoss for multi-class classification
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)



In [17]:
print(f'Total number of batches: {len(train_loader)}')


Total number of batches: 2098


In [18]:
print(f'Train Dataset Length: {len(train_dataset)}')
print(f'Test Dataset Length: {len(test_dataset)}')


Train Dataset Length: 67108
Test Dataset Length: 16756


In [19]:
torch.cuda.empty_cache()

In [20]:
def train_model(model, train_loader, criterion, optimizer, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        correct = 0
        total = 0
        for sequences, labels in train_loader:
            sequences, labels = sequences.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(sequences)
            loss = criterion(outputs, labels.argmax(dim=1))
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            true_labels = labels.argmax(dim=1)
            correct += (predicted == true_labels).sum().item()
            total += true_labels.size(0)

        avg_loss = running_loss / len(train_loader)
        accuracy = correct / total
        scheduler.step()
        print(f'Epoch [{epoch + 1}/{num_epochs}] - loss: {avg_loss:.4f} - accuracy: {accuracy * 100:.2f}%')

In [21]:
train_model(model, train_loader, criterion, optimizer, num_epochs=10)
torch.save(model.state_dict(), 'model_weights.pth')
print("Model weights saved to model_weights.pth")

  attn_output = scaled_dot_product_attention(q, k, v, attn_mask, dropout_p, is_causal)


Epoch [1/10] - loss: 1.6106 - accuracy: 22.00%
Epoch [2/10] - loss: 1.5978 - accuracy: 22.60%
Epoch [3/10] - loss: 1.5972 - accuracy: 22.51%
Epoch [4/10] - loss: 1.5970 - accuracy: 22.61%
Epoch [5/10] - loss: 1.5970 - accuracy: 22.68%
Epoch [6/10] - loss: 1.5965 - accuracy: 22.76%
Epoch [7/10] - loss: 1.5965 - accuracy: 22.70%
Epoch [8/10] - loss: 1.5965 - accuracy: 22.54%
Epoch [9/10] - loss: 1.5964 - accuracy: 22.67%
Epoch [10/10] - loss: 1.5965 - accuracy: 22.96%
Model weights saved to model_weights.pth


In [22]:
def evaluate_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    y_true = []
    y_pred = []
    with torch.no_grad():
        for sequences, labels in test_loader:
            sequences, labels = sequences.to(device), labels.to(device)
            outputs = model(sequences)
            _, predicted = torch.max(outputs, 1)
            true_labels = labels.argmax(dim=1)
            correct += (predicted == true_labels).sum().item()
            total += true_labels.size(0)
            y_true.extend(true_labels.cpu().numpy())
            y_pred.extend(predicted.cpu().numpy())

    accuracy = correct / total
    print(f'Test Accuracy: {accuracy * 100:.2f}%')
    print(classification_report(y_true, y_pred, target_names=['Push-up', 'Pull-up', 'Sit-up', 'Jumping Jack', 'Squat']))


In [23]:
evaluate_model(model, test_loader)

Test Accuracy: 22.19%
              precision    recall  f1-score   support

     Push-up       0.22      1.00      0.36      3718
     Pull-up       0.00      0.00      0.00      3818
      Sit-up       0.00      0.00      0.00      3484
Jumping Jack       0.00      0.00      0.00      3383
       Squat       0.00      0.00      0.00      2353

    accuracy                           0.22     16756
   macro avg       0.04      0.20      0.07     16756
weighted avg       0.05      0.22      0.08     16756



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [30]:
def calculate_angle(landmark1, landmark2, landmark3):
    a = np.array([landmark1.x, landmark1.y])  # First point
    b = np.array([landmark2.x, landmark2.y])  # Middle point (vertex)
    c = np.array([landmark3.x, landmark3.y])  # Third point

    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])
    angle = np.abs(np.degrees(radians))

    if angle > 180.0:
        angle = 360 - angle
    return angle


In [32]:
# Function to provide real-time pose correction feedback
def provide_feedback(landmarks, mp_pose):
    feedback = []

    # Check squat position (left and right knees)
    left_knee_angle = calculate_angle(
        landmarks[mp_pose.PoseLandmark.LEFT_HIP],
        landmarks[mp_pose.PoseLandmark.LEFT_KNEE],
        landmarks[mp_pose.PoseLandmark.LEFT_ANKLE]
    )
    right_knee_angle = calculate_angle(
        landmarks[mp_pose.PoseLandmark.RIGHT_HIP],
        landmarks[mp_pose.PoseLandmark.RIGHT_KNEE],
        landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE]
    )

    # Squat correction logic
    if left_knee_angle > 100 or right_knee_angle > 100:
        feedback.append("Bend your knees more.")
    elif left_knee_angle < 70 or right_knee_angle < 70:
        feedback.append("Straighten your legs slightly.")

    # Back posture (spine check using shoulders and hips)
    left_shoulder_angle = calculate_angle(
        landmarks[mp_pose.PoseLandmark.LEFT_HIP],
        landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER],
        landmarks[mp_pose.PoseLandmark.LEFT_ELBOW]
    )
    right_shoulder_angle = calculate_angle(
        landmarks[mp_pose.PoseLandmark.RIGHT_HIP],
        landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER],
        landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW]
    )

    if left_shoulder_angle < 160 or right_shoulder_angle < 160:
        feedback.append("Keep your back straight.")
    

    return feedback



In [None]:
def run_camera(model):
    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose()
    mp_drawing = mp.solutions.drawing_utils

    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Error: Could not open camera.")
        return

    exercise_count = 0
    current_exercise = "Squats"
    is_squat = False
    last_status = ""
    correction_feedback = []

    # Define threshold for correct squat angle (knee)
    squat_angle_threshold = 90  # Angle less than 90 degrees means a deep enough squat

    # Define minimum visibility threshold for reliable landmark detection
    visibility_threshold = 0.6

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            print("Error: Could not read frame.")
            break

        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        results = pose.process(image)

        if results.pose_landmarks:
            mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
            landmarks = results.pose_landmarks.landmark

            # Check if landmarks for knees, hips, and ankles are visible with high confidence
            if (landmarks[mp_pose.PoseLandmark.LEFT_HIP].visibility > visibility_threshold and
                landmarks[mp_pose.PoseLandmark.LEFT_KNEE].visibility > visibility_threshold and
                landmarks[mp_pose.PoseLandmark.LEFT_ANKLE].visibility > visibility_threshold and
                landmarks[mp_pose.PoseLandmark.RIGHT_HIP].visibility > visibility_threshold and
                landmarks[mp_pose.PoseLandmark.RIGHT_KNEE].visibility > visibility_threshold and
                landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE].visibility > visibility_threshold):

                # Calculate left and right knee angles using hip, knee, and ankle landmarks
                left_knee_angle = calculate_angle(
                    landmarks[mp_pose.PoseLandmark.LEFT_HIP],
                    landmarks[mp_pose.PoseLandmark.LEFT_KNEE],
                    landmarks[mp_pose.PoseLandmark.LEFT_ANKLE]
                )
                right_knee_angle = calculate_angle(
                    landmarks[mp_pose.PoseLandmark.RIGHT_HIP],
                    landmarks[mp_pose.PoseLandmark.RIGHT_KNEE],
                    landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE]
                )

                # Provide real-time feedback for corrections
                correction_feedback = provide_feedback(landmarks, mp_pose)

                # Squat counting logic (if needed)
                if left_knee_angle < squat_angle_threshold and right_knee_angle < squat_angle_threshold:
                    last_status = 'Correct Squat'
                    if not is_squat:
                        exercise_count += 1  # Increment squat count
                        is_squat = True
                else:
                    last_status = 'Incomplete Squat'
                    is_squat = False
            else:
                last_status = 'Landmarks not detected clearly'
                correction_feedback = ['Ensure all body parts are visible.']

        else:
            last_status = 'No landmarks detected'

        # Display the exercise, count, and pose correction feedback on the frame
        cv2.putText(frame, f'Exercise: {current_exercise}', (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
        cv2.putText(frame, f'Count: {exercise_count}', (50, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
        cv2.putText(frame, last_status, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if last_status == 'Correct Squat' else (0, 0, 255), 2)

        # Display feedback on the screen
        for i, feedback in enumerate(correction_feedback):
            cv2.putText(frame, feedback, (50, 250 + i*50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2)

        cv2.imshow('Pose Correction', frame)
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

# Run camera feedback
run_camera(model)
