## Import Library

In [None]:
import tensorflow as tf
from tensorflow.keras.utils import Sequence, to_categorical
from tensorflow import keras
from keras.preprocessing.image import ImageDataGenerator
import matplotlib.pyplot as plt
import numpy as np
import cv2
import glob, glob2
import albumentations as A
import time
import random
import os
from sklearn.model_selection import train_test_split
import shutil

In [None]:
tf.config.list_physical_devices('GPU')

In [None]:
!pip install wandb

In [None]:
import wandb 
from wandb.keras import WandbCallback

!wandb login

## Prepare dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%cd /content/drive/MyDrive/capstone-project/data/zip

In [None]:
!cp pushups.zip /content/

In [None]:
!unzip /content/pushups.zip -d /content/dataset

In [None]:
!rm /content/pushups.zip

In [None]:
# TYPE = 'up'
TYPE = 'down'

In [None]:
class_names = ['right', 'wrong']
num_classes = 2

In [None]:
dataset_folder = '/content/dataset/pushups/horizontal'
save_path = '/content/drive/MyDrive/capstone-project/models/wandb_pushups'
img_size = (224, 224)

In [None]:
all_files = glob.glob(f'{dataset_folder}/*/{TYPE}/*/*[.png|.jpg]')

In [None]:
len(all_files)

In [None]:
for cls_name in class_names:
  print(cls_name, len(glob.glob(f'{dataset_folder}/*/{TYPE}/{cls_name}/*[.png|.jpg]')))

In [None]:
for file_name in all_files:
  if 'Copy' in file_name:
    org = file_name.replace(' - Copy', '')
    if org in all_files:
      print(org)

## Augmentation

In [None]:
transform = A.Compose([
    
        A.HorizontalFlip(p=0.5),
        # A.VerticalFlip(p=0.5),
        
        A.ShiftScaleRotate(shift_limit=0.02, scale_limit=0.2, rotate_limit=5, p=0.5),
        A.RandomBrightnessContrast(brightness_limit=0.25, contrast_limit=0.25, p=0.5),
        A.RandomSunFlare(flare_roi=(0, 0, 1, 1), src_radius=50, p=0.1),
        A.IAAPerspective(scale=(0.01, 0.01), p=0.1),
        A.OneOf([
            A.IAAAdditiveGaussianNoise(),
            A.GaussNoise(),
            A.Blur(blur_limit=3, p=0.3),
            A.MedianBlur(blur_limit=3, p=0.3),
        ], p=0.3),
        A.CLAHE(clip_limit=1.5, p=0.3),
        
        A.RandomCrop(width=1020, height=764, p=0.5),
    ])

In [None]:
fig = plt.figure(figsize=(20, 20))
columns = 4
rows = 4
for i in range(1, columns*rows +1):
    test_img_path = random.choice(all_files)
    img = cv2.imread(test_img_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    # img = cv2.resize(img, (256, 256), interpolation=cv2.INTER_AREA)
    transformed = transform(image=img)
    img = transformed["image"]
    # img = (( X[i-1].copy() + 1)*127.5).astype('uint8')
    img = cv2.resize(img, (224, 224), interpolation=cv2.INTER_AREA)
    fig.add_subplot(rows, columns, i)
    plt.imshow(img)
plt.show()

## Data Loader

In [None]:
class DataGenerator(Sequence):
    def __init__(self,
                img_paths,
                class_names,
                batch_size=128,
                img_size=(224,224),
                n_channels=3,
                shuffle=True,
                augmentations=None,
                ):
        self.img_paths = img_paths
        self.class_names = np.array(class_names) 
        self.batch_size = batch_size
        self.img_size = img_size
        self.n_channels = n_channels 
        self.n_classes = len(class_names)
        self.shuffle = shuffle 
        self.augmentations = augmentations
        self.on_epoch_end()
        
    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.img_paths) / self.batch_size))
    
    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        
        # Find list of IDs
        batch_img_paths = [self.img_paths[k] for k in indexes]
        
        # Generate data
        X, y = self.__data_generation(batch_img_paths)
        return X, y
    
    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.img_paths))
        if self.shuffle:
            np.random.shuffle(self.indexes)
    
    def __data_generation(self, batch_img_paths):
        X = np.empty((self.batch_size, *self.img_size, self.n_channels))
        y = np.empty((self.batch_size, self.n_classes), dtype=int)
        
        try: 
            for i, img_path in enumerate(batch_img_paths):
                img = cv2.imread(img_path)
                img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                # img = cv2.resize(img, (256, 256), interpolation=cv2.INTER_AREA)
                # img = cv2.resize(img, self.img_size)
                
                if self.augmentations is not None:
                    # Augment an image
                    transformed = self.augmentations(image=img)
                    img = transformed["image"]
                    
                img = cv2.resize(img, self.img_size, interpolation=cv2.INTER_AREA)
                label = img_path.split('/')[-2]
                label = (self.class_names == label)*1

                X[i] = img*1.0
                y[i] = label
        except:
            print(img_path)
        # Normalize batch data
        X /= 127.5
        X -= 1.

        return X, y

In [None]:
# ## If not sure model works fine, should use val set
X_train, X_test, _, _ = train_test_split(all_files, all_files, test_size=0.2, random_state=42)
print(len(X_train))

train_generator = DataGenerator(X_train, class_names, batch_size=32, augmentations=transform)
test_generator = DataGenerator(X_test, class_names, batch_size=32, augmentations=transform)
print(len(train_generator), len(test_generator))

In [None]:
# train_generator = DataGenerator(all_files, class_names, batch_size=32, augmentations=transform)
# val_generator = DataGenerator(X_test, class_names)
# print(train_generator.__len__())

In [None]:
### Check generator
train_generator.on_epoch_end()
X, y = train_generator.__getitem__(1)

In [None]:
fig = plt.figure(figsize=(20, 20))
columns = 4
rows = 4
for i in range(1, columns*rows +1):
    img = (( X[i-1].copy() + 1)*127.5).astype('uint8')
    fig.add_subplot(rows, columns, i)
    plt.imshow(img)
plt.show()

In [None]:
%cd /content/

## Prepare Model

In [None]:
sweep_config = {
    'method': 'grid', #grid, random
    'metric': {
      'name': 'val_accuracy',
      'goal': 'maximize'   
    },
    'parameters': {
        'type_data':{
            'values': ['down']
        },
        'warmup_epochs':{
            'values': [5]
        },
        'warmup_learning_rate':{
            'values': [0.001]
        },
        'model_name': {  
            'values': ['ResNet50', 'MobileNetV2', 'EfficientNetB0', 'VGG16', 'InceptionV3']
        },
        'finetune_epochs':{
            'values': [10]
        },
        "initial_learning_rate":{
            'values':[1e-4]
        },
        "decay_steps" : {
            'values':[70]
        },
        "decay_rate" : {
            'values':[0.93]
        }
    }
}

In [None]:
from wandb.keras import WandbCallback
sweep_id = wandb.sweep(sweep_config, entity="xuannhamng28",project="pushups-down")

In [None]:
def get_pretrained_model(model_name):
    
    if model_name == 'MobileNetV2':
        base_model = tf.keras.applications.MobileNetV2(input_shape = (*img_size, 3), include_top = False, weights = "imagenet")
    elif model_name == 'ResNet50':
        base_model = tf.keras.applications.ResNet50(input_shape = (*img_size, 3), include_top = False, weights = "imagenet")
    elif model_name == 'EfficientNetB0':
        base_model = tf.keras.applications.efficientnet.EfficientNetB0(input_shape = (*img_size, 3), include_top = False, weights = "imagenet")
    elif model_name == 'VGG16':
        base_model = tf.keras.applications.vgg16.VGG16(input_shape = (*img_size, 3), include_top = False, weights = "imagenet")
    elif model_name == 'InceptionV3':
        base_model = tf.keras.applications.inception_v3.InceptionV3(input_shape = (*img_size, 3), include_top = False, weights = "imagenet")
    else:
        base_model = None
    
    return base_model


In [None]:
METRICS =['accuracy',
            keras.metrics.Precision(name="precision"),
            keras.metrics.Recall(name="recall"),
            keras.metrics.AUC(name="auc"),]

In [None]:
def warmup_model(model_name, epochs=5, learning_rate=0.001):

    base_model = get_pretrained_model(model_name)

    if base_model is None:
        print(f'Model {model_name} not found!')
        return None

    base_model.trainable = False
    x = base_model.output
    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    x = tf.keras.layers.Dropout(0.2)(x)
    x = tf.keras.layers.Dense(num_classes)(x)
    outputs = tf.keras.layers.Activation('softmax')(x)

    model =  tf.keras.Model(base_model.input, outputs)

    early_callback = tf.keras.callbacks.EarlyStopping(monitor='accuracy', patience=2)
    checkpoint_filepath = save_path + f'/tmp/warmup/{model_name}/checkpoint'

    model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath=checkpoint_filepath,
        save_weights_only=True,
        monitor='val_accuracy', 
        mode='max',
        save_best_only=True)
    model.compile(
      optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
      loss=keras.losses.CategoricalCrossentropy(),
      metrics=METRICS
    )

    history = model.fit(train_generator, validation_data=test_generator, epochs=epochs, callbacks=[early_callback, model_checkpoint_callback, WandbCallback()])
    # model.save(f'/content/drive/MyDrive/capstone-project/models/pushups-up/warmup_models/{model_name}_P')

    return model,history,base_model 

In [None]:
def finetune():

    config_defaults={
        'warmup_epochs': 5,
        'warmup_learning_rate':0.001,
        'model_name': 'ResNet50',
        'finetune_epochs':10,
        "initial_learning_rate":1e-4,
        "decay_steps" : 70,
        "decay_rate" : 0.93,
    }

    wandb.init(config=config_defaults)
    config = wandb.config
    model_name = config.model_name
    model, history, base_model = warmup_model(model_name,config.warmup_epochs,config.warmup_learning_rate)

    base_model.trainable = True

    lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=config.initial_learning_rate,
    decay_steps = config.decay_steps,
    decay_rate = config.decay_rate,
    staircase=True)

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=lr_schedule),
        loss=keras.losses.CategoricalCrossentropy(),
        metrics=METRICS
    )
    checkpoint_filepath = save_path + f'/tmp/finetune/{model_name}/checkpoint'

    model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath=checkpoint_filepath,
        save_weights_only=True,
        monitor='val_accuracy', 
        mode='max',
        save_best_only=True)
    
    total_epochs = config.finetune_epochs*config.warmup_epochs

    history_fine = model.fit(train_generator, \
                             validation_data=test_generator, \
                             initial_epoch=history.epoch[-1],\
                             epochs=total_epochs, \
                             callbacks=[model_checkpoint_callback, WandbCallback()])
    
    model.save(f'/content/drive/MyDrive/capstone-project/models/pushups-down/finetune/{model_name}')


In [None]:
wandb.agent(sweep_id, finetune)

# Test

In [None]:
test_folder = "/content/drive/MyDrive/capstone-project/data/test_dataset/pushups/horizontal"

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import confusion_matrix

In [None]:
model_path = '/content/drive/MyDrive/capstone-project/models/pushups-up/finetune/MobileNetV2'

In [None]:
model = keras.models.load_model(model_path)

In [None]:
test_img_list = glob.glob(f'{test_folder}/*/up/*/*[.jpg|.png]')

In [None]:
class_names = ['right', 'wrong']
for path in test_img_list:
    input = cv2.imread(path)[...,::-1]
    input = cv2.resize(input,(224, 224), interpolation=cv2.INTER_AREA)
    input = np.reshape(input,[1,224,224,3])*1.0
    # input = keras.applications.mobilenet_v2.preprocess_input(input)
    input /= 127.5
    input -= 1.
    out = model.predict(input)[0]
    print(f'{path}--{class_names[np.argmax(out)]}')

In [None]:
class_names = np.array(['right', 'wrong'])

def generate_y_test(test_folder, ex_type, class_names):
  test_img_list = glob.glob(f'{test_folder}/*/{ex_type}/*/*[.jpg|.png]')
  test_labels = np.array([], dtype='int64')

  for i in range(len(test_img_list)):
    label_file = test_img_list[i].split('/')[-2]
    label = (class_names == label_file)*1
    # print(f'{label_file}: {label}')
    test_labels = np.append(test_labels, np.argmax(label))
  return test_labels

In [None]:
def test(model, img_list):
    y_test_preds = np.array([], dtype='int64')

    for path in img_list:
      input = cv2.imread(path)[...,::-1]
      input = cv2.resize(input,(224, 224), interpolation=cv2.INTER_AREA)
      input = np.reshape(input,[1,224,224,3])*1.0
      # input = keras.applications.mobilenet_v2.preprocess_input(input)
      input /= 127.5
      input -= 1.
      out = model.predict(input)[0]
      y_test_preds = np.append(y_test_preds,np.argmax(out))
    return y_test_preds

In [None]:
model_down_list = ['model-hardy-sweep-2:v5', 'model-bumbling-sweep-1:v9',\
                   'model-efficient-sweep-3:v11', 'model-lilac-sweep-2:v7',\
                   'model-soft-sweep-1:v7']
model_up_list = ['model-copper-sweep-3:v5', 'model-playful-sweep-2:v11',\
                   'model-dauntless-sweep-1:v12', 'model-fluent-sweep-2:v6',\
                   'model-denim-sweep-1:v6']              

In [None]:
import wandb
for ex_type in ['up', 'down']:
  test_img_list = glob.glob(f'{test_folder}/*/{ex_type}/*/*[.jpg|.png]')
  y_test = generate_y_test(test_folder=test_folder, ex_type=ex_type, class_names=class_names)
  print(len(test_img_list))
  print(len(y_test))
  if ex_type == 'up':
    model_list = model_up_list
  else:
    model_list = model_down_list
  for model_name in model_list:
    run = wandb.init()
    artifact = run.use_artifact(f'xuannhamng28/pushups-{ex_type}/{model_name}', type='model')
    artifact_dir = artifact.download()
    model = keras.models.load_model(artifact_dir)
    y_preds = test(model=model, img_list=test_img_list)
    conf_matrix = confusion_matrix(y_test, y_preds)
    sns.heatmap(conf_matrix, annot=True)
    plt.savefig(f'/content/evaluate_result/{ex_type}/{model_name}_confusion_matrix.jpg')
    plt.clf()
    print(f'MODEL - {model} - EVALUATION RESULT: ')
    print()
    # accuracy: (tp + tn) / (p + n)
    accuracy = accuracy_score(y_test, y_preds)
    print('Accuracy: %f' % accuracy)
    print()
    # precision tp / (tp + fp)
    precision = precision_score(y_test, y_preds)
    print('Precision: %f' % precision)
    print()
    # recall: tp / (tp + fn)
    recall = recall_score(y_test, y_preds)
    print('Recall: %f' % recall)
    print()
    # f1: 2 tp / (2 tp + fp + fn)
    f1 = f1_score(y_test, y_preds)
    print('F1 score: %f' % f1)
    print()
    print('='*100)
    f = open(f'/content/evaluate_result/{ex_type}/{model_name}_result.txt', "a")
    f.write(f"acc: {accuracy}\n")
    f.write(f"f1: {f1}\n")
    f.write(f"precision: {precision}\n")
    f.write(f"recall: {recall}\n")

In [None]:
!zip -r /content/file.zip /content/evaluate_result