In [1]:
from keras import backend as K
from keras.preprocessing.image import ImageDataGenerator,load_img, img_to_array
from keras.models import Sequential, Model
from keras.layers import GlobalAveragePooling2D, Dense, Conv2D
from keras.layers import Activation, Dropout, Flatten, Dense, AvgPool2D, MaxPool2D, MaxPooling2D
from keras.applications.vgg16 import VGG16, preprocess_input
from keras.optimizers import Adam, SGD, RMSprop

import tensorflow as tf

import os
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt

In [None]:
DATASET_DIR = r"D:\user\DICOM_images"

In [None]:
category_list = os.listdir(DATASET_DIR)
category_list

In [None]:
IMG_W = 256
IMG_H = 256
CHANNELS = 3

INPUT_SHAPE = (IMG_W, IMG_H, CHANNELS)
EPOCHS = 10
BATCH_SIZE = 32

In [None]:
from PIL import Image

for root, dirs, files in os.walk(DATASET_DIR):
    rootpath = os.path.join(os.path.abspath(DATASET_DIR), root)

    for file in files:
        try:
            filepath = os.path.join(rootpath, file)
            img = Image.open(filepath)
            img_resize = img.resize((IMG_W, IMG_H))
            img_resize.save(filepath)
            print(filepath + " SUCCESS")
        except:
            print(filepath + " Image Error")

In [None]:
from imgaug import augmenters as iaa 
import cv2

def load_images_from_folder(folder): 
    images = [] 
    for filename in os.listdir(folder): 
        img = cv2.imread(os.path.join(folder, filename)) 
        if img is not None: 
            images.append(img) 
    return images

def write_images(folder, name, images): 
    for i in range(0, len(images)): 
        cv2.imwrite(folder + "\\" + name + "{}.png".format(i), images[i])
    print("image saving complete")
        
def augmentations(images):       
    seq1 = iaa.Sequential([
        iaa.AverageBlur(k=(2, 7)),
        iaa.MedianBlur(k = (3, 11))
    ])
    seq2 = iaa.Sequential([
        iaa.Fliplr(0.5),
        iaa.GaussianBlur(sigma=(0, 3.0))
    ])
    seq3 = iaa.Sequential([
        iaa.AdditiveGaussianNoise(scale=0.05*255),
        iaa.Affine(translate_px={"x": (1, 5)})
    ])
    seq4 = iaa.Sequential([
        iaa.Affine(
            scale={"x": (0.8, 1.2), "y": (0.8, 1.2)},
            rotate=(-25, 25),
            shear=(-8, 8)
        )
    ], random_order=True)
    print("image augmentation beginning")
    img1 = seq1.augment_images(images)
    print("sequence 1 completed......")
    img2 = seq2.augment_images(images)
    print("sequence 2 completed......")
    img3 = seq3.augment_images(images)
    print("sequence 3 completed......")
    img4 = seq4.augment_images(images)
    print("sequence 4 completed......")
    print("proceed to next augmentations")
    list = [img1, img2, img3, img4]
    
    return list

In [None]:
for category in category_list:
    category_images_path = os.path.join(DATASET_DIR, category)
    while len(os.listdir(category_images_path)) < 3000:
        images = load_images_from_folder(category_images_path)
        augmented_images = augmentations(images)
        for i in range(len(augmented_images)):
            write_images(category_images_path, "Augmented_", augmented_images[i])

In [None]:
train_datagen = ImageDataGenerator(
    rotation_range = 10,
    brightness_range = [0.8, 1.2],
    shear_range = 0.01,
    zoom_range = [0.9, 1.1],
    validation_split = 0.25,
    preprocessing_function = preprocess_input
)
 
val_datagen = ImageDataGenerator(
    validation_split = 0.25,
    preprocessing_function = preprocess_input
)

In [None]:
train_gen = train_datagen.flow_from_directory(
    DATASET_DIR,
    target_size = (IMG_W, IMG_H),
    class_mode = 'categorical',
    batch_size = BATCH_SIZE,
    shuffle = True,
    subset = 'training'
)
 
val_gen = val_datagen.flow_from_directory(
    DATASET_DIR,
    target_size = (IMG_W, IMG_H),
    class_mode = 'categorical',
    batch_size = BATCH_SIZE,
    shuffle = False,
    subset = 'validation'
)

In [None]:
base_model = VGG16(input_shape=INPUT_SHAPE, weights="imagenet",
                  include_top=False)

x = base_model.output
x = GlobalAveragePooling2D()(x)
output = Dense(len(category_list), activation='softmax')(x)

model = Model(inputs=base_model.input, outputs=output)

In [None]:
model.compile(loss=tf.keras.losses.CategoricalCrossentropy(),
              optimizer='rmsprop',
              metrics=['accuracy'])

In [None]:
model.summary()

In [None]:
from keras.callbacks import ModelCheckpoint
from keras.callbacks import TensorBoard
from keras.callbacks import EarlyStopping

MODEL_SAVE_FOLDER_PATH = "./model"
LOG_SAVE_FOLDER_PATH = "./logs"
if not os.path.exists(MODEL_SAVE_FOLDER_PATH):
    os.mkdir(MODEL_SAVE_FOLDER_PATH)
if not os.path.exists(LOG_SAVE_FOLDER_PATH):
    os.mkdir(LOG_SAVE_FOLDER_PATH)
model_path = MODEL_SAVE_FOLDER_PATH + "\\" + "model.h5"

history = model.fit_generator(
    train_gen,
    steps_per_epoch = train_gen.samples // BATCH_SIZE,
    validation_data = val_gen, 
    validation_steps = val_gen.samples // BATCH_SIZE,
    epochs = EPOCHS,
    callbacks = [
        ModelCheckpoint(filepath=model_path, monitor="val_loss", verbose=1, save_best_only=True),
        TensorBoard(log_dir=LOG_SAVE_FOLDER_PATH, histogram_freq=0, write_graph=True, write_images=True)
    ])

In [None]:
plt.plot(history.history["accuracy"])
plt.plot(history.history["val_accuracy"])
plt.title("model accuracy")
plt.ylabel("accuracy")
plt.xlabel("epoch")
plt.legend(["train", "test"], loc="upper left")
plt.show()

plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
from keras.models import load_model
    
model.save("model.h5")
model.save_weights("model_weight.h5")
print("Saved model to disk")

In [None]:
print("training_accuracy", history.history['accuracy'][-1])
print("validation_accuracy", history.history['val_accuracy'][-1])

In [None]:
label = validation_generator.classes
print(label)

In [None]:
pred = model.predict(validation_generator)
predicted_class_indices = np.argmax(pred, axis=1)
labels = (validation_generator.class_indices)  
labels2 = dict((v, k) for k,v in labels.items())
predictions = [labels2[k] for k in predicted_class_indices]
print(predicted_class_indices)
print(labels)
print(predictions)

In [None]:
from sklearn.metrics import confusion_matrix

cf = confusion_matrix(predicted_class_indices, label)

In [None]:
exp_series = pd.Series(label)
pred_series = pd.Series(predicted_class_indices)
pd.crosstab(exp_series, pred_series, rownames=["Actual"], colnames=["Predicted"], margins=True)