# Libraries

In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.image  as img
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.models import Model
import keras.backend as K
from tqdm import tqdm
import sys
sys.path.append('../input/swintransformertf')
from swintransformer import SwinTransformer
from sklearn.model_selection import StratifiedKFold

# Data

In [2]:
train_csv=pd.read_csv('../input/petfinder-pawpularity-score/train.csv')
test_csv=pd.read_csv('../input/petfinder-pawpularity-score/test.csv')

In [3]:
train_csv.head(2)

In [4]:
test_csv.head(2)

In [5]:
labels=dict(train_csv[['Id','Pawpularity']].values)

In [6]:
def dropout(image,DIM=[224,224], PROBABILITY = 0.6, CT = 5, SZ = 0.1):
    P = tf.cast( tf.random.uniform([],0,1)<PROBABILITY, tf.int32)
    if (P==0)|(CT==0)|(SZ==0): return image    
    for k in range(CT):        
        x = tf.cast( tf.random.uniform([],0,DIM[1]),tf.int32)
        y = tf.cast( tf.random.uniform([],0,DIM[0]),tf.int32)
        WIDTH = tf.cast( SZ*min(DIM),tf.int32) * P
        ya = tf.math.maximum(0,y-WIDTH//2)
        yb = tf.math.minimum(DIM[0],y+WIDTH//2)
        xa = tf.math.maximum(0,x-WIDTH//2)
        xb = tf.math.minimum(DIM[1],x+WIDTH//2)
        one = image[ya:yb,0:xa,:]
        two = tf.zeros([yb-ya,xb-xa,3], dtype = image.dtype) 
        three = image[ya:yb,xb:DIM[1],:]
        middle = tf.concat([one,two,three],axis=1)
        image = tf.concat([image[0:ya,:,:],middle,image[yb:DIM[0],:,:]],axis=0)
        image = tf.reshape(image,[*DIM,3])
    return image

# Data Generators

In [7]:
class DataGenerator(tf.keras.utils.Sequence):

    def __init__(self, list_IDs, labels, batch_size=1, n_channels=1,part='train', shuffle=False):
        self.batch_size = batch_size
        self.labels = labels
        self.list_IDs = list_IDs
        self.n_channels = n_channels
        self.shuffle = shuffle
        self.part=part
        self.on_epoch_end()

    def __len__(self):
        return int(np.floor(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        
        list_IDs_temp      = [self.list_IDs[k] for k in indexes]
        
        x, y = self.__data_generation(list_IDs_temp)
        return x, y

    def on_epoch_end(self):
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __data_generation(self, list_IDs_temp):
        x           = np.zeros((self.batch_size,224,224,3))
        y           = np.zeros((self.batch_size,1))
        for i, ID in enumerate(list_IDs_temp):
            x[i]                =Image.open('../input/petfinder-pawpularity-score/'+str(self.part)+'/'+ str(ID)+'.jpg').resize((224,224))
            y[i]                =self.labels[ID]
            
        return x,y

In [8]:
class TestDataGenerator(tf.keras.utils.Sequence):

    def __init__(self, list_IDs, batch_size=1, n_channels=1,part='test', shuffle=False):
        self.batch_size = batch_size
        self.list_IDs = list_IDs
        self.n_channels = n_channels
        self.shuffle = shuffle
        self.part=part
        self.on_epoch_end()

    def __len__(self):
        return int(np.floor(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        
        list_IDs_temp      = [self.list_IDs[k] for k in indexes]
        
        x= self.__data_generation(list_IDs_temp)
        return x

    def on_epoch_end(self):
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __data_generation(self, list_IDs_temp):
        x           = np.zeros((self.batch_size,224,224,3))
        for i, ID in enumerate(list_IDs_temp):
            x[i]                =Image.open('../input/petfinder-pawpularity-score/'+str(self.part)+'/'+ str(ID)+'.jpg').resize((224,224))

        return x

test_generator   = TestDataGenerator(test_csv['Id'].values)

# CallBack

In [9]:
class CustomCallBacks(tf.keras.callbacks.Callback):
    
    def __init__(self,n):
        self.n=n
    
    def on_train_begin(self, logs={}):
        self.loss     ={'loss':[]}
        self.val_loss ={'loss':[]}
        self.epoch    =None
        self.b        =0
        self.c        =0
        self.min_t    =None 
        
    def on_epoch_begin(self, epoch, logs={}):
        self.epoch=epoch
#         if self.epoch>=1:
#             lr=tf.keras.backend.get_value(self.model.optimizer.lr)
#             tf.keras.backend.set_value(self.model.optimizer.lr,lr*0.10)

    def on_epoch_end(self, epoch, logs={}):
        print('{} ended'.format(epoch))
        self.loss['loss'].append(logs.get('root_mean_squared_error'))
        self.val_loss['loss'].append(logs.get('val_root_mean_squared_error'))
        if self.epoch==0:
            self.model.save_weights('model'+str(self.n)+'.h5')
            self.min_t=self.val_loss['loss'][-1]
        if self.epoch>=1:
            if self.val_loss['loss'][-1]<=self.min_t and self.val_loss['loss'][-1]<=self.loss['loss'][-1]:
                self.min_t=self.val_loss['loss'][-1]
                self.model.save_weights('model'+str(self.n)+'.h5')
                print('*'*5)
                print("weight updated")
                print('*'*5)
            else:
                self.model.stop_training = True

# Model

In [10]:
def root_mean_squared_error(y_true, y_pred):
        return K.sqrt(K.mean(K.square(y_pred - y_true)))

In [None]:
skf = StratifiedKFold(n_splits=10)
q=0
for train_index, test_index in skf.split(train_csv['Id'].values,train_csv['Pawpularity'].values):
    training_generator   = DataGenerator(train_csv['Id'].values[train_index],labels)
    validation_generator = DataGenerator(train_csv['Id'].values[test_index],labels)
    cb=CustomCallBacks(q)
    
    model = tf.keras.Sequential([
      tf.keras.layers.Lambda(lambda data: tf.keras.applications.imagenet_utils.preprocess_input(tf.cast(data, tf.float32), mode="torch"), input_shape=(224,224,3)),
      SwinTransformer('swin_large_224', include_top=False, pretrained=True),
      tf.keras.layers.Dropout(0.6),
      tf.keras.layers.Dense(1, activation='linear')
    ])
    model.layers[-3].trainable=False
    
    opt = tf.keras.optimizers.Adam(learning_rate=0.01)
    model.compile(loss=root_mean_squared_error,optimizer=opt,metrics=[root_mean_squared_error])
    history=model.fit_generator(generator=training_generator,validation_data=validation_generator,use_multiprocessing=True,workers=-1,epochs=2,callbacks=[cb])
    print('*'*100)
    print(q)
    print('*'*100)
    q+=1