# Librairies

In [None]:
from typing import List, Union, Tuple, Callable, Dict

from os import environ

from random import seed

from numpy.random import seed as np_seed
from numpy import ndarray, zeros

from pandas import DataFrame, read_csv

from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow import random
from tensorflow import keras
from tensorflow.keras import applications
from tensorflow import data

import plotly.graph_objects as go

import wandb
from wandb.keras import WandbCallback

from kaggle_secrets import UserSecretsClient

# WandB

In [None]:
user_secrets = UserSecretsClient()
api_key = user_secrets.get_secret("WANDB")
wandb.login(key=api_key)

run = wandb.init(
    project="pet_finder",
    entity="leopoldavezac",
    config={
        'learning_rate':0.001,
        'epochs':20,
        'batch_size':24,
        'loss_func':'mse',
        'img_width':224,
        'img_length':224,
        'efficient_net_symbol':'B0',
        'efficient_net_trainable':False,
        'dense_layers_post_efficient_net':[18, 9],
        'dropout':0.2,
        'data_augmentation_contrast':0.1,
    }
)

# Constants

In [None]:
DATA_PATH = '../input/petfinder-pawpularity-score'

ID_VAR_NM = 'Id'
TARGET_VAR_NM = 'Pawpularity'

AUTOTUNE = tf.data.experimental.AUTOTUNE

CONFIG = wandb.config

# Load & Preprocess Data

In [None]:

def get_datasets() -> List[tf.data.Dataset]:

    df_train = load_df(set_nm='train')
    df_test = load_df(set_nm='test')

    df_train[TARGET_VAR_NM] /= 100

    df_train = create_img_path_var(df_train, 'train')
    df_test = create_img_path_var(df_test, 'test')

    df_train, df_val = split(df_train)

    ds_train = create_dataset_with_preprocessed_imgs(
        df_train.img_path.values,
        df_train[TARGET_VAR_NM].values.astype('float'),
        augment=True
        )
    ds_val = create_dataset_with_preprocessed_imgs(
        df_val.img_path.values,
        df_val[TARGET_VAR_NM].values.astype('float')
        )
    ds_test = create_dataset_with_preprocessed_imgs(
        df_test.img_path.values
        )

    return [ds_train, ds_val, ds_test]


def load_df(set_nm:str) -> DataFrame:

    var_nms = [ID_VAR_NM] 
    var_nms += [TARGET_VAR_NM] if set_nm == 'train' else []

    return read_csv('{}/{}.csv'.format(DATA_PATH, set_nm), usecols=var_nms)


def create_img_path_var(df: DataFrame, set_nm:str) -> DataFrame:

    df['img_path'] = '{}/{}/'.format(DATA_PATH, set_nm) + df[ID_VAR_NM] + '.jpg'
    df.drop(columns=ID_VAR_NM, inplace=True)

    return df


def split(df: DataFrame) -> List[DataFrame]:

    train, val = train_test_split(df.values, test_size=0.2)

    df_train = DataFrame(train, columns=df.columns)
    df_val = DataFrame(val, columns=df.columns)

    return [df_train, df_val]

def create_dataset_with_preprocessed_imgs(X_paths: ndarray, y: Union[None, ndarray] = None, augment:bool=False) -> data.Dataset:

    get_preprocessed_img = build_img_processor(y is not None)
    
    if y is not None:
        ds = data.Dataset.from_tensor_slices((X_paths, y))
    else:
        ds = data.Dataset.from_tensor_slices((X_paths,))    
    
    ds = ds.map(get_preprocessed_img, num_parallel_calls=AUTOTUNE)
    
    if augment:
        augmentation_model = get_augmentation_model()
        ds = ds.map(lambda X, y: (augmentation_model(X, training=True), y))
    
    ds = ds.batch(CONFIG.batch_size)
    ds = ds.prefetch(buffer_size=AUTOTUNE)
    
    return ds

def build_img_processor(with_target: bool) -> Callable:
    
    def get_preprocessed_img(path: str) -> tf.Tensor:

        img = load_img(path)
        img = resize(img)
        img = eff_net_preprocess(img)

        return img
    
    def get_preprocessed_img_with_target(path:str, y:float) -> Tuple[Union[tf.Tensor, float]]:
        
        return (get_preprocessed_img(path), y)
    
    return get_preprocessed_img_with_target if with_target else get_preprocessed_img

def load_img(path: str) -> tf.Tensor:

    img = tf.io.read_file(path)
    return tf.io.decode_jpeg(img)

def resize(img: tf.Tensor) -> tf.Tensor:

    return tf.cast(
        tf.image.resize_with_pad(img, CONFIG.img_length, CONFIG.img_width),
        dtype=tf.int32
        )

def eff_net_preprocess(img: tf.Tensor) -> tf.Tensor:
    
    return keras.applications.efficientnet.preprocess_input(img)

def normalize(img: tf.Tensor) -> tf.Tensor:
    
    return img / 255.0

def get_augmentation_model() -> tf.keras.Model:
    
    return tf.keras.Sequential([
      layers.RandomFlip("horizontal"),
      layers.RandomRotation(CONFIG.data_augmentation_contrast),
    ])

    

In [None]:
ds_train, ds_val, ds_test = get_datasets()

# Img Dims Visualization

In [None]:
img_paths = ('{}/{}/'.format(DATA_PATH, 'train') + load_df('train')[ID_VAR_NM] + '.jpg').values
img_dims = zeros((len(img_paths), 2))

for i, img_path in enumerate(img_paths):
    
    img_dims[i,:] = load_img(img_path).shape[:-1]
    

fig = go.Figure()
fig.add_trace(go.Histogram(x=img_dims[:,0], histnorm='probability', name='width'))
fig.add_trace(go.Histogram(x=img_dims[:,1], histnorm='probability', name='height'))
fig.update_layout(title_text='Distribution of Img Width and Height')
fig.show()

# Model

In [None]:
def get_model(efficient_net_model_nm:str, dense_layers_post_eff_net:List[int], dropout: float) -> tf.keras.Model:
    
    
    efficient_net = tf.keras.models.load_model('../input/keras-applications-models/{efficient_net_model_nm}.h5')
    
    if CONFIG.efficient_net_trainable:
        unfreeze_layers(efficient_net)
    
    layers = [
            tf.keras.layers.Input(shape=(CONFIG.img_length, CONFIG.img_width, 3)),
            efficient_net,
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dropout(dropout)
        ]
        
    layers += [tf.keras.layers.Dense(nb_units) for nb_units in dense_layers_post_eff_net]
    
    layers += [tf.keras.layers.Dense(1, activation='sigmoid')]
    
    model = keras.models.Sequential(layers)
    
    print(model.summary())
    
    return model

def unfreez_layers(model: tf.keras.Model) -> None:
    
    for layer in model.layers:
        if not isinstance(layer, tf.keras.layers.BatchNormalization):
            layer.trainable = True
        else:
            layer.trainable = False
    

def compile_model(model: keras.Model, learning_rate: float, loss_func:str) -> None:
    
    optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
    
    model.compile(loss=loss_func, optimizer=optimizer, metrics=[keras.metrics.RootMeanSquaredError()])


def fit(model: keras.Model, ds_train: data.Dataset, ds_val: data.Dataset, epochs: int) -> None:
    
    early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)

    model.fit(ds_train, epochs=epochs, validation_data=ds_val, callbacks=[WandbCallback(), early_stopping])

In [None]:

model = get_model(CONFIG.efficient_net_symbol, CONFIG.dense_layers_post_efficient_net, CONFIG.dropout)
compile_model(model, CONFIG.learning_rate, CONFIG.loss_func)
fit(model, ds_train, ds_val, CONFIG.epochs)

run.finish()

# Submissions

In [None]:

def save_test_pred(pred: ndarray) -> None:

    df_test = load_df_test()
    df_test[TARGET_VAR_NM] = pred

    df_test[[ID_VAR_NM, TARGET_VAR_NM]].to_csv('submission.csv', index=False)
    
def load_df_test() -> DataFrame:

    return read_csv('{}/test.csv'.format(DATA_PATH), usecols=[ID_VAR_NM])

In [None]:
test_pred = model.predict(ds_test)
test_pred *= 100
save_test_pred(test_pred)