In [127]:
import pandas as pd
import numpy as np
import cv2
import mediapipe as mp
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score, precision_recall_curve, auc, confusion_matrix, log_loss
from imblearn.over_sampling import SMOTE
import matplotlib.pyplot as plt
import seaborn as sns
import pickle
import logging

In [128]:
# Set random seed for reproducibility
np.random.seed(42)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')

In [129]:
# Load dataset
df = pd.read_csv("face_mimic_df.csv")

In [130]:
# Define features
features = ['AU_01_t12', 'AU_06_t12', 'AU_12_t12', 'AU_04_t13', 'AU_07_t13', 
            'AU_09_t13', 'AU_01_t14', 'AU_02_t14', 'AU_04_t14', 'age', 'gender']
X = df[features].dropna()
y = df.loc[X.index, 'diagnosed']

In [132]:
# Scale features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

In [133]:
# Apply SMOTE
smote = SMOTE(random_state=42)
X_resampled, y_resampled = smote.fit_resample(X_scaled, y)

In [135]:
# Initialize Random Forest model
rf_model = RandomForestClassifier(n_estimators=100, random_state=42, class_weight='balanced', max_depth=10, min_samples_split=5)


In [136]:
# Perform stratified 5-fold cross-validation
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scoring = {'accuracy': 'accuracy', 'f1': 'f1', 'precision': 'precision', 'recall': 'recall', 'roc_auc': 'roc_auc'}


In [137]:
train_accuracies, val_accuracies = [], []
train_losses, val_losses = [], []
y_true_val, y_pred_val, y_pred_proba_val = [], [], []

In [138]:
for train_idx, val_idx in cv.split(X_resampled, y_resampled):
    X_train, X_val = X_resampled[train_idx], X_resampled[val_idx]
    y_train, y_val = y_resampled[train_idx], y_resampled[val_idx]
    rf_model.fit(X_train, y_train)
    y_train_pred = rf_model.predict(X_train)
    y_val_pred = rf_model.predict(X_val)
    y_train_proba = rf_model.predict_proba(X_train)
    y_val_proba = rf_model.predict_proba(X_val)
    train_accuracies.append(accuracy_score(y_train, y_train_pred))
    val_accuracies.append(accuracy_score(y_val, y_val_pred))
    train_losses.append(log_loss(y_train, y_train_proba))
    val_losses.append(log_loss(y_val, y_val_proba))
    y_true_val.extend(y_val)
    y_pred_val.extend(y_val_pred)
    y_pred_proba_val.extend(y_val_proba[:, 1])

precision, recall, _ = precision_recall_curve(y_true_val, y_pred_proba_val)
auprc = auc(recall, precision)

In [139]:
from sklearn.model_selection import StratifiedKFold, train_test_split, cross_validate


In [112]:
print("Cross-Validation Results (SMOTE-balanced data):")
print(f"Mean Training Accuracy: {np.mean(train_accuracies):.4f} (±{np.std(train_accuracies):.4f})")
print(f"Mean Validation Accuracy: {np.mean(val_accuracies):.4f} (±{np.std(val_accuracies):.4f})")
print(f"Mean Training Log Loss: {np.mean(train_losses):.4f} (±{np.std(train_losses):.4f})")
print(f"Mean Validation Log Loss: {np.mean(val_losses):.4f} (±{np.std(val_losses):.4f})")
print(f"Mean F1 Score: {np.mean(cross_validate(rf_model, X_resampled, y_resampled, cv=cv, scoring='f1')['test_score']):.4f}")
print(f"Mean Precision: {np.mean(cross_validate(rf_model, X_resampled, y_resampled, cv=cv, scoring='precision')['test_score']):.4f}")
print(f"Mean Recall: {np.mean(cross_validate(rf_model, X_resampled, y_resampled, cv=cv, scoring='recall')['test_score']):.4f}")
print(f"Mean ROC AUC: {np.mean(cross_validate(rf_model, X_resampled, y_resampled, cv=cv, scoring='roc_auc')['test_score']):.4f}")
print(f"AUPRC: {auprc:.4f}")

Cross-Validation Results (SMOTE-balanced data):
Mean Training Accuracy: 0.9786 (±0.0019)
Mean Validation Accuracy: 0.9187 (±0.0199)
Mean Training Log Loss: 0.1621 (±0.0066)
Mean Validation Log Loss: 0.2848 (±0.0173)
Mean F1 Score: 0.9220
Mean Precision: 0.8890
Mean Recall: 0.9583
Mean ROC AUC: 0.9762
AUPRC: 0.9732


In [140]:
# Split for test set evaluation
X_train_val, X_test, y_train_val, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42, stratify=y)
rf_model.fit(X_train_val, y_train_val)
y_pred_test = rf_model.predict(X_test)
y_pred_proba_test = rf_model.predict_proba(X_test)[:, 1]
precision_test, recall_test, _ = precision_recall_curve(y_test, y_pred_proba_test)
auprc_test = auc(recall_test, precision_test)

In [114]:
print("\nPerformance on Test Data:")
print(f"Accuracy: {accuracy_score(y_test, y_pred_test):.4f}")
print(f"F1 Score: {f1_score(y_test, y_pred_test):.4f}")
print(f"Precision: {precision_score(y_test, y_pred_test):.4f}")
print(f"Recall: {recall_score(y_test, y_pred_test):.4f}")
print(f"ROC AUC: {roc_auc_score(y_test, y_pred_proba_test):.4f}")
print(f"AUPRC: {auprc_test:.4f}")


Performance on Test Data:
Accuracy: 0.9327
F1 Score: 0.0000
Precision: 0.0000
Recall: 0.0000
ROC AUC: 0.5214
AUPRC: 0.0717


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [205]:
plt.figure(figsize=(8, 6))
plt.plot(range(1, 6), train_accuracies, label='Training Accuracy', marker='o')
plt.plot(range(1, 6), val_accuracies, label='Validation Accuracy', marker='o')
plt.xlabel('Fold')
plt.ylabel('Accuracy')
plt.title('Training and Validation Accuracy Across Folds')
plt.legend()
plt.grid(True)
plt.savefig('accuracy_plot.png', dpi=300)
plt.close()

In [206]:
plt.figure(figsize=(8, 6))
plt.plot(range(1, 6), train_losses, label='Training Log Loss', marker='o')
plt.plot(range(1, 6), val_losses, label='Validation Log Loss', marker='o')
plt.xlabel('Fold')
plt.ylabel('Log Loss')
plt.title('Training and Validation Log Loss Across Folds')
plt.legend()
plt.grid(True)
plt.savefig('loss_plot.png', dpi=300)
plt.close()

In [207]:
cm = confusion_matrix(y_test, y_pred_test)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Healthy', 'PD'], yticklabels=['Healthy', 'PD'])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix (Test Data)')
plt.savefig('confusion_matrix.png', dpi=300)
plt.close()

In [141]:
with open('rf_model.pkl', 'wb') as f:
    pickle.dump(rf_model, f)
with open('scaler.pkl', 'wb') as f:
    pickle.dump(scaler, f)

In [142]:
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(max_num_faces=1, refine_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5)


In [143]:
def extract_au_features(landmarks, frame):
    if not landmarks:
        return np.zeros(9)
    
    landmarks = landmarks.landmark
    left_eye = landmarks[159]
    left_brow = landmarks[70]
    right_eye = landmarks[386]
    right_brow = landmarks[300]
    cheek_left = landmarks[205]
    lip_corner_left = landmarks[61]
    lip_corner_right = landmarks[291]
    outer_brow_left = landmarks[66]
    nose_tip = landmarks[6]
    eye_distance = np.abs(left_eye.y - right_eye.y)  # Baseline normalization
    
    au_01 = np.mean([np.abs(left_brow.y - left_eye.y), np.abs(right_brow.y - right_eye.y)]) / max(0.01, eye_distance)
    au_06 = np.abs(cheek_left.y - left_eye.y) / max(0.01, eye_distance)
    au_12 = np.abs(lip_corner_left.x - lip_corner_right.x) / max(0.01, frame.shape[1])
    au_04 = np.abs(left_brow.y - right_brow.y) / max(0.01, eye_distance)
    au_07 = np.abs(left_eye.y - landmarks[145].y) / max(0.01, eye_distance)
    au_09 = np.abs(nose_tip.y - landmarks[197].y) / max(0.01, eye_distance)
    au_02 = np.abs(outer_brow_left.y - left_eye.y) / max(0.01, eye_distance)
    
    return np.array([au_01, au_06, au_12, au_04, au_07, au_09, au_01, au_02, au_04])

In [144]:
def process_image(image_path):
    frame = cv2.imread(image_path)
    if frame is None:
        print(f"Error: Could not load image {image_path}")
        return None, None, frame
    
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = face_mesh.process(frame_rgb)
    
    pred = 0
    pred_proba = 0.0
    if not results or not results.multi_face_landmarks:
        logging.info(f"Image {image_path} - No face detected, defaulting to Healthy")
    else:
        au_features = extract_au_features(results.multi_face_landmarks[0], frame)
        input_features = np.array([au_features[0], au_features[1], au_features[2], au_features[3], 
                                  au_features[4], au_features[5], au_features[6], au_features[7], 
                                  au_features[8], user_age, user_gender])  # Explicit order
        input_scaled = scaler.transform([input_features])  # Ensure correct shape
        pred_proba = rf_model.predict_proba(input_scaled)[0][1]
        pred = 1 if pred_proba > 0.05 else 0  # Lowered threshold
        logging.info(f"Image {image_path} - Prediction Probability: {pred_proba:.4f}, AU Values: {au_features}")
    
    label = "Healthy"  # Always show Healthy without detection
    if results and results.multi_face_landmarks and pred == 1:
        label = "PD"
    return label, pred_proba, frame

In [145]:
import os

In [None]:
# Process images (e.g., from a directory of PD patient images)
user_age = 60  # or set to the appropriate value
user_gender = 1  # 1 for male, 0 for female (adjust as needed)
image_dir = "pd_images/001.jpg"  # Update this to your directory with PD images
if os.path.exists(image_dir):
    for image_file in os.listdir(image_dir):
        if image_file.endswith(('.jpg', '.jpeg', '.png')):
            label, pred_proba, frame = process_image(os.path.join(image_dir, image_file))
            print(f"Image {image_file}: Status: {label} (Probability: {pred_proba:.2f})")
            cv2.putText(frame, f"Status: {label} ({pred_proba:.2f})", (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if label == "Healthy" else (255, 0, 0), 2)
            cv2.imshow('PD Detection', frame)
            if cv2.waitKey(2000) & 0xFF == ord('q'):  # Display for 2 seconds
                break
else:
    print(f"Directory {image_dir} not found. Proceeding with webcam.")

Directory pd_images.jpg not found. Proceeding with webcam.


In [None]:
# Process images (e.g., from a directory of PD patient images)
user_age = 60  # or set to the appropriate value
user_gender = 1  # 1 for male, 0 for female (adjust as needed)
image_dir = "pd_images/001.jpg"  # Update this to your directory with PD images
if os.path.exists(image_dir):
    for image_file in os.listdir(image_dir):
        if image_file.endswith(('.jpg', '.jpeg', '.png')):
            label, pred_proba, frame = process_image(os.path.join(image_dir, image_file))
            print(f"Image {image_file}: Status: {label} (Probability: {pred_proba:.2f})")
            cv2.putText(frame, f"Status: {label} ({pred_proba:.2f})", (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if label == "Healthy" else (255, 0, 0), 2)
            cv2.imshow('PD Detection', frame)
            if cv2.waitKey(2000) & 0xFF == ord('q'):  # Display for 2 seconds
                break
else:
    print(f"Directory {image_dir} not found. Proceeding with webcam.")

Directory pd_images.jpg not found. Proceeding with webcam.


In [None]:
# Process images (e.g., from a directory of PD patient images)
user_age = 60  # or set to the appropriate value
user_gender = 1  # 1 for male, 0 for female (adjust as needed)
image_dir = "pd_images/001.jpg"  # Update this to your directory with PD images
if os.path.exists(image_dir):
    for image_file in os.listdir(image_dir):
        if image_file.endswith(('.jpg', '.jpeg', '.png')):
            label, pred_proba, frame = process_image(os.path.join(image_dir, image_file))
            print(f"Image {image_file}: Status: {label} (Probability: {pred_proba:.2f})")
            cv2.putText(frame, f"Status: {label} ({pred_proba:.2f})", (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if label == "Healthy" else (255, 0, 0), 2)
            cv2.imshow('PD Detection', frame)
            if cv2.waitKey(2000) & 0xFF == ord('q'):  # Display for 2 seconds
                break
else:
    print(f"Directory {image_dir} not found. Proceeding with webcam.")

NotADirectoryError: [WinError 267] The directory name is invalid: 'pd_images/001.jpg'

In [None]:
# Process images (e.g., from a directory of PD patient images)
user_age = 60  # or set to the appropriate value
user_gender = 1  # 1 for male, 0 for female (adjust as needed)
image_dir = "pd_images/001.jpg"  # Update this to your directory with PD images
if os.path.exists(image_dir):
    for image_file in os.listdir(image_dir):
        if image_file.endswith(('.jpg', '.jpeg', '.png')):
            label, pred_proba, frame = process_image(os.path.join(image_dir, image_file))
            print(f"Image {image_file}: Status: {label} (Probability: {pred_proba:.2f})")
            cv2.putText(frame, f"Status: {label} ({pred_proba:.2f})", (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if label == "Healthy" else (255, 0, 0), 2)
            cv2.imshow('PD Detection', frame)
            if cv2.waitKey(2000) & 0xFF == ord('q'):  # Display for 2 seconds
                break
else:
    print(f"Directory {image_dir} not found. Proceeding with webcam.")

NotADirectoryError: [WinError 267] The directory name is invalid: 'pd_images/001.jpg'

In [None]:
# Process images (e.g., from a directory of PD patient images)
user_age = 60  # or set to the appropriate value
user_gender = 1  # 1 for male, 0 for female (adjust as needed)
image_dir = "pd_images/001.jpg"  # Update this to your directory with PD images
if os.path.exists(image_dir):
    for image_file in os.listdir(image_dir):
        if image_file.endswith(('.jpg', '.jpeg', '.png')):
            label, pred_proba, frame = process_image(os.path.join(image_dir, image_file))
            print(f"Image {image_file}: Status: {label} (Probability: {pred_proba:.2f})")
            cv2.putText(frame, f"Status: {label} ({pred_proba:.2f})", (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if label == "Healthy" else (255, 0, 0), 2)
            cv2.imshow('PD Detection', frame)
            if cv2.waitKey(2000) & 0xFF == ord('q'):  # Display for 2 seconds
                break
else:
    print(f"Directory {image_dir} not found. Proceeding with webcam.")

NotADirectoryError: [WinError 267] The directory name is invalid: 'pd_images/001.jpg'

In [None]:
# Process images (e.g., from a directory of PD patient images)
user_age = 60  # or set to the appropriate value
user_gender = 1  # 1 for male, 0 for female (adjust as needed)
image_dir = "pd_images/001.jpg"  # Update this to your directory with PD images
if os.path.exists(image_dir):
    for image_file in os.listdir(image_dir):
        if image_file.endswith(('.jpg', '.jpeg', '.png')):
            label, pred_proba, frame = process_image(os.path.join(image_dir, image_file))
            print(f"Image {image_file}: Status: {label} (Probability: {pred_proba:.2f})")
            cv2.putText(frame, f"Status: {label} ({pred_proba:.2f})", (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if label == "Healthy" else (255, 0, 0), 2)
            cv2.imshow('PD Detection', frame)
            if cv2.waitKey(2000) & 0xFF == ord('q'):  # Display for 2 seconds
                break
else:
    print(f"Directory {image_dir} not found. Proceeding with webcam.")

NotADirectoryError: [WinError 267] The directory name is invalid: 'pd_images/001.jpg'

In [None]:
# Process images (e.g., from a directory of PD patient images)
user_age = 60  # or set to the appropriate value
user_gender = 1  # 1 for male, 0 for female (adjust as needed)
image_dir = "pd_images/001.jpg"  # Update this to your directory with PD images
if os.path.exists(image_dir):
    for image_file in os.listdir(image_dir):
        if image_file.endswith(('.jpg', '.jpeg', '.png')):
            label, pred_proba, frame = process_image(os.path.join(image_dir, image_file))
            print(f"Image {image_file}: Status: {label} (Probability: {pred_proba:.2f})")
            cv2.putText(frame, f"Status: {label} ({pred_proba:.2f})", (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if label == "Healthy" else (255, 0, 0), 2)
            cv2.imshow('PD Detection', frame)
            if cv2.waitKey(2000) & 0xFF == ord('q'):  # Display for 2 seconds
                break
else:
    print(f"Directory {image_dir} not found. Proceeding with webcam.")

NotADirectoryError: [WinError 267] The directory name is invalid: 'pd_images/001.jpg'

In [None]:
# Process images (e.g., from a directory of PD patient images)
user_age = 60  # or set to the appropriate value
user_gender = 1  # 1 for male, 0 for female (adjust as needed)
image_dir = "pd_images/001.jpg"  # Update this to your directory with PD images
if os.path.exists(image_dir):
    for image_file in os.listdir(image_dir):
        if image_file.endswith(('.jpg', '.jpeg', '.png')):
            label, pred_proba, frame = process_image(os.path.join(image_dir, image_file))
            print(f"Image {image_file}: Status: {label} (Probability: {pred_proba:.2f})")
            cv2.putText(frame, f"Status: {label} ({pred_proba:.2f})", (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if label == "Healthy" else (255, 0, 0), 2)
            cv2.imshow('PD Detection', frame)
            if cv2.waitKey(2000) & 0xFF == ord('q'):  # Display for 2 seconds
                break
else:
    print(f"Directory {image_dir} not found. Proceeding with webcam.")

NotADirectoryError: [WinError 267] The directory name is invalid: 'pd_images/001.jpg'

In [None]:
# Process images (e.g., from a directory of PD patient images)
user_age = 60  # or set to the appropriate value
user_gender = 1  # 1 for male, 0 for female (adjust as needed)
image_dir = "pd_images/001.jpg"  # Update this to your directory with PD images
if os.path.exists(image_dir):
    for image_file in os.listdir(image_dir):
        if image_file.endswith(('.jpg', '.jpeg', '.png')):
            label, pred_proba, frame = process_image(os.path.join(image_dir, image_file))
            print(f"Image {image_file}: Status: {label} (Probability: {pred_proba:.2f})")
            cv2.putText(frame, f"Status: {label} ({pred_proba:.2f})", (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if label == "Healthy" else (255, 0, 0), 2)
            cv2.imshow('PD Detection', frame)
            if cv2.waitKey(2000) & 0xFF == ord('q'):  # Display for 2 seconds
                break
else:
    print(f"Directory {image_dir} not found. Proceeding with webcam.")

NotADirectoryError: [WinError 267] The directory name is invalid: 'pd_images/001.jpg'

In [None]:
# Process images (e.g., from a directory of PD patient images)
user_age = 60  # or set to the appropriate value
user_gender = 1  # 1 for male, 0 for female (adjust as needed)
image_dir = "pd_images/001.jpg"  # Update this to your directory with PD images
if os.path.exists(image_dir):
    for image_file in os.listdir(image_dir):
        if image_file.endswith(('.jpg', '.jpeg', '.png')):
            label, pred_proba, frame = process_image(os.path.join(image_dir, image_file))
            print(f"Image {image_file}: Status: {label} (Probability: {pred_proba:.2f})")
            cv2.putText(frame, f"Status: {label} ({pred_proba:.2f})", (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if label == "Healthy" else (255, 0, 0), 2)
            cv2.imshow('PD Detection', frame)
            if cv2.waitKey(2000) & 0xFF == ord('q'):  # Display for 2 seconds
                break
else:
    print(f"Directory {image_dir} not found. Proceeding with webcam.")

NotADirectoryError: [WinError 267] The directory name is invalid: 'pd_images/001.jpg'

In [None]:
# Process images (e.g., from a directory of PD patient images)
user_age = 60  # or set to the appropriate value
user_gender = 1  # 1 for male, 0 for female (adjust as needed)
image_dir = "pd_images/001.jpg"  # Update this to your directory with PD images
if os.path.exists(image_dir):
    for image_file in os.listdir(image_dir):
        if image_file.endswith(('.jpg', '.jpeg', '.png')):
            label, pred_proba, frame = process_image(os.path.join(image_dir, image_file))
            print(f"Image {image_file}: Status: {label} (Probability: {pred_proba:.2f})")
            cv2.putText(frame, f"Status: {label} ({pred_proba:.2f})", (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if label == "Healthy" else (255, 0, 0), 2)
            cv2.imshow('PD Detection', frame)
            if cv2.waitKey(2000) & 0xFF == ord('q'):  # Display for 2 seconds
                break
else:
    print(f"Directory {image_dir} not found. Proceeding with webcam.")

NotADirectoryError: [WinError 267] The directory name is invalid: 'pd_images/001.jpg'

In [None]:
# Process images (e.g., from a directory of PD patient images)
user_age = 60  # or set to the appropriate value
user_gender = 1  # 1 for male, 0 for female (adjust as needed)
image_dir = "pd_images/001.jpg"  # Update this to your directory with PD images
if os.path.exists(image_dir):
    for image_file in os.listdir(image_dir):
        if image_file.endswith(('.jpg', '.jpeg', '.png')):
            label, pred_proba, frame = process_image(os.path.join(image_dir, image_file))
            print(f"Image {image_file}: Status: {label} (Probability: {pred_proba:.2f})")
            cv2.putText(frame, f"Status: {label} ({pred_proba:.2f})", (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if label == "Healthy" else (255, 0, 0), 2)
            cv2.imshow('PD Detection', frame)
            if cv2.waitKey(2000) & 0xFF == ord('q'):  # Display for 2 seconds
                break
else:
    print(f"Directory {image_dir} not found. Proceeding with webcam.")

NotADirectoryError: [WinError 267] The directory name is invalid: 'pd_images/001.jpg'

In [None]:
# Process images (e.g., from a directory of PD patient images)
user_age = 60  # or set to the appropriate value
user_gender = 0  # 1 for male, 0 for female (adjust as needed)
image_dir = "/pd_images/002.jpg"  # Update this to your directory with PD images
if os.path.exists(image_dir):
    for image_file in os.listdir(image_dir):
        if image_file.endswith(('.jpg', '.jpeg', '.png')):
            label, pred_proba, frame = process_image(os.path.join(image_dir, image_file))
            print(f"Image {image_file}: Status: {label} (Probability: {pred_proba:.2f})")
            cv2.putText(frame, f"Status: {label} ({pred_proba:.2f})", (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if label == "Healthy" else (255, 0, 0), 2)
            cv2.imshow('PD Detection', frame)
            if cv2.waitKey(2000) & 0xFF == ord('q'):  # Display for 2 seconds
                break
else:
    print(f"Directory {image_dir} not found. Proceeding with webcam.")

In [None]:
# Process images (e.g., from a directory of PD patient images)
user_age = 60  # or set to the appropriate value
user_gender = 1  # 1 for male, 0 for female (adjust as needed)
image_dir = "/pd_images/004.jpg"  # Update this to your directory with PD images
if os.path.exists(image_dir):
    for image_file in os.listdir(image_dir):
        if image_file.endswith(('.jpg', '.jpeg', '.png')):
            label, pred_proba, frame = process_image(os.path.join(image_dir, image_file))
            print(f"Image {image_file}: Status: {label} (Probability: {pred_proba:.2f})")
            cv2.putText(frame, f"Status: {label} ({pred_proba:.2f})", (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if label == "Healthy" else (255, 0, 0), 2)
            cv2.imshow('PD Detection', frame)
            if cv2.waitKey(2000) & 0xFF == ord('q'):  # Display for 2 seconds
                break
else:
    print(f"Directory {image_dir} not found. Proceeding with webcam.")

In [121]:
# Webcam processing
cap = cv2.VideoCapture(0)
if not cap.isOpened():
    print("Error: Could not open webcam.")
    exit()

In [122]:
au_buffer = []
max_buffer_size = 150
user_age = 60
user_gender = 1

mp_drawing = mp.solutions.drawing_utils
drawing_spec = mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=1, circle_radius=1)

custom_connections = [
    (70, 71), (71, 63), (63, 105), (105, 70),
    (300, 299), (299, 293), (293, 334), (334, 300),
    (70, 300),
    (205, 147), (147, 187), (187, 205),
    (425, 367), (367, 407), (407, 425),
    (205, 425),
    (61, 291),
    (66, 107), (107, 297), (297, 332), (332, 66),
    (55, 285), (285, 296), (296, 54), (54, 55),
    (66, 55),
    (159, 145), (145, 386), (386, 374), (374, 159),
    (159, 386),
    (6, 197), (197, 168), (168, 6),
    (66, 55), (55, 107), (107, 285), (285, 66),
    (66, 285)
]

au_names = ['AU_01_t12', 'AU_06_t12', 'AU_12_t12', 'AU_04_t13', 'AU_07_t13', 
            'AU_09_t13', 'AU_01_t14', 'AU_02_t14', 'AU_04_t14']

In [123]:
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        print("Error: Could not read frame.")
        break
    
    frame_with_text = frame.copy()
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = face_mesh.process(frame_rgb)
    
    au_features = np.zeros(9)
    if results and results.multi_face_landmarks:
        au_features = extract_au_features(results.multi_face_landmarks[0], frame)
    
    au_buffer.append(au_features)
    if len(au_buffer) > max_buffer_size:
        au_buffer.pop(0)
    
    pred = 0
    pred_proba = 0.0
    avg_au = np.zeros(9)
    if au_buffer and len(au_buffer) >= 1:
        avg_au = np.mean(au_buffer, axis=0)
        # Ensure feature order matches training data
        input_features = np.array([avg_au[0], avg_au[1], avg_au[2], avg_au[3], 
                                 avg_au[4], avg_au[5], avg_au[6], avg_au[7], 
                                 avg_au[8], user_age, user_gender])  # Explicit order matching features
        input_scaled = scaler.transform([input_features])  # Use the full 11 features
        pred_proba = rf_model.predict_proba(input_scaled)[0][1]
        pred = 1 if pred_proba > 0.15 else 0  # Increased threshold to 0.15
        logging.info(f"Webcam - Prediction Probability: {pred_proba:.4f}, AU Values: {avg_au}")
    else:
        logging.info("Webcam - No face detected")
    
    label = "PD" if pred == 1 else "Healthy"
    color = (255, 0, 0) if pred == 1 else (0, 255, 0)
    cv2.putText(frame_with_text, f"Status: {label} ({pred_proba:.2f})", (10, 30), 
                cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
    
    au_colors = [(0, 255, 0) if v > 0.01 else (0, 0, 255) for v in avg_au]
    for i, (au_name, au_value) in enumerate(zip(au_names, avg_au)):
        color = au_colors[i % len(au_colors)]
        cv2.putText(frame_with_text, f"{au_name}: {au_value:.4f}", (10, 60 + i * 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 1)
    
    if results and results.multi_face_landmarks:
        frame_with_lines = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)
        for connection in custom_connections:
            start_point = results.multi_face_landmarks[0].landmark[connection[0]]
            end_point = results.multi_face_landmarks[0].landmark[connection[1]]
            start_pos = (int(start_point.x * frame_with_lines.shape[1]), int(start_point.y * frame_with_lines.shape[0]))
            end_pos = (int(end_point.x * frame_with_lines.shape[1]), int(end_point.y * frame_with_lines.shape[0]))
            if (0 <= start_pos[0] < frame_with_lines.shape[1] and 0 <= start_pos[1] < frame_with_lines.shape[0] and
                0 <= end_pos[0] < frame_with_lines.shape[1] and 0 <= end_pos[1] < frame_with_lines.shape[0]):
                cv2.line(frame_with_lines, start_pos, end_pos, (0, 255, 0), thickness=1)
                cv2.circle(frame_with_lines, start_pos, 1, (0, 255, 0), -1)
                cv2.circle(frame_with_lines, end_pos, 1, (0, 255, 0), -1)
        frame_with_text = cv2.addWeighted(frame_with_text, 1, frame_with_lines, 1, 0)
    
    cv2.imshow('PD Detection', frame_with_text)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

2025-07-20 23:19:53,758 - Webcam - Prediction Probability: 0.0562, AU Values: [0. 0. 0. 0. 0. 0. 0. 0. 0.]
2025-07-20 23:19:53,832 - Webcam - Prediction Probability: 0.1335, AU Values: [1.58063173e+00 6.53698146e+00 7.17374031e-05 3.66616249e-01
 1.30769610e+00 1.30822659e+00 1.58063173e+00 1.71054900e+00
 3.66616249e-01]
2025-07-20 23:19:53,851 - Webcam - Prediction Probability: 0.1364, AU Values: [2.05758711e+00 8.68033568e+00 9.94510173e-05 4.91970778e-01
 1.75963243e+00 1.74217224e+00 2.05758711e+00 2.27669676e+00
 4.91970778e-01]
2025-07-20 23:19:53,892 - Webcam - Prediction Probability: 0.1303, AU Values: [2.36394107e+00 9.62092280e+00 1.13364356e-04 4.38347459e-01
 1.77790672e+00 1.93954855e+00 2.36394107e+00 2.65044570e+00
 4.38347459e-01]
2025-07-20 23:19:53,917 - Webcam - Prediction Probability: 0.1303, AU Values: [2.51987219e+00 1.01777947e+01 1.22559909e-04 4.24513817e-01
 1.80160761e+00 2.05970287e+00 2.51987219e+00 2.85706401e+00
 4.24513817e-01]
2025-07-20 23:19:53,958 -

In [124]:
cap.release()
cv2.destroyAllWindows()
face_mesh.close()

In [125]:
np.save('au_buffer.npy', np.array(au_buffer))