In [None]:
import keras
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, Activation, Add, MaxPooling2D, GlobalAveragePooling2D, Dense, Conv2DTranspose, Flatten, LeakyReLU, Reshape
from tensorflow.keras import Model
import matplotlib.pyplot as plt
import cv2
from tensorflow.keras.models import Sequential
import os
import pydicom
from pydicom.pixel_data_handlers.util import apply_voi_lut
from skimage import exposure
from IPython.display import clear_output

In [None]:
"""
Linux Paths for CheXpert Dataset

train_dir = os.path.abspath("/media/nicholasjprimiano/8A5C72285C720F67/ML_C/CheXpert/CheXpert-Keras-master/data/default_split/CheXpert-v1.0-small/CheXpert-v1.0-small/train.csv")
traindf=pd.read_csv(train_dir, dtype=str)

valid_dir = os.path.abspath("/media/nicholasjprimiano/8A5C72285C720F67/ML_C/CheXpert/CheXpert-Keras-master/data/default_split/CheXpert-v1.0-small/CheXpert-v1.0-small/valid.csv")
validdf=pd.read_csv(valid_dir, dtype=str)

for i in range(len(traindf)):
    traindf.iloc[i,0] = "/media/nicholasjprimiano/8A5C72285C720F67/ML_C/CheXpert/CheXpert-Keras-master/data/default_split/CheXpert-v1.0-small/" + traindf.iloc[i,0]"""
    
#Windows Paths for CheXpert Dataset
train_dir = os.path.abspath(r"C:/ML_C/CheXpert/CheXpert-Keras-master/data/default_split/CheXpert-v1.0-small/CheXpert-v1.0-small/train.csv")
traindf=pd.read_csv(train_dir, dtype=str)

#Modify dataframe path
for i in range(len(traindf)):
    traindf.iloc[i,0] = r"C:/ML_C/CheXpert/CheXpert-Keras-master/data/default_split/CheXpert-v1.0-small/" + traindf.iloc[i,0]

#valid_dir = os.path.abspath(r"C:/ML_C/CheXpert/CheXpert-Keras-master/data/default_split/CheXpert-v1.0-small/CheXpert-v1.0-small/valid.csv")
#validdf=pd.read_csv(valid_dir, dtype=str)

In [None]:
#Only looking at AP (anterior-posterior) view xrays
aptrainlist = []
for i in range(len(traindf)):
    if (traindf.iloc[i,4] == "AP"):
        aptrainlist.append(traindf.iloc[i,:])

aptraindf = pd.DataFrame(aptrainlist)

#Only looking at xrays labeled Pneumothorax
paths = []
#for i in range(len(aptraindf[aptraindf["Pneumothorax"] == "1.0"]["Path"])):
#    paths.append(aptraindf[aptraindf["Pneumothorax"] == "1.0"]["Path"].iloc[i])

for i in range(len(aptraindf["Path"])//10):
    paths.append(aptraindf["Path"].iloc[i])

In [None]:
#Normalization called in get_imgs() not used right now
def normalize_xray(img):
    hist_normal = exposure.equalize_adapthist(img/np.max(img))   
    #clache_hist_normal = exposure.equalize_adapthist(hist_normal /np.max(hist_normal))
    #return clache_hist_normal
    return hist_normal

In [None]:
#load 128x128 images

IMG_SIZE = 256
def get_imgs(paths):
    images = []
    for i in paths:
        #Normalized
        images.append(normalize_xray(cv2.cvtColor(cv2.resize(cv2.imread(i),(IMG_SIZE,IMG_SIZE)), cv2.COLOR_BGR2GRAY)))
        #Gray Scale 
        #images.append(cv2.cvtColor(cv2.resize(cv2.imread(i),(IMG_SIZE,IMG_SIZE)), cv2.COLOR_BGR2GRAY))
    return images 

In [None]:
#X_train array of images with values between 0 and 1
X_train = np.array(get_imgs(paths)).astype(np.float32)

In [None]:
#reshaped X train and shifted pixzel values between -1 and 1 for tanh 
X_train_dcgan = X_train.reshape(-1, IMG_SIZE, IMG_SIZE, 1) * 2. - 1.

In [None]:
"""tf.random.set_seed(42)
np.random.seed(42)

codings_size = 100

generator = keras.models.Sequential([
    keras.layers.Dense(8 * 8 * 1024, input_shape=[codings_size], activation="relu"),
    keras.layers.Reshape([8, 8, 1024]),
    keras.layers.ReLU(),
    keras.layers.BatchNormalization(),

    keras.layers.Conv2DTranspose(512, kernel_size=5, strides=2, padding="SAME",activation="relu"),
    keras.layers.BatchNormalization(),

    keras.layers.Conv2DTranspose(256, kernel_size=5, strides=2, padding="SAME",activation="relu"),
    keras.layers.BatchNormalization(),

    keras.layers.Conv2DTranspose(128, kernel_size=5, strides=2, padding="SAME",activation="relu"),
    keras.layers.BatchNormalization(),

    keras.layers.Conv2DTranspose(64, kernel_size=5, strides=2, padding="SAME",activation="relu"),
    keras.layers.BatchNormalization(),

    keras.layers.Conv2D(1, kernel_size=5, strides=1, padding="SAME",
                                 activation="tanh"),
])
generator.summary()"""

In [None]:
"""tf.random.set_seed(42)
np.random.seed(42)

codings_size = 100

generator = keras.models.Sequential([
    keras.layers.Dense(16 * 16 * 128, input_shape=[codings_size]),
    keras.layers.Reshape([16, 16, 128]),

    keras.layers.Conv2DTranspose(128, kernel_size=5, strides=2, padding="SAME",
                                 activation="relu"),
    keras.layers.Dropout(0.5),

    keras.layers.Conv2DTranspose(64, kernel_size=5, strides=2, padding="SAME",
                                 activation="relu"),
    keras.layers.Dropout(0.5),
                                 
    keras.layers.Conv2DTranspose(1, kernel_size=5, strides=2, padding="SAME",
                                 activation="tanh"),
])
generator.summary()


discriminator = keras.models.Sequential([

    keras.layers.Conv2D(64, kernel_size=5, strides=1, padding="SAME",
                        activation=keras.layers.LeakyReLU(0.2),
                        input_shape=[128, 128, 1]),
    
    keras.layers.Conv2D(128, kernel_size=5, strides=2, padding="SAME",
                        activation=keras.layers.LeakyReLU(0.2)),
    keras.layers.BatchNormalization(),

    keras.layers.Conv2D(256, kernel_size=5, strides=2, padding="SAME",
                        activation=keras.layers.LeakyReLU(0.2)),
    keras.layers.BatchNormalization(),

    keras.layers.Conv2D(512, kernel_size=5, strides=2, padding="SAME",
                        activation=keras.layers.LeakyReLU(0.2)),
    keras.layers.BatchNormalization(),

    keras.layers.Flatten(),
    keras.layers.Dense(1, activation="sigmoid")
])

discriminator.summary()"""

"""
discriminator = keras.models.Sequential([
    keras.layers.Conv2D(64, kernel_size=5, strides=2, padding="SAME",
                        activation=keras.layers.LeakyReLU(0.2),
                        input_shape=[128, 128, 1]),
    keras.layers.BatchNormalization(),
    keras.layers.Conv2D(128, kernel_size=5, strides=2, padding="SAME",
                        activation=keras.layers.LeakyReLU(0.2)),
    keras.layers.BatchNormalization(),
    keras.layers.Conv2D(256, kernel_size=5, strides=2, padding="SAME",
                        activation=keras.layers.LeakyReLU(0.2)),
    keras.layers.BatchNormalization(),
    keras.layers.Flatten(),
    keras.layers.Dense(1, activation="sigmoid")
])
"""

In [None]:
tf.random.set_seed(42)
np.random.seed(42)

codings_size = 100

generator = keras.models.Sequential([

    keras.layers.Dense(256* 32 * 32, activation="relu", input_shape=[codings_size]),
    keras.layers.Reshape((32, 32, 256)),
    keras.layers.UpSampling2D(),
    keras.layers.Conv2D(128, kernel_size=3, padding="same"),
    keras.layers.BatchNormalization(momentum=0.8),
    keras.layers.Activation("relu"),
    keras.layers.UpSampling2D(),
    keras.layers.Conv2D(64, kernel_size=3, padding="same"),
    keras.layers.BatchNormalization(momentum=0.8),
    keras.layers.Activation("relu"),
    keras.layers.UpSampling2D(),
    keras.layers.Conv2D(1, kernel_size=3, padding="same"),
    keras.layers.Activation("tanh")

])
generator.summary()

discriminator = keras.models.Sequential([
    
    keras.layers.Conv2D(32, kernel_size=3, strides=2, input_shape=[256, 256, 1], padding="same"),
    keras.layers.LeakyReLU(alpha=0.2),
    keras.layers.Dropout(0.25),
    keras.layers.Conv2D(64, kernel_size=3, strides=2, padding="same"),
    keras.layers.ZeroPadding2D(padding=((0,1),(0,1))),
    keras.layers.BatchNormalization(momentum=0.8),
    keras.layers.LeakyReLU(alpha=0.2),
    keras.layers.Dropout(0.25),
    keras.layers.Conv2D(128, kernel_size=3, strides=2, padding="same"),
    BatchNormalization(momentum=0.8),
    keras.layers.LeakyReLU(alpha=0.2),
    keras.layers.Dropout(0.25),
    keras.layers.Conv2D(256, kernel_size=3, strides=1, padding="same"),
    keras.layers.BatchNormalization(momentum=0.8),
    keras.layers.LeakyReLU(alpha=0.2),
    keras.layers.Dropout(0.25),
    keras.layers.Flatten(),
    keras.layers.Dense(1, activation='sigmoid')
])
discriminator.summary()

In [None]:
gan = keras.models.Sequential([generator, discriminator])

In [None]:
def label_smoothing(y_true,y_pred):
     return tf.keras.losses.binary_crossentropy(y_true,y_pred, label_smoothing=0.2)

In [None]:
discriminator.compile(loss=label_smoothing, optimizer=tf.keras.optimizers.Adam(learning_rate= 0.0002, beta_1 = 0.7, beta_2 = 0.8))
discriminator.trainable = False

#Changed from std Adam optimizer

gan.compile(loss="binary_crossentropy", optimizer="Adam")

#changed from 128 - smaller batches seem to yeild better results
batch_size = 8
dataset = tf.data.Dataset.from_tensor_slices(X_train_dcgan)
dataset = dataset.shuffle(250)
dataset = dataset.batch(batch_size, drop_remainder=True).prefetch(1)

In [None]:
def train_gan(gan, dataset, batch_size, codings_size, n_epochs=500):
    generator, discriminator = gan.layers
    
    for epoch in range(n_epochs):
        print("Epoch {}/{}".format(epoch + 1, n_epochs))   
           
        for X_batch in dataset:
            # phase 1 - training the discriminator
            noise = tf.random.normal(shape=[batch_size, codings_size])
            generated_images = generator(noise)

            X_fake_and_real = tf.concat([generated_images, X_batch], axis=0)
            y1 = tf.constant([[0.]] * batch_size + [[1.]] * batch_size)
            discriminator.trainable = True
            d_loss = discriminator.train_on_batch(X_fake_and_real, y1)

            # phase 2 - training the generator
            noise = tf.random.normal(shape=[batch_size, codings_size])
            y2 = tf.constant([[1.]] * batch_size)
            discriminator.trainable = False
            g_loss = gan.train_on_batch(noise, y2)
            
        if epoch % 10 == 0:
            clear_output(wait=True)

        print("g_loss", g_loss)  
        print("d_loss", d_loss) 
        
        plt.imshow(generator(noise)[0,:,:,:] + 1, cmap="gray")
        #print(str(generator(noise)[0,:,:,:]))
        plt.show()


In [None]:
"""
#Bad attempt at splitting up fake and real batchs

def train_gan(gan, dataset, batch_size, codings_size, n_epochs=500):
    generator, discriminator = gan.layers
    
    for epoch in range(n_epochs):
        print("Epoch {}/{}".format(epoch + 1, n_epochs))   
           
        for X_batch in dataset:
            # phase 1 - training the discriminator
            noise = tf.random.normal(shape=[batch_size, codings_size])
            generated_images = generator(noise)    

            # generate 'fake' examples
            X_fake = generated_images
            y_fake = tf.constant([[0.]] * batch_size)
            
            # generate 'real' examples
            X_real =  X_batch
            y_real = tf.constant([[1.]] * batch_size)

            discriminator.trainable = True
            discriminator.train_on_batch(X_real, y_real)
            # update discriminator model weights
            d_loss = discriminator.train_on_batch(X_fake, y_fake)


            # phase 2 - training the generator
            noise = tf.random.normal(shape=[batch_size, codings_size])
            y2 = tf.constant([[1.]] * batch_size)
            discriminator.trainable = False
            g_loss = gan.train_on_batch(noise, y2)

        if epoch % 5 == 0:
            clear_output(wait=True)

        print("g_loss", g_loss)  
        print("d_loss", d_loss) 
        plt.imshow(generator(noise)[0,:,:,:] + 1, cmap="gray")
        #print(str(generator(noise)[0,:,:,:]))
        plt.show()"""

In [32]:
train_gan(gan, dataset, batch_size, codings_size)

KeyboardInterrupt: 