In [1]:
import mediapipe as mp
import cv2
import pyrealsense2 as rs
import lightgbm as lgb
import datetime as dt 
import time
import csv
import os
import numpy as np
import pandas as pd
import logging
import pickle 
import sys
from math import pi
import serial
from time import sleep
import random
from sklearn.cluster import KMeans as kmc
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression as lr, RidgeClassifier as rc 
from sklearn.metrics import accuracy_score
from sklearn.ensemble import RandomForestClassifier as rsc, GradientBoostingClassifier as gbc 
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
mp_drawing=mp.solutions.drawing_utils
mp_holistic=mp.solutions.holistic

Dataset Creator

In [21]:
class_name = input("Enter the class name: ")

# Set up CSV file path
CSV_PATH = "collected_data.csv"

# Create CSV with headers if not exists
if not os.path.exists(CSV_PATH):
    headers = ["class_name"] + \
              [f"pose_{i}_{c}" for i in range(33) for c in ['x', 'y', 'z', 'visibility']] + \
              [f"lh_{i}_{c}" for i in range(21) for c in ['x', 'y', 'z']] + \
              [f"rh_{i}_{c}" for i in range(21) for c in ['x', 'y', 'z']] + \
              [f"face_nose_{c}" for c in ['x', 'y', 'z']] + \
              [f"face_eye_l_{c}" for c in ['x', 'y', 'z']] + \
              [f"face_eye_r_{c}" for c in ['x', 'y', 'z']]

    df = pd.DataFrame(columns=headers)
    df.to_csv(CSV_PATH, index=False)

# 🟢 SET UP REALSENSE CAMERA
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 1280, 720, rs.format.z16, 15)
config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 15)
# config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 15)
# config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 15)
profile = pipeline.start(config)

align = rs.align(rs.stream.color)  # Align depth & color frames
depth_sensor = profile.get_device().first_depth_sensor()
depth_scale = depth_sensor.get_depth_scale()
clipping_distance = 3 / depth_scale  # Objects beyond 2m will be ignored

# Start data collection
start_time = time.time()
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while True:
        frames = pipeline.wait_for_frames()
        aligned_frames = align.process(frames)
        depth_frame = aligned_frames.get_depth_frame()
        color_frame = aligned_frames.get_color_frame()

        if not depth_frame or not color_frame:
            continue

        # Convert frames
        color_image = np.asanyarray(color_frame.get_data())

        # Flip for natural mirror effect
        color_image = cv2.flip(color_image, 1)
        rgb_frame = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB)

        # Make detections
        results = holistic.process(rgb_frame)

        # Store landmark data
        landmarks = [class_name]

        # 1️⃣ Extract Pose Landmarks
        if results.pose_landmarks:
            pose_landmarks = results.pose_landmarks.landmark
            pose_data = [[lmk.x, lmk.y, lmk.z, lmk.visibility] for lmk in pose_landmarks]
        else:
            pose_data = np.zeros((33, 4))

        landmarks.extend(np.array(pose_data).flatten())

        # 2️⃣ Extract Left Hand Landmarks
        if results.left_hand_landmarks:
            left_hand_landmarks = results.left_hand_landmarks.landmark
            left_hand_data = [[lmk.x, lmk.y, lmk.z] for lmk in left_hand_landmarks]
        else:
            left_hand_data = np.zeros((21, 3))

        landmarks.extend(np.array(left_hand_data).flatten())

        # 3️⃣ Extract Right Hand Landmarks
        if results.right_hand_landmarks:
            right_hand_landmarks = results.right_hand_landmarks.landmark
            right_hand_data = [[lmk.x, lmk.y, lmk.z] for lmk in right_hand_landmarks]
        else:
            right_hand_data = np.zeros((21, 3))

        landmarks.extend(np.array(right_hand_data).flatten())

        # 4️⃣ Extract **ONLY** 3 Facial Landmarks (Nose, Left Eye, Right Eye)
        if results.face_landmarks:
            face_landmarks = results.face_landmarks.landmark
            selected_face_data = [
                [face_landmarks[1].x, face_landmarks[1].y, face_landmarks[1].z],  # Nose
                [face_landmarks[33].x, face_landmarks[33].y, face_landmarks[33].z],  # Left Eye
                [face_landmarks[263].x, face_landmarks[263].y, face_landmarks[263].z],  # Right Eye
            ]
        else:
            selected_face_data = np.zeros((3, 3))

        landmarks.extend(np.array(selected_face_data).flatten())

        # Save data to CSV
        df = pd.DataFrame([landmarks])
        df.to_csv(CSV_PATH, mode='a', header=False, index=False)

        # 🟢 DRAW LANDMARKS ON FRAME
        if results.pose_landmarks:
            mp_drawing.draw_landmarks(color_image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)
        if results.left_hand_landmarks:
            mp_drawing.draw_landmarks(color_image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
        if results.right_hand_landmarks:
            mp_drawing.draw_landmarks(color_image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)

        # 🟢 ONLY DRAW NOSE + EYES, NOT FULL FACE
        if results.face_landmarks:
            keypoints = [1, 33, 263]  # Nose, Left Eye, Right Eye
            for idx in keypoints:
                landmark = results.face_landmarks.landmark[idx]
                h, w, _ = color_image.shape
                x, y = int(landmark.x * w), int(landmark.y * h)
                cv2.circle(color_image, (x, y), 4, (0, 255, 0), -1)
        # Show countdown timer
        elapsed_time = time.time() - start_time
        remaining_time = max(0, 30 - elapsed_time)
        cv2.putText(color_image, f"Time Left: {int(remaining_time)}s", (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

        cv2.imshow("RealSense Landmark Detection", color_image)

        # Stop after 30 seconds or if 'q' is pressed
        if elapsed_time >= 30 or cv2.waitKey(1) & 0xFF == ord('q'):
            break

# Cleanup
pipeline.stop()
cv2.destroyAllWindows()

RuntimeError: No device connected

Reading Dataset 

-K Means Clustering

In [10]:

df=pd.read_csv('collected_data.csv')
df.head()
df.tail()
X=df.drop('class_name', axis=1)
Y=df['class_name']
scaler=StandardScaler()
X_scaled=scaler.fit_transform(X)
kmeans=kmc(n_clusters=10, random_state=125)
clusters=kmeans.fit_predict(X_scaled)
df['Clusters']=clusters
df.to_csv("collected_dataV2.csv", index=False)

  super()._check_params_vs_input(X, default_n_init=10)


Pipeline & Training

In [None]:
data=pd.read_csv("coords_NonV2.csv")
X=data.drop('class', axis=1)
Y=data['class']
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)
corr_matrix = X.corr()
plt.figure(figsize=(10, 8))
sns.heatmap(corr_matrix, annot=True, cmap="coolwarm")
plt.show()


In [5]:
pipelines={
    'lr':make_pipeline(StandardScaler(), lr()), 
    'rc':make_pipeline(StandardScaler(), rc()),
    'rsc':make_pipeline(StandardScaler(), rsc()), 
    'lgb': make_pipeline(StandardScaler(), lgb.LGBMClassifier())
}

In [6]:
fit_models={}
for algo, pipeline, in pipelines.items():
    model=pipeline.fit(X_train, Y_train)
    fit_models[algo]=model 

STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.430129 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 407756
[LightGBM] [Info] Number of data points in the train set: 14618, number of used features: 1600
[LightGBM] [Info] Start training from score -3.166762
[LightGBM] [Info] Start training from score -3.906429
[LightGBM] [Info] Start training from score -3.173277
[LightGBM] [Info] Start training from score -3.549754
[LightGBM] [Info] Start training from score -3.141120
[LightGBM] [Info] Start training from score -3.153859
[LightGBM] [Info] Start training from score -3.239123
[LightGBM] [Info] Start training from score -3.216689
[LightGBM] [Info] Start training from score -3.155462
[LightGBM] [Info] Start training from score -3.199768
[LightGBM] [Info] Start training from score -3.176550
[LightGBM] [Info] Start training from score -3.397646
[LightGBM] [Info] Start training from score -3.184780
[Ligh

In [12]:
for algo, model in fit_models.items():
    Y_hat=model.predict(X_test)
    print(algo, accuracy_score(Y_test, Y_hat))
    
fit_models['lgb'].predict(X_test)

lr 0.9969904240766074
rc 0.9882352941176471
rsc 0.9975376196990424
lgb 0.9969904240766074


array(['Collect_pen', 'Release', 'Collect_tape', ..., 'Left', 'Down',
       'Collect_tape'], dtype=object)

Saving The model


In [13]:
with open('body_language_lgbm.pkl', 'wb') as f:
    pickle.dump(fit_models['lgb'], f)

Loading and using the model for predictions

In [14]:
with open('body_language_lgbm.pkl', 'rb') as f:
    model=pickle.load(f)

In [9]:

threshold = 0.8
font = cv2.FONT_HERSHEY_COMPLEX
org = (20,100)
fontScale = .5
thickness = 1 
color = (0,150,255)
realsense_ctx = rs.context()
connected_devices = [] # List of serial numbers for present cameras
for i in range(len(realsense_ctx.devices)):
        detected_camera =  realsense_ctx.devices[i].get_info(rs.camera_info.serial_number)
        print(f"{detected_camera}")
        connected_devices.append(detected_camera)
        
device = connected_devices[0] # In this example we are only using one camera    
pipeline = rs.pipeline()    
config = rs.config()    
background_removed_color = 153 # Grey

config.enable_device(device)
stream_res_x = 640
stream_res_y = 480
stream_fps = 30
config.enable_stream(rs.stream.depth, stream_res_x, stream_res_y, rs.format.z16, stream_fps)
config.enable_stream(rs.stream.color, stream_res_x, stream_res_y, rs.format.bgr8, stream_fps)
profile = pipeline.start(config)
align_to = rs.stream.color
align = rs.align(align_to)
# ====== Get depth Scale ======
depth_sensor = profile.get_device().first_depth_sensor()
depth_scale = depth_sensor.get_depth_scale()
# ====== Set clipping distance ======
clipping_distance_in_meters = 2
clipping_distance = clipping_distance_in_meters / depth_scale
LEFT_EYE=153
RIGHT_EYE=380
NOSE=1
# Initiate holistic model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
    while True:
        start_time = dt.datetime.today().timestamp() # Necessary for FPS calculations

                # Get and align frames
        frames = pipeline.wait_for_frames()
        aligned_frames = align.process(frames)
        aligned_depth_frame = aligned_frames.get_depth_frame()
        color_frame = aligned_frames.get_color_frame()

        if not aligned_depth_frame or not color_frame:
            continue

            # Process images
        # Wait for the next frame from the RealSense camera
        frames = pipeline.wait_for_frames()
        depth_frame = frames.get_depth_frame()
        color_frame = frames.get_color_frame()

        if not depth_frame or not color_frame:
            continue

        # Convert the depth frame to a numpy array
        depth_image = np.asanyarray(depth_frame.get_data())

        # Convert the color frame to a numpy array
        color_image = np.asanyarray(color_frame.get_data())
        color_image = cv2.flip(color_image, 1) 
        
        # Apply colormap on depth image (for visualization)
        depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET)
            
        image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
                       # Image is no longer writeable
        
        # Recolor Feed
               
               
        
        # Make Detections
        results = holistic.process(image)
        # print(results.face_landmarks)
        
        # face_landmarks, pose_landmarks, left_hand_landmarks, right_hand_landmarks
        
        # Recolor image back to BGR for rendering
        image.flags.writeable = True   
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        
        # 1. Draw face landmarks
        if results.face_landmarks:
            h, w, _ = image.shape
            for idx in [LEFT_EYE, RIGHT_EYE, NOSE]:
                landmark = results.face_landmarks.landmark[idx]
                x, y = int(landmark.x * w), int(landmark.y * h)
                cv2.circle(image, (x, y), 5, (0, 255, 0), -1)
        
        # 2. Right hand
        mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                 mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4),
                                 mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                                 )

        # 3. Left Hand
        mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                 mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4),
                                 mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                                 )

        # 4. Pose Detections
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS, 
                                 mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4),
                                 mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                                 )

        try:
            # Extract Pose landmarks
            pose = results.pose_landmarks.landmark
            pose_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in pose]).flatten())
            
            # Extract Face landmarks
            face = results.face_landmarks.landmark
            face_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in face]).flatten())

            # Extract Right hand landmarks
            right_hand = results.right_hand_landmarks.landmark
            right_hand_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in right_hand]).flatten())
            
             # Extract Left hand landmarks
            left_hand = results.left_hand_landmarks.landmark
            left_hand_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in left_hand]).flatten())

            # Concate rows
            row = pose_row+face_row+right_hand_row+left_hand_row
            # row = pose_row+face_row+left_hand_row
            
            
        
            print(type(model))
            X = pd.DataFrame([row])
            body_language_class = model.predict(X)[0]
            body_language_prob = model.predict_proba(X)[0]
            #print(body_language_class, body_language_prob)
            max_body_language_prob = round(body_language_prob[np.argmax(body_language_prob)])
            
            if max_body_language_prob > threshold:
                # Grab ear coords
                coords = tuple(np.multiply(
                                np.array(
                                    (results.pose_landmarks.landmark[mp_holistic.PoseLandmark.LEFT_EAR].x, 
                                    results.pose_landmarks.landmark[mp_holistic.PoseLandmark.LEFT_EAR].y))
                            , [640,480]).astype(int))
                

                cv2.rectangle(image, 
                            (coords[0], coords[1]+5), 
                            (coords[0]+len(body_language_class)*20, coords[1]-30), 
                            (245, 117, 16), -1)
                cv2.putText(image, body_language_class, coords, 
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
                
                # Get status box
                cv2.rectangle(image, (0,0), (250, 60), (245, 117, 16), -1)
                
                # Display Class
                cv2.putText(image, 'CLASS'
                            , (95,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
                cv2.putText(image, body_language_class.split(' ')[0]
                            , (90,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
                
                # Display Probability
                cv2.putText(image, 'PROB'
                            , (15,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
                cv2.putText(image, str(round(body_language_prob[np.argmax(body_language_prob)],2))
                            , (10,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
                
        except:
            pass
                        
        cv2.imshow('Raw Webcam Feed', image)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
print(type(model))
pipeline.stop()
cv2.destroyAllWindows()

819312070630


I0000 00:00:1741745557.152718    3324 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1741745557.156661    7054 gl_context.cc:357] GL version: 3.2 (OpenGL ES 3.2 Mesa 21.2.6), renderer: NV134
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


KeyboardInterrupt: 