In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import pandas as pd
import numpy as np
import os
from typing import *

from cvmt.utils import (load_yaml_params, nested_dict_to_easydict)
from cvmt.utils import (
    img_coord_2_cartesian_coord,
    translate_landmarks,
    rotate_landmarks,
    plot_landmarks,
    normalize_coords,
    plot_image_and_vertebral_landmarks,
)

from cvmt.ml.utils import download_wandb_model_checkpoint
from cvmt.ml.trainer import create_dataloader, max_indices_4d_tensor
from cvmt.inference.inference import load_pretrained_model_eval_mode
import torch
from sklearn.cluster import KMeans, DBSCAN, SpectralClustering
import matplotlib.pyplot as plt

from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler, MinMaxScaler

In [None]:
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 100)

In [None]:
def post_process_vertebral_landmarks(
    landmarks: np.ndarray,
    swap_x_y: bool = False,
    plot: bool = False,
    retrieve_orig_position: bool = False,
) -> np.ndarray:
    """Rotate, translate, and normalize the vertebral landmarks. The
    normalization is done with respect to the distance between the 2
    points at the left and right of the base of each shape.
    """
    c2, c3, c4 = landmarks[0:3].copy(), landmarks[3:8].copy(), landmarks[8:].copy()
    c2_cart = img_coord_2_cartesian_coord(c2, swap_x_y) 
    c2_ref_index_tr = 0
    c2_trns = translate_landmarks(c2_cart, ref_index=c2_ref_index_tr)
    c2_ref_index_rot = 2
    c2_trns_rot = rotate_landmarks(c2_trns, ref_index=c2_ref_index_rot)
    c2_trns_rot_nr = 2
    c2_trns_rot_n = normalize_coords(
        landmarks=c2_trns_rot,
        ref_index=c2_trns_rot_nr,
        height_wise=False,
    )
    if plot:
        plot_landmarks(c2_trns_rot_n)
    c3_cart = img_coord_2_cartesian_coord(c3, swap_x_y)
    c3_ref_index_tr = 1
    c3_trns = translate_landmarks(c3_cart, ref_index=c3_ref_index_tr)
    c3_ref_index_rot = 3
    c3_trns_rot = rotate_landmarks(c3_trns, ref_index=c3_ref_index_rot)
    c3_ref_index_nr = 3
    c3_trns_rot_n = normalize_coords(
        landmarks=c3_trns_rot,
        ref_index=c3_ref_index_nr,
        height_wise=False,
    )
    if plot:
        plot_landmarks(c3_trns_rot_n)
    c4_cart = img_coord_2_cartesian_coord(c4, swap_x_y)
    c4_ref_index_tr = 1
    c4_trns = translate_landmarks(c4_cart, ref_index=c4_ref_index_tr)
    c4_ref_index_rot = 3
    c4_trns_rot = rotate_landmarks(c4_trns, ref_index=c4_ref_index_rot)
    c4_ref_index_nr = 3
    c4_trns_rot_n = normalize_coords(
        landmarks=c4_trns_rot,
        ref_index=c4_ref_index_nr,
        height_wise=False,
    )
    if plot:
        plot_landmarks(c4_trns_rot_n)
    # retrieve the original distance of the vertebrae
    if retrieve_orig_position:
        c24_cart_dist = (
            np.abs(
                c2_cart[c2_ref_index_tr, 1] - c4_cart[c4_ref_index_tr, 1]
            )/c4_trns_rot[c4_ref_index_nr, 0]
        )
        c34_cart_dist = (
            np.abs(
                c3_cart[c3_ref_index_tr, 1] - c4_cart[c4_ref_index_tr, 1]
            )/c4_trns_rot[c4_ref_index_nr, 0]
        )
        c2_trns_rot_n[:,1] = c2_trns_rot_n[:,1] + c24_cart_dist
        c3_trns_rot_n[:,1] = c3_trns_rot_n[:,1] + c34_cart_dist
    normalized_landmarks = np.vstack((c2_trns_rot_n, c3_trns_rot_n, c4_trns_rot_n))
    return normalized_landmarks

In [None]:
def cartesian_to_polar(coords):
    """Transform cartesian coordiantes to polar coordinates."""
    r = np.sqrt(coords[..., 0]**2 + coords[..., 1]**2)
    theta = np.arctan2(coords[..., 1], coords[..., 0])
    return np.stack((r, theta), axis=-1)


def polygon_center_of_mass(
    arr: np.ndarray,
) -> np.ndarray:
    """Calculate mean of the coordinate arrays along axis 0"""
    return np.mean(arr, axis=0)


def vertebral_center_of_masses(
    arr: np.ndarray,
) -> np.ndarray:
    """Calculate center of masses of each of the three vertebrae"""
    cm_c2 = polygon_center_of_mass(arr[:3,:])
    cm_c3 = polygon_center_of_mass(arr[3:8,:])
    cm_c4 = polygon_center_of_mass(arr[8:,:])
    return np.stack((cm_c2, cm_c3, cm_c4), axis=0)


def vertebral_features(
    arr: np.ndarray,
) -> Tuple[float]:
    """Calculate the 7 features described below,
        - the heights of C3 and C4 vertical edges (4 features)
        - the concavity of C2, C3, and C4 (3 features)
    """
    c2_conc = arr[1,1]
    c3_conc = arr[5,1]
    c4_conc = arr[10,1]
    
    c3_1 = arr[3,1] - arr[4,1]
    c3_2 = arr[7,1] - arr[6,1]
    
    c4_1 = arr[8,1] - arr[9,1]
    c4_2 = arr[12,1] - arr[11,1]
    return c2_conc, c3_conc, c4_conc, c3_1, c3_2, c4_1, c4_2

# Stage Clustering

In this notebook, we see how we can utilize the nominal patterns of the different stages that were
reported by McNamara and Franchi into a clustering tasks. The nominal patterns serve as the
characteristics of the cluster centers.

In [None]:
os.chdir("../../")
!source configs/.env

## Load parameters

In [None]:
CONFIG_PARAMS_PATH = "configs/params.yaml"

params = nested_dict_to_easydict(
    load_yaml_params(CONFIG_PARAMS_PATH)
)

## Load model

In [None]:
checkpoint_path, model_id = download_wandb_model_checkpoint(
    wandb_checkpoint_uri= params.VERIFY.WANDB_CHECKPOINT_REFERENCE_NAME
)
print(checkpoint_path)

In [None]:
use_pretrain = True

task_config = params.TRAIN.V_LANDMARK_TASK
task_id = task_config.TASK_ID

loss_name = params.TRAIN.LOSS_NAME
model_params = params.MODEL.PARAMS
transforms_params = params.INFERENCE.TRANSFORMS

In [None]:
model, device = load_pretrained_model_eval_mode(
    model_params=model_params,
    use_pretrain=use_pretrain,
    checkpoint_path=checkpoint_path,
    task_id=task_id,
    loss_name=loss_name,
)

## Load and process cluster centers

In [None]:
cluster_centers = []
cluster_center_features = []
for i in range(1,7):
    cs_df = pd.read_csv(
        os.path.join(
            params.INTERMEDIATE_DATA_DIRECTORY, "stages_nominal_patterns", f"cs{i}.csv"
        ),
        header=None,
        names=['index', 'x', 'y']
    )
    cs = cs_df.iloc[:, 1:].to_numpy()
    normalized_landmarks = post_process_vertebral_landmarks(
        landmarks=cs, swap_x_y=False, plot=False, retrieve_orig_position=False,
    )
    cluster_centers.append(normalized_landmarks)
    # cluster_centers_cms.append(vertebral_center_of_masses(normalized_landmarks))
    cluster_center_features.append(vertebral_features(normalized_landmarks))

In [None]:
print(np.array(cluster_center_features))

In [None]:
scaler = MinMaxScaler()
cluster_centers_features_normalized = scaler.fit_transform(cluster_center_features)

In [None]:
print(cluster_centers_features_normalized)

## Load training and validation set data, predict, and process the landmarks

In [None]:
# train dataloader
train_dataloader = create_dataloader(
    task_id=task_id,
    batch_size=1,
    split='train',
    shuffle=False,
    params=params,
    sampler_n_samples=None,
)
# val dataloader
val_dataloader = create_dataloader(
    task_id=task_id,
    batch_size=1,
    split='val',
    shuffle=False,
    params=params,
    sampler_n_samples=None,
)

In [None]:
train_set = []
train_set_features = []
for i, batch in enumerate(train_dataloader):
    images, targets = batch['image'], batch['v_landmarks']
    images = images.to(device)
    targets = targets.to(device)
    # Pass images through the model
    with torch.no_grad():
        lmks = model(images, task_id=task_id)
    # turn heatmaps to coordinates
    lmks = max_indices_4d_tensor(lmks)
    lmks = lmks.squeeze()
    lmks = lmks.cpu().numpy()
    # process coordinates
    normalized_landmarks = post_process_vertebral_landmarks(
        landmarks=lmks, swap_x_y=True, plot=False,retrieve_orig_position=False,)
    train_set.append(normalized_landmarks)
    # train_set_cms.append(vertebral_center_of_masses(normalized_landmarks))
    train_set_features.append(vertebral_features(normalized_landmarks))

In [None]:
val_set = []
val_set_features = []
for i, batch in enumerate(val_dataloader):
    images, targets = batch['image'], batch['v_landmarks']
    images = images.to(device)
    targets = targets.to(device)
    # Pass images through the model
    with torch.no_grad():
        lmks = model(images, task_id=task_id)
    # turn heatmaps to coordinates
    lmks = max_indices_4d_tensor(lmks)
    lmks = lmks.squeeze()
    lmks = lmks.cpu().numpy()
    # process coordinates
    normalized_landmarks = post_process_vertebral_landmarks(
        landmarks=lmks, swap_x_y=True, plot=False,retrieve_orig_position=False,)
    val_set.append(normalized_landmarks)
    # val_set_cms.append(vertebral_center_of_masses(normalized_landmarks))
    val_set_features.append(vertebral_features(normalized_landmarks))

## flatten coordinate arrays

### coordinates of the landmarks

### center of masses

#### discard the x coordinate values of center of masses

# Polar coordinates

# Fit to PCA

# Fit PCA coordinates to the KMeans

# Fit raw flattened coordinates to the KMeans

# Fit clusters' center of masses to KMeans

# Fit features to KMeans

In [None]:
scaler = MinMaxScaler()
train_set_features_normalized = scaler.fit_transform(train_set_features)
val_set_features_normalized = scaler.transform(val_set_features)

In [None]:
# Get the number of features
num_features = train_set_features_normalized.shape[1]

# Create subplots
fig, axes = plt.subplots(nrows=num_features, ncols=1, figsize=(8, 2*num_features))

# Plot each feature separately
for i in range(num_features):
    axes[i].hist(train_set_features_normalized[:, i], bins=30, color='blue', alpha=0.7)
    axes[i].set_title(f'Feature {i + 1}')
    axes[i].set_xlabel('Value')
    axes[i].set_ylabel('Frequency')

plt.tight_layout()
plt.show()

In [None]:
# Create DBSCAN object with initial centers
dbscan = SpectralClustering(
    n_clusters=6,
    assign_labels='discretize',
    random_state=0,
)

# Fit the model to your data
dbscan.fit(train_set_features_normalized)

In [None]:
train_set_clusters = dbscan.fit_predict(train_set_features_normalized)
np.unique(train_set_clusters, return_counts=True)

In [None]:
val_set_clusters = dbscan.fit_predict(val_set_features_normalized)
np.unique(val_set_clusters, return_counts=True)

## Plot landmarks and corresponding clusters

In [None]:
# val dataloader
val_dataloader = create_dataloader(
    task_id=task_id,
    batch_size=1,
    split='val',
    shuffle=False,
    params=params,
    sampler_n_samples=None,
)

for i, batch in enumerate(val_dataloader):
    images, targets = batch['image'], batch['v_landmarks']
    images = images.to(device)
    with torch.no_grad():
        lmks = model(images, task_id=task_id)
    # turn heatmaps to coordinates
    image = images.detach().cpu().numpy()[0,0,...]
    lmks = max_indices_4d_tensor(lmks)
    lmks = lmks.squeeze()
    lmks = lmks.cpu().numpy()
    lmks_flipped = np.flip(lmks.copy(),1)
    lmks_flipped[:,1] = -1 * lmks_flipped[:,1]
    clss = val_set_clusters[i]
    if i % 10 == 0:
        print(clss+1)
        plot_landmarks(lmks_flipped)
        plot_image_and_vertebral_landmarks(
            img_name="",
            model_id="",
            landmarks=lmks,
            image=image,
        )


In [None]:
display(pd.DataFrame(val_set_clusters),)