In [None]:
import cv2 as cv
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models, optimizers
from matplotlib import pyplot as plt
import os
import json
import urllib.request

In [None]:
def show_image(img):
    plt.figure(figsize=(4, 4))
    plt.imshow(img, aspect='equal')

def load_file(file):
    with open(file, "r") as f:
        content = f.read()
    return json.loads(content)

def segment_mask(obj, name, assert_annotations=False):
    mask = np.zeros((obj["image"]["height"], obj["image"]["width"], 3), np.uint8)
    annotations = [x for x in obj["annotations"] if x["name"] == name if x.get("polygon")]
    if not annotations and assert_annotations:
        return False, None
    for annotation in annotations:
        points = annotation["polygon"]["path"]
        points = np.array([np.array([x["x"], x["y"]]) for x in points])
        mask = cv.fillPoly(mask, pts=np.int32([points]), color=(255, 255, 255))
    return True, mask

def download_image(obj):
    path = "../xray/images/" + obj["image"]["filename"]
    if os.path.exists(path):
        return np.uint8(cv.imread(path))
    url = obj["image"]["url"]
    print("Downloading {}...".format(url))
    r = urllib.request.urlopen(url)
    with open(path, "wb") as f:
        f.write(r.read())
    return np.uint8(cv.imread(path))

def show_images(images):
    n = len(images)
    plt.figure(figsize=(80, 80))
    for i in range(n):
        ax = plt.subplot(2, n, i + 1)
        plt.imshow(images[i], cmap="gray")
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
    plt.show()

In [None]:
files = ["../xray/data/" + x for x in os.listdir("../xray/data")]

In [None]:
# Remove this if you wanna run the notebook on the whole dataset.
files = files[:1000]

In [None]:
labels = []
for file in files:
    f = load_file(file)
    ls = [x["name"] for x in f["annotations"] if x.get("polygon")]
    for l in ls:
        if l not in labels:
            labels.append(l)
            print(l)

In [None]:
X = []
y = []

for file in files:
    obj = load_file(file)    
    #s, bsd = segment_mask(obj, "Lung", True)
    l, lung_mask = segment_mask(obj, "Lung", True)
    if l:
        image = download_image(obj)
        image = cv.cvtColor(image, cv.COLOR_BGR2GRAY)
        image = cv.resize(image, (128, 128), interpolation=cv.INTER_AREA) 
        image = np.float32(image) / 255
        lung_mask = cv.cvtColor(lung_mask, cv.COLOR_BGR2GRAY)
        lung_mask = cv.resize(lung_mask, (128, 128), interpolation=cv.INTER_AREA) 
        lung_mask = np.float32(lung_mask) / 255
        X.append(image)
        y.append(lung_mask)

In [None]:
X = np.array(X)
y = np.array(y)
X = X.reshape(X.shape[0], X.shape[1], X.shape[2], 1)
y = y.reshape(y.shape[0], y.shape[1], y.shape[2], 1)

In [None]:
X.shape

In [None]:
y.shape

In [None]:
split = int(len(X) * 0.7)
X_train = X[:split]
X_test = X[split:]
y_train = y[:split]
y_test = y[split:]

In [None]:
num = 10
alpha = 0.1
show_images((X[:num,:,:] * alpha + y[:num,:,:] * (1 - alpha)) / 2)

In [None]:
input_img = layers.Input(shape=(X.shape[1], X.shape[2], X.shape[3]))  # adapt this if using `channels_first` image data format

x = layers.Conv2D(8, (3, 3), activation='relu', padding='same')(input_img)
x = layers.MaxPooling2D((2, 2), padding='same')(x)
x = layers.UpSampling2D((2, 2))(x)
decoded = layers.Conv2D(1, (3, 3), activation='sigmoid', padding='same')(x)
model = keras.Model(input_img, decoded)

In [None]:
def get_model():
    inputs = keras.Input(shape=(X.shape[1], X.shape[2], X.shape[3]))

    ### [First half of the network: downsampling inputs] ###

    # Entry block
    x = layers.Conv2D(32, 3, strides=2, padding="same")(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)

    previous_block_activation = x  # Set aside residual

    # Blocks 1, 2, 3 are identical apart from the feature depth.
    for filters in [8, 16, 32]:
        x = layers.Activation("relu")(x)
        x = layers.SeparableConv2D(filters, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.Activation("relu")(x)
        x = layers.SeparableConv2D(filters, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.MaxPooling2D(3, strides=2, padding="same")(x)

        # Project residual
        residual = layers.Conv2D(filters, 1, strides=2, padding="same")(
            previous_block_activation
        )
        x = layers.add([x, residual])  # Add back residual
        previous_block_activation = x  # Set aside next residual

    ### [Second half of the network: upsampling inputs] ###

    for filters in [32, 16, 8, 4]:
        x = layers.Activation("relu")(x)
        x = layers.Conv2DTranspose(filters, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.Activation("relu")(x)
        x = layers.Conv2DTranspose(filters, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.UpSampling2D(2)(x)

        # Project residual
        residual = layers.UpSampling2D(2)(previous_block_activation)
        residual = layers.Conv2D(filters, 1, padding="same")(residual)
        x = layers.add([x, residual])  # Add back residual
        previous_block_activation = x  # Set aside next residual

    # Add a per-pixel classification layer
    outputs = layers.Conv2D(y.shape[3], 3, activation="sigmoid", padding="same")(x)

    # Define the model
    model = keras.Model(inputs, outputs)
    return model


# Free up RAM in case the model definition cells were run multiple times
keras.backend.clear_session()

# Build model
model = get_model()
model.summary()

In [None]:
# optimizer = optimizers.Adadelta(learning_rate=1.0, rho=0.95, epsilon=10e-6)

model.compile(optimizer='adam', loss='binary_crossentropy')

In [None]:
model.fit(
    X_train, 
    y_train,
    epochs=10,
    batch_size=8,
    shuffle=True,
    validation_data=(X_test, y_test)
)

In [None]:
y_pred = model.predict(X_test)

In [None]:
num = 20
show_images(y_test[:num])
show_images(y_pred[:num])
show_images(np.abs(y_test[:num] - y_pred[:num]))

In [None]:
plt.figure(figsize=(20, 20))
for i in range(32):
    color = cv.cvtColor(X_test[i], cv.COLOR_GRAY2RGB)
    prediction = cv.cvtColor(y_pred[i], cv.COLOR_GRAY2RGB)
    prediction[:,:,0:2] = 0
    ax = plt.subplot(8, 8, i + 1)
    plt.imshow(np.clip(color + prediction, 0, 1))
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
plt.show()    

In [None]:
plt.figure(figsize=(50, 50))
for i in range(32):
    image = X_test[i].copy() * y_pred[i]
    ax = plt.subplot(8, 8, i + 1)
    plt.imshow(image, cmap="rainbow")
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
plt.show()    