In [None]:
import os
import numpy as np
from PIL import Image
from sklearn.preprocessing import OneHotEncoder
import matplotlib
from matplotlib import pyplot as plt
from tqdm import tqdm
import pandas as pd
# !pip install opencv-python-headless
import cv2
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
import keras
from tensorflow.keras import layers, models
from tensorflow.keras.optimizers import Adam, legacy

In [None]:
# Load CSVs and filter by Labeling State
csv_files = {
    "EPH": 'Hemorrhage Segmentation Project/Results_Epidural Hemorrhage Detection_2020-11-16_21.31.26.148.csv',
    "IVH": 'Hemorrhage Segmentation Project/Results_Brain Hemorrhage Tracing_2020-09-28_15.21.52.597.csv',
    "IPH": 'Hemorrhage Segmentation Project/Results_Intraparenchymal Hemorrhage Detection_2020-11-16_21.39.31.268.csv',
    "SDH": 'Hemorrhage Segmentation Project/Results_Subdural Hemorrhage Detection_2020-11-16_21.37.19.745.csv',
    "SAH": 'Hemorrhage Segmentation Project/Results_Subarachnoid Hemorrhage Detection_2020-11-16_21.36.18.668.csv',
    "MCH": 'Hemorrhage Segmentation Project/Results_Multiple Hemorrhage Detection_2020-11-16_21.36.24.018.csv'
}

filtered_files = {}
for key, csv_file in csv_files.items():
    df = pd.read_csv(csv_file)
    df_filtered = df[df['Labeling State'].isin(['Labeled', 'Gold Standard'])]
    filtered_files[key] = set(df_filtered['Origin'].values)

In [None]:

file_dir = 'XN1 Data/renders/' 
ICH_types = ['normal', 'epidural', 'subarachnoid', 'intraparenchymal', 'subdural', 'intraventricular', 'multi']
windows = ["brain_bone_window", "brain_window", "max_contrast_window", "subdural_window"]


encoder = OneHotEncoder()
encoder.fit([[0], [1], [2], [3], [4], [5], [6]])
target_shape = (512, 512, 3)

In [None]:
def load_images_from_directory(directory, label, target_shape, file_max=None, filter_set=None):
    """Load images from a given directory, check shape, and apply one-hot encoding."""
    train_data = []
    filenames = os.listdir(directory)
    if file_max is None:
        file_max = len(filenames)

    true_file_max = file_max

    for filename in tqdm(filenames[0:file_max], desc=f'Loading images from {directory}', unit='file'):
        if filter_set and filename not in filter_set:
            continue

        img_path = os.path.join(directory, filename)
        img = Image.open(img_path)

        # Convert to array and check shape
        img_np = np.array(img)
        if img_np.shape != target_shape:
            img = img.resize((512, 512), Image.LANCZOS)  # Resize to match target shape

        img = img.resize((256, 256), Image.LANCZOS)  # Downsample to 256x256
        img_array = np.array(img)
        grayscale_img = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
        train_data.append(grayscale_img)

    return train_data, len(train_data)


def main_loader(directory, windows, label, X_train_dict, y_train, target_shape, file_max=None, filter_set=None):
    for slide in windows:
        dir_path = os.path.join(directory, ICH_types[label], slide)
        print(f"Loading images for {slide}...")
        data, _ = load_images_from_directory(dir_path, label, target_shape, file_max, filter_set)
        X_train_dict[slide].extend(data)

    # Convert lists to numpy arrays
    print("Converting lists to numpy arrays:")
    for window in tqdm(windows, desc='Converting lists', unit='window'):
        X_train_dict[window] = np.array(X_train_dict[window])

    sample = encoder.transform([[label]]).toarray()
    labels = np.tile(sample, (len(data), 1))
    y_train = np.array(labels)

    return X_train_dict, y_train


def stack_slices(X_train_dict):
    windows_to_stack = [
        'brain_bone_window', 
        'brain_window', 
        'subdural_window', 
        'max_contrast_window'
    ]
    stacked_data = []
    for window in tqdm(windows_to_stack, desc='Stacking slices', unit='window'):
        stacked_data.append(X_train_dict[window])
    X_train = np.stack(stacked_data, axis=3)
    return X_train

In [None]:
NRM_train_dict = {window: [] for window in windows}
NRM_label = []
NRM_train_dict, NRM_label = main_loader(file_dir, windows, 0, NRM_train_dict, NRM_label, target_shape, 2000)
NRM_train = stack_slices(NRM_train_dict)

EDH_train_dict = {window: [] for window in windows}
EDH_label = []
EDH_train_dict, EDH_label = main_loader(file_dir, windows, 1, EDH_train_dict, EDH_label, target_shape, filter_set=filtered_files['EPH'])
EDH_train = stack_slices(EDH_train_dict)


X_train = np.concatenate((NRM_train, EDH_train), axis=0)  # Add other hemorrhage types as needed
y_train = np.concatenate((NRM_label, EDH_label), axis=0)


X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, test_size=0.2, random_state=42)
X_train, y_train = shuffle(X_train, y_train)
X_valid, y_valid = shuffle(X_valid, y_valid)

# Define and compile the model
model = models.Sequential()
model.add(layers.Conv2D(32, kernel_size=(3, 3), input_shape=(256, 256, 4), padding="same"))
model.add(layers.Activation('relu'))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))
model.add(layers.Conv2D(64, kernel_size=(3, 3), padding="same"))
model.add(layers.Activation('relu'))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))
model.add(layers.Conv2D(128, kernel_size=(3, 3), padding="same"))
model.add(layers.Activation('relu'))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))
model.add(layers.Conv2D(256, kernel_size=(3, 3), padding="same"))
model.add(layers.Activation('relu'))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))
model.add(layers.Flatten())
model.add(layers.Dense(1024))
model.add(layers.Activation('relu'))
model.add(layers.Dense(512))
model.add(layers.Activation('relu'))
model.add(layers.Dense(7, activation="softmax"))
model.summary()

model.compile(loss="categorical_crossentropy", optimizer=optimizers.Adam(), metrics=["accuracy"])
model.fit(X_train, y_train, validation_data=(X_valid, y_valid), epochs=10)