## Imports

In [None]:
# Conv2D is spatial convolution over images
# MaxPooling2D is a max pooling operation for 2D spatial data
from tensorflow.keras.metrics import Precision, Recall, BinaryAccuracy
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dense, Flatten, Dropout, BatchNormalization
# sequential is good for when we have a single dataset and looking for single output
from tensorflow.keras.models import Sequential, load_model, Model
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.callbacks import EarlyStopping
import cv2
import tensorflow as tf
import os
import random
import numpy as np
from matplotlib import pyplot as plt
from PIL import Image
from sklearn.model_selection import StratifiedShuffleSplit
import seaborn as sns
from sklearn.metrics import confusion_matrix

## Set random seed

In [None]:
def set_seed(seed: int = 42) -> None:
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)
    tf.experimental.numpy.random.seed(seed)
    tf.keras.utils.set_random_seed(seed)
    # When running on the CuDNN backend, two further options must be set
    os.environ['TF_CUDNN_DETERMINISTIC'] = '1'
    os.environ['TF_DETERMINISTIC_OPS'] = '1'
    # Set a fixed value for the hash seed
    os.environ["PYTHONHASHSEED"] = str(seed)


set_seed()

## Extracting data using opencv

In [None]:
image_directory = 'tonsildb/'
dataset = []
label = []

no_pha_images = os.listdir(image_directory+'no_pharyngitis')
for i, image_name in enumerate(no_pha_images):    #Remember enumerate method adds a counter and returns the enumerate object
    image = cv2.imread(image_directory + 'no_pharyngitis/' + image_name)
    image = Image.fromarray(image, 'RGB')
    image = image.resize((256, 256))
    image = np.array(image)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    dataset.append(image)
    label.append(0)


pha_images = os.listdir(image_directory + 'pharyngitis/')
for i, image_name in enumerate(pha_images):    #Remember enumerate method adds a counter and returns the enumerate object
    image = cv2.imread(image_directory + 'pharyngitis/' + image_name)
    image = Image.fromarray(image, 'RGB')
    image = image.resize((256, 256))
    image = np.array(image)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    dataset.append(image)
    label.append(1)
    
images_np = dataset
labels_np = label

## Augmenting the data functions

In [None]:
def brightness(img, low, high):
    value = random.uniform(low, high)
    hsv = cv2.cvtColor(img, cv2.COLOR_RGB2HSV)
    hsv = np.array(hsv, dtype = np.float64)
    hsv[:,:,1] = hsv[:,:,1]*value
    hsv[:,:,1][hsv[:,:,1]>255]  = 255
    hsv[:,:,2] = hsv[:,:,2]*value 
    hsv[:,:,2][hsv[:,:,2]>255]  = 255
    hsv = np.array(hsv, dtype = np.uint8)
    img = cv2.cvtColor(hsv, cv2.COLOR_HSV2RGB)
    return img

def channel_shift(img, value):
    value = int(random.uniform(-value, value))
    img = img + value
    img[:,:,:][img[:,:,:]>255]  = 255
    img[:,:,:][img[:,:,:]<0]  = 0
    img = img.astype(np.uint8)
    return img


def horizontal_flip(img):    
    return cv2.flip(img, 1)
    

## Augmentation pipeline

In [None]:
def brightness_and_flip(image_np):
    image_np = horizontal_flip(image_np)
    image_np = brightness(image_np, 0.7, 1.2)
    return image_np


for i in range(len(images_np)):
    to_append = horizontal_flip(images_np[i])
    images_np.append(to_append)
    labels_np.append(labels_np[i])
    to_append = brightness(images_np[i], 0.7, 1.2)
    images_np.append(to_append)
    labels_np.append(labels_np[i])
    to_append = brightness_and_flip(images_np[i])
    images_np.append(to_append)
    labels_np.append(labels_np[i])
    
print(len(images_np))
print(len(labels_np))

## OpenCV pipeline functions

In [None]:
def print_image(image_np):
    plt.imshow(image_np/255.0)
    plt.show()

def print_image_grayscale(image_np, gray = False):
    if gray == True:
        plt.imshow(image_np/255.0, cmap='gray')
    else:
        plt.imshow(image_np)
    plt.show()

def combine_mask_with_image(mask, image_np):
    image_np = cv2.bitwise_and(image_np, image_np, mask = mask)
    return image_np


def apply_threshold_rgb(images_np, lower = np.array([0,0,0]), upper = np.array([255,255,255])):
    binary_np = cv2.inRange(images_np, lower, upper)
    return binary_np


def apply_threshold_grayscale(grayscale_np, lower=0, upper=255):
    binary_np = cv2.inRange(grayscale_np, lower, upper)
    return binary_np

def convert_to_grayscale(images_np):
    #gray_np = np.dot(images_np[..., :3], [0.2989, 0.5870, 0.1140])
    gray_np = cv2.cvtColor(images_np, cv2.COLOR_BGR2GRAY)
    return gray_np

def convert_rgb_to_ycbcr(image_np):
    image_np = image_np.astype(np.uint8)
    ycbcr_image = cv2.cvtColor(image_np, cv2.COLOR_RGB2YCrCb)
    return ycbcr_image

def resize_image(image_np, new_size):
    resize_image = cv2.resize(image_np, new_size)
    return resize_image

## Edge detection and smoothing functions

In [None]:
def apply_gaussian_blur(images_np):
    # second argument is kernel size. Larger the kernel size means wider distribution of weights
    # third argument is sigma - larger the sigma the more spread out the blur effect. Pixels far away have greater influence on final output
    # A larger kernel size can help to smooth out larger features in the image, while a larger sigma value can help to remove finer details and noise
    blurred_image_np = cv2.GaussianBlur(images_np, (7, 7), 15)
    return blurred_image_np

def calculate_gradient_mag_using_canny(grayscale_np, lower_thresh = 0, upper_thresh = 0):
    #second argument - grad mag threshold. Any edge with grad mag lower than this is discarded as weak edges
    #third argument - grad mag threshold but any edge with grad mag higher considered as strong edges
    grayscale_np = grayscale_np.astype(np.uint8)
    edges_np = cv2.Canny(grayscale_np,lower_thresh, upper_thresh)
    return edges_np

## Vision modelling and detecting shapes

In [None]:

def get_circles_from_edges(edges, min_radius = 0, max_radius = 100):
    #first argument is the binary image of edges
    #Second argument is variation of hough transform -
    #variants include cv2.HOUGH_STANDARD, cv2.HOUGH_PROBABALISTIC, cv2.HOUGH_GRADIENT
    #third argument is resolution of accumulator array - represents ratio between image resolution and resolution of accumulator array - determines granularity of search - smaller leads to more finer search
    #fourth argument is the minimum distance between centers of detected circles
    #param1 is the higher threshold for the canny edge detector
    #fourth argument is Min and max radius of circles found
    circles = cv2.HoughCircles(edges, cv2.HOUGH_GRADIENT, 1,20, param1=50, param2= 30, minRadius=min_radius, maxRadius=max_radius)
    
    if circles is not None:
        circles = np.round(circles[0, :]).astype("int")
        distances = np.sqrt((circles[:, 0] - edges.shape[1]/2)**2 + (circles[:, 1] - edges.shape[0]/2)**2)
        sorted_circles = circles[np.argsort(distances)]
        sorted_circles = np.expand_dims(sorted_circles, axis=0)
        return sorted_circles
    
    return circles

def create_square_mask(circle, image_np):
    print(circle)
    x, y = circle[:2]
    size = circle[2]
    mask = np.zeros(image_np.shape[:2], dtype=np.uint8)
    mask[y-size:y+size, x-size:x+size] = 1
    return mask

def crop_image(image_np, circle):
    square_size = circle[2] * 2
    x_start = circle[0]-circle[2]
    y_start = circle[1] - circle[2]
    cropped = image_np[y_start: y_start+square_size, x_start:x_start+square_size]
    return cropped

def draw_circle_on_image(image_np, circle):
    image_np = cv2.circle(image_np, (circle[0],circle[1]),circle[2],(0,255,0),2)
    print_image(image_np)
    return image_np

def create_and_apply_circle_mask(image_np, circles):
    if circles is not None:
        circle = circles[0][0]
        mask = np.zeros(image_np.shape[:2], dtype=np.uint8)
        cv2.circle(mask, (circle[0], circle[1]), circle[2], (255, 255, 255), -1)
        circle_result = cv2.bitwise_and(image_np, image_np, mask=mask)
        #print_image(circle_result)
        result = crop_image(circle_result, circle)
    else:
        result = np.zeros_like(image_np)
    return result

## Pipeline

In [None]:
for i in range(len(images_np)):
    
    blurred_np= apply_gaussian_blur(images_np[i])
    gray_np = convert_to_grayscale(blurred_np)
    grad_mag_np = calculate_gradient_mag_using_canny(gray_np, 0, 5)
    circles = get_circles_from_edges(grad_mag_np, 70, 100)
    cropped_image_np = create_and_apply_circle_mask(images_np[i],circles)
    images_np[i] = resize_image(cropped_image_np, (256,256))
    
    #print_image(images_np[i])
    images_np[i] = images_np[i].astype('float32')/255.0
    

## Splitting into train and test

In [None]:
#for i in range(len(images_np)):
#    images_np[i] = images_np[i]/255.0

stratified_split = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
train_indices, test_indices = next(stratified_split.split(images_np, labels_np))

x_train = [images_np[i] for i in train_indices]
y_train = [labels_np[i] for i in train_indices]
x_test_list = [images_np[i] for i in test_indices]
y_test = [labels_np[i] for i in test_indices]

x_train = np.array(x_train)
x_test = np.array(x_test_list)
y_train = np.array(y_train).reshape(-1,1)
y_test = np.array(y_test).reshape(-1,1)

print(len(x_train))
print(len(x_test))
print(x_test.shape)
print(y_test.shape)

for i in x_test_list:
    #print_image(i)
    break

## Building the deep learning model

In [None]:
'''
model = ResNet50(include_top=False, weights='imagenet',
                 input_shape=(256, 256, 3), classes=2, pooling='avg')
resnet_model = Sequential()
resnet_model.add(model)
for layer in resnet_model.layers:
    layer.trainable = False
resnet_model.add(Dense(512, activation='relu'))
resnet_model.add(Dense(1, activation='sigmoid'))
resnet_model.compile('adam', loss=tf.losses.BinaryCrossentropy(),
                     metrics=['accuracy'])
resnet_model.summary()
'''

tf.random.set_seed(42)
np.random.seed(42)

callback = EarlyStopping(monitor='val_loss', patience=3)

model = Sequential()
# Conv2D(No. of filters, dimensions of filter, activation function, expected image size(only first time))
model.add(Conv2D(32, (3, 3), 1, activation='relu',
          padding="same", input_shape=(256, 256, 3)))
model.add(MaxPooling2D())

model.add(Conv2D(32, (3, 3), 1, activation='relu'))
model.add(MaxPooling2D())

model.add(Conv2D(32, (3, 3), 1, activation='relu'))
model.add(MaxPooling2D())
model.add(MaxPooling2D())

model.add(Conv2D(32, (3, 3), 1, activation='relu'))
model.add(MaxPooling2D())

model.add(Conv2D(32, (3, 3), 1, activation='relu', padding="same"))
model.add(MaxPooling2D())

model.add(Flatten())

model.add(Dense(512, activation='relu'))
model.add(Dense(1, activation='sigmoid'))

model.compile('adam', loss=tf.losses.BinaryCrossentropy(),
              metrics=['accuracy'])
model.summary()

In [None]:
del model

## Fitting and testing the model

In [None]:
'''
hist = model.fit(
    train_generator,
    epochs=20,
    validation_data=val_generator,
    verbose=1
)
'''
hist = model.fit(x_train,
    y_train,
    batch_size=64,
    epochs=20,
    validation_split = 0.2,
    verbose=1,
    callbacks = [callback]
)

### Graph for error and accuracy

In [None]:
fig = plt.figure()
plt.plot(hist.history['accuracy'], color='green', label='accuracy')
plt.plot(hist.history['val_accuracy'], color='black', label='val_accuracy')
fig.suptitle('accuracy')
plt.legend(loc='upper left')
plt.show()

fig2 = plt.figure()
plt.plot(hist.history['loss'], color='teal', label='loss')
plt.plot(hist.history['val_loss'], color='orange', label='val_loss')
fig.suptitle('loss')
plt.legend(loc='upper left')
plt.show()

## Importing existing model

In [None]:
file_path = os.path.abspath(os.path.join(
    'models', 'tonsil_detector.h5'))
model = tf.keras.models.load_model(file_path)

## Visualising each layer

In [None]:
img_to_vis = np.expand_dims(x_test[0], axis=0)
print(img_to_vis.shape)

conv_layer_index = [0,2,4,7,9]
outputs = [model.layers[i].output for i in conv_layer_index]
feature_model = Model(inputs=model.inputs, outputs=outputs)

feature_output = feature_model.predict(img_to_vis)

columns = 8
rows = 4
for ftr in feature_output:
    #pos = 1
    fig=plt.figure(figsize=(8,4))
    for i in range(1, columns*rows +1):
        fig =plt.subplot(rows, columns, i)
        fig.set_xticks([])  #Turn off axis
        fig.set_yticks([])
        plt.imshow(ftr[0, :, :, i-1], cmap='gray')
        #pos += 1
    plt.show()



## Evaluation on test set

In [None]:
y_preds = model.predict(x_test)
y_preds = np.round(y_preds)
cm = confusion_matrix(y_test, y_preds)
sns.heatmap(cm, annot=True, cmap='Blues', fmt='g')
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.show()

tn, fp, fn, tp = cm.ravel()

precision = tp / (tp + fp)
recall = tp / (tp + fn)
accuracy = (tp + tn) / (tp + tn + fp + fn)

print("Precision:", precision)
print("Recall:", recall)
print("Accuracy:", accuracy)



# Get misclassified indices
misclassified_idx = np.where(y_preds != y_test)[0]
misclassified_idx = misclassified_idx.tolist()
for i in misclassified_idx:
    print_image(x_test_list[i]*255.0)

## Saving the model

In [None]:
model.save(os.path.join('models', 'tonsil_detector.h5'))

## Building feature extractor model

In [None]:
feature_extractor = Sequential()
feature_extractor.add(Conv2D(32, (3, 3), 1, activation='relu',
          padding="same", input_shape=(256, 256, 3)))
feature_extractor.add(MaxPooling2D())

feature_extractor.add(Conv2D(32, (3, 3), 1, activation='relu'))
feature_extractor.add(MaxPooling2D())


feature_extractor.add(Conv2D(32, (3, 3), 1, activation='relu'))
feature_extractor.add(MaxPooling2D())
feature_extractor.add(MaxPooling2D())

feature_extractor.add(Conv2D(32, (3, 3), 1, activation='relu'))
feature_extractor.add(MaxPooling2D())

feature_extractor.add(Conv2D(32, (3, 3), 1, activation='relu', padding="same"))
feature_extractor.add(MaxPooling2D())

feature_extractor.add(Flatten())

In [None]:
del feature_extractor

## Random forest

In [None]:
from sklearn.ensemble import RandomForestClassifier
RF_model = RandomForestClassifier(n_estimators = 50, random_state = 42)

x_train_rf = feature_extractor.predict(x_train)
x_test_rf = feature_extractor.predict(x_test)

RF_model.fit(x_train_rf, y_train)
RF_pred = RF_model.predict(x_test_rf)

cm = confusion_matrix(y_test, RF_pred)
sns.heatmap(cm, annot=True, cmap='Blues', fmt='g')
fig.suptitle('accuracy')
plt.legend(loc='upper left')
plt.show()

tn, fp, fn, tp = cm.ravel()

precision = tp / (tp + fp)
recall = tp / (tp + fn)
accuracy = (tp + tn) / (tp + tn + fp + fn)

print("Precision:", precision)
print("Recall:", recall)
print("Accuracy:", accuracy)