In [1]:
!pip install imutils 
! pip install gdown 
! gdown https://drive.google.com/uc?id=1pGDcwitiFXy-CY9vO3wH3XprwjWgsQ8R

Collecting imutils
  Downloading imutils-0.5.4.tar.gz (17 kB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: imutils
  Building wheel for imutils (setup.py) ... [?25ldone
[?25h  Created wheel for imutils: filename=imutils-0.5.4-py3-none-any.whl size=25859 sha256=078b49ba544eade09bd4f1c033bd2322c764e5a53cb881227a464541874538bf
  Stored in directory: /root/.cache/pip/wheels/85/cf/3a/e265e975a1e7c7e54eb3692d6aa4e2e7d6a3945d29da46f2d7
Successfully built imutils
Installing collected packages: imutils
Successfully installed imutils-0.5.4
[0mCollecting gdown
  Downloading gdown-4.7.1-py3-none-any.whl (15 kB)
Installing collected packages: gdown
Successfully installed gdown-4.7.1
[0mDownloading...
From (uriginal): https://drive.google.com/uc?id=1pGDcwitiFXy-CY9vO3wH3XprwjWgsQ8R
From (redirected): https://drive.google.com/uc?id=1pGDcwitiFXy-CY9vO3wH3XprwjWgsQ8R&confirm=t&uuid=22df4aad-7338-419d-a078-e35f46a7b822
To: /kaggle/working/semi_super_au

In [1]:
from tensorflow import keras 
import tensorflow as tf 
from tensorflow.keras import models 
import os 
#from imutils import paths 
import matplotlib.pyplot as plt 
import numpy as np 

import shutil
import cv2
import random
from dataclasses import dataclass 
from tqdm import tqdm
import tempfile
from semi_super_augPipe import preprocess_image
from tensorflow.keras.layers import Dense, Input, Conv2D, MaxPooling2D, GlobalMaxPooling2D, \
                                                GlobalAveragePooling2D, BatchNormalization, Flatten, ReLU


TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 



In [13]:
L2_PENALTY = 1.5*10e-6

In [2]:
class DataLoader: 
    """
        Class, will be useful for creating the BYOL dataset or dataset for the DownStream task 
            like classification or segmentation.
        Methods:
            __download_data(scope: private)
            __normalize(scope: private)
            __preprocess_img(scope: private)
             __get_valdata(scope: private)
            get_byol_dataset(scope: public)
            get_downstream_data(scope: public)
        
        Property:
            dname(dtype: str)        : dataset name(supports cifar10, cifar100).
            byol_augmentor(type      : ByolAugmentor): byol augmentor instance/object.
            nval(type: int)          : Number of validation data needed, this will be created by splitting the testing
                                       data.
            resize_shape(dtype: int) : Resize shape, bcoz pretrained models, might have a different required shape.
            normalize(dtype: bool)   : bool value, whether to normalize the data or not. 
    """
    
    def __init__(self, dname="cifar10", byol_augmentor=None, nval=5000,
                                             resize_shape=96, normalize=True, downstream_data=False): 
        assert (byol_augmentor != None or downstream_data), 'Need a BYOL Augment object'
        assert dname in ["cifar10", 'cifar100'], "dname should be either cifar10 or cifar100"
        assert nval <= 10_000, "ValueError: nval value should be <= 10_000"
        
        __train_data, __test_data = self.__download_data(dname)
        self.__train_X, self.__train_y = __train_data
        self.__train_X, self.__train_y = self.__train_X[: 20000], self.__train_y[: 20000]
      #  self.__train_X, self.__train_y = self.__train_X[: 100], self.__train_y[: 100]
        self.__dtest_X, self.__dtest_y = __test_data 
        self.class_name = ['airplane', 'automobile', 'bird', 'cat', 'deer', 
                                           'dog', 'frog', 'horse', 'sheep', 'truck']
        self.byol_augmentor = byol_augmentor
        self.__get_valdata(nval)
        self.resize_shape = resize_shape
        
        self.__normalize() if normalize else None
        self.min_obj_cov_value = 0.7
        self.color_jitter_value = 0.1
        
    def __len__(self): 
        return self.__train_X.shape[0] + self.__dtest_X.shape[0]
    
    def __repr__(self): 
        return f"Training Samples: {self.__train_X.shape[0]}, Testing Samples: {self.__dtest_X.shape[0]}"
    
    def __download_data(self, dname):
        """
            Downloads the data from the tensorflow website using the tensorflw.keras.load_data() method.
            Params:
                dname(type: Str): dataset name, it just supports two dataset cifar10 or cifar100
            Return(type(np.ndarray, np.ndarray))
                returns the training data and testing data
        """
        if dname == "cifar10": 
            train_data, test_data = tf.keras.datasets.cifar10.load_data()
        if dname == "cifar100": 
            train_data, test_data = tf.keras.datasets.cifar100.load_data()
            
        return train_data, test_data
    
    def __normalize(self): 
        """
            this method, will used to normalize the inputs.
        """
        self.__train_X = self.__train_X / 255.0
        self.__dtest_X = self.__dtest_X / 255.0
    
    def __preprocess_img(self, image): 
        """
            this method, will be used by the get_byol_dataset methos, which does a convertion of 
            numpy data to tensorflow data.
            Params:
                image(type: np.ndarray): image data.
            Returns(type; (np.ndarray, np.ndarray))
                returns the two different augmented views of same image.
        """
        try: 
            image = tf.image.convert_image_dtype(image, tf.float32)
            image = tf.image.resize(image, (self.resize_shape, self.resize_shape))
           # view1 = self.byol_augmentor.augment(image, self.resize_shape)
            #view2 = self.byol_augmentor.augment(image, self.resize_shape)
            
            view1 = preprocess_image(image = image, 
                                   height = 96, 
                                   width  = 96, 
                                   cjs = self.color_jitter_value,
                                   m_obj_cov = self.min_obj_cov_value,
                                   a_range = (self.min_obj_cov_value, 1.0))
            
            view2 = preprocess_image(image = image, 
                                   height = 96, 
                                   width  = 96, 
                                   cjs = self.color_jitter_value,
                                   m_obj_cov = self.min_obj_cov_value,
                                   a_range = (self.min_obj_cov_value, 1.0))
            
            return (view1, view2)
        
        except Exception as err:
            return err
    
    def get_byol_dataset(self, batch_size, dataset_type="train"):
        """
            this method, will gives the byol dataset, which is nothing but a tf.data.Dataset object.
            Params:
                batch_size(dtype: int)    : Batch Size.
                dataset_type(dtype: str)  : which type of dataset needed, (train, test or val)
                
            return(type: tf.data.Dataset)
                returns the tf.data.Dataset for intended dataset_type, by preprocessing and converting 
                the np data.
        """
        try:
            if dataset_type == "train":
                tensorflow_data = tf.data.Dataset.from_tensor_slices((self.__train_X))
                tensorflow_data = (
                tensorflow_data
                    .map(self.__preprocess_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
                    .shuffle(1024)
                    .batch(batch_size, drop_remainder=True)
                    .prefetch(tf.data.experimental.AUTOTUNE)
                )
                return tensorflow_data  
            
            if dataset_type == "test":
                tensorflow_data = tf.data.Dataset.from_tensor_slices((self.__test_X))
                tensorflow_data = (
                tensorflow_data
                    .map(self.__preprocess_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
                    .shuffle(1024)
                    .batch(batch_size, drop_remainder=True)
                    .prefetch(tf.data.experimental.AUTOTUNE)
                )
                return tensorflow_data  
            
            if dataset_type == "val":
                tensorflow_data = tf.data.Dataset.from_tensor_slices((self.__val_X))
                tensorflow_data = (
                tensorflow_data
                    .map(self.__preprocess_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
                    .shuffle(1024)
                    .batch(batch_size, drop_remainder=True)
                    .prefetch(tf.data.experimental.AUTOTUNE)
                )
                return tensorflow_data  
        
        except Exception as err:
            return err
    
    def get_downstream_data(self): 
        """
            this method returns the dataset for the downstream task.
        """
        return (self.__train_X, self.__train_y)#, (self.__val_X, self.__val_y), (self.__test_X, self.__test_y)
    
    def __get_valdata(self, nval):
        """
            this method is used to create a validation data by randomly sampling from the testing data.
            Params:
                nval(dtype: Int); Number of validation data needed, rest of test_X.shape[0] - nval, will be 
                                  testing data size.
            returns(type; np.ndarray, np.ndarray):
                returns the testing and validation dataset.
        """
        try: 
            ind_arr = np.arange(10_000)
            val_inds = np.random.choice(ind_arr, nval)
            test_inds = [i for i in ind_arr if not i in val_inds]

            self.__test_X, self.__test_y = self.__dtest_X[test_inds], self.__dtest_y[test_inds]
            self.__val_X, self.__val_y = self.__dtest_X[val_inds], self.__dtest_y[val_inds]
            
        except Exception as err:
            raise err    
        
    def get_downstream_tf_dataset(self, batch_size, dataset_type="train"): 
        """
             this method, will gives the downstream dataset, which is of type tf.data.Dataset object.
            Params:
                batch_size(dtype: int)    : Batch Size.
                dataset_type(dtype: str)  : which type of dataset needed, (train, test or val)
                
            return(type: tf.data.Dataset)
                returns the tf.data.Dataset for intended dataset_type, by preprocessing and converting 
                the np data.
        """
        assert dataset_type in ["train", "test", "val"], "Given dataset type is not valid"
        try:
            if dataset_type == "train":
                tensorflow_data = tf.data.Dataset.from_tensor_slices((self.__train_X, self.__train_y))
                tensorflow_data = (
                tensorflow_data
                    .shuffle(1024)
                    .batch(batch_size, drop_remainder=True)
                    .prefetch(tf.data.experimental.AUTOTUNE)
                )
                return tensorflow_data  
            
            if dataset_type == "test":
                tensorflow_data = tf.data.Dataset.from_tensor_slices((self.__test_X, self.__test_X))
                tensorflow_data = (
                tensorflow_data
                    .shuffle(1024)
                    .batch(batch_size, drop_remainder=True)
                    .prefetch(tf.data.experimental.AUTOTUNE)
                )
                return tensorflow_data  
            
            if dataset_type == "val":
                tensorflow_data = tf.data.Dataset.from_tensor_slices((self.__val_X, self.__val_y))
                tensorflow_data = (
                tensorflow_data
                    .shuffle(1024)
                    .batch(batch_size, drop_remainder=True)
                    .prefetch(tf.data.experimental.AUTOTUNE)
                )
                return tensorflow_data  
        
        except Exception as err:
            return err

In [3]:
class ByolAugmentor: 
    """
        This class is used for the data augmentation for the byol model.
        Methods: 
            __random_crop_flip_resize(scope: private)
            __random_color_distortion(scope: private)
            augment(scope: public)
    """
    def __init__(self): 
        pass 
    
    @tf.function
    def __random_crop_flip_resize(self, image, resize_shape):
        """
            this method does a random crop with height and width of the crop are sampled randomly. it does the 
            crop with the height and width, then it does a resizing again to the original shape. And also it does
            a flip of the image
            Params:
                image(type: tf.Tensor)   : image data of type tensor.
                resize_shape(type: int)  : Size of the image.
            Return(type: tf.Tensor)
                returns the crop and resized image.
        """
        try: 
            h_crop = tf.cast(tf.random.uniform(shape=[], minval=13, maxval=33, dtype=tf.int32), tf.float32)
            w_crop = h_crop * tf.random.uniform(shape=[], minval=0.67, maxval=1.0)
            h_crop, w_crop = tf.cast(h_crop, tf.int32), tf.cast(w_crop, tf.int32)
            opposite_aspectratio = tf.random.uniform(shape=[])
            if opposite_aspectratio < 0.5:
                h_crop, w_crop = w_crop, h_crop
            image = tf.image.random_crop(image, size=[h_crop, w_crop, 3])

            horizontal_flip = tf.random.uniform(shape=[])
            if horizontal_flip < 0.5:
                image = tf.image.random_flip_left_right(image)

            image = tf.image.resize(image, size=[resize_shape, resize_shape])
            return image
        
        except Exception as err:
            return err
    
    @tf.function
    def __random_color_distortion(self, image):
        """
            this method, will do the color disortion augmentation for the given image.
            Params:
                image(type: tf.Tensor)   : image data of type tensor.
            Return(type: tf.Tensor)
                returns the crop and resized image.
        """
        try: 
            color_jitter = tf.random.uniform(shape=[])
            if color_jitter < 0.8:
                image = tf.image.random_brightness(image, max_delta=0.4)
                image = tf.image.random_contrast(image, lower=0.6, upper=1.4)
                image = tf.image.random_saturation(image, lower=0.6, upper=1.4)
                image = tf.image.random_hue(image, max_delta=0.1)
                image = tf.clip_by_value(image, 0, 1)

            color_drop = tf.random.uniform(shape=[])
            if color_drop < 0.2:
                image = tf.image.rgb_to_grayscale(image)
                image = tf.tile(image, [1, 1, 3])

            return image
        
        except Exception as error:
            return error
    
    @tf.function
    def augment(self, image, resize_shape): 
        """
            this method will include all the augmentation as a pipeline(random crop, random flip, resize, and 
            color disortion), this augment method will be used by DataLoader class.
            Params:
                image(type: tf.Tensor)   : image data of type tensor.
                resize_shape(type: int)  : Size of the image.
            Return(type: tf.Tensor)
                returns the crop and resized image.
                
        """
        try: 
            image = self.__random_crop_flip_resize(image, resize_shape)
          #  image = self.__random_color_distortion(image)

            return image
        
        except Exception as error:
            print(error, error)
            return error

In [43]:
downstream_dataloader = DataLoader("cifar10", downstream_data=True)

In [44]:
train_ds = downstream_dataloader.get_downstream_tf_dataset(128, "train")

In [45]:
val_ds = downstream_dataloader.get_downstream_tf_dataset(128, "val")

In [55]:
trained_model.trainable = True
model = keras.models.Sequential(
    [
        Input(shape=(32, 32, 3), name="Encoder Input"),
        trained_model.layers[1],
        trained_model.layers[2],
        trained_model.layers[3],
        Dense(10, activation='softmax')
    ]
)

In [56]:
def get_linear_model(): 
    inputs = Input(shape=(32, 32, 3), name="Encoder Input")
    encoder = trained_model(inputs)
    
    out = Dense(10, activation="softmax")(encoder)
    
    return keras.models.Model(inputs, outputs)

In [57]:
model.summary()

Model: "sequential_4"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 resnet50 (Functional)       (None, 1, 1, 2048)        23587712  
                                                                 
 gmp1 (GlobalMaxPooling2D)   (None, 2048)              0         
                                                                 
 Projection (Functional)     (None, 256)               1182464   
                                                                 
 dense_3 (Dense)             (None, 10)                2570      
                                                                 
Total params: 24,772,746
Trainable params: 24,718,602
Non-trainable params: 54,144
_________________________________________________________________


In [58]:
model.compile(optimizer="adam", loss='sparse_categorical_crossentropy', metrics="acc")

In [None]:
model.fit(train_ds, validation_data=val_ds, epochs=10)