# mount drive

In [None]:
import os
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
os.chdir("drive/MyDrive/xray GAN/")
os.listdir

Mounted at /content/drive


<function posix.listdir>

# imports

In [None]:
from __future__ import print_function, division

from keras.datasets import mnist
from keras.layers import Input, Dense, Reshape, Flatten, Dropout
from keras.layers import BatchNormalization, Activation, ZeroPadding2D, Embedding, Multiply, Concatenate, concatenate
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.models import Sequential, Model
from keras.optimizers import Adam
from keras.layers import multiply
import cv2
import pandas as pd
from os import path as osp

import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

import sys

import numpy as np

from sklearn import preprocessing

# CGAN

In [None]:
pip install jdc

Collecting jdc
  Downloading https://files.pythonhosted.org/packages/5a/cb/9afea749985eef20f3160e8826a531c7502e40c35a038dfe49b67726e9a0/jdc-0.0.9-py2.py3-none-any.whl
Installing collected packages: jdc
Successfully installed jdc-0.0.9


In [None]:
import jdc

In [None]:
class CGAN(): 
  def __init__(self):
    #self.__root = None
    # Input shape
    self.img_rows = 256
    self.img_cols = 256
    self.channels = 1
    self.img_shape = (self.img_rows, self.img_cols, self.channels)
    self.num_classes = 5
    self.latent_dim = 128

    optimizer = Adam(0.0002, 0.5)

    # Build and compile the discriminator
    self.discriminator = self.build_discriminator()
  
    self.discriminator.compile(loss='binary_crossentropy',
                                  optimizer=optimizer,
                                  metrics=['accuracy'])

    # Build the generator
    self.generator = self.build_generator()

    # The generator takes noise as input and generates imgs
    noise = Input(shape=(self.latent_dim,))
    label = Input(shape=(1,))

    img = self.generator([noise, label])
    # For the combined model we will only train the generator
    self.discriminator.trainable = False

    # The discriminator takes generated images as input and determines validity
    valid = self.discriminator([img, label])

    # The combined model  (stacked generator and discriminator)
    # Trains the generator to fool the discriminator
    self.combined = Model([noise, label], valid)
    self.combined.compile(loss='binary_crossentropy', optimizer=optimizer)   

  def build_generator(self):
    model = Sequential()

    model.add(Dense(128 * 32 * 32, activation="relu", input_dim=self.latent_dim))
    model.add(Reshape((32, 32, 128)))
    model.add(UpSampling2D())
    model.add(Conv2D(128, kernel_size=3, padding="same"))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Activation("relu"))
    model.add(UpSampling2D())
    model.add(Conv2D(64, kernel_size=3, padding="same"))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Activation("relu"))
    model.add(Conv2D(32, kernel_size=3, padding="same"))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Activation("relu"))
    model.add(UpSampling2D())
    model.add(Conv2D(self.channels, kernel_size=3, padding="same"))
    model.add(Activation("tanh"))

    model.summary()

    noise = Input(shape=(self.latent_dim,))
    label = Input(shape=(1,), dtype='int32')
    label_embedding = Flatten()(Embedding(self.num_classes, self.latent_dim)(label))
    model_input = multiply([noise, label_embedding])
    img = model(model_input)

    return Model([noise, label], img)
  
  def build_discriminator(self):
    img = Input(shape=self.img_shape)
    label = Input(shape=(1,), dtype='float32')
    c1 = Conv2D(32, kernel_size=3, strides=2, padding="same")(img)
    lr1 = LeakyReLU(alpha=0.2)(c1)
    d1 = Dropout(0.25)(lr1)
    c2 = Conv2D(64, kernel_size=3, strides=2, padding="same")(d1)
    zp1 = ZeroPadding2D(padding=((0, 1), (0, 1)))(c2)
    bn1 = BatchNormalization(momentum=0.8)(zp1)
    lr2 = LeakyReLU(alpha=0.2)(bn1)
    d2 = Dropout(0.25)(lr2)
    c3 = Conv2D(128, kernel_size=3, strides=2, padding="same")(d2)
    bn2 = BatchNormalization(momentum=0.8)(c3)
    lr3 = LeakyReLU(alpha=0.2)(bn2)
    d3 = Dropout(0.25)(lr3)
    c4 = Conv2D(256, kernel_size=3, strides=1, padding="same")(d3)
    bn3 = BatchNormalization(momentum=0.8)(c4)
    lr4 = LeakyReLU(alpha=0.2)(bn3)
    d4 = Dropout(0.25)(lr4)
    f = Flatten()(d4)
    concat = Concatenate()([f, label])
    hid = Dense(512, activation='relu')(concat)
    out = Dense(1, activation='sigmoid')(concat)
    return Model([img, label], out)

  
  def load_xrays(self, epochs=100, batch_size=1000, save_interval=20):
    (img_x, img_y) = 256, 256
    train_path = '/content/drive/MyDrive/xray GAN/xray14/'
    data_path = 'data_entry.csv'

    classes = ['Effusion', 'Emphysema', 'Cardiomegaly','Mass']
    num_classes = len(classes)
    img_folders = { 'images_001/', 'images_002/', 'images_003/', 'images_005/', 
                 'images_008/', 'images_011/', 'images_006/', 'images_007/', 
                 'images_004/', 'images_009/', 'images_010/', 'images_012/'}

    # Load training data
    dataTrain = pd.read_csv(train_path + data_path)

    x_train = []
    y_train = []

    lb = preprocessing.LabelEncoder()#Binarizer()
    lb.fit(classes)	

    count = 0
    for index, row in dataTrain.iterrows():
      for folder in img_folders:
        label = row["Finding Labels"]
        if label not in classes:
          continue
        img1 = train_path + folder + "images/" + row["Image Index"]
        if not osp.exists(img1):
          continue
        #imgplot = plt.imshow(img1)
        image1 = cv2.imread(img1)  # Image.open(img).convert('L')
        image1 = image1[:, :, 0]
        arr1 = cv2.resize(image1, (img_x, img_y))
        arr1 = arr1.astype('float32')
        arr1 /= 255.0
        arr1 = arr1 - np.mean(arr1)
        
        x_train.append(arr1)
        y_train.append(label)
        #y_train.append(lb.transform([row["Finding Labels"]]).flatten().T)
      count += 1

    print("shape of x train: {}".format(len(x_train)))
    x_train = np.asarray(x_train)
    y_train = np.asarray(y_train)
    x_train = x_train.reshape(count, img_y, img_x, 1)
    #y_train = y_train.reshape(count, num_classes)
    #print("Y SHAPE BEFORE RESHAPING: {}".format(y_train.shape))
    y_train = y_train.reshape(-1, 1)
    #print("Y SHAPE: {}".format(y_train.shape))

    valid = np.ones((batch_size, 1))
    fake = np.zeros((batch_size, 1))

    # new experience relay to avoid mode collapse
    exp_replay = []

    for epoch in range(epochs):
      # ---------------------
      #  Train Discriminator
      # ---------------------

      # Select a random half batch of images
      idx = np.random.randint(0, x_train.shape[0], batch_size)
      imgs, labels = x_train[idx], y_train[idx]

      # Sample noise as generator input
      noise = np.random.normal(0, 1, (batch_size, 128))

      # Generate a half batch of new images
      gen_imgs = self.generator.predict([noise, labels])

      # Train the discriminator
      d_loss_real = self.discriminator.train_on_batch([imgs, labels], valid)
      d_loss_fake = self.discriminator.train_on_batch([gen_imgs, labels], fake)

      # relay stuff
      noise_prop = 0.05
      gene_labels = np.ones((batch_size, 1)) - np.random.uniform(low=0.0, high=0.1, size=(batch_size, 1))
      flipped_idx = np.random.choice(np.arange(len(gene_labels)), size=int(noise_prop*len(gene_labels)))
      gene_labels[flipped_idx] = 1 - gene_labels[flipped_idx]
    
      # Store a random point for experience replay
      r_idx = np.random.randint(batch_size)
      exp_replay.append([gen_imgs[r_idx], labels[r_idx], gene_labels[r_idx]])
      
      if len(exp_replay) == batch_size:
        generated_images = np.array([p[0] for p in exp_replay])
        labels = np.array([p[1] for p in exp_replay])
        gene_labels = np.array([p[2] for p in exp_replay])
        expprep_loss_gene = discriminator.train_on_batch([generated_images, labels], gene_labels)
        exp_replay = []
        break
    
      d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

      # ---------------------
      #  Train Generator
      # ---------------------

      # Condition on labels
      sampled_labels = np.random.randint(0, 5, batch_size).reshape(-1, 1)

      # Train the generator
      g_loss = self.combined.train_on_batch([noise, sampled_labels], valid)

      # Plot the progress
      print ("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, d_loss[0], 100*d_loss[1], g_loss))

      # If at save interval => save generated image samples
      #if epoch % save_interval == 0:
          #self.sample_images(epoch)

  def sample_images(self, epoch):
    r, c = 5, 5
    noise = np.random.normal(0, 1, (r * c, 128))
    sampled_labels = np.arange(0, 5).reshape(-1, 1)

    gen_imgs = self.generator.predict([noise, sampled_labels])

    # Rescale images 0 - 1
    gen_imgs = 0.5 * gen_imgs + 0.5

    fig, axs = plt.subplots(r, c)
    cnt = 0
    for i in range(r):
      for j in range(c):
        axs[i,j].imshow(gen_imgs[cnt,:,:,0], cmap='gray')
        axs[i,j].set_title("Digit: %d" % sampled_labels[cnt])
        axs[i,j].axis('off')
        cnt += 1
    fig.savefig(train_path + "images/%d.png" % epoch)
    plt.close()


# running CGAN


In [None]:
if __name__ == '__main__':
  cgan = CGAN()
  cgan.load_xrays(epochs=100, batch_size=1000, save_interval=20)
  cgan.generator.save('models/gen.h5')
  cgan.discriminator.save('models/disc.h5')
  # Generate one-hot-encoded labels
  # prepare label binarizer

  lb = preprocessing.LabelEncoder()#Binarizer()

  classes = ['Effusion', 'Emphysema', 'Cardiomegaly','Mass']

  OHE_labels = lb.fit_transform(classes)

  # at the end, loop per class, per 1000 images
  cnt = 0
  fig, ax = plt.subplots()
  for label in OHE_labels:
    for num in range(1):
      nlab = np.asarray([label]).reshape(-1, 1)
      noise1 = np.random.normal(0, 1, (1, 128))#cgan.latent_dim))
      #noise1 = np.zeros((1, 10000))
      #labels1 = np.tile(labels, 1000)
      img = cgan.generator.predict([noise1, nlab])#labels1])
      plt.imshow(img[cnt,:,:,0], cmap='gray')
            #cnt+=1
      fig.savefig( "images-strong-conv/" + str(label) + "-" + str(num) + ".png")
      plt.clf()


Model: "sequential_7"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_23 (Dense)             (None, 131072)            16908288  
_________________________________________________________________
reshape_7 (Reshape)          (None, 32, 32, 128)       0         
_________________________________________________________________
up_sampling2d_21 (UpSampling (None, 64, 64, 128)       0         
_________________________________________________________________
conv2d_60 (Conv2D)           (None, 64, 64, 128)       147584    
_________________________________________________________________
batch_normalization_45 (Batc (None, 64, 64, 128)       512       
_________________________________________________________________
activation_28 (Activation)   (None, 64, 64, 128)       0         
_________________________________________________________________
up_sampling2d_22 (UpSampling (None, 128, 128, 128)    

ValueError: ignored