In [14]:
import os

from PIL import Image
import cv2
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import scipy.ndimage as nd
import SimpleITK as sitk
import skimage.color as color

In [15]:
csv_path = "/data/dataset_medium.csv"
dataset_path = "/data/ANHIR_Data"
output_path = "/data/ANHIR_Parsed_1024_Original"

In [16]:
output_max_size = 1024

In [17]:
def load_pair(case_id, dataset_path, load_masks=False):
    base_path = os.path.join(dataset_path, str(case_id))
    source_path = os.path.join(base_path, "source.mha")
    target_path = os.path.join(base_path, "target.mha")
    if load_masks:
        source_mask_path = os.path.join(base_path, "source_mask.mha")
        target_mask_path = os.path.join(base_path, "target_mask.mha")
    source_landmarks_path = os.path.join(base_path, "source_landmarks.csv")
    target_landmarks_path = os.path.join(base_path, "target_landmarks.csv")

    source = sitk.GetArrayFromImage(sitk.ReadImage(source_path))
    target = sitk.GetArrayFromImage(sitk.ReadImage(target_path))
    if load_masks:
        source_mask = sitk.GetArrayFromImage(sitk.ReadImage(source_mask_path))
        target_mask = sitk.GetArrayFromImage(sitk.ReadImage(target_mask_path))
    source_landmarks = pd.read_csv(source_landmarks_path).to_numpy()[:, 1:]
    try:
        status = "training"
        target_landmarks = pd.read_csv(target_landmarks_path).to_numpy()[:, 1:]
    except:
        status = "evaluation"
        target_landmarks = None
    if load_masks:
        return source, target, source_landmarks, target_landmarks, status, source_mask, target_mask
    else:
        return source, target, source_landmarks, target_landmarks, status,


def resample_image(image, resample_factor):
    if len(image.shape) == 3:
        y_size, x_size, _ = image.shape
        grid_y, grid_x, grid_z = np.mgrid[0:y_size:resample_factor, 0:x_size:resample_factor, 0:3]
        resampled_image = nd.map_coordinates(image, [grid_y, grid_x, grid_z], cval=0, order=3)
    else:
        y_size, x_size = image.shape
        grid_y, grid_x = np.mgrid[0:y_size:resample_factor, 0:x_size:resample_factor]
        resampled_image = nd.map_coordinates(image, [grid_y, grid_x], cval=0, order=3)
    return resampled_image


def normalize(image):
    min_val = np.min(image, axis=(0, 1))
    max_val = np.max(image, axis=(0, 1))
    return np.where(max_val != min_val, (image - min_val) / (max_val - min_val), image)


def load_landmarks(landmarks_path):
    landmarks = pd.read_csv(landmarks_path)
    landmarks = landmarks.to_numpy()[:, 1:]
    return landmarks


def pad_landmarks(landmarks, old_shape, new_shape):
    new_landmarks = landmarks.copy()
    new_landmarks[:, 0] += int(np.floor((new_shape[1] - old_shape[1])/2))
    new_landmarks[:, 1] += int(np.floor((new_shape[0] - old_shape[0])/2))
    return new_landmarks


def pad_images_np(source, target):
    if len(source.shape) == 3:
        y_size_source, x_size_source, _ = source.shape
    else:
        y_size_source, x_size_source = source.shape

    if len(target.shape) == 3:
        y_size_target, x_size_target, _ = target.shape
    else:
        y_size_target, x_size_target = target.shape
    
    new_y_size = max(y_size_source, y_size_target)
    new_x_size = max(x_size_source, x_size_target)
    new_shape = (new_y_size, new_x_size)

    padded_source = pad_single(source, new_shape)
    padded_target = pad_single(target, new_shape)
    return padded_source, padded_target


def pad_single(image, new_shape):
    if len(image.shape) == 3:
        y_size, x_size, _ = image.shape
        y_pad = ((new_shape[0] - y_size) // 2, (new_shape[0] - y_size) // 2)
        x_pad = ((new_shape[1] - x_size) // 2, (new_shape[1] - x_size) // 2)
        z_pad = (0, 0)
        pad_width = (y_pad, x_pad, z_pad)
    else:
        y_size, x_size = image.shape
        y_pad = ((new_shape[0] - y_size) // 2, (new_shape[0] - y_size) // 2)
        x_pad = ((new_shape[1] - x_size) // 2, (new_shape[1] - x_size) // 2)
        pad_width = (y_pad, x_pad)

    new_image = np.pad(image, pad_width, constant_values=0)
    return new_image


def resample_landmarks(landmarks, resample_ratio):
    new_landmarks = landmarks / resample_ratio
    return new_landmarks


def save_landmarks(landmarks, landmarks_path):
    df = pd.DataFrame(landmarks)
    df.to_csv(landmarks_path)

In [18]:
csv_file = pd.read_csv(csv_path)
current_case = None ,csv_file.iloc[0]

In [7]:
current_case[1]

Unnamed: 0                                          0
Image diagonal [pixels]                       17997.0
Image size [pixels]                     (16308, 7612)
Source image                COAD_01/scale-25pc/S1.jpg
Source landmarks            COAD_01/scale-25pc/S1.csv
Target image                COAD_01/scale-25pc/HE.jpg
Target landmarks            COAD_01/scale-25pc/HE.csv
status                                     evaluation
Warped target landmarks                           NaN
Warped source landmarks                           NaN
Execution time [minutes]                          NaN
Name: 0, dtype: object

In [None]:
current_id = current_case[1]['Unnamed: 0']
size = current_case[1]['Image size [pixels]']
diagonal = int(current_case[1]['Image diagonal [pixels]'])
y_size, x_size = int(size.split(",")[0][1:]), int(size.split(",")[1][:-1])
source_path = current_case[1]['Source image']
target_path = current_case[1]['Target image']
source_landmarks_path = current_case[1]['Source landmarks']
target_landmarks_path = current_case[1]['Target landmarks']
status = current_case[1]['status']

source_path = os.path.join(dataset_path, source_path)
target_path = os.path.join(dataset_path, target_path)
source_landmarks_path = os.path.join(dataset_path, source_landmarks_path)
target_landmarks_path = os.path.join(dataset_path, target_landmarks_path)

source_landmarks = load_landmarks(source_landmarks_path)

if status == "training":
    target_landmarks = load_landmarks(target_landmarks_path)

source = cv2.cvtColor(cv2.imread(source_path), cv2.COLOR_BGR2RGB)
target = cv2.cvtColor(cv2.imread(target_path), cv2.COLOR_BGR2RGB)

source = 1 - normalize(source)
target = 1 - normalize(target)

padded_source, padded_target = pad_images_np(source, target)
padded_source_landmarks = pad_landmarks(source_landmarks, source.shape, padded_source.shape)

if status == "training":
    padded_target_landmarks = pad_landmarks(target_landmarks, target.shape, padded_target.shape)

resample_factor = np.max(padded_source.shape) / output_max_size
gaussian_sigma = resample_factor / 1.25

smoothed_source = nd.gaussian_filter(padded_source, gaussian_sigma)
smoothed_target = nd.gaussian_filter(padded_target, gaussian_sigma)

resampled_source = resample_image(smoothed_source, resample_factor)
resampled_target = resample_image(smoothed_target, resample_factor)
resampled_source_landmarks = resample_landmarks(padded_source_landmarks, resample_factor)

if status == "training":
    resampled_target_landmarks = resample_landmarks(padded_target_landmarks, resample_factor)

# save jpeg
if not os.path.isdir(os.path.dirname(os.path.join(output_path, str(current_id)))):
    os.makedirs(os.path.join(os.path.join(output_path, str(current_id))))

to_save_source_jpg_path = os.path.join(output_path, str(current_id), "source.jpg")
to_save_target_jpg_path = os.path.join(output_path, str(current_id), "target.jpg")

to_save_source_jpg = cv2.cvtColor((resampled_source * 255).astype(np.uint8), cv2.COLOR_RGB2BGR)
to_save_target_jpg = cv2.cvtColor((resampled_target * 255).astype(np.uint8), cv2.COLOR_RGB2BGR)

# to_save_source_jpg = cv2.cvtColor(resampled_source, cv2.COLOR_RGB2BGR)
# to_save_target_jpg = cv2.cvtColor(resampled_target, cv2.COLOR_RGB2BGR)

cv2.imwrite(to_save_source_jpg_path, to_save_source_jpg)
cv2.imwrite(to_save_target_jpg_path, to_save_target_jpg)

: 

In [22]:
_, axs = plt.subplots(1, 2, figsize=(15,10))
axs[0].imshow(source, cmap='gray')
axs[0].set_title("Fixed Image")
axs[1].imshow(target, cmap='gray')
axs[1].set_title("Moving Image")
plt.show()

: 