# Convolution Neural Network

In [None]:
%pip install numpy scikit-learn pandas boto3 matplotlib seaborn python-dotenv tensorflow

In [None]:
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
from io import BytesIO
from pathlib import Path
import pandas as pd
import seaborn as sn
import numpy as np
import joblib
import dotenv
import boto3
import logging
import os
import uuid
import sys
import glob
from random import shuffle
from tqdm import tqdm
from datetime import datetime, timezone
import tensorflow
from keras.applications.vgg16 import VGG16, preprocess_input
from keras.preprocessing import image
from keras.models import Model, Sequential, load_model
from keras.layers import Dense, Dropout, Flatten
import keras
from keras.utils import *
from keras.callbacks import EarlyStopping
from keras import backend as K

In [None]:
DOTENV_PATH = os.environ.get('DOTENV_PATH', './../.env')

if dotenv.load_dotenv(dotenv_path=DOTENV_PATH) == False:
    print(f'no environment have been loaded from .env path \"{DOTENV_PATH}\"')

In [None]:
LOG_LEVEL = 'INFO'
LOCAL_DATASET_PATH = os.environ.get('LOCAL_DATASET_PATH', '')
IMPORTED_DATASET_S3_KEY = os.environ.get('IMPORTED_DATASET_S3_KEY', '')
IMPORTED_CNN_S3_KEY = os.environ.get('IMPORTED_CNN_S3_KEY', '')
PUSH_MODEL_DUMP_TO_S3_ENABLED = os.environ.get('PUSH_MODEL_DUMP_TO_S3_ENABLED', 'true').lower() == 'true'
SKIP_DOWNLOAD = False
TMP_DIR = os.environ.get('TMP_DIR', '/tmp/pink-twins')
S3_BUCKET_NAME = os.environ.get('BUCKET_NAME', 'pink-twins-bucket')
S3_IMAGES_BUCKET_FOLDER = os.environ.get('S3_IMAGES_BUCKET_FOLDER', '')
S3_BUCKET_FOLDER = os.environ.get('S3_MODELS_BUCKET_FOLDER', '')
S3_ACCESS_KEY_ID = os.environ.get('S3_ACCESS_KEY_ID', '')
S3_SECRET_ACCESS_KEY = os.environ.get('S3_SECRET_ACCESS_KEY', '')
AUTHOR = os.environ.get('AUTHOR', 'undefined')

In [None]:
# Ensure that the temporary folder exist and create one if it doesn't exists
Path(TMP_DIR).mkdir(parents=True, exist_ok=True)

In [None]:
# Set logger format
logging.basicConfig(
    format="%(levelname)s | %(asctime)s | %(message)s",
    datefmt="%Y-%m-%dT%H:%M:%SZ",
    encoding='utf-8',
    level=logging.getLevelName(LOG_LEVEL),
    stream=sys.stdout,
)

In [None]:
def download_images_from_class(cl: str, s3, s3_bucket: str, s3_folder: str, dest_folder: str):
    cl_path = f"{s3_folder}/{cl}"
    Path(f"{dest_folder}/{cl}").mkdir(parents=True, exist_ok=True)

    objects = s3.list_objects_v2(Bucket=s3_bucket, Prefix=cl_path)

    # Download each object (image) from the bucket
    for obj in tqdm(objects.get('Contents', []), desc=f"download images from class {cl}"):
        key = obj['Key']
        key = key.split("/")
        key = key[-1]

        dest_filepath = f"{dest_folder}/{cl}/{key}"

        if(key == ''):
            continue

        if not os.path.exists(dest_filepath):
            s3.download_file(Bucket=s3_bucket, Key=f"{cl_path}/{key}", Filename=dest_filepath)

if SKIP_DOWNLOAD == False:
    dest_folder = f'{TMP_DIR}/cnn-images'

    # Create dir where images will be downloaded if it not already exists
    Path(dest_folder).mkdir(parents=True, exist_ok=True)

    try:
        s3 = boto3.client('s3', aws_access_key_id=S3_ACCESS_KEY_ID, aws_secret_access_key=S3_SECRET_ACCESS_KEY)
        rsp = s3.list_objects_v2(Bucket=S3_BUCKET_NAME, Prefix=f'{S3_IMAGES_BUCKET_FOLDER}/', Delimiter="/")

        name_of_classes = list(obj["Prefix"] for obj in rsp["CommonPrefixes"])
        classes= []
        for cl in name_of_classes:
            cl = cl.split("/")[1]
            classes.append(cl)

        for cl in tqdm(classes, desc=f"download all class images from bucket {S3_BUCKET_NAME}/{S3_IMAGES_BUCKET_FOLDER}"):
            download_images_from_class(cl, s3, S3_BUCKET_NAME, S3_IMAGES_BUCKET_FOLDER, dest_folder)
    except Exception as e:
        print(f"Error: {e}")

In [None]:
os.chdir(f'{TMP_DIR}/cnn-images')
classes = os.listdir()

itemPerClass = 200
nbClasses = len(classes)
N = nbClasses*itemPerClass
npix = 224

classLabel = 0
x = np.empty(shape=(0,npix,npix,3))
y = []

for cl in classes:
    print("Reading: "+cl+" images")
    listImages = glob.glob(cl+'/*')
    y += [classLabel]*itemPerClass
    for pathImg in listImages[:itemPerClass]:
        img = image.load_img(pathImg, target_size=(npix, npix))
        im = image.img_to_array(img)
        im = np.expand_dims(im, axis=0)
        im = preprocess_input(im)
        x = np.vstack([x, im])
    classLabel += 1
    
y = tensorflow.keras.utils.to_categorical(y, nbClasses)

ind_list = [i for i in range(N)]
shuffle(ind_list)
xNew  = x[ind_list, :,:,:]
yNew  = y[ind_list,]

pTrain = int(0.7*N) # selected 0.75 as the ratio train/test -- depended on the tries.
xTrain = xNew[:pTrain]
xTest  = xNew[pTrain:]

yTrain = yNew[:pTrain]
yTest  = yNew[pTrain:]

In [None]:
#Functions to calculate the recall, precision, and f1 score for the model 

def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))

In [None]:
if IMPORTED_CNN_S3_KEY != '':
    try:
        # Create an S3 client
        s3 = boto3.client('s3', aws_access_key_id=S3_ACCESS_KEY_ID, aws_secret_access_key=S3_SECRET_ACCESS_KEY)
        imported_model_id = IMPORTED_CNN_S3_KEY.split('/')[-1]
        imported_model_file = f'{TMP_DIR}/{imported_model_id}'
        
        # Download the dump file from S3
        response = s3.download_file(Bucket=S3_BUCKET_NAME, Key=IMPORTED_CNN_S3_KEY,
            Filename=imported_model_file)

        model = load_model(imported_model_file)

    except Exception as err:
        logging.fatal(f'failed to load dataset {IMPORTED_CNN_S3_KEY} from S3 bucket: {err}')
else:    
    VGGmodel = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    
    
    model = Sequential()
    model.add(VGGmodel)
    model.add(Flatten())
    model.add(Dense(512, activation='relu'))
    model.add(Dropout(0.2))
    model.add(Dense(nbClasses, activation='softmax'))
    
    # global network
    model = Model(inputs=model.input, outputs=model.output)
    model.summary()
    
    # training
    ourCallback = tensorflow.keras.callbacks.EarlyStopping(monitor='val_accuracy', min_delta=0.0001, patience=20, verbose=0,
                                                           mode='auto', baseline=None, restore_best_weights=False)
    
    # training part I: training only the classification part (the end)
    for layer in VGGmodel.layers:
        layer.trainable = False
    model.compile(optimizer='rmsprop', loss='binary_crossentropy', metrics=['accuracy', f1_m, precision_m, recall_m])
    
    model.fit(xTrain, yTrain, epochs=17, batch_size=32, validation_split=0.2, callbacks=[ourCallback],
              verbose=1)  # to prenvent it from too long a run


In [None]:
# Make predictions from the testing dataset and then compute the confusion
# matrix to evaluate model accuracy
score = model.evaluate(xTest,yTest,verbose=0)
print("%s: %.2f%%" % (model.metrics_names[1], score[1]*100,))
print("%s: %.2f%%" % (model.metrics_names[2], score[2]*100,))
print("%s: %.2f%%" % (model.metrics_names[3], score[3]*100,))
print("%s: %.2f%%" % (model.metrics_names[4], score[4]*100,))

In [None]:
#Find an example that was wrongly classified and show the picture see end of the first notebook on NN
yPred = model.predict(xTest)

yPredV = yPred.argmax(axis=1)
yTestV = yTest.argmax(axis=1)
err = yTestV - yPredV

cm = confusion_matrix(yTestV, yPredV)

num_classes = len(classes)
df_cm = pd.DataFrame(cm, index=[str(i) for i in range(num_classes)], columns=[str(i) for i in range(num_classes)])

In [None]:
# Display the confusion matrix
plt.figure(figsize = (5,4))
sn.heatmap(df_cm, annot=True)
plt.show()

In [None]:
if IMPORTED_CNN_S3_KEY == '' and PUSH_MODEL_DUMP_TO_S3_ENABLED:
    model_id = uuid.uuid4()

    try:
        model_id = uuid.uuid4()
        tmp_file = f'{TMP_DIR}/{model_id}.h5'
        key = f'{S3_BUCKET_FOLDER}/cnn/model/{model_id}.h5'
        model.save(tmp_file)

        s3 = boto3.client('s3', aws_access_key_id=S3_ACCESS_KEY_ID, aws_secret_access_key=S3_SECRET_ACCESS_KEY)
        s3.upload_file(Bucket=S3_BUCKET_NAME, Key=key, Filename=tmp_file,
                      ExtraArgs={
                          'Metadata': {
                          'author': AUTHOR,
                          'date': datetime.now(timezone.utc).astimezone().isoformat(),
                          'training_dataset_key': IMPORTED_DATASET_S3_KEY,
        }})
        logging.info(f'successfully pushed model dump as: {key}')

    except Exception as err:
        logging.fatal(f'failed to push complete model dump: {err}')
