In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import LogisticRegression,RidgeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier

In [2]:
import mediapipe as mp
import cv2

In [3]:
mp_drawing = mp.solutions.drawing_utils
mp_holistic = mp.solutions .holistic

In [None]:
# infer.py
import cv2
import numpy as np
import joblib
import warnings
from sklearn.exceptions import ConvergenceWarning
from tensorflow.keras.models import load_model, Model
import mediapipe as mp
from collections import deque
import time  # for sign‐label debounce

# ==== SILENCE ANY HMM WARNINGS ====
warnings.filterwarnings("ignore", category=ConvergenceWarning)

# ====== LOAD ARTIFACTS ======
sign_lstm    = load_model('sign_lstm_deeper.keras')
sign_le      = joblib.load('sign_label_encoder.pkl')
sign_scaler  = joblib.load('sign_scaler.pkl')
sign_pca     = joblib.load('sign_pca.pkl')
sign_ghmm    = joblib.load('sign_ghmm.pkl')
sign_rf      = joblib.load('sign_rf.pkl')

feat_extractor = Model(sign_lstm.input,
                       sign_lstm.get_layer('feature_reducer').output)

face_le     = joblib.load('face_label_encoder.pkl')
face_scaler = joblib.load('face_scaler.pkl')
face_rf     = joblib.load('face_rf.pkl')

# ====== CONFIG ======
SEQUENCE_LENGTH      = 30
POSE_LM, HAND_LM, FACE_LM = 33, 21, 468
SIGN_DISPLAY_DURATION = 5  # seconds to hold last sign
mp_holistic = mp.solutions.holistic
mp_drawing  = mp.solutions.drawing_utils

# ====== HELPERS ======
def extract_landmarks(lm_list, expected_count):
    if not lm_list:
        return [0.0] * expected_count * 4
    vals = []
    for lm in lm_list.landmark:
        x = lm.x if lm.x == lm.x else 0.0
        y = lm.y if lm.y == lm.y else 0.0
        z = lm.z if lm.z == lm.z else 0.0
        v = lm.visibility if lm.visibility == lm.visibility else 0.0
        vals += [x, y, z, v]
    # pad if missing
    if len(vals) < expected_count * 4:
        vals += [0.0] * (expected_count * 4 - len(vals))
    return vals

# ====== REAL‑TIME LOOP ======
sign_buffer      = deque(maxlen=SEQUENCE_LENGTH)
last_sign_time   = 0
last_sign_label  = ""

cap = cv2.VideoCapture(0)
# speed up by lowering resolution
cap.set(cv2.CAP_PROP_FRAME_WIDTH,  640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

with mp_holistic.Holistic(min_detection_confidence=0.6,
                          min_tracking_confidence=0.6) as holistic:
    print("Starting inference. Press 'q' to quit.")
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame = cv2.flip(frame, 1)
        rgb   = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = holistic.process(rgb)

        # draw landmarks (optional)
        if results.pose_landmarks:
            mp_drawing.draw_landmarks(frame, results.pose_landmarks,
                                      mp_holistic.POSE_CONNECTIONS)
        if results.left_hand_landmarks:
            mp_drawing.draw_landmarks(frame, results.left_hand_landmarks,
                                      mp_holistic.HAND_CONNECTIONS)
        if results.right_hand_landmarks:
            mp_drawing.draw_landmarks(frame, results.right_hand_landmarks,
                                      mp_holistic.HAND_CONNECTIONS)

        # extract all landmarks
        p  = extract_landmarks(results.pose_landmarks,  POSE_LM)
        lh = extract_landmarks(results.left_hand_landmarks, HAND_LM)
        rh = extract_landmarks(results.right_hand_landmarks, HAND_LM)
        f  = extract_landmarks(results.face_landmarks,  FACE_LM)

        # buffer for sign sequence
        feats = p + lh + rh
        sign_buffer.append(feats)

        current_time = time.time()

        # run sign pipeline when buffer full
        if len(sign_buffer) == SEQUENCE_LENGTH:
            Xs = np.array(sign_buffer, dtype=np.float32)[None, ...]
            nr, _, nf = Xs.shape
            Xflat     = Xs.reshape(-1, nf)
            Xs_scaled = sign_scaler.transform(Xflat).reshape(nr, SEQUENCE_LENGTH, nf)

            # extract LSTM features
            lstmf   = feat_extractor.predict_on_batch(Xs_scaled)
            pca_feat = sign_pca.transform(lstmf)
            hs       = sign_ghmm.predict(pca_feat).reshape(-1, 1)
            comb_feat= np.hstack((lstmf, hs))

            pred_enc = sign_rf.predict(comb_feat)
            new_label= sign_le.inverse_transform(pred_enc)[0]
            # update label if changed or held expired
            if new_label != last_sign_label or (current_time - last_sign_time) > SIGN_DISPLAY_DURATION:
                last_sign_label = new_label
                last_sign_time  = current_time

            sign_buffer.clear()

        # decide whether to show the last sign
        if (current_time - last_sign_time) < SIGN_DISPLAY_DURATION:
            sign_label = last_sign_label
        else:
            sign_label = ""

        # face‐expression pipeline only if face detected
        if results.face_landmarks:
            Xf      = np.array(f, dtype=np.float32).reshape(1, -1)
            Xf_s    = face_scaler.transform(Xf)
            face_enc= face_rf.predict(Xf_s)
            face_label = face_le.inverse_transform(face_enc)[0]
        else:
            face_label = ""

        # display combined result
        display_text = f"{sign_label or '…'} ({face_label or '…'})"
        cv2.putText(frame, display_text,
                    (10, 40), cv2.FONT_HERSHEY_SIMPLEX,
                    1, (0, 255, 0), 2)

        cv2.imshow("Gesture + Expression Recognition", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()
