# Análisis automático de alteraciones en la escritura manuscrita debidas al Párkinson

In [None]:
import numpy as np
import os
import pickle
import random
import shutil
import sys
import time
import zipfile
import matplotlib.lines as mlines
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import (
    accuracy_score,
    confusion_matrix,
    f1_score,
    precision_score,
    recall_score,
)

#### Para uso en Colab. La estructura de archivos esperada se explica en la memoria.
Se recomienda ejecutar el proyecto localmente porque para entrenar la red neuronal convolucional y generar las predicciones se hace uso de la CPU.

In [None]:
%%script false --no-raise-error
# Colab. Necesario para subir PaHaW.zip, los módulos y otros archivos.
from google.colab import files
files.upload()

In [None]:
%%script false --no-raise-error
# Colab. Se extrae PaHaW.zip
try:
    with zipfile.ZipFile("PaHaW.zip") as zip_pahaw:
        zip_pahaw.extractall()
        print("Se ha extraído PaHaW.zip")
except:
    print("Error al extraer PaHaW.zip")

In [None]:
%%script false --no-raise-error
# Colab (opcional). Se extrae generated.zip
try:
    with zipfile.ZipFile("generated.zip") as zip_generated:
        zip_generated.extractall()
        print("Se ha extraído generated.zip")
except:
    print("Error al extraer generated.zip")

In [None]:
import pahaw_loader
import nn

#### Valores relevantes del proyecto.

In [None]:
# Ruta para cargar (si existe) o guardar el modelo entrenado.
model_file_path = os.path.join("generated", "default.obj")

training_epochs = 30  # En la memoria se usó 30 en todos los casos.
random_seed = 17  # En la memoria se usó 17 en todos los casos.
task_number = 2  # El trabajo es compatible con las tareas 2, 3 y 4.
clipping_side_size = 111  # Ha de ser impar y mayor de 71. Se recomienda 111.
clipping_jump_size = 1  # Si es mayor de 1 hay puntos que pueden quedar sin predicción.

In [None]:
# Primero se cargan los sujetos y las tareas.
subjects_pd_status_years, subjects_tasks = pahaw_loader.load()

# Para las configuraciones 2, 3 y 4.
worst_pd_id_list = [4, 7, 9, 17, 18, 19, 53, 55]
best_h_id_list = [30, 49, 52, 67, 69, 70, 84, 87]

# Para la configuración 4.
outliers_false_negatives = [20, 24, 33, 43, 44, 74, 75, 78]
outliers_false_positives = [28, 57, 83, 91]

#### Selección de sujetos para entrenamiento y test de la red neuronal convolucional.

In [None]:
# %%script false --no-raise-error
# Para la configuración 1. Se usan todos los sujetos PaHaW en base a su estado PD.
h_id_list = []
pd_id_list = []

subjects_id_list = list(subjects_tasks.keys())

for subject_id in subjects_id_list:
    if subjects_pd_status_years[subject_id][0] == 0:
        h_id_list.append(subject_id)
    else:
        pd_id_list.append(subject_id)

random.Random(random_seed).shuffle(h_id_list)
random.Random(random_seed).shuffle(pd_id_list)

# Hay 75 sujetos en total, 62 para entrenamiento y 13 para test.
test_id_list = h_id_list[:7]
train_id_list = h_id_list[7:]
test_id_list.extend(pd_id_list[:6])
train_id_list.extend(pd_id_list[6:])

In [None]:
%%script false --no-raise-error
# Para la configuración 2. Se usan los 16 sujetos seleccionados según las
# características PD visibles y todos los sujetos PaHaW en base a su estado PD.
h_id_list = []
pd_id_list = []

subjects_id_list = list(subjects_tasks.keys())

for subject_id in subjects_id_list:
    if subjects_pd_status_years[subject_id][0] == 0:
        h_id_list.append(subject_id)
    else:
        pd_id_list.append(subject_id)

for subject_id in worst_pd_id_list:
    pd_id_list.remove(subject_id)

for subject_id in best_h_id_list:
    h_id_list.remove(subject_id)

random.Random(random_seed).shuffle(worst_pd_id_list)
random.Random(random_seed).shuffle(best_h_id_list)
random.Random(random_seed).shuffle(h_id_list)
random.Random(random_seed).shuffle(pd_id_list)

# Hay 75 sujetos en total, 62 para entrenamiento y 13 para test.
test_id_list = worst_pd_id_list[:4]
train_id_list = worst_pd_id_list[4:]
test_id_list.extend(best_h_id_list[:4])
train_id_list.extend(best_h_id_list[4:])
test_id_list.extend(h_id_list[:3])
train_id_list.extend(h_id_list[3:])
test_id_list.extend(pd_id_list[:2])
train_id_list.extend(pd_id_list[2:])

In [None]:
%%script false --no-raise-error
# Para las configuraciones 3 y 4. Se usan únicamente los 16 sujetos seleccionados.
random.Random(random_seed).shuffle(worst_pd_id_list)
random.Random(random_seed).shuffle(best_h_id_list)

# Hay 16 sujetos en total, 14 para entrenamiento y 2 para test.
test_id_list = worst_pd_id_list[:1]
train_id_list = worst_pd_id_list[1:]

test_id_list.extend(best_h_id_list[:1])
train_id_list.extend(best_h_id_list[1:])

#### Generación de los recortes para los grupos de entrenamiento y test.

In [None]:
# Ahora se generan todos los recortes y los diccionarios para entrenamiento y test.
print(
    f"Generando los recortes para train ({len(train_id_list)}) y test ({len(test_id_list)})."
)
start_time = time.time()

train_id_list.sort()
test_id_list.sort()

# Las imágenes de los recortes se guardan en un directorio cuyo nombre depende de la tarea, del tamaño
# de salto y del tamaño de recorte. De esta manera pueden ser usadas en ejecuciones posteriores.
subjects_id_list = train_id_list + test_id_list
for subject_id in subjects_id_list:
    for letters_set in subjects_tasks[subject_id][task_number].letters_sets_list:
        letters_set.generate_clippings(clipping_side_size, clipping_jump_size)

# Las imágenes de los recortes generados se copian a los directorios
# de entrenamiento y test en base al grupo al que pertenece el sujeto.
shutil.rmtree(os.path.join("generated", "test"), ignore_errors=True)
os.makedirs(os.path.join("generated", "test"))
shutil.rmtree(os.path.join("generated", "train"), ignore_errors=True)
os.makedirs(os.path.join("generated", "train"))

test_clippings = {}
test_clippings_pd = {}
train_clippings = {}
train_clippings_pd = {}

for subject_id in test_id_list:
    for letters_set in subjects_tasks[subject_id][task_number].letters_sets_list:
        for clipping in letters_set.clippings_list:
            test_clippings[clipping.name] = clipping
            test_clippings_pd[clipping.name] = subjects_pd_status_years[subject_id][0]
            clipping.copy_clipping("test")

for subject_id in train_id_list:
    for letters_set in subjects_tasks[subject_id][task_number].letters_sets_list:
        for clipping in letters_set.clippings_list:
            train_clippings[clipping.name] = clipping
            train_clippings_pd[clipping.name] = subjects_pd_status_years[subject_id][0]
            clipping.copy_clipping("train")

end_time = time.time()
print(f"Recortes generados en {(end_time - start_time):.2f} segundos.")

#### Entrenamiento de la red neuronal convolucional.

In [None]:
# Aquí se entrena el modelo si es necesario.
train_dataset = nn.ClippingsDataset("train", train_clippings_pd, train_clippings)
train_loader = nn.DataLoader(train_dataset, batch_size=10, shuffle=True)
test_dataset = nn.ClippingsDataset("test", test_clippings_pd, test_clippings)
test_loader = nn.DataLoader(test_dataset, batch_size=10, shuffle=False)

# Si el modelo existe en disco se intenta cargar y se salta el entrenamiento.
if os.path.exists(model_file_path):
    file = open(model_file_path, "rb")
    model = pickle.load(file)
    file.close()
    if model.check(
        task_number,
        clipping_side_size,
        clipping_jump_size,
        training_epochs,
        train_id_list,
    ):
        # Si el modelo en la ruta es el esperado, se muestran los resultados del entrenamiento.
        print("Modelo cargado con éxito:")
        for results_i in model.train_results:
            if (results_i["epoch"] + 1) % 5 == 0:
                print(
                    f"Epoch: {results_i['epoch'] + 1}/{len(model.train_results)} | Trai"
                    f"n loss: {results_i['train_loss']:.4f}, Train accuracy: "
                    f"{(results_i['train_accuracy'] * 100):.2f} | Test loss: "
                    f"{results_i['test_loss']:.4f}, Test accuracy: "
                    f"{(results_i['test_accuracy'] * 100):.2f}"
                )

    # Si el modelo no es el esperado, se para la ejecución y se muestra el error.
    else:
        print("Error.")
        print(f"Modelo esperado:")
        print(
            f"Tarea: {task_number} | Tamaño: {clipping_side_size} | Salto: "
            f"{clipping_jump_size} | Epochs: {training_epochs} | Sujetos: "
            f"{train_id_list}"
        )
        print(f"Modelo cargado:")
        print(
            f"Tarea: {model.task_number} | Tamaño: {model.clipping_side_size} | Salto"
            f": {model.clipping_jump_size} | Epochs: {model.training_epochs} | Sujeto"
            f"s: {model.train_id_list}"
        )
        print("Operación abortada.")
        sys.exit()

# Si no hay un modelo en la ruta, se entrena uno nuevo.
else:
    model = nn.ConvolutionalNetwork(
        task_number,
        clipping_side_size,
        clipping_jump_size,
        training_epochs,
        train_id_list,
    )
    print("Empieza el entrenamiento:")
    start_time = time.time()

    # Se obtiene el modelo y los datos del entrenamiento.
    model = nn.train(model, training_epochs, train_loader, test_loader)
    end_time = time.time()

    print(f"Modelo entrenado en {(end_time - start_time):.2f} segundos.")

    # Se guarda el modelo para no tener que repetir el entrenamiento.
    file = open(model_file_path, "wb")
    pickle.dump(model, file)
    file.close()

#### Se generan las predicciones sobre los sujetos deseados.

In [None]:
# Una vez entrenado el modelo se puede usar para generar predicciones.
# Posibles valores, entre otros: list(subjects_tasks.keys()), [1, 2, 3], subjects_id_list, test_id_list
predict_subjects = list(subjects_tasks.keys())

# Se descartan los sujetos que no existen y los que se han usado para entrenamiento.
target_subjects = []
for subject_id in predict_subjects:
    if subject_id not in list(subjects_tasks.keys()):
        print(f"El sujeto {subject_id} no existe, descartado.")

    # Comentar si se quiere realizar la predicción sobre algún sujeto de entrenamiento.
    elif subject_id in train_id_list:
        print(f"El sujeto {subject_id} se ha usado en el entrenamiento, descartado.")

    # Para la configuración 4, se descartan los valores atípicos.
    # elif subject_id in (outliers_false_negatives or outliers_false_positives):
    #     print(f"El sujeto {subject_id} es un valor atípico en la predicción, descartado.")

    else:
        target_subjects.append(subject_id)

# Se generan los recortes de los sujetos y los diccionarios para realizar la predicción.
print("Generando los recortes para realizar las predicciones")
start_time = time.time()

# Las imágenes de los recortes se guardan en un directorio cuyo nombre depende de la tarea, del tamaño
# de salto y del tamaño de recorte. De esta manera pueden ser usadas en ejecuciones posteriores.
for subject_id in target_subjects:
    for letters_set in subjects_tasks[subject_id][task_number].letters_sets_list:
        letters_set.generate_clippings(clipping_side_size, clipping_jump_size)

# Las imágenes de los recortes generados se copian al directorio temp para realizar las predicciones.
shutil.rmtree(os.path.join("generated", "temp"), ignore_errors=True)
os.makedirs(os.path.join("generated", "temp"))

temp_clippings = {}
temp_clippings_pd = {}

for subject_id in target_subjects:
    for letters_set in subjects_tasks[subject_id][task_number].letters_sets_list:
        for clipping in letters_set.clippings_list:
            temp_clippings[clipping.name] = clipping
            temp_clippings_pd[clipping.name] = subjects_pd_status_years[subject_id][0]
            clipping.copy_clipping("temp")

end_time = time.time()
print(f"Recortes generados en {(end_time - start_time):.2f} segundos.")

# Se generan las predicciones.
print("Generando las predicciones.")
start_time = time.time()

temp_dataset = nn.ClippingsDataset("temp", temp_clippings_pd, temp_clippings)
nn.predict(model, temp_dataset, clipping_side_size)

# Se generan las predicciones a nivel de trazo, conjunto de letras y tarea.
for subject_id in target_subjects:
    subjects_tasks[subject_id][task_number].generate_prediction_results()

end_time = time.time()
print(f"Predicciones generadas en {(end_time - start_time):.2f} segundos.")

#### Se generan los resultados.

In [None]:
# Por último se genera el plot de las tareas sobre las que
# se ha realizado la predicción y se guardan los resultados.
shutil.rmtree(os.path.join("results", f"{task_number}_predicted_by_id"), ignore_errors=True)
os.makedirs(os.path.join("results", f"{task_number}_predicted_by_id"))

px = 1 / plt.rcParams["figure.dpi"]

for subject_id in target_subjects:
    fig, ax = plt.subplots()
    fig.set_figwidth(1200 * px)
    ax.set_aspect("equal", "box")
    ax.axis("off")
    plt.tight_layout()

    colors = "tab:green", "tab:red"
    ls_predicted_text = "H", "PD"
    task_predicted_text = "\nPredicción: Sano.", "\nPredicción: sufre PD."
    no_predicted_coordinates = False

    for letters_set in subjects_tasks[subject_id][task_number].letters_sets_list:
        # Primero se genera el plot de los trazos originales en base a la predicción
        # de la letra. De esta manera no quedan huecos entre los nuevos trazos.
        for stroke in letters_set.strokes_list:
            stroke_x_list = stroke.get_x_coordinates_list()
            stroke_y_list = stroke.get_y_coordinates_list()
            ax.plot(
                stroke_x_list, stroke_y_list, color=colors[letters_set.pd_predicted]
            )

        # Se superponen los trazos de la predicción.
        for stroke in letters_set.predicted_strokes_list:
            stroke_x_list = stroke.get_x_coordinates_list()
            stroke_y_list = stroke.get_y_coordinates_list()
            ax.plot(stroke_x_list, stroke_y_list, color=colors[stroke.pd_predicted])

        # Se muestran las coordenadas sin predicción.
        for coordinate in letters_set.unpredicted_coordinates_list:
            ax.scatter(coordinate[0], coordinate[1], color="tab:purple")

        if len(letters_set.unpredicted_coordinates_list) > 0:
            no_predicted_coordinates = True

        # Se muestra la predicción a nivel de conjunto de letras.
        text_coordinates = letters_set.get_text_coordinates()
        plt.text(
            text_coordinates[0],
            text_coordinates[1],
            ls_predicted_text[letters_set.pd_predicted],
            color=colors[letters_set.pd_predicted],
        )

    title_string = f"Tarea {task_number} del sujeto {subject_id} ("
    if subjects_pd_status_years[subject_id][0] == 1:
        title_string += (
            f"sufre PD desde hace {subjects_pd_status_years[subject_id][1]} años)."
        )
    else:
        title_string += "sano)"

    title_string += task_predicted_text[
        subjects_tasks[subject_id][task_number].pd_predicted
    ]
    plt.suptitle(title_string)

    h_line = mlines.Line2D([], [], color="tab:green", label="Sano")
    pd_line = mlines.Line2D([], [], color="tab:red", label="PD")
    np_point = mlines.Line2D(
        [], [], color="tab:purple", marker="o", markersize=10, label="Punto sin recorte"
    )

    if no_predicted_coordinates:
        fig.legend(handles=[h_line, pd_line, np_point], loc="upper left")
    else:
        fig.legend(handles=[h_line, pd_line], loc="upper left")

    plt.savefig(
        os.path.join("results", f"{task_number}_predicted_by_id", f"{subject_id}.png")
    )
    plt.close()

output_file_path = os.path.join("results", f"{task_number}_prediction_results.txt")
output_file = open(output_file_path, "a", encoding="utf-8")

# Generación de la matriz de confusión de conjuntos y tareas.
# Corrección, precisión, recall, F1-score
real_value = []
predicted_value = []

for subject_id in target_subjects:
    if subjects_pd_status_years[subject_id][0] == 0:
        real_value.append("H")
    else:
        real_value.append("PD")
    if subjects_tasks[subject_id][task_number].pd_predicted == 0:
        predicted_value.append("H")
    else:
        predicted_value.append("PD")

real_value = np.array(real_value)
predicted_value = np.array(predicted_value)

c_matrix = confusion_matrix(real_value, predicted_value)

sns.heatmap(c_matrix, annot=True, fmt="g", xticklabels=["H", "PD"], yticklabels=["H", "PD"])  # type: ignore
plt.ylabel("Predicción", fontsize=13)
plt.xlabel("Valor real", fontsize=13)
plt.title("Matriz de confusión (tareas)", fontsize=17)

plt.savefig(os.path.join("results", f"{task_number}_tasks_confusion_matrix.png"))
plt.close()

output_file.write(
    f"Semilla: {random_seed} | Tamaño: {clipping_side_size}\nTareas: Corrección: "
    f"{accuracy_score(real_value, predicted_value)*100:.2f}% | Precisión: "
    f"{precision_score(real_value, predicted_value, pos_label='PD')*100:.2f}% | Recall: "
    f"{recall_score(real_value, predicted_value, pos_label='PD')*100:.2f}% | "
    f"F1-score: {f1_score(real_value, predicted_value, pos_label='PD')*100:.2f}%\n"
)

real_value = []
predicted_value = []

for subject_id in target_subjects:
    for letters_set in subjects_tasks[subject_id][task_number].letters_sets_list:
        if subjects_pd_status_years[subject_id][0] == 0:
            real_value.append("H")
        else:
            real_value.append("PD")
        if letters_set.pd_predicted == 0:
            predicted_value.append("H")
        else:
            predicted_value.append("PD")

real_value = np.array(real_value)
predicted_value = np.array(predicted_value)

c_matrix = confusion_matrix(real_value, predicted_value)

sns.heatmap(c_matrix, annot=True, fmt="g", xticklabels=["H", "PD"], yticklabels=["H", "PD"])  # type: ignore
plt.ylabel("Predicción", fontsize=13)
plt.xlabel("Valor real", fontsize=13)
plt.title("Matriz de confusión (letras)", fontsize=17)

plt.savefig(os.path.join("results", f"{task_number}_letters_confusion_matrix.png"))
plt.close()

output_file.write(
    f"Conjuntos: Corrección: {accuracy_score(real_value, predicted_value)*100:.2f}% | "
    f"Precisión: {precision_score(real_value, predicted_value, pos_label='PD')*100:.2f}% | "
    f"Recall: {recall_score(real_value, predicted_value, pos_label='PD')*100:.2f}% | "
    f"F1-score: {f1_score(real_value, predicted_value, pos_label='PD')*100:.2f}%\n"
)
output_file.close()

In [None]:
%%script false --no-raise-error
# Colab. Para poder descargar los resultados.
shutil.make_archive("results", "zip", "results")