In [None]:
from IPython.display import clear_output
!rm -rf ./*
!pip install keras==2.15 
!pip install albumentations
!pip install keras-tuner
clear_output()
print("-- dependency installs completed --")

# Imports and some things

In [None]:
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import cv2
import keras
import keras_tuner
import random
import numpy as np
import tensorflow as tf
import albumentations as A
import matplotlib.pyplot as plt
from IPython.display import clear_output
def seed_everything(seed=911):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)

seed_everything(911)

class D:
    '''class that contain all the parameters'''
    #param for dataset
    image_size = (512,288)
    batch_size = 4
    images_dir = '/kaggle/input/multi-view-dataset-v2/images'
    shuffle_buffer = 3000
    splits = [f'fold-{i}' for i in range(5)]
    views = ['Examined','Aux']
    
    #param for ConvNeXt multi-view model
    model_var = 'convnext_small'
    drop_path_rate = 0.2123456789
    dropout_rate   = 0.5123456789
    pooling ='w_avg'
    fusion_stage = 3
    fusion_index = 1
    fc_layers_depth = 3
    fc_layers_dims = 256
    
    #param for training
    epochs = 60
    loss_fn = 'categorical_crossentropy'
    optimizer = lambda: keras.optimizers.Adam(1e-5, use_ema=True)
    metrics = lambda : [
        keras.metrics.CategoricalAccuracy(name='accuracy'),
        keras.metrics.F1Score(average='macro',name='macro_F1')
    ]
try :
    resolver = tf.distribute.cluster_resolver.TPUClusterResolver()
    tf.config.experimental_connect_to_cluster(resolver)
    tf.tpu.experimental.initialize_tpu_system(resolver)
    strategy = tf.distribute.TPUStrategy(resolver)
    D.TPU = True
    D.batch_size *= 8
except ValueError:
    D.TPU = False
    strategy = tf.distribute.MirroredStrategy()
clear_output()
print("-- versions --")
print("tf :",tf.__version__)
print("keras :",keras.__version__)
print("keras_tuner :",keras_tuner.__version__)
print("albumentations :",A.__version__)
print("Number of accelerators: ", strategy.num_replicas_in_sync)

# loading dataset

In [None]:
from random import choice

TOUT = "float32"

@tf.numpy_function(Tout=TOUT)
def cutout_mask(shape, n_holes, min_max_height, min_max_width):
    h, w, c = shape
    mask = np.ones(shape, dtype=np.float32)
    for _ in range(n_holes):
        y = np.random.randint(0, h)
        x = np.random.randint(0, w)
        hmax, hmin = min_max_height
        wmax, wmin = min_max_width
        cutout_height = np.random.randint( hmax, hmin)
        cutout_width = np.random.randint( wmax, wmin)
        y1 = np.clip(y - cutout_height // 2, 0, h)
        y2 = np.clip(y + cutout_height // 2, 0, h)
        x1 = np.clip(x - cutout_width // 2, 0, w)
        x2 = np.clip(x + cutout_width // 2, 0, w)
        mask[y1: y2, x1: x2, :] = 0
    mask = tf.cast(mask, TOUT) 
    return mask

class CutoutLayer(keras.layers.Layer):
    def __init__(self, min_max_holes=(None,None), min_max_height=(None, None), min_max_width=(None, None),p=1, **kwargs):
        super().__init__(**kwargs)
        min_holes, max_holes = min_max_holes
        self.n_holes = tf.random.uniform((), min_holes, max_holes, dtype=tf.int32)
        self.min_max_height = min_max_height
        self.min_max_width = min_max_width
        self.p = p

    def call(self, inputs):
        h = tf.shape(inputs)[-3]
        w = tf.shape(inputs)[-2]
        c = tf.shape(inputs)[-1]
        mask =  cutout_mask((h, w, c), self.n_holes, self.min_max_height, self.min_max_width)
        outputs = inputs * mask
        return outputs

@tf.numpy_function(Tout=TOUT)
def shift_scale_rotate(image, scale_limit, rotate_limit, shift_limit_x, shift_limit_y):
    image = image.astype("float32")
    ssr = A.ShiftScaleRotate(shift_limit=(shift_limit_x, shift_limit_x),
                             scale_limit=(scale_limit, scale_limit), 
                             rotate_limit=(rotate_limit, rotate_limit), 
                             interpolation=cv2.INTER_LINEAR,
                             border_mode=cv2.BORDER_CONSTANT, 
                             value=0, mask_value=0,
                             shift_limit_x=(shift_limit_x, shift_limit_x),
                             shift_limit_y=(shift_limit_y, shift_limit_y), 
                             rotate_method='largest_box', p=1.0)
    augmented = ssr(image=image)["image"]
    augmented = tf.cast(augmented, TOUT)
    return augmented

class ShiftScaleRotate(keras.layers.Layer):
    def __init__(self, scale, rotate, shift_x, shift_y,  **kwargs):
        super().__init__(**kwargs)
        self.scale = scale
        self.rotate = rotate
        self.shift_x = shift_x
        self.shift_y = shift_y

    def call(self, inputs):
        outputs = shift_scale_rotate(inputs, self.scale,self.rotate,self.shift_x,self.shift_y)
        return outputs

class BrightnessContrastLayer(keras.layers.Layer):
    def __init__(self, contrast_factor = None,brightness_factor=None, p=1, **kwargs):
        super().__init__(**kwargs)
        self.contrast_factor = contrast_factor
        self.brightness_factor = brightness_factor
        self.p = p

    def call(self, inputs):
        outputs = tf.image.adjust_contrast(inputs, self.contrast_factor)
        outputs = tf.experimental.numpy.clip(outputs, 0, 255)
        outputs = tf.image.adjust_brightness(outputs, self.brightness_factor)
        outputs = tf.experimental.numpy.clip(outputs, 0, 255)
        return outputs


def augment_img(img_dict):
    if tf.random.uniform((),0,1) < 0.45: #rate for overal augmentations
        return img_dict
    Ex_image = img_dict['Examined']
    Aux_image = img_dict['Aux']

    cutout = CutoutLayer(min_max_holes=(1,8), 
                         min_max_height=(int(0.05 * D.image_size[0]), int(0.15 * D.image_size[0])), 
                         min_max_width=(int(0.05 * D.image_size[1]), int(0.15 * D.image_size[1])),
                        )
    rotate_zoom = ShiftScaleRotate(scale= tf.random.uniform((),-0.15,0.2), 
                                   rotate= tf.random.uniform((),-20,20), 
                                   shift_x=tf.random.uniform((),-0.20,0.20), 
                                   shift_y=tf.random.uniform((),-0.1,0.1))
    brightness_contrast = BrightnessContrastLayer(contrast_factor=tf.random.uniform((),0.8, 1.5), 
                                                  brightness_factor=tf.random.uniform((),-0.1, 0.2))
    augmentations = {
                     cutout : 0.4,
                     tf.image.flip_left_right :0.5,
                     tf.image.flip_up_down :0.5,
                     rotate_zoom : 0.4,
                     brightness_contrast :0.5
                    }

    for aug, p in augmentations.items():
        if tf.random.uniform((), 0, 1) < p:
            Ex_image = aug(Ex_image)
            Aux_image = aug(Aux_image)
        else:
            Ex_image = Ex_image
            Aux_image = Aux_image

    Ex_image = tf.reshape(Ex_image, (D.image_size[0],D.image_size[1], 3))
    Aux_image = tf.reshape(Aux_image, (D.image_size[0],D.image_size[1], 3))

    if tf.random.uniform((),0,1) < 0.5: # swap view position
        img_dict['Examined']= Aux_image
        img_dict['Aux'] = Ex_image
    else:
        img_dict['Examined'] = Ex_image
        img_dict['Aux']= Aux_image
        
    if tf.random.uniform((),0,1) < 0.1: # remove auxiliary view
        img_dict['Aux'] = tf.zeros_like(Aux_image)
#     else:
#         img_dict['Aux']= Aux_image
        
    return img_dict

0

In [None]:
def get_label(file_path):
    path_parts = tf.strings.split(file_path, os.path.sep)
    filename_parts = tf.strings.split(path_parts[-1], "_")
    label = tf.strings.to_number(filename_parts[-3],out_type=tf.dtypes.int32)

    return tf.one_hot(label-1,5)

def decode_img(img):
    img = tf.io.decode_png(img, channels=3)
    # Resize the image to the desired size
    return tf.image.resize(img, D.image_size)

def process_image(file_path):
    # Load the raw data from the file as a string
    img = tf.io.read_file(file_path)
    img = decode_img(img)
    return tf.cast(img, TOUT)

datasets_dict = {
    f'fold-{i}':[
        {
            'Examined':None,
            'Aux':None,
        },
        [] #for label
    ] for i in range (5)
}
view_map = {"Examined":"CC", "Aux":"MLO"}
#https://www.tensorflow.org/tutorials/load_data/images?hl=en#using_tfdata_for_finer_control
for split in D.splits:
    for view in D.views:
        datasets_dict[split][0][view] = tf.data.Dataset.list_files(f"{D.images_dir}/*_{view_map[view]}_{split}.png",shuffle=False)

for view in D.views:
    total = 0
    print(f"{view} dataset".center(40, "-"))
    for split in D.splits:
        ds = datasets_dict[split][0][view]
        fold_cardinality = ds.cardinality().numpy()
        total += fold_cardinality
        print(f'{view}-{split} cardinality :', fold_cardinality)
        print('examples :')
        for f in ds.take(2):
            print(f.numpy())
        print()
    print("total = ",total)

        
for split in D.splits:
    datasets_dict[split][1] = datasets_dict[split][0][view].map(get_label, num_parallel_calls=tf.data.AUTOTUNE)
    for view in D.views:
        datasets_dict[split][0][view] = datasets_dict[split][0][view].map(process_image, num_parallel_calls=tf.data.AUTOTUNE)

print("".center(40, "-"))
for image in datasets_dict[D.splits[0]][0][D.views[0]].take(1):
    print("image dtype =", image.dtype) 
print("unique labels :\n",np.unique(list(datasets_dict[D.splits[0]][1].as_numpy_iterator()), axis=0))

fold_ds = []
for split in D.splits:
    fold_i = tf.data.Dataset.zip(tuple(datasets_dict[split]))
    fold_ds.append(fold_i)

del datasets_dict

def prepare_ds(ds, training=True):
    ds = ds.cache()
    if training:
        ds = ds.repeat(1)
        ds = ds.shuffle(buffer_size=D.shuffle_buffer*2)
        ds = ds.map(lambda x, y: (augment_img(x), y),
                    num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.batch(D.batch_size)
    ds = ds.prefetch(buffer_size=tf.data.AUTOTUNE)
    return ds

# visualization

In [None]:
from tensorflow.errors import InvalidArgumentError

viz_ds = prepare_ds(fold_ds[0], training=True)
print(repr(viz_ds))
batch = next(iter(viz_ds))
Ex_image_batch = batch[0]['Examined']
Aux_image_batch = batch[0]['Aux']
label_batch = batch[1]

ncol = 4
nrow = 4
bar_color = [0, 255, 0]
bar_thickness = 5
bar = np.array([[bar_color for i in range(bar_thickness)]
                           for j in range(D.image_size[0])])
random_start = 0 if D.batch_size < 21 else np.random.randint(0,D.batch_size-nrow*ncol)
plt.figure(figsize=(20, 5*nrow))
for i in range(nrow*ncol):
    index = random_start+i
    try:
        EX = Ex_image_batch[index].numpy().astype("uint8")
        AUX = Aux_image_batch[index].numpy().astype("uint8")
    except InvalidArgumentError:
        break
    ax = plt.subplot(nrow, ncol, i + 1)
    image = np.hstack((EX,bar,AUX))
    plt.imshow(image)
    label = label_batch[index]
    plt.title(f"({index})\nBI-RADS {np.argmax(label)+1}\nExamined|Aux")
    plt.axis("off")

# Model Training

In [None]:
# Learning rate schedule for TPU, GPU and CPU.
# Using an LR ramp up because fine-tuning a pre-trained model.
# Starting with a high LR would break the pre-trained weights.
def get_lr_callback(plot_schedule=False, EPOCHS=D.epochs):
    LR_START = 0.00001
    LR_MAX   = 0.00005 * strategy.num_replicas_in_sync
    LR_MIN   = 0.00001
    LR_RAMPUP_EPOCHS = round(EPOCHS*0.25)
    LR_SUSTAIN_EPOCHS = 0
    LR_EXP_DECAY = .9

    def lrfn(epoch):
        if epoch < LR_RAMPUP_EPOCHS:
            lr = (LR_MAX - LR_START) / LR_RAMPUP_EPOCHS * epoch + LR_START
        elif epoch < LR_RAMPUP_EPOCHS + LR_SUSTAIN_EPOCHS:
            lr = LR_MAX
        else:
            lr = (LR_MAX - LR_MIN) * LR_EXP_DECAY**(epoch - LR_RAMPUP_EPOCHS - LR_SUSTAIN_EPOCHS) + LR_MIN
        return lr
    
    if plot_schedule:
        rng = [i for i in range(25 if EPOCHS < 25 else EPOCHS)]
        y = [lrfn(x) for x in rng]
        plt.plot(rng, y)

    return keras.callbacks.LearningRateScheduler(lrfn, verbose=0)
get_lr_callback(plot_schedule=True)

In [None]:
@keras.saving.register_keras_serializable("CustomLayers", name="custom_global_pooling")
class GlobalPooling2D(keras.layers.Layer):
    '''
    modified from : https://github.com/csvance/keras-global-weighted-pooling/blob/master/gwp.py#L51
    reference : https://arxiv.org/abs/1809.08264
    '''
    def __init__(self,pool_func:str, activation:str='linear', **kwargs):
        super().__init__(**kwargs)
        assert pool_func in ['max', 'avg','w_max', 'w_avg'], "one of : 'max', 'avg','w_max', 'w_avg'"
        self.act = activation
        self.pool_func = pool_func

    def build(self, input_shape):
        if "w_" in self.pool_func:
            self.kernel = self.add_weight(name='kernel',
                                          shape=(input_shape[1], input_shape[2], 1),
                                          initializer='ones',
                                          trainable=True)
            self.bias = self.add_weight(name='bias',
                                        shape=(1,),
                                        initializer='zeros',
                                        trainable=True)
        else:
            pass
        super().build(input_shape)

    def compute_output_shape(self, input_shape):
        return input_shape[0], input_shape[3],

    def call(self, x):
        if "w_avg" == self.pool_func:
            z = tf.reduce_mean(x*self.kernel, axis=(1, 2)) + self.bias
        elif "w_max" == self.pool_func:
            z = tf.reduce_max(x*self.kernel, axis=(1, 2)) + self.bias
        elif "max" == self.pool_func:
            z = tf.reduce_max(x, axis=(1, 2))
        elif "avg" == self.pool_func:
            z = tf.reduce_mean(x, axis=(1, 2))
        x = keras.layers.Activation(self.act)(z)
        return x

@keras.saving.register_keras_serializable("CustomLayers", name="stochastic_depth")
class StochasticDepth(keras.layers.Layer):
    """
    source : https://github.com/keras-team/keras/blob/v3.3.3/keras/src/applications/convnext.py#L140
    """
    def __init__(self, drop_path_rate, **kwargs):
        super().__init__(**kwargs)
        self.drop_path_rate = drop_path_rate

    def call(self, x, training=None):
        if training:
            keep_prob = 1 - self.drop_path_rate
            shape = (tf.shape(x)[0],) + (1,) * (len(x.shape) - 1)
            random_tensor = keep_prob + tf.random.uniform(shape, 0, 1)
            random_tensor = tf.floor(random_tensor)
            random_tensor = tf.cast(random_tensor, TOUT)
            return (x / keep_prob) * random_tensor
        return x

    def get_config(self):
        config = super().get_config()
        config.update({"drop_path_rate": self.drop_path_rate})
        return config

@keras.saving.register_keras_serializable("CustomLayers", name="layer_scale")
class LayerScale(keras.layers.Layer):
    """
    source : https://github.com/keras-team/keras/blob/v3.3.3/keras/src/applications/convnext.py#L177
    """

    def __init__(self, init_values, projection_dim, **kwargs):
        super().__init__(**kwargs)
        self.init_values = init_values
        self.projection_dim = projection_dim

    def build(self, _):
        self.gamma = self.add_weight(
            name='gamma',
            shape=(self.projection_dim,),
            initializer=keras.initializers.Constant(self.init_values),
            trainable=True,
        )

    def call(self, x):
        return x * self.gamma

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "init_values": self.init_values,
                "projection_dim": self.projection_dim,
            }
        )
        return config

    
keras.saving.get_custom_objects()

In [None]:
from keras.applications import (
    ConvNeXtTiny,
    ConvNeXtSmall,
    ConvNeXtBase,
    ConvNeXtLarge,
    ConvNeXtXLarge,
)

convnext_varians_map = {
    "convnext_tiny" : ConvNeXtTiny,
    "convnext_small": ConvNeXtSmall,
    "convnext_base" : ConvNeXtBase,
    "convnext_large": ConvNeXtLarge,
    "convnext_xlarge": ConvNeXtXLarge,
}

convnext_dims_depth_map = {
    "convnext_tiny" : ([96, 192, 384, 768], [3, 3, 9, 3]),
    "convnext_small": ([96, 192, 384, 768], [3, 3, 27, 3]),
    "convnext_base" : ([128, 256, 512, 1024], [3, 3, 27, 3]),
    "convnext_large": ([192, 384, 768, 1536], [3, 3, 27, 3]),
    "convnext_xlarge":([256, 512, 1024, 2048], [3, 3, 27, 3]),
}

def load_convnext(variant=D.model_var):
    convnext = convnext_varians_map[variant](
        include_top=False,
        weights="imagenet",
        input_shape=(D.image_size[0],D.image_size[1],3),
    )
    return convnext

def get_dims_depth(variant=D.model_var):
    return convnext_dims_depth_map[variant]
    
def ConvNext_Block(x, dim,
                   stage=0,
                   block=0,
                   pretrained=None,
                   layer_scale_init_value=1e-6,
                   drop_path_rate=None,
                   variant='convnext_small', 
                   name=''
                  ):
    prew = None
    if pretrained:
        prew = {
              'conv':[
                  pretrained[f'{variant}_stage_{stage}_block_{block}_depthwise_conv.kernel'].numpy(),
                  pretrained[f'{variant}_stage_{stage}_block_{block}_depthwise_conv.bias'].numpy(),
                  ],
              'lnorm':[
                  pretrained[f'{variant}_stage_{stage}_block_{block}_layernorm.gamma'].numpy(),
                  pretrained[f'{variant}_stage_{stage}_block_{block}_layernorm.beta'].numpy(),
                  ],
              'pointwise1':[
                  pretrained[f'{variant}_stage_{stage}_block_{block}_pointwise_conv_1.kernel'].numpy(),
                  pretrained[f'{variant}_stage_{stage}_block_{block}_pointwise_conv_1.bias'].numpy(),
                  ],
              'pointwise2':[
                  pretrained[f'{variant}_stage_{stage}_block_{block}_pointwise_conv_2.kernel'].numpy(),
                  pretrained[f'{variant}_stage_{stage}_block_{block}_pointwise_conv_2.bias'].numpy(),
                  ],
              'layer_scale' : [pretrained[f'{variant}_stage_{stage}_block_{block}_layer_scale.gamma']]
          }
  

    depthwise_convolution = keras.layers.Conv2D(dim, 
                                                   kernel_size=7, 
                                                   padding="same", 
                                                   groups=dim, 
                                                   name = f'{name}_{stage}-{block}_depthwise_conv'
                                                  )
    layer_normalization = keras.layers.LayerNormalization(epsilon=1e-6, name=f'{name}_{stage}-{block}_layernorm')
    pointwise_convolution_1 = keras.layers.Dense(4 * dim, name=f'{name}_{stage}-{block}_pointwise_conv1')
    GELU = keras.layers.Activation("gelu", name=f'{name}_{stage}-{block}_gelu')
    pointwise_convolution_2 = keras.layers.Dense(dim, name=f'{name}_{stage}-{block}_pointwise_conv2')
    add = keras.layers.Add(name=f'{name}_{stage}-{block}_output')
    layer_scale = LayerScale(layer_scale_init_value, projection_dim=dim, name=f'{name}_{stage}-{block}_layer_scale')

    o = depthwise_convolution(x)
    o = layer_normalization(o)
    o = pointwise_convolution_1(o)
    o = GELU(o)
    o = pointwise_convolution_2(o)
    o = layer_scale(o)
    if drop_path_rate is not None:
        o = StochasticDepth(
            drop_path_rate, name=f'{name}_{stage}-{block}_stochastic_depth'
        )(o)
    else:
        o = keras.layers.Activation("linear", name=f'{name}_{stage}-{block}_identity')(o)
    if prew:
        depthwise_convolution.set_weights(prew['conv'])
        layer_normalization.set_weights(prew['lnorm'])
        pointwise_convolution_1.set_weights(prew['pointwise1'])
        pointwise_convolution_2.set_weights(prew['pointwise2'])
        layer_scale.set_weights(prew['layer_scale'])

    return add([o, x])


def patchify_stem(x, 
                  dim=96, 
                  pretrained=None, 
                  name='single', 
                  variant='convnext_small'
                 ):
    conv = keras.layers.Conv2D(dim, 
                              kernel_size=4, 
                              strides=4,
                              name=f'{name}_stem_conv')
    layer_norm = keras.layers.LayerNormalization(epsilon=1e-6,name=f'{name}_stem_layer_norm')
    o = conv(x)
    o = layer_norm(o)
    if pretrained:
        prew = pretrained
        conv_w = [
            prew[f'{variant}_stem.layer_with_weights-0.kernel'].numpy(),
            prew[f'{variant}_stem.layer_with_weights-0.bias'].numpy(),
            ]
        layer_norm_w = [
            prew[f'{variant}_stem.layer_with_weights-1.gamma'].numpy(),
            prew[f'{variant}_stem.layer_with_weights-1.beta'].numpy(),
        ]
        conv.set_weights(conv_w)
        layer_norm.set_weights(layer_norm_w)

    return o

def spatial_downsampling(x, 
                         stage, 
                         dim,
                         pretrained=None, 
                         kernel_size=2,
                         stride=2,
                         name='single', 
                         variant='convnext_small'
                        ):
    layer_norm = keras.layers.LayerNormalization(epsilon=1e-6,name=f'{name}_downsampling_{stage}_layer_norm')
    conv = keras.layers.Conv2D(dim, 
                              kernel_size=kernel_size, 
                              strides=stride,
                              name=f'{name}_downsampling_{stage}_conv'
                              )
    o = layer_norm(x)
    o = conv(o)
    if pretrained:
        prew = pretrained
        layer_norm_w = [
            prew[f'{variant}_downsampling_block_{stage-1}.layer_with_weights-0.gamma'].numpy(),
            prew[f'{variant}_downsampling_block_{stage-1}.layer_with_weights-0.beta'].numpy(),
        ]
        conv_w = [
            prew[f'{variant}_downsampling_block_{stage-1}.layer_with_weights-1.kernel'].numpy(),
            prew[f'{variant}_downsampling_block_{stage-1}.layer_with_weights-1.bias'].numpy(),
        ]
        conv.set_weights(conv_w)
        layer_norm.set_weights(layer_norm_w)
    return o

def ConvNext_Stage(x, 
                   dim, 
                   depth, 
                   stage,  
                   pretrained=None, 
                   layer_scale_init_value=1e-6, 
                   depth_drop_rates=None,
                   name='stage',
                   variant='convnext_small'
                  ):
    o = x
    if depth_drop_rates is None:
        depth_drop_rates = np.zeros(depth)
    for j in range(depth):
        o = ConvNext_Block(o,
                           dim=dim, 
                           pretrained=pretrained, 
                           stage=stage, block=j, 
                           layer_scale_init_value=layer_scale_init_value,
                           variant=variant,
                           drop_path_rate=depth_drop_rates[j],
                           name=name
                          )
    return o

def ConvNext_stage_and_downsampling(x,
                                    dim, 
                                    depth, 
                                    i, 
                                    pretrained, 
                                    depth_drop_rates=None,
                                    view='single', 
                                    variant='convnext_small'
                                   ):
    if i == 0:
        x = keras.layers.Normalization(
            mean=[0.485 * 255, 0.456 * 255, 0.406 * 255],
            variance=[
                (0.229 * 255) ** 2,
                (0.224 * 255) ** 2,
                (0.225 * 255) ** 2,
            ],
            name=f'{variant}_{view}_norm'
        )(x)
        x = patchify_stem(pretrained=pretrained,
                        dim=dim,
                        variant=variant,
                        name=f'{variant}_{view}',x=x)
    else:
        x = spatial_downsampling(pretrained=pretrained,
                                  dim=dim,
                                  stage=i,
                                  kernel_size=2,
                                  stride=2,
                                  variant=variant,
                                  name=f'{variant}_{view}',x=x)

    x = ConvNext_Stage(x=x,
                       dim=dim, 
                       depth=depth, 
                       stage=i, 
                       pretrained=pretrained, 
                       depth_drop_rates=depth_drop_rates,
                       name=f'{variant}_{view}_stage',
                       variant=variant,)
    return x


def multi_view_fusion_stage(pre_fusion, 
                            i, 
                            pretrained_weights=None, 
                            dims=None,
                            depths=None, 
                            depth_drop_rates=None,
                            fusion_block_index=0,
                            model_var=D.model_var
                           ):
    for view, x in pre_fusion.items():
        x = spatial_downsampling(pretrained=pretrained_weights,
                                  dim=dims[i],
                                  stage=i,
                                  kernel_size=2,
                                  stride=2,
                                  variant=model_var,
                                  name=f"{model_var}_{view}_fusion_downsampling",
                                  x=x
                                )
        pre_fusion[view] = x
    x_dual_skip = pre_fusion.copy()
    if depth_drop_rates is None:
        depth_drop_rates = np.zeros(depths[i])
    # stage iteration here
    for j in range(depths[i]):
        if j < fusion_block_index:
            for view, x in pre_fusion.items():
                x = ConvNext_Block(x, 
                                  dims[i],stage=i,
                                  block=j,
                                  pretrained=pretrained_weights, 
                                  drop_path_rate=depth_drop_rates[j],
                                  variant=model_var,
                                  name=f'{model_var}_{view}_fusion_stage')
                pre_fusion[view] = x_dual_skip[view] = x
            continue
        elif j == fusion_block_index:
            x = keras.layers.Average(name=f'{model_var}_fusion_merge')(list(pre_fusion.values()))
            x = ConvNext_Block(x, 
                              dims[i],stage=i,
                              block=j,
                              pretrained=pretrained_weights, 
                              drop_path_rate=depth_drop_rates[j],
                              variant=model_var,
                              name=f'{model_var}_{view}_post-fusion_stage')
            x = keras.layers.Add(name="merge_fused_and_examined_skip")([x, x_dual_skip["Examined"]])
            continue
        elif j > fusion_block_index:
            x = ConvNext_Block(x, 
                              dims[i],stage=i,
                              block=j,
                              pretrained=pretrained_weights, 
                              drop_path_rate=depth_drop_rates[j],
                              variant=model_var,
                              name=f'{model_var}_{view}_post-fusion_stage')
    return x
0

In [None]:
def get_inputs():
    inputs = {
                "Examined" : keras.Input(shape=[*D.image_size,3], name='Examined', dtype=tf.float32),
                "Aux" : keras.Input(shape=[*D.image_size,3], name='Aux', dtype=tf.float32)
             }
    return inputs

def model_compile(model):
    model.compile(
        loss=D.loss_fn,
        optimizer=D.optimizer(),
        metrics=D.metrics(),
        jit_compile=True if D.TPU else False,
    )
    return model

def model_fit(model, fold=1, initial_epoch=0,epoch=D.epochs, verbose = 2, initial_val_F1=0.4):
    validation_ds = prepare_ds(fold_ds[fold], training=False)
    train_fold = fold_ds[:fold] + fold_ds[fold + 1:]
    train_ds = train_fold[0]
    for i in range(1, len(train_fold)):
        train_ds = train_ds.concatenate(train_fold[i])
    train_ds = prepare_ds(train_ds, training=True)
    ckpt_callback = keras.callbacks.ModelCheckpoint(
                        filepath=f'checkpoints/{D.model_var}_fold-{fold}_best.weights.h5',
                        monitor='val_macro_F1',
                        verbose=2,
                        mode='max',
                        save_best_only=True,
                        save_weights_only=True,
                        initial_value_threshold=initial_val_F1,
                       )
    history = model.fit(
        train_ds,
        initial_epoch=initial_epoch,
        epochs = epoch,
        verbose = verbose,
        validation_data=validation_ds,
        callbacks=[ckpt_callback, get_lr_callback()]
    )
    return history, ckpt_callback.best

def create_model(model_var=D.model_var, 
                 fusion_stage=2, 
                 fusion_block_index=1,
                 fc_layers_depth=1, 
                 fc_layers_dims=512, 
                 drop_path_rate=D.dropout_rate,
                 drop_out_rate=0.5,
                 pooling='avg',
                 pretrained_weights=None,
                ):
    
    inputs = get_inputs()
    pre_fusion = {key:value for key, value in inputs.items()}
    dims , depths = get_dims_depth(model_var)
    depth_drop_rates = np.linspace(0, drop_path_rate, sum(depths), dtype=float)
    blocks_passed = 0
    for i in range(len(dims)):
        current_stage_depth_drop_rates = depth_drop_rates[blocks_passed:blocks_passed+depths[i]]
        blocks_passed+=depths[i]
        if i < fusion_stage:
            for key in D.views:
                pre_fusion[key] = ConvNext_stage_and_downsampling(pre_fusion[key], 
                                                                   dims[i], 
                                                                   depths[i], i, 
                                                                   pretrained_weights, 
                                                                   variant=model_var, 
                                                                   depth_drop_rates=current_stage_depth_drop_rates,
                                                                   view=key)
            continue
        if i == fusion_stage:
            x = multi_view_fusion_stage(pre_fusion, 
                                        i,
                                        pretrained_weights=pretrained_weights, 
                                        dims=dims,
                                        depths=depths, 
                                        depth_drop_rates=current_stage_depth_drop_rates,
                                        fusion_block_index=fusion_block_index,
                                        model_var=model_var,
                                       )
            continue
        x = ConvNext_stage_and_downsampling(x, dims[i], depths[i], i, 
                                        pretrained_weights,view="fine", 
                                        depth_drop_rates=current_stage_depth_drop_rates, variant=model_var)
    x = GlobalPooling2D(pooling, name=f'{model_var}_global_pooling')(x)
    LN1 = keras.layers.LayerNormalization(epsilon=1e-6, name=f'{model_var}_pre_FC_ln')
    x = LN1(x)
    if pretrained_weights:
        LN1.set_weights([
            pretrained_weights['layer_normalization.gamma'].numpy(),
            pretrained_weights['layer_normalization.beta'].numpy(),
        ])
    x = keras.layers.Dropout(drop_out_rate)(x)
    for i in range(fc_layers_depth):
        x = keras.layers.Dense(fc_layers_dims, activation='gelu', name=f'{model_var}_cls_{i}')(x)
    output = keras.layers.Dense(5, activation='softmax',dtype='float32' ,name=f'{model_var}_output')(x)
    model = model_compile(keras.src.models.Functional(inputs, output, name=f'{model_var}_mammo_multi_view'))
    return model

def plot_history_metrics(history, model_name="convnext"):
    loss = history.history.pop('loss')
    val_loss = history.history.pop('val_loss')
    history.history.pop('lr')
    epochs = range(D.epochs)
    plt.plot(epochs, loss, 'r', label='Training Loss')
    plt.plot(epochs, val_loss, 'b', label='Validation Loss')
    plt.legend()
    plt.title(f'Training and validation loss | {model_name}')
    plt.figure()
    for key, values in history.history.items():
        plt.plot(epochs, values, label=key)
    plt.title(f'Training and validation metrics | {model_name}')
    plt.legend()
    plt.show()
    
def plot_history_metrics_for_multi_model(histories:dict, metric_to_plot="loss"):
    for fold, history in histories.items():
        metric_history = history.history.get(metric_to_plot)
        epochs = range(D.epochs)
        plt.plot(epochs, metric_history, label=fold)
    plt.legend()
    plt.title(f'{metric_to_plot} history')
    plt.show()
0

In [None]:
pretrained_weights = load_convnext(D.model_var).get_weight_paths()

In [None]:
# model = create_model(model_var=D.model_var, 
#                      fusion_stage=2, 
#                      fusion_block_index=9,
#                      fc_layers_depth=D.fc_layers_depth, 
#                      fc_layers_dims=D.fc_layers_dims, 
#                      drop_path_rate=D.dropout_rate,
#                      pooling=D.pooling,
#                      pretrained_weights=pretrained_weights,
#                     )

# keras.utils.plot_model(model,show_shapes=True, 
#                        rankdir="TB", dpi=128,)
# model.summary()

In [None]:
# model.get_layer(f'{D.model_var}_Examined_stage_{1}-{2}_stochastic_depth').get_config()

In [None]:
# !rm -rf keras-tuner
# hyperparams = {
#     "pooling": ['w_avg'],
#     "fusion_stage" : [1, 2, 3],
#     "fusion_block_index" : [0, 1, 2],
#     "drop_path_rate": [0.20, 0.60]
# }
# num_trials = np.prod([len(hpc) for hpc in hyperparams.values()])
# print("num_trials = ",num_trials)
# def build_model(hp):
#     hp_kwargs = { key:hp.Choice(key, value, ordered=False) for key, value in hyperparams.items()}
#     with strategy.scope():
#         model = create_model(
#                              pretrained_weights=pretrained_weights,
#                              **hp_kwargs
#                             )
#     return model
# tuner = keras_tuner.GridSearch(
#     hypermodel=build_model,
#     objective=keras_tuner.Objective('val_macro_F1', 'max'),
#     max_trials=num_trials,
#     seed=911,
#     overwrite=False,
#     directory="/kaggle/working",
#     project_name='keras-tuner',
# )
# tuner.search_space_summary()

In [None]:
# # D.model_var = "convnext_base"
# # D.fusion_stage = 3
# # D.fc_layers_depth = None
# # D.fc_layers_dims = None
# # D.dropout_rate = None
# # D.pooling = None
# fold = 1
# try:
#     with strategy.scope():
#         model = create_model(model_var=D.model_var, 
#                              fusion_stage=D.fusion_stage,
#                              fusion_block_index=D.fusion_index,
#                              fc_layers_depth=D.fc_layers_depth, 
#                              fc_layers_dims=D.fc_layers_dims, 
#                              drop_path_rate=D.dropout_rate,
#                              drop_out_rate=0.3,
#                              pooling=D.pooling,
#                              pretrained_weights=pretrained_weights,
#                             )
#         history = model_fit(model, 
#                             fold=fold,
#                             initial_epoch=0,
#                             epoch=D.epochs, 
#                             verbose = 2 if D.TPU else 1, 
#                             initial_val_F1=0.5
#                            )
#         model.load_weights(f"checkpoints/{D.model_var}_fold-{fold}_best.weights.h5")
        
# #         tuner.search(train_ds, 
# #                      epochs=D.epochs if D.TPU else 1, 
# #                      validation_data=validation_ds,
# #                      verbose=2,
# #                      callbacks=[get_lr_callback()]
# #                     )
# except IndexError as e:
#     print(repr(e))

In [None]:
histories = {}
# models = []
models_best_f1_score = []
with strategy.scope():
    for fold in range(len(fold_ds)):
        model = create_model(model_var=D.model_var, 
                         fusion_stage=D.fusion_stage,
                         fusion_block_index=D.fusion_index,
                         fc_layers_depth=D.fc_layers_depth, 
                         fc_layers_dims=D.fc_layers_dims, 
                         drop_path_rate=D.drop_path_rate,
                         drop_out_rate=D.dropout_rate,
                         pooling=D.pooling,
                         pretrained_weights=pretrained_weights,
                        )
        print(f"training model fold-{fold}".center(40,"-"),"\nprevious fold result :", models_best_f1_score)
        history, best_f1_score = model_fit(model, 
                                            fold,
                                            initial_epoch=0,
                                            epoch=D.epochs, 
                                            verbose = 2 if D.TPU else 1, 
                                            initial_val_F1=0.5
                                           )
        histories[f"fold-{fold}"] = history
        models_best_f1_score.append(best_f1_score)
        model.load_weights(f"checkpoints/{D.model_var}_fold-{fold}_best.weights.h5")
        model.save(f"{D.model_var}_fold-{fold}_best-{best_f1_score:.3f}.h5", include_optimizer=False)
#         models.append(model)
        clear_output()
print(f"training finished".center(40,"-"))
!rm -rf checkpoints
print("F1-Score result :")
print("\n".join([f"fold-{i} = {score}" for i, score in enumerate(models_best_f1_score)]))

# Result

In [None]:
# tuner.results_summary(num_trials)

In [None]:
plot_history_metrics_for_multi_model(histories, metric_to_plot="macro_F1")
plot_history_metrics_for_multi_model(histories, metric_to_plot="val_macro_F1")
plot_history_metrics_for_multi_model(histories, metric_to_plot="accuracy")
plot_history_metrics_for_multi_model(histories, metric_to_plot="val_accuracy")

In [None]:
for fold, history in histories.items():
    plot_history_metrics(history, model_name=fold)

# test the saved model

In [None]:
# with strategy.scope():
#     modelll = create_model(model_var=D.model_var, 
#                              fusion_stage=D.fusion_stage,
#                              fusion_block_index=0,
#                              fc_layers_depth=D.fc_layers_depth, 
#                              fc_layers_dims=D.fc_layers_dims, 
#                              drop_path_rate=D.dropout_rate,
#                              pooling=D.pooling,
#                              pretrained_weights=None,
#                             )
#     modelll.load_weights(input("model weights path (.weights.h5) :"))
# #     modelll = keras.models.load_model(input("model path (.h5 or .keras) :"))
#     modelll.evaluate(validation_ds)

In [None]:
# modell = keras.models.load_model("/kaggle/working/convnext_dual_view.h5")
# modell.summary()