#  import libraries

In [None]:
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt
import seaborn as sns
import random
import glob
import cv2
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow import keras
from keras.preprocessing.image import  ImageDataGenerator
from keras.layers import Dense,Flatten,BatchNormalization,MaxPooling2D,Conv2D,Input
from keras.callbacks import EarlyStopping,ModelCheckpoint
from keras import Model
from sklearn.metrics import classification_report , confusion_matrix

# get images from dataset files

In [None]:
train_normal = glob.glob('/kaggle/input/pneumonia-chest-x-ray-dataset/train/normal/*.jpeg')
train_pneumonia = glob.glob('/kaggle/input/pneumonia-chest-x-ray-dataset/train/pneumonia/*.jpeg')

test_normal = glob.glob('/kaggle/input/pneumonia-chest-x-ray-dataset/test/normal/*.jpeg')
test_pneumonia = glob.glob('/kaggle/input/pneumonia-chest-x-ray-dataset/test/pneumonia/*.jpeg')

val_normal = glob.glob('/kaggle/input/pneumonia-chest-x-ray-dataset/val/normal/*.jpeg')
val_pneumonia = glob.glob('/kaggle/input/pneumonia-chest-x-ray-dataset/val/pneumonia/*.jpeg')

normal = train_normal + test_normal + val_normal
pneumonia = train_pneumonia + test_pneumonia + val_pneumonia
        
random.shuffle(normal)
random.shuffle(pneumonia)

print ('number of normal x-rays = {}'.format(len (normal)))
print ('number of pneumonia x-rays = {}'.format(len (pneumonia)))

In [None]:
train_normal = normal [:int (len (normal) *0.8)]
test_normal = normal[int (len (normal) *0.8):]

train_pneumonia = pneumonia [:int (len (pneumonia) *0.8)]
test_pneumonia = pneumonia[int (len (pneumonia) *0.8):]

# plot sample of each catogery 

In [None]:
def plot_images (data ,image_title,num_of_images):
    plt.figure(figsize=(15,10))
    for i in range (num_of_images):
        random_image = random.choice(data)
        if type (data) == list:
            random_image = plt.imread(random_image)
        plt.subplot(1,num_of_images,i+1)
        plt.imshow(random_image)
        plt.axis('off')
        plt.title (image_title)
    
    plt.show()

plot_images (normal ,'Normal',5)
plot_images (pneumonia ,'Pneumonia',5)

# plot categories distribution

In [None]:
def plt_pie (data,labels,colors,title):
    plt.figure(figsize=(20,10))
    my_circle=plt.Circle( (0,0), 0.7, color='white')
    plt.pie(data,labels = labels, colors = colors , autopct='%1.1f%%')
    p=plt.gcf()
    p.gca().add_artist(my_circle)
    plt.title (title,fontsize = 22)
    plt.show()
    
data = [len (train_normal) , len (train_pneumonia) ]
labels = ['normal' , 'pneumonia' ]

plt_pie (data,labels,['blue','red'] , 'Categories distribution')

# define function to add Ben Graham's Method

In [None]:
def convert_image(image): 
    img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
    img = cv2.addWeighted(img, 4, cv2.GaussianBlur(img, (0,0), 256/10), -4, 128)
    return img

# define function to ColorJitter images 

In [None]:
def colorjitter(img, cj_type="b"):
    '''
    ### Different Color Jitter ###
    img: image
    cj_type: {b: brightness, s: saturation, c: constast}
    '''
    images =[]
    if cj_type == "b":
        value = np.random.choice(np.array([-50, -40, -30, 30, 40, 50]))
        hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
        h, s, v = cv2.split(hsv)
        if value >= 0:
            lim = 255 - value
            v[v > lim] = 255
            v[v <= lim] += value
        else:
            lim = np.absolute(value)
            v[v < lim] = 0
            v[v >= lim] -= np.absolute(value)

        final_hsv = cv2.merge((h, s, v))
        img = cv2.cvtColor(final_hsv, cv2.COLOR_HSV2BGR)
        return (img)
    elif cj_type == "s":
        value = np.random.choice(np.array([-50, -40, -30, 30, 40, 50]))
        hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
        h, s, v = cv2.split(hsv)
        if value >= 0:
            lim = 255 - value
            s[s > lim] = 255
            s[s <= lim] += value
        else:
            lim = np.absolute(value)
            s[s < lim] = 0
            s[s >= lim] -= np.absolute(value)

        final_hsv = cv2.merge((h, s, v))
        img = cv2.cvtColor(final_hsv, cv2.COLOR_HSV2BGR)
        return (img)
    elif cj_type == "c":
        brightness = 10
        contrast = random.randint(40, 100)
        dummy = np.int16(img)
        dummy = dummy * (contrast/127+1) - contrast + brightness
        dummy = np.clip(dummy, 0, 255)
        img = np.uint8(dummy) 
        return (img)
    

# define functions to balance low data with high data using ColorJitter

In [None]:
def balance_data(low_data,high_data):
    normal_train_imgs =[]
    for image in low_data:
        img = cv2.resize(cv2.imread(image), (256, 256) )
        img = convert_image(img)
        normal_train_imgs.append(img)

    for i in range (int ((len (high_data)- len (low_data))/len (low_data)) + 1):
        for image in low_data:
            img = cv2.resize(cv2.imread(image), (256, 256) )
            normal_train_imgs.append (colorjitter(img,random.choice(['b','s','c']) ))
            if (len (normal_train_imgs)>= len (high_data) ):
                break
    return np.array (normal_train_imgs)

In [None]:
def convert_links_to_images(data):
    images =[]
    for image in data:
        img = cv2.resize(cv2.imread(image), (256, 256) )
        img = convert_image(img)
        images.append(img)
    return np.array (images)

In [None]:
train_normal_images = balance_data(train_normal,train_pneumonia)
train_pneumonia_images = convert_links_to_images(train_pneumonia)
test_normal_images = convert_links_to_images(test_normal)
test_pneumonia_images = convert_links_to_images(test_pneumonia)

# plot distribution of train data after balncing 

In [None]:
plt_pie ([len (train_normal_images) ,len (train_pneumonia_images)] ,
         labels,['blue','red'],'Train distribution after balncing data')

# plot sample from each category after balancing

In [None]:
plot_images (train_normal_images ,'Normal',5)
plot_images (train_pneumonia_images ,'Pneumonia',5)

# get train and test data

In [None]:
y_train = np.concatenate ([np.zeros (len(train_normal_images)) , np.ones (len (train_pneumonia_images)) ])
y_test = np.concatenate ([np.zeros (len(test_normal_images)) , np.ones (len (test_pneumonia_images))] )

X_train = np.concatenate ([train_normal_images,train_pneumonia_images],axis = 0)
X_test = np.concatenate ([test_normal_images,test_pneumonia_images],axis = 0)

print (X_train.shape)
print (y_train.shape)
print (X_test.shape)
print (y_test.shape)
#------------------------free some memory---------------
train_normal_images,train_pneumonia_images,test_normal_images,test_pneumonia_images = None,None,None,None
del train_normal_images,train_pneumonia_images,test_normal_images,test_pneumonia_images 

# create datagenerator for train and for test

In [None]:
train_datagen = ImageDataGenerator( rescale=.1/255,
                                    rotation_range=40,
                                    width_shift_range=0.2,
                                    height_shift_range=0.2,
                                    shear_range=0.2,
                                    zoom_range=0.2,
                                    horizontal_flip=True,
                                    fill_mode='nearest')

test_datagen = ImageDataGenerator(rescale=.1/255,)

train_generator = train_datagen.flow(X_train, y_train,batch_size=16,shuffle=True)
test_generator = test_datagen.flow(X_test, y_test,batch_size=16,shuffle=False)
#---------------------------free some memory--------------------
X_train,y_train,X_test = None, None,None
del X_train,y_train,X_test

# get inception_v3 pre trained model

In [None]:
inception = keras.applications.inception_v3.InceptionV3 (input_shape = (256,256,3),
                                                            include_top = False,
                                                            weights = 'imagenet')
inception.trainable = False         
last_output = inception.get_layer('mixed8').output         

# build full-model and fit model

In [None]:
x = Flatten()(last_output)
x = Dense(64, activation='relu')(x)
x = BatchNormalization()(x)
x = Dense(32, activation='relu')(x)
x = BatchNormalization()(x)
x = Dense(1, activation='sigmoid')(x)

model = Model(inputs=inception.input, outputs=x)

callbacks = [EarlyStopping(monitor='val_loss', patience=7, restore_best_weights = True,verbose = 1),
             ModelCheckpoint(filepath='model.h5', monitor='val_loss', save_best_only=True)]
            
model.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-4),
              loss='binary_crossentropy',
              metrics=['accuracy'])

history = model.fit(train_generator,epochs=50,validation_data=test_generator,callbacks=[callbacks])
model.load_weights('model.h5')

# plot loss and accuracy

In [None]:
train_loss = history.history['loss']
val_loss = history.history['val_loss']
tarin_acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

plt.plot(range (len (train_loss)),train_loss,c='b',label = 'training loss')
plt.plot (range (len (val_loss)),val_loss,c='r',label = 'validation loss')
plt.title ('model loss',loc ='right')
plt.legend()
plt.show()

plt.plot(range (len (tarin_acc)),tarin_acc,c='b',label = 'training accuracy')
plt.plot (range (len (val_acc)),val_acc,c='r',label = 'validation accuracy')
plt.title ('model accuracy',loc ='right')
plt.legend()
plt.show()

# get predicted output

In [None]:
y_pred = model.predict (test_generator)
y_pred = y_pred.reshape (-1,)
y_pred = np.float16(y_pred>=0.5 )

# print confusion matrix and classification report


In [None]:
sns.heatmap(confusion_matrix(y_test,y_pred),annot = True,fmt='d')
print (classification_report (y_test,y_pred))