In [None]:
import os, re
import numpy as np
from PIL import Image, ImageFilter
from skimage.morphology import skeletonize, dilation, square

# Paths for the four categories (update these as needed)
healthy_spiral_dir = "HealthySpiral"
healthy_meander_dir = "HealthyMeander"
patient_spiral_dir = "PatientSpiral"
patient_meander_dir = "PatientMeander"

# Helper function: Extract binary masks for HT and ET from an image
def extract_traces(image):
    """
    Given a PIL Image, separate it into Handwriting Trace (HT) and Exam Trace (ET) binary masks.
    Returns two boolean numpy arrays: ht_mask, et_mask.
    """
    # Optionally apply a blur to reduce noise (median blur for HT, mean blur for ET as per paper)
    # image = image.filter(ImageFilter.MedianFilter(size=5))   # for HT smoothing (if needed)
    # image = image.filter(ImageFilter.BLUR)                  # for ET smoothing (if needed)
    img_array = np.array(image.convert('RGB'))  # ensure RGB
    # Split color channels
    R, G, B = img_array[:,:,0], img_array[:,:,1], img_array[:,:,2]
    # Threshold for ET (exam template trace - usually black): pixel is part of ET if it is dark in all channels
    et_mask = (R < 90) & (G < 90) & (B < 90)    # dark (near black) pixels [oai_citation_attribution:10‡file-ppssohspef7crohxhfnw4j](file://file-PPSsohSpeF7cRoHxHfnW4J#:~:text=I%20represents%20each%20pixel%20,b)
    # Threshold for HT (handwritten trace): pixel is part of HT if it is not pure white and not part of ET.
    # We include colored/darker strokes while excluding the black template:
    ht_mask = (R < 200) & (G < 200) & (B < 200)  # pixel is somewhat dark in at least one channel [oai_citation_attribution:11‡file-ppssohspef7crohxhfnw4j](file://file-PPSsohSpeF7cRoHxHfnW4J#:~:text=Consider%20I%20to%20represent%20a,b)
    ht_mask = ht_mask & (~et_mask)              # exclude those identified as ET (black) [oai_citation_attribution:12‡file-ppssohspef7crohxhfnw4j](file://file-PPSsohSpeF7cRoHxHfnW4J#:~:text=three%20color%20channels%3A%20red%2C%20green%2C,b)
    # Optional: apply dilation to connect any breaks in ET (template) lines (paper used 4x4 dilation) [oai_citation_attribution:13‡file-ppssohspef7crohxhfnw4j](file://file-PPSsohSpeF7cRoHxHfnW4J#:~:text=%7B%200%20if%20I%28r%29%20,90)
    et_mask = dilation(et_mask, square(4))
    # Ensure boolean type
    ht_mask = ht_mask.astype(bool)
    et_mask = et_mask.astype(bool)
    return ht_mask, et_mask

# Load images and assign labels and grouping by patient
data_images = []    # list of feature vectors for each image
data_labels = []    # list of class labels (0=Healthy, 1=PD) for each image
data_patient_ids = []  # list of patient IDs corresponding to each image (for grouping)
# Regex to identify patient (or exam) ID from filename
id_pattern = re.compile(r'^(\d+)-')  # matches numeric ID at start of filename (before hyphen)

# Process each category folder
categories = [
    (healthy_spiral_dir, 0), (healthy_meander_dir, 0),
    (patient_spiral_dir, 1), (patient_meander_dir, 1)
]
for folder, label in categories:
    for filename in os.listdir(folder):
        if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
            img_path = os.path.join(folder, filename)
            image = Image.open(img_path).convert('L')  # convert to grayscale (L mode)
            # Extract HT and ET traces
            ht_mask, et_mask = extract_traces(Image.open(img_path))
            # Determine a patient/exam ID from filename for grouping
            m = id_pattern.match(filename)
            if m:
                patient_id = f"ID_{m.group(1)}"
            else:
                # Fallback: use the first numeric sequence in filename (or filename itself if none)
                nums = re.findall(r'\d+', filename)
                patient_id = f"ID_{nums[0]}" if nums else filename
            # Store image data and labels for feature extraction
            data_patient_ids.append(patient_id)
            data_labels.append(label)
            # We will compute features in the next section after obtaining the trace masks
            data_images.append((ht_mask, et_mask))

In [None]:
import math

# Helper: skeletonize mask and extract ordered path of coordinates
def skeleton_to_path(mask):
    """Convert a binary mask to a skeleton and return an ordered list of (y,x) coordinates along the skeleton."""
    # Skeletonize to one-pixel width trace
    skel = skeletonize(mask)
    coords = np.argwhere(skel)
    coords_set = {tuple(p) for p in coords}
    if not coords_set:
        return []
    # Identify largest connected component in skeleton (in case of any stray pixels)
    # We use BFS to find connected components
    largest_comp = set()
    while coords_set:
        comp = set()
        stack = [coords_set.pop()]
        comp.add(stack[0])
        while stack:
            cy, cx = stack.pop()
            # check 8-connected neighbors
            for dy in (-1,0,1):
                for dx in (-1,0,1):
                    if dy == 0 and dx == 0:
                        continue
                    ny, nx = cy+dy, cx+dx
                    if (ny, nx) in coords_set:
                        coords_set.remove((ny, nx))
                        comp.add((ny, nx))
                        stack.append((ny, nx))
        if len(comp) > len(largest_comp):
            largest_comp = comp
    coords_list = list(largest_comp)
    # Find endpoints (points with only one neighbor in skeleton)
    neighbors8 = [(-1,-1),(-1,0),(-1,1),(0,-1),(0,1),(1,-1),(1,0),(1,1)]
    end_points = []
    for p in coords_list:
        count = 0
        for dy, dx in neighbors8:
            if (p[0]+dy, p[1]+dx) in largest_comp:
                count += 1
        if count == 1:
            end_points.append(p)
    # Pick a start point (an endpoint if available, else arbitrary)
    start = end_points[0] if end_points else coords_list[0]
    # Order the skeleton points by walking from the start
    ordered_path = [start]
    visited = {tuple(start)}
    current = tuple(start)
    while True:
        next_pt = None
        for dy, dx in neighbors8:
            nbr = (current[0]+dy, current[1]+dx)
            if nbr in largest_comp and nbr not in visited:
                next_pt = nbr
                break
        if next_pt is None:
            break
        ordered_path.append(next_pt)
        visited.add(next_pt)
        current = next_pt
    return ordered_path

# Helper: resample path to N points evenly spaced by arc length
def resample_path(path, N):
    """Resample a polyline (list of (y,x) points) to have N points evenly spaced by arc length."""
    if len(path) == 0:
        return []
    # Compute cumulative distances along the path
    cumd = [0.0]
    for i in range(1, len(path)):
        y0,x0 = path[i-1]
        y1,x1 = path[i]
        dist = math.hypot(x1-x0, y1-y0)
        cumd.append(cumd[-1] + dist)
    total_length = cumd[-1]
    if total_length == 0:
        # All points the same
        return [path[0]] * N
    # sample target distances
    target_distances = [i*total_length/(N-1) for i in range(N)]
    target_distances[-1] = total_length  # ensure last exactly matches
    resampled = []
    j = 0
    for td in target_distances:
        # advance j until cumd[j] <= td < cumd[j+1]
        while j < len(cumd)-1 and cumd[j] < td:
            j += 1
        if cumd[j] == td or j == 0:
            # exact match or at start
            y, x = path[j]
        else:
            # interpolate between j-1 and j
            y0,x0 = path[j-1]; d0 = cumd[j-1]
            y1,x1 = path[j];   d1 = cumd[j]
            if d1 - d0 > 0:
                t = (td - d0) / (d1 - d0)
            else:
                t = 0
            y = y0 + t * (y1 - y0)
            x = x0 + t * (x1 - x0)
        resampled.append((y, x))
    return resampled

# Compute features for each image
feature_matrix = []
for (ht_mask, et_mask), patient_id in zip(data_images, data_patient_ids):
    # Skeletonize and extract ordered trace paths
    ht_path = skeleton_to_path(ht_mask)
    et_path = skeleton_to_path(et_mask)
    # If either path is empty (e.g., failed extraction), skip this image
    if len(ht_path) < 2 or len(et_path) < 2:
        feature_matrix.append([0]*9)  # placeholder (or continue to skip)
        continue
    # Resample both traces to the same number of points (based on the longer trace for resolution)
    N = max(len(ht_path), len(et_path), 300)  # use at least 300 points for resolution
    ht_path_res = resample_path(ht_path, N)
    et_path_res = resample_path(et_path, N)
    # Compute center of shape (we use the bounding box center of ET as approximation of shape center)
    et_coords = np.array(et_path_res)
    min_y, max_y = et_coords[:,0].min(), et_coords[:,0].max()
    min_x, max_x = et_coords[:,1].min(), et_coords[:,1].max()
    center_y, center_x = (min_y + max_y)/2.0, (min_x + max_x)/2.0
    # Compute radius for each sampled point
    radii_ht = []
    radii_et = []
    for (y_ht, x_ht), (y_et, x_et) in zip(ht_path_res, et_path_res):
        r_ht = math.hypot(x_ht - center_x, y_ht - center_y)
        r_et = math.hypot(x_et - center_x, y_et - center_y)
        radii_ht.append(r_ht)
        radii_et.append(r_et)
    radii_ht = np.array(radii_ht)
    radii_et = np.array(radii_et)
    diff = radii_ht - radii_et
    # Feature F1: RMS difference between radii
    F1 = math.sqrt(np.mean(diff**2))  # RMS [oai_citation_attribution:27‡file-ppssohspef7crohxhfnw4j](file://file-PPSsohSpeF7cRoHxHfnW4J#:~:text=F1%3A%20The%20root%20mean%20square,as%20the%20length%20of%20the)
    # F2: max |difference|
    F2 = np.max(np.abs(diff))
    # F3: min |difference|
    F3 = np.min(np.abs(diff))
    # F4: standard deviation of differences
    F4 = np.std(diff, ddof=0)
    # F5: Mean Relative Tremor (mean |r_HT[i] - r_HT[i-D]|, D=10) [oai_citation_attribution:28‡file-ppssohspef7crohxhfnw4j](file://file-PPSsohSpeF7cRoHxHfnW4J#:~:text=individual%E2%80%99s%20HT,imizing%20the%20PD%20detection%20rate) [oai_citation_attribution:29‡file-ppssohspef7crohxhfnw4j](file://file-PPSsohSpeF7cRoHxHfnW4J#:~:text=%28%7Cr%20i)
    D = 10
    if len(radii_ht) > D:
        tremor_diffs = []
        for i in range(D, len(radii_ht)):
            tremor_diffs.append(abs(radii_ht[i] - radii_ht[i-D]))
        F5 = np.mean(tremor_diffs)
    else:
        F5 = 0.0
    # F6: max ET radius
    F6 = np.max(radii_et)
    # F7: min ET radius
    F7 = np.min(radii_et)
    # F8: standard deviation of HT radii
    F8 = np.std(radii_ht, ddof=0)
    # F9: count of sign changes in (r_HT - r_ET) [oai_citation_attribution:30‡file-ppssohspef7crohxhfnw4j](file://file-PPSsohSpeF7cRoHxHfnW4J#:~:text=F8%3A%20Standard%20Deviation%20of%20ET,values)
    sign_diff = np.sign(diff)
    # Treat zeros as no sign for change counting
    sign_diff[sign_diff==0] = 1  # or could also skip zeros
    changes = 0
    for i in range(1, len(sign_diff)):
        if sign_diff[i] != sign_diff[i-1]:
            changes += 1
    F9 = changes
    features = [F1, F2, F3, F4, F5, F6, F7, F8, F9]
    feature_matrix.append(features)

feature_matrix = np.array(feature_matrix, dtype=float)

# Normalize features (z-score normalization per feature)
feat_mean = feature_matrix.mean(axis=0)
feat_std = feature_matrix.std(axis=0)
feature_matrix_norm = (feature_matrix - feat_mean) / (feat_std + 1e-8)

In [None]:
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression

# Prepare dataset grouping by patient for train-test split
patient_ids = np.array(data_patient_ids)
labels = np.array(data_labels)
unique_patients = np.unique(patient_ids)
# Determine a label per patient (all images of a patient share same class label)
patient_label_map = {}
for pid in unique_patients:
    # label of first occurrence of this patient
    pid_label = labels[patient_ids == pid][0]
    patient_label_map[pid] = pid_label
patient_labels = np.array([patient_label_map[pid] for pid in unique_patients])

# Train-test split at patient level (15% patients for test)
# Handle case where we have classes with only one sample - don't use stratify in that case
if np.min(np.bincount(patient_labels)) < 2:
    train_pids, test_pids = train_test_split(unique_patients, test_size=0.15, random_state=42)
    print("Warning: Using non-stratified split because some classes have only one sample")
else:
    train_pids, test_pids = train_test_split(unique_patients, test_size=0.15, stratify=patient_labels, random_state=42)
train_mask = np.isin(patient_ids, train_pids)
test_mask  = np.isin(patient_ids, test_pids)
X_train = feature_matrix_norm[train_mask]
y_train = labels[train_mask]
X_test = feature_matrix_norm[test_mask]
y_test = labels[test_mask]

# Define hyperparameter grids for each model
# SVM with RBF (and try linear, poly for completeness) – tune kernel, C, gamma, class_weight
svm_param_grid = [
    {'kernel': ['linear'], 
     'C': [0.1, 1, 10, 100], 
     'class_weight': [None, 'balanced']},
    {'kernel': ['rbf', 'poly'], 
     'C': [0.1, 1, 10, 100], 
     'gamma': ['scale', 'auto', 0.01, 0.1, 1], 
     'class_weight': [None, 'balanced']}
]
svm = SVC(probability=True)  # SVM model (we enable probability estimates for thresholding/AUC)
svm_grid_search = GridSearchCV(svm, svm_param_grid, cv=10, scoring='accuracy', n_jobs=-1)
svm_grid_search.fit(X_train, y_train)
best_svm = svm_grid_search.best_estimator_

# Logistic Regression with Elastic Net – tune C, l1_ratio, class_weight, fit_intercept
logreg_param_grid = {
    'C': [0.01, 0.1, 1, 10],
    'l1_ratio': [0.25, 0.5, 0.75, 1.0],  # 1.0 = L1 (lasso), 0.0 = L2 (ridge)
    'class_weight': [None, 'balanced'],
    'fit_intercept': [True, False],
    'solver': ['saga'],
    'penalty': ['elasticnet']
}
logreg = LogisticRegression(max_iter=10000)
logreg_grid_search = GridSearchCV(logreg, logreg_param_grid, cv=10, scoring='accuracy', n_jobs=-1)
logreg_grid_search.fit(X_train, y_train)
best_logreg = logreg_grid_search.best_estimator_

print("Best SVM parameters:", svm_grid_search.best_params_)
print("Best Logistic Regression parameters:", logreg_grid_search.best_params_)

In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix, roc_auc_score, precision_score

# 1. Evaluate on test images (image-level) – this part is fine
y_pred_svm = best_svm.predict(X_test)
y_pred_lr = best_logreg.predict(X_test)
y_prob_svm = best_svm.predict_proba(X_test)[:,1]  # Probability of class 1
y_prob_lr = best_logreg.predict_proba(X_test)[:,1]

acc_svm = accuracy_score(y_test, y_pred_svm)
acc_lr = accuracy_score(y_test, y_pred_lr)
auc_svm = roc_auc_score(y_test, y_prob_svm)
auc_lr = roc_auc_score(y_test, y_prob_lr)
tn_svm, fp_svm, fn_svm, tp_svm = confusion_matrix(y_test, y_pred_svm).ravel()
tn_lr, fp_lr, fn_lr, tp_lr = confusion_matrix(y_test, y_pred_lr).ravel()

sens_svm = tp_svm / (tp_svm + fn_svm) if tp_svm+fn_svm > 0 else 0.0
spec_svm = tn_svm / (tn_svm + fp_svm) if tn_svm+fp_svm > 0 else 0.0
sens_lr = tp_lr / (tp_lr + fn_lr) if tp_lr+fn_lr > 0 else 0.0
spec_lr = tn_lr / (tn_lr + fp_lr) if tn_lr+fp_lr > 0 else 0.0
ppv_svm = precision_score(y_test, y_pred_svm, pos_label=1)
ppv_lr = precision_score(y_test, y_pred_lr, pos_label=1)

print(f"SVM Image-level Accuracy: {acc_svm:.3f}, AUC: {auc_svm:.3f}, Sensitivity: {sens_svm:.3f}, Specificity: {spec_svm:.3f}")
print(f"LogReg Image-level Accuracy: {acc_lr:.3f}, AUC: {auc_lr:.3f}, Sensitivity: {sens_lr:.3f}, Specificity: {spec_lr:.3f}")

# 2. Build a map from global indices to local test indices
test_indices = np.where(test_mask)[0]  # global indices for test samples
global_to_test_index = {g_idx: i for i, g_idx in enumerate(test_indices)}

# 3. Aggregate to patient-level by majority vote
test_patient_preds_svm = {}
test_patient_preds_lr = {}

for pid in test_pids:
    true_label = patient_label_map[pid]
    # these are global indices
    img_indices = np.where((patient_ids == pid) & test_mask)[0]
    if len(img_indices) == 0:
        continue
    # convert to local test indices
    local_indices = [global_to_test_index[g_idx] for g_idx in img_indices]
    
    # count votes
    pred_votes_svm = np.sum(y_pred_svm[local_indices] == 1)
    pred_votes_lr = np.sum(y_pred_lr[local_indices] == 1)
    
    # majority vote
    patient_pred_svm = 1 if pred_votes_svm > (len(local_indices) / 2) else 0
    patient_pred_lr = 1 if pred_votes_lr > (len(local_indices) / 2) else 0
    
    test_patient_preds_svm[pid] = (true_label, patient_pred_svm)
    test_patient_preds_lr[pid] = (true_label, patient_pred_lr)

# 4. Calculate patient-level accuracy and confusion
tp_svm = fp_svm = tn_svm = fn_svm = 0
tp_lr = fp_lr = tn_lr = fn_lr = 0

for pid, (true_label, pred_label) in test_patient_preds_svm.items():
    if true_label == 1 and pred_label == 1:
        tp_svm += 1
    elif true_label == 1 and pred_label == 0:
        fn_svm += 1
    elif true_label == 0 and pred_label == 1:
        fp_svm += 1
    elif true_label == 0 and pred_label == 0:
        tn_svm += 1

for pid, (true_label, pred_label) in test_patient_preds_lr.items():
    if true_label == 1 and pred_label == 1:
        tp_lr += 1
    elif true_label == 1 and pred_label == 0:
        fn_lr += 1
    elif true_label == 0 and pred_label == 1:
        fp_lr += 1
    elif true_label == 0 and pred_label == 0:
        tn_lr += 1

patient_acc_svm = (tp_svm + tn_svm) / (tp_svm+tn_svm+fp_svm+fn_svm)
patient_acc_lr = (tp_lr + tn_lr) / (tp_lr+tn_lr+fp_lr+fn_lr)

sens_svm = tp_svm / (tp_svm + fn_svm) if (tp_svm+fn_svm) else 0
spec_svm = tn_svm / (tn_svm + fp_svm) if (tn_svm+fp_svm) else 0
sens_lr = tp_lr / (tp_lr + fn_lr) if (tp_lr+fn_lr) else 0
spec_lr = tn_lr / (tn_lr + fp_lr) if (tn_lr+fp_lr) else 0

print(f"SVM Patient-level Accuracy: {patient_acc_svm:.3f}, Sensitivity: {sens_svm:.3f}, Specificity: {spec_svm:.3f}")
print(f"LogReg Patient-level Accuracy: {patient_acc_lr:.3f}, Sensitivity: {sens_lr:.3f}, Specificity: {spec_lr:.3f}")

In [None]:
# Evaluate combined models on spiral vs meander test images
# Identify test indices for spirals and meanders (assuming file path or patient_id can tell shape; here we use the folder info stored in mask arrays)
# We know the order in data_images corresponds to categories in the same order we loaded.
test_idx = np.where(test_mask)[0]
# Reconstruct shape type from original category loading order (we appended in known sequence: healthy_spiral, healthy_meander, patient_spiral, patient_meander)
shapes = []
for idx in test_idx:
    # We can infer shape by the index position from loading sequence:
    # (Not elegant; ideally file names or a separate array indicating shape)
    # For simplicity, assume if file came from a spiral folder, we tagged it in its name or order.
    # Here, we'll just check the corresponding original mask tuple in data_images: 
    # if it came from healthy_spiral_dir or patient_spiral_dir, it was loaded before any meander in each half.
    shapes.append('spiral' if idx < len(os.listdir(healthy_spiral_dir)) + len(os.listdir(patient_spiral_dir)) else 'meander')
# Now compute accuracy for each shape subset
spiral_idx = [i for i,sh in zip(test_idx, shapes) if sh == 'spiral']
meander_idx = [i for i,sh in zip(test_idx, shapes) if sh == 'meander']
acc_svm_spiral = accuracy_score(y_test[[np.where(test_idx==i)[0][0] for i in spiral_idx]], y_pred_svm[[np.where(test_idx==i)[0][0] for i in spiral_idx]]) if spiral_idx else None
acc_svm_meander = accuracy_score(y_test[[np.where(test_idx==i)[0][0] for i in meander_idx]], y_pred_svm[[np.where(test_idx==i)[0][0] for i in meander_idx]]) if meander_idx else None
acc_lr_spiral = accuracy_score(y_test[[np.where(test_idx==i)[0][0] for i in spiral_idx]], y_pred_lr[[np.where(test_idx==i)[0][0] for i in spiral_idx]]) if spiral_idx else None
acc_lr_meander = accuracy_score(y_test[[np.where(test_idx==i)[0][0] for i in meander_idx]], y_pred_lr[[np.where(test_idx==i)[0][0] for i in meander_idx]]) if meander_idx else None

print(f"SVM Accuracy on Spiral drawings: {acc_svm_spiral:.3f}, on Meander drawings: {acc_svm_meander:.3f}")
print(f"LogReg Accuracy on Spiral drawings: {acc_lr_spiral:.3f}, on Meander drawings: {acc_lr_meander:.3f}")