In [None]:
import tensorflow as tf
from tensorflow import keras

import numpy as np

import skimage
from skimage.io import imread, imshow
from skimage.transform import resize
from sklearn import cluster

import matplotlib.pyplot as plt

import os

### LOAD TRAINING DATA

In [None]:
# CHANGE THIS VARIABLE TO CHANGE PATH TO ALL TRAINING IMAGES
# MAKE SURE ALL FILES IN SPECIFIED DIRECTORY ARE IMAGE FILES OR OTHER FOLDERS
train_set_images = './pipeline_outputs/data/train/'

In [None]:
img_file_paths = [f for f in os.listdir(train_set_images) if os.path.isfile(os.path.join(train_set_images, f))]
img_file_paths = [train_set_images + f for f in img_file_paths]
img_file_paths.sort()

In [None]:
NUM_IMAGES = len(img_file_paths)
IMG_WIDTH = 256
IMG_HEIGHT = 256

X_train = np.zeros( (NUM_IMAGES, IMG_WIDTH, IMG_HEIGHT, 1), dtype=np.float32 )
y_train = np.zeros( (NUM_IMAGES, IMG_WIDTH, IMG_HEIGHT, 1), dtype=np.float32 )

for img, i in zip(img_file_paths, range(NUM_IMAGES)):
    img_curr = imread(img)
    img_curr = resize(img_curr, (IMG_WIDTH, IMG_HEIGHT, 1), mode="constant", preserve_range=True)
    X_train[i] = img_curr
    
    
    
    x_ray = (np.int64(imread(img))).reshape((-1,1))
    k_m = cluster.KMeans(n_clusters=5, n_init=10)
    k_m.fit(x_ray)
    
    values, labels = np.int64(k_m.cluster_centers_.squeeze()), np.int64(k_m.labels_)
    img_segm = np.choose(labels, values)
    img_segm.shape = (256, 256, 1)
    
    pix_val = 0.0
    for value in (np.unique(img_segm)):
        img_segm = np.where(img_segm == value, pix_val, img_segm)
        pix_val += 0.25
        
    img_segm = np.where(img_segm > 0.5, 1, img_segm)
    img_segm = np.where(img_segm <= 0.5, 0, img_segm)
    img_segm = np.uint8(img_segm)
    y_train[i] = img_segm

In [None]:
imshow(X_train[10], cmap=plt.cm.gray)

In [None]:
imshow(y_train[10], cmap=plt.cm.gray)

### U-NET MODEL

In [None]:
inputs = keras.layers.Input( (256, 256, 1) )
s = keras.layers.Lambda(lambda x: x / 255.)(inputs)

# contracting path
c1 = keras.layers.Conv2D(64, (5,5), activation="relu", kernel_initializer="he_normal", padding="same")(s)
c1 = keras.layers.Dropout(0.2)(c1)
c1 = keras.layers.Conv2D(64, (5,5), activation="relu", kernel_initializer="he_normal", padding="same")(c1)
p1 = keras.layers.MaxPooling2D( (2,2) )(c1)

c2 = keras.layers.Conv2D(96, (3,3), activation="relu", kernel_initializer="he_normal", padding="same")(p1)
c2 = keras.layers.Dropout(0.2)(c2)
c2 = keras.layers.Conv2D(96, (3,3), activation="relu", kernel_initializer="he_normal", padding="same")(c2)
p2 = keras.layers.MaxPooling2D( (2,2) )(c2)

c3 = keras.layers.Conv2D(128, (3,3), activation="relu", kernel_initializer="he_normal", padding="same")(p2)
c3 = keras.layers.Dropout(0.2)(c3)
c3 = keras.layers.Conv2D(128, (3,3), activation="relu", kernel_initializer="he_normal", padding="same")(c3)
p3 = keras.layers.MaxPooling2D( (2,2) )(c3)

c4 = keras.layers.Conv2D(256, (3,3), activation="relu", kernel_initializer="he_normal", padding="same")(p3)
c4 = keras.layers.Dropout(0.2)(c4)
c4 = keras.layers.Conv2D(256, (3,3), activation="relu", kernel_initializer="he_normal", padding="same")(c4)
p4 = keras.layers.MaxPooling2D( (2,2) )(c4)

c5 = keras.layers.Conv2D(512, (3,3), activation="relu", kernel_initializer="he_normal", padding="same")(p4)
c5 = keras.layers.Dropout(0.2)(c5)
c5 = keras.layers.Conv2D(512, (3,3), activation="relu", kernel_initializer="he_normal", padding="same")(c5)

# expansive path
u6 = keras.layers.Conv2DTranspose(256, (2,2), strides=(2,2), padding="same")(c5)
u6 = keras.layers.concatenate([u6, c4])
c6 = keras.layers.Conv2D(256, (3,3), activation="relu", kernel_initializer="he_normal", padding="same")(u6)
c6 = keras.layers.Dropout(0.2)(c6)
c6 = keras.layers.Conv2D(256, (3,3), activation="relu", kernel_initializer="he_normal", padding="same")(c6)

u7 = keras.layers.Conv2DTranspose(128, (2,2), strides=(2,2), padding="same")(c6)
u7 = keras.layers.concatenate([u7, c3])
c7 = keras.layers.Conv2D(128, (3,3), activation="relu", kernel_initializer="he_normal", padding="same")(u7)
c7 = keras.layers.Dropout(0.2)(c7)
c7 = keras.layers.Conv2D(128, (3,3), activation="relu", kernel_initializer="he_normal", padding="same")(c7)

u8 = keras.layers.Conv2DTranspose(96, (2,2), strides=(2,2), padding="same")(c7)
u8 = keras.layers.concatenate([u8, c2])
c8 = keras.layers.Conv2D(96, (3,3), activation="relu", kernel_initializer="he_normal", padding="same")(u8)
c8 = keras.layers.Dropout(0.1)(c8)
c8 = keras.layers.Conv2D(96, (3,3), activation="relu", kernel_initializer="he_normal", padding="same")(c8)

u9 = keras.layers.Conv2DTranspose(64, (2,2), strides=(2,2), padding="same")(c8)
u9 = keras.layers.concatenate([u9, c1], axis=3)
c9 = keras.layers.Conv2D(64, (5,5), activation="relu", kernel_initializer="he_normal", padding="same")(u9)
c9 = keras.layers.Dropout(0.1)(c9)
c9 = keras.layers.Conv2D(64, (5,5), activation="relu", kernel_initializer="he_normal", padding="same")(c9)

outputs = keras.layers.Conv2D(1, (1,1), activation="sigmoid")(c9)

In [None]:
model = keras.Model(inputs=[inputs], outputs=[outputs])
model.compile(optimizer="adam", loss='binary_crossentropy', metrics=['accuracy'])
model.summary()

In [None]:
checkpointer = keras.callbacks.ModelCheckpoint('DDUNET_train_custom_loss_2.h5', verbose=1, save_best_only=True)

callbacks = [
    keras.callbacks.EarlyStopping(monitor='val_loss', patience=4),
    keras.callbacks.TensorBoard(log_dir='logs_ddunet_custom_loss_2'),
    checkpointer
]

In [None]:
model.fit(X_train, y_train, validation_split=0.3, batch_size=10, epochs=30, callbacks=callbacks)

In [None]:
results = model.predict(X_train, verbose=1)

### TRAINING SET RESULTS

In [None]:
SAMPLE_NUMBER = 8

i, (im1, im2, im3, im4) = plt.subplots(1, 4, sharey=False)
i.set_figwidth(20)

im1.title.set_text("Original X-Ray")
im1.imshow(X_train[SAMPLE_NUMBER])

im2.title.set_text("K-Means Clustered X-Ray")
im2.imshow(y_train[SAMPLE_NUMBER])

im3.title.set_text("U-Net Sigmoid Output")
im3.imshow(results[SAMPLE_NUMBER])

test_img = results[SAMPLE_NUMBER]
thresh = np.mean(test_img)
test_img = np.where(test_img >= thresh, 1, 0)
im4.title.set_text("Processed U-Net Output")
im4.imshow(test_img)

### LOAD TESTING SET

In [None]:
# CHANGE THIS VARIABLE TO CHANGE PATH TO ALL TESTING IMAGES
# MAKE SURE ALL FILES IN SPECIFIED DIRECTORY ARE IMAGE FILES OR OTHER FOLDERS
test_set_images = './pipeline_outputs/data/test/'

In [None]:
test_file_paths = [f for f in os.listdir(test_set_images) if os.path.isfile(os.path.join(test_set_images, f))]
test_file_paths = [test_set_images + f for f in test_file_paths]
test_file_paths.sort()

In [None]:
NUM_IMAGES = len(test_file_paths)
IMG_WIDTH = 256
IMG_HEIGHT = 256

X_test = np.zeros( (NUM_IMAGES, IMG_WIDTH, IMG_HEIGHT, 1), dtype=np.float32 )

for img, i in zip(test_file_paths, range(NUM_IMAGES)):
    img_curr = imread(img)
    img_curr = resize(img_curr, (IMG_WIDTH, IMG_HEIGHT, 1), mode="constant", preserve_range=True)
    X_test[i] = img_curr

### TESTING SET RESULTS

In [None]:
test_results = model.predict(X_test, verbose=1)

In [None]:
SAMPLE_NUMBER = 8

i, (im1, im2, im3) = plt.subplots(1, 3, sharey=False)
i.set_figwidth(20)

im1.title.set_text("Original X-Ray")
im1.imshow(X_test[SAMPLE_NUMBER])

im2.title.set_text("U-Net Sigmoid Output")
im2.imshow(test_results[SAMPLE_NUMBER])

test_img = test_results[SAMPLE_NUMBER]
thresh = np.mean(test_img)
test_img = np.where(test_img >= thresh, 1, 0)
im3.title.set_text("Processed U-Net Output")
im3.imshow(test_img)

In [None]:
model.save("UNET_MODEL.h5")