# PATIENT MOVEMENT ANALYSIS FOR PARKINSON'S DISEASE SEVERITY PREDICTION

### This project is a video classification model for Parkinson's disease severity using gait analysis. 

## SETUP

In [None]:
import json
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from sklearn.metrics import classification_report, confusion_matrix
import os
import cv2
from optuna import create_study
from optuna.samplers import TPESampler  # Import TPESampler

## LOAD KEYPOINTS

This function loads the keypoints from the JSON files. It returns the keypoints as a list.<br>
<br>
The keypoints have been extracted using the AlphaPose pose estimator and the Halpe Full-Body Human Keypoints Dataset.

In [None]:
def load_keypoints_from_json(json_file):
    with open(json_file, 'r') as f:
        data = json.load(f)

    keypoints_list = []
    for element in data:
        keypoints = np.array(element['keypoints']).reshape(-1, 3)
        keypoints_list.append(keypoints)

    return keypoints_list

## FEATURE EXTRACTION

The feature extraction function takes in a list of keypoints as a parameter. <br>

The features extracted for Parkinson's disease severity prediction are the following: <br>

- Distance between the nose and the approximate center of the feet, indicating posture.
- Average confidence score of the wrists, indicating arm steadiness (tremors).
- Average distance between the nose and the wrists, indicating arm swing.
- Ratio of the distances between the nose and the left and right wrists, indicating arm swing symmetry.
- Maximum height of the ankles, indicating step height.
- Distance between the left and right ankles, indicating step length.

In [None]:
def extract_features(keypoints_list):
    total_nose_to_foot_distance = 0
    total_wrist_steadiness = 0
    total_nose_to_wrist_distance = 0
    total_arm_swing_symmetry = 0
    max_ankle_height = 0
    total_step_length = 0
    num_frames = 0

    #indices of keypoints
    nose_idx = 0
    left_wrist_idx = 9
    right_wrist_idx = 10
    left_ankle_idx = 15
    right_ankle_idx = 16

    for keypoints in keypoints_list:
        if keypoints.shape[0] < 17:
            continue
       
        nose = keypoints[nose_idx]
        left_wrist = keypoints[left_wrist_idx]
        right_wrist = keypoints[right_wrist_idx]
        left_ankle = keypoints[left_ankle_idx]
        right_ankle = keypoints[right_ankle_idx]

        # features 
        nose_to_foot_distance = np.linalg.norm(nose[:2] - ((left_ankle[:2] + right_ankle[:2]) / 2))
        wrist_steadiness = np.mean([left_wrist[2], right_wrist[2]])
        nose_to_wrist_distance = np.mean([np.linalg.norm(nose[:2] - left_wrist[:2]), np.linalg.norm(nose[:2] - right_wrist[:2])])
        arm_swing_symmetry = np.abs(np.linalg.norm(nose[:2] - left_wrist[:2]) / np.linalg.norm(nose[:2] - right_wrist[:2]))
        max_ankle_height = np.max([left_ankle[1], right_ankle[1]])
        step_length = np.linalg.norm(left_ankle[:2] - right_ankle[:2])

        #Update Values
        total_nose_to_foot_distance += nose_to_foot_distance
        total_wrist_steadiness += wrist_steadiness
        total_nose_to_wrist_distance += nose_to_wrist_distance
        total_arm_swing_symmetry += arm_swing_symmetry
        max_ankle_height = max(max_ankle_height, np.max([left_ankle[1], right_ankle[1]]))
        total_step_length += step_length
        num_frames += 1

    if num_frames > 0:
        avg_nose_to_foot_distance = total_nose_to_foot_distance / num_frames
        avg_wrist_steadiness = total_wrist_steadiness / num_frames
        avg_nose_to_wrist_distance = total_nose_to_wrist_distance / num_frames
        avg_arm_swing_symmetry = total_arm_swing_symmetry / num_frames
        avg_step_length = total_step_length / num_frames
    else:
        return None
    
    return np.array([
        avg_nose_to_foot_distance,
        avg_wrist_steadiness,
        avg_nose_to_wrist_distance,
        avg_arm_swing_symmetry,
        max_ankle_height,
        avg_step_length
    ])

## LOAD FEATURES

This function loads the keypoints from all the json files and calls the extract_features() function on each file. <br>

It returns a list of lists of features and a list of their respective labels.

In [None]:
def load_features(data_dir):
    features = []
    labels = []

    for severity in ['NORMAL', 'MILD', 'MODERATE', 'SEVERE']:
        dir_path = os.path.join(data_dir, severity)

        for json_file in os.listdir(dir_path):
            file_path = os.path.join(dir_path, json_file)
            keypoints = load_keypoints_from_json(file_path)
            #filtered_keypoints = filter_keypoints_by_confidence(keypoints)

            if len(keypoints) > 0:
                feature = extract_features(keypoints)
                if feature is not None:
                    features.append(feature)
                labels.append(severity)

    return np.array(features), np.array(labels)

Create model function

In [None]:
def create_model(n_neurons, dropout_rate):
  model = Sequential()
  model.add(Dense(n_neurons, input_dim=6, activation='relu'))
  model.add(Dropout(dropout_rate))
  model.add(Dense(n_neurons, activation='relu'))
  model.add(Dropout(dropout_rate))
  model.add(Dense(4, activation='softmax'))

  model.compile(loss='categorical_crossentropy',
                optimizer='adam',
                metrics=['accuracy'])

  return model


features, labels = load_features('JSON/')

In [None]:
# One-hot encode the labels
label_dict = {'NORMAL': 0, 'MILD': 1, 'MODERATE': 2, 'SEVERE': 3}
# Use label_dict to convert labels to integers
labels = np.array([label_dict[label] for label in labels.flatten()])
labels_one_hot = tf.keras.utils.to_categorical(labels, num_classes=4)

# Split data into training sets and testing sets
X_train, X_test, y_train, y_test = train_test_split(features, labels_one_hot, test_size=0.2, random_state=42)


In [None]:
def objective(trial):
  # Suggest hyperparameters using trial object
  n_neurons = trial.suggest_int("n_neurons", 32, 128)  # Define search range
  dropout_rate = trial.suggest_float("dropout_rate", 0.2, 0.5) 

  model = create_model(n_neurons, dropout_rate)

  model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

  model.fit(X_train, y_train, epochs=10, validation_data=(X_test, y_test))

  _, accuracy = model.evaluate(X_test, y_test)
  return accuracy

In [None]:


study = create_study(direction="maximize") 


In [None]:
study.optimize(objective, n_trials=100)

In [None]:
best_trial = study.best_trial
best_params = best_trial.params

In [None]:
model = create_model(best_params["n_neurons"], best_params["dropout_rate"])
model.fit(X_train, y_train, epochs=10, validation_data=(X_test, y_test))

Train the model

Evaluate the model

In [None]:
# Evaluate the model
scores = model.evaluate(X_test, y_test, verbose=0)
print("CNN Error: %.2f%%" % (100-scores[1]*100))

# Predict the testing set
y_pred = np.argmax(model.predict(X_test), axis=-1)
y_test = np.argmax(y_test, axis=-1)

# Print classification report and confusion matrix
print(classification_report(y_test, y_pred))
print(confusion_matrix(y_test, y_pred))

In [None]:
def predict_single_instance(json_file, model):
  """
  Loads keypoints from a JSON file, extracts features, and predicts the severity class.

  Args:
      json_file: Path to the JSON file containing keypoints.
      model: Trained model for prediction.

  Returns:
      Predicted severity class (string).
  """
  # Load keypoints from JSON
  keypoints = load_keypoints_from_json(json_file)

  # Extract features from keypoints
  features = extract_features(keypoints)

  # Ensure features are extracted successfully
  if features is None:
      return "Failed to extract features"

  # Reshape features for model prediction (might be necessary)
  features = np.expand_dims(features, axis=0)  # Add an extra dimension for batch size

  # Predict class probabilities
  predictions = model.predict(features)

  # Get the predicted class index with the highest probability
  predicted_class_index = np.argmax(predictions, axis=1)[0]

  # Convert class index to severity label using the label dictionary
  predicted_class = list(label_dict.keys())[predicted_class_index]

  return predicted_class

# Example usage (replace 'path/to/your/file.json' with your actual file path)
new_data_file = "results.json"
predicted_severity = predict_single_instance(new_data_file, model)
print(f"Predicted severity for {new_data_file}: {predicted_severity}")

## Video Analysis with OpenCV