In [None]:
import logging
import os

# LOGGING CONFIG ##############################
logging.basicConfig(
    format='%(asctime)s %(levelname)-8s %(message)s',
    level=logging.INFO,
    datefmt='%Y-%m-%d %H:%M:%S')
###############################################

In [None]:
import numpy as np
import tensorflow as tf
import cv2
import matplotlib.pyplot as plt

In [None]:
'''
using the chest x-ray dataset from https://www.kaggle.com/paultimothymooney/chest-xray-pneumonia/
check if the chest x-ray dataset exists
unzip the chest x-ray dataset if it exists
'''
if not os.path.exists('chest_xray'):
    if os.path.exists('chest-xray-pneumonia.zip'):
        logging.info('unzipping the dataset file')
        os.system('unzip chest-xray-pneumonia.zip')
        logging.info('unzipping is done')
        os.system('rm chest-xray-pneumonia.zip')
    else:
        logging.warning('please download the dataset from https://www.kaggle.com/paultimothymooney/chest-xray-pneumonia/')

In [None]:
def create_data_from_image(dir, data_type, main, size):
    data = []
    occurrence = [0, 0, 0]
    for cat in main:
        logging.info('reading images for category {}'.format(cat))
        path =  os.path.join(dir, data_type, cat)
        label = main.index(cat)
        for img in os.listdir(path):
            # read and resize image
            try:
                img_array = cv2.imread(os.path.join(path, img), cv2.IMREAD_GRAYSCALE)
                img_array = cv2.resize(img_array, (size, size))

                # append to training data
                occurrence[label] += 1
                data.append([img_array, label])

            except:
                logging.warn('error reading {}'.format(img))
    
    logging.info('reading images done')
    return data, occurrence

In [None]:
directory = 'chest_xray'
data_type = 'train'   # choose between train and test
main_category = ['NORMAL', 'PNEUMONIA']
image_size = 200

# create training data with labels: 0:normal, 1:bacterial 2:viral
training_data, occurrence = create_data_from_image(directory, data_type, main_category, image_size)

In [None]:
# number of images with labels 0, 1
print('Normal:', occurrence[0], 'Pneumonia:', occurrence[1])
# weight is labels based on their occurence
weight = [float(i)/sum(occurrence) for i in occurrence]
print('Normal:', weight[0], 'Pneumonia:', weight[1])
class_weight = {0: weight[0],
                1: weight[1]}

# shuffle the training data otherwise the neural network model will be inefficient
import random 
random.shuffle(training_data)

In [None]:
# separate features and labels
X = []
y = []
for features, label in training_data:
    X.append(features)
    y.append(label)

In [None]:
# we have to convert a list to a numpy array that is understandable for tensorflow
# -1 means everything in the list, 1 is because the image is gray scale
X = np.array(X).reshape(-1, image_size, image_size, 1)


In [None]:
# save the training data
# you need at least 8GB of ram for this
import pickle
pickle_out = open('trainings/X_2labels.pickle', 'wb')
pickle.dump(X, pickle_out)
pickle_out.close()

pickle_out = open('trainings/y_2labels.pickle', 'wb')
pickle.dump(y, pickle_out)
pickle_out.close()

In [None]:
# uncomment the following to read X and y in case we want to re run from here
# this avoids the need to re-read all images
# import pickle
# X = pickle.load(open('trainings/X_2labels.pickle', 'rb'))
# y = pickle.load(open('trainings/y_2labels.pickle', 'rb'))

# in case of gray scale image data, we normalize it 
X = X/255.0 

In [None]:
# create test data
directory = 'chest_xray'
data_type = 'test'   # choose between train and test
main_category = ['NORMAL', 'PNEUMONIA']
image_size = 200

# create training data with labels: 0:normal, 1:bacterial 2:viral
test_data, occurrence = create_data_from_image(directory, data_type, main_category, image_size)

# separate features and labels
X_test = []
y_test = []
for features, label in test_data:
    X_test.append(features)
    y_test.append(label)

X_test = np.array(X_test).reshape(-1, image_size, image_size, 1)
X_test = X_test/255.0 

In [None]:
# import libraries to create neural networks
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten, Conv2D, MaxPooling2D
from tensorflow.keras.callbacks import TensorBoard

In [None]:
import time
NAME = 'xray-2_labels-4_layers-64x32-{}'.format(int(time.time()))
# tensorboard = TensorBoard(log_dir='trainings/{}'.format(NAME))
# os.system("tensorboard --logdir='trainin/'")

In [None]:
def create_model(X):
    # create the model 
    model = Sequential() 

    # layer 1
    model.add(Conv2D(64, (4, 4), strides=(3, 3), input_shape=X.shape[1:]))
    model.add(Activation('relu'))
    # model.add(MaxPooling2D(pool_size=(2,2)))

    # layer 2
    model.add(Conv2D(32, (4, 4), strides=(3, 3)))
    model.add(Activation('relu'))

    # layer 3
    model.add(Flatten())
    model.add(Dense(64))
    model.add(Activation('relu'))

    # output layer, we use Dense(3) to have 3 labels 0,1,2
    model.add(Dense(2))
    model.add(Activation('softmax'))

    model.compile(loss='sparse_categorical_crossentropy',
                optimizer='adam',
                metrics=['accuracy'])

    print(model.summary())
    return model

In [None]:
# create the model
model = create_model(X)

In [None]:
# in case data is imported, chage the weight manually
class_weight = {0: 0.25,
                1: 0.75}
# train the model with 
model.fit(X, y, batch_size=100, epochs=10, class_weight=class_weight, validation_split=0.1)
# model.fit(X, y, batch_size=100, epochs=20, class_weight=class_weight, validation_split=0.1, callbacks=[tensorboard])

In [None]:
# train the model again with external test data
model.fit(X, y, batch_size=100, epochs=10, class_weight=class_weight, validation_data=(X_test, y_test))

In [None]:
# save the network weights
model.save("trainings/{}.h5".format(NAME))

In [None]:
# # test the already saved weight
# # create the model
# model_test = create_model(X_test)

In [None]:
# # load a weights
# import tkinter as tk
# from tkinter import filedialog

# # file dialogue initialization
# root = tk.Tk()
# root.withdraw()

# file_path = filedialog.askopenfilename(filetypes=[("Model Weights", ".h5")])
# model_test.load_weights(file_path)

In [None]:
# # evaluate the model
# loss,acc = model_test.evaluate(X_test, y_test, verbose=2)
# print("Restored model, accuracy: {:5.2f}%".format(100*acc))