- SemSeg Pipeline (data.py): https://github.com/HasnainRaz/SemSegPipeline
- Semantic Segmentation and the Dataset (prepare dataset): https://d2l.ai/chapter_computer-vision/semantic-segmentation-and-dataset.html
- PLC segmentation (custom callback) : https://github.com/ika-rwth-aachen/PCLSegmentation/blob/main/pcl_segmentation/utils/callbacks.py
- Human Image Segmentation (data and application ) : https://github.com/nikhilroxtomar/Human-Image-Segmentation-with-DeepLabV3Plus-in-TensorFlow
- Deep Resnet and Resnet++ (pytorch) : https://github.com/rishikksh20/ResUnet
- Deep Resnet and Resnet++ (original repo and tf) : https://github.com/DebeshJha/ResUNetPlusPlus
- Loss functions for image segmentation : https://github.com/JunMa11/SegLoss
- Albumentations with tf.data : https://colab.research.google.com/drive/1uUH-asz3CFxlvld8uGx-m3crr4Iepp3u#scrollTo=LWO057_PshWr
- DeepLabv3 with (reference_repo) : https://github.com/lattice-ai/DeepLabV3-Plus
- tf-segmentation-segmentation (reference_repo) : https://github.com/baudcode/tf-semantic-segmentation/tree/feature/line-detection/tf_semantic_segmentation

In [None]:
%load_ext autoreload
%autoreload 2

import albumentations as A

from tensorflow import keras
from tensorflow.keras import initializers
from tensorflow.keras.layers import Conv2D,Conv1D
import numpy as np
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import Layer
from tensorflow.keras.losses import Loss
from tensorflow.keras import Model
from tensorflow.keras import initializers
import tensorflow as tf
import datetime

#from tf_seg.utils import TensorLike, Tensor, FloatTensorLike,AcceptableDTypes
from tf_seg.logger import Logger

### Trainer

In [None]:
batch_size = 4

data_config = dict(
    name="road_segmentation",
    function_name="custom",  # it is used camvid dataset to generate binary data
    path="data/dummy_data",#"ignis/data/segmentation/data_v2",
    classes=["Area"],
    normalizing=False,
    palette=[(255, 255, 255)],
    one_hot_encoding=True,  # target output shape
    background_adding=False,  # add target background class
    image_size=(512, 512),
    batch_size=batch_size, #
    output_type=("tf.float32", "tf.float32"),  # this is for camvid data types after data processing
    channels=(3, 3),  # it is optional
)
aug_config = dict(aug_type="albumentations")


model_config = dict(
    # n_filters=[16, 32, 64, 128, 256],
    # n_filters=[4, 8, 12, 16,24],
    n_filters=[64, 128,256,512],  #
    input_shape=[data_config["image_size"][0], data_config["image_size"][1], 3],
    final_activation="sigmoid",
    activation="relu",
    backbone= "EfficientNetB0",# "ResNet50", None
    pretrained="imagenet",
    output_size=1,
)

trainer_config = dict(
    epochs=600,
    #batch_size=batch_size, # remove batch size
    optimizer={"name": "adam", "params": {"learning_rate": 0.001}},
    losses=["dice_loss"],
    metrics=["dice_score"],
    save_model=True,
    save_path="test_efficientnetb0_ignis_area",
    verbose=1,
    deploy_onnx=True,
)

callbacks_config = dict(measure_total_time={"class_name": "MeasureTotalTime", "params": {}},
                        update_best_weights={"class_name": "UpdateBestWeights", "params": {"metric_name": "val_dice_score", "mode": "max"}})

config = dict(data=data_config, model=model_config, aug=aug_config, trainer=trainer_config, callbacks=callbacks_config)

In [None]:
from tf_seg.train import Trainer
from tf_seg.save import ModelSaver
from tf_seg.transformers import Transformer

In [None]:
IM_SIZE = data_config["image_size"][0]
p=0.3

train_transforms = A.Compose(
    [
        A.Resize(IM_SIZE, IM_SIZE),
        #A.RandomBrightness(limit=0.2),
    
        # A.Rotate(limit=90,p=p),
        # A.RandomSizedCrop(min_max_height=(400, 512), height=IM_SIZE, width=IM_SIZE, p=p),
        # A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        #A.RandomBrightness(limit=1)
       
        #   A.OneOf(
        #       [
        #           A.RandomSizedCrop(min_max_height=(256, 256), height=IM_SIZE, width=IM_SIZE, p=p),
        #           A.CenterCrop(height=IM_SIZE, width=IM_SIZE, p=p),
        #           #A.PadIfNeeded(min_height=IM_SIZE, min_width=IM_SIZE, p=p),
        #       ],p=1),
        # A.OneOf([A.Rotate(limit=45), A.Transpose(p=p)]), # A.VerticalFlip(p=p), 
        #A.OneOf([A.ElasticTransform(alpha=120, sigma=120 * 0.05, alpha_affine=120 * 0.03, p=p), A.GridDistortion(p=p), A.OpticalDistortion(distort_limit=2, shift_limit=p, p=1)], p=0.8),
    ])


train_transformer= Transformer(aug_config, 'train', train_transforms)

In [None]:
# train_transformer.save(path="test_efficientnetb0_ignis_area.yml")
# train_transformer.load(path="test_efficientnetb0_ignis_area.yml")

In [None]:
# crete tf.data.Dataset
batch_size=2
train_ds = tf.data.Dataset.from_tensor_slices(tensors=(np.random.rand(10, 512, 512, 3), np.random.rand(10, 512, 512, 1))).batch(batch_size)

In [None]:
inputs = tf.keras.Input(shape=(512, 512, 3))
x = Conv2D(3, 3, activation='relu',padding='same')(inputs)
outputs = Conv2D(1, 1, activation='sigmoid')(x)

model = tf.keras.Model(inputs=inputs, outputs=outputs)

In [None]:
trainer = Trainer(config, model, train_ds)

In [None]:
train_data_loader, val_data_loader = get_data_loader(data_config,train_data=True, val_data=True, test_data=False)

### Logger

In [None]:
logger = Logger('deneme')

In [None]:
logger.info(f'hello_{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}')

In [None]:
logger.warning(message='hello world')

In [None]:
logger.info(message='hello world')

In [None]:
logger.warning(message='hello world')

In [None]:
logger.logger.info(msg='hello world')

In [None]:
logger.logger.handlers

In [None]:
from tf_seg.models import resunet_pp

### Model

In [None]:
resunet_pp.ResUnetPlusPlus().build_model()

In [None]:
model = resunet_pp.ResUnetPlusPlus().build_model()


In [None]:
model.summary()


In [None]:
model._attention_block(np.ones((1, 64, 64, 3)), np.ones((1, 32, 32, 15)))


In [None]:
model2 = resunet_pp.ResUnetPlusPlus()

In [None]:
model2.build_model()

### Layer

In [None]:
class ResidualLayer(Layer):
    def __init__(self,n_filter,activation='relu',kernel_size=3,name="residual",**kwargs):
        super().__init__(name=name)
        assert n_filter is not None ,'n_filter must be specified'

        self.activation_name = activation
        self.n_filter = n_filter
        self.kernel_size = kernel_size

       

    def build(self,input_shape):
        self.activation = keras.layers.Activation(self.activation_name)
        
        self.conv1 = Conv2D(filters=self.n_filter, kernel_size=self.kernel_size, padding="same",kernel_initializer=initializers.Ones())
        self.batch_norm1 = keras.layers.BatchNormalization()
        
        self.conv2 = Conv2D(filters=self.n_filter, kernel_size=self.kernel_size,padding="same",kernel_initializer=initializers.Ones())
        self.batch_norm2 = keras.layers.BatchNormalization()
        
        self.conv_skip = Conv2D(filters=self.n_filter, kernel_size=(1, 1), padding="same",kernel_initializer=initializers.Ones())
        self.batch_norm_skip = keras.layers.BatchNormalization()
        
        self.add = keras.layers.Add()


    def call(self,inputs):
        
        x = self.batch_norm1(inputs)
        x = self.activation(x)
        x = self.conv1(x)

        x = self.batch_norm2(x)
        x = self.activation(x)
        x = self.conv2(x)
        
        x_i = self.conv_skip(inputs)
        x_i = self.batch_norm_skip(x_i)
        x = self.add([x, x_i])
        
        return x
        
        

In [None]:
residual_layer = ResidualLayer(n_filter=1, kernel_size=1)


In [None]:
i = keras.Input((1, 64, 64, 3))


In [None]:
o = residual_layer(i)
o


In [None]:
model = Model(inputs=i, outputs=o)


In [None]:
model.summary()


In [None]:
ResidualLayer(np.ones((1, 2, 2, 3)))


### Losses

**Dice Loss**

In [None]:
from tf_seg.losses import dice_coef, dice_loss, DiceLoss


In [None]:
t = np.array([[1, 0, 1, 0, 0]], dtype=float)
p = np.array([[1, 0, 1, 0, 1], [1, 0, 1, 0, 0]], dtype=float)
i = 100000
p = np.random.choice(2, size=(i)).astype(float)
t = np.random.choice(2, size=(i)).astype(float)


In [None]:
dice_loss = DiceLoss()


In [None]:
dice_loss(t, p)


### Metrics

In [None]:
%load_ext autoreload
%autoreload 2
from tensorflow.keras.metrics import BinaryAccuracy,binary_accuracy
from tensorflow.keras.metrics import MeanMetricWrapper
from tensorflow.keras.metrics import MeanIoU,Precision

import numpy as np

In [None]:
from tf_seg.metrics import DiceScore
from tf_seg.metrics import MeanMetricWrapper


In [None]:
dice_score = DiceScore(name="dice_score_spece")


In [None]:
t = np.array([[1, 0, 1, 0, 0]], dtype=float)
p = np.array([[1, 0, 1, 0, 1], [1, 0, 1, 0, 0]], dtype=float)
i = 100000
p = np.random.choice(2, size=(i)).astype(float)
t = np.random.choice(2, size=(i)).astype(float)


In [None]:
iou_score = MeanIoU(num_classes=2, name="mean_iou")


In [None]:
iou_score(t, p)


In [None]:
dice_score(t, p)


In [None]:
dice_score.result()


In [None]:
dice_score.get_weights()


### Data 

In [None]:
%load_ext autoreload
%autoreload 2
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.utils import image_dataset_from_directory
import tensorflow as tf
#from tensorflow_addons.image import rotate
from glob import glob
import matplotlib.pyplot as plt
import albumentations as A


from tf_seg.data import DataLoader
from tf_seg.data import get_camvid_data_loader

import time

In [None]:
# train_image_paths = glob("./dataset_v1.0/train_image/*.jpg")
# train_annot_paths = glob("./dataset_v1.0/train_annot/*.jpg")

# camvid
train_image_paths = sorted(glob("./dataset/camvid/train/*.png"))
train_annot_paths = sorted(glob("./dataset/camvid/train_labels/*.png"))

assert len(train_image_paths) > 0 and len(train_annot_paths) > 0, f"No training images found  train image length {len(train_image_paths)} train annot length {len(train_annot_paths)}"


In [None]:
train_annot_paths[:3], train_image_paths[:3]


In [None]:
import os
import urllib
import zipfile

In [None]:
url = "https://storage.googleapis.com/kaggle-data-sets/635428/1132317/bundle/archive.zip?X-Goog-Algorithm=GOOG4-RSA-SHA256&X-Goog-Credential=gcp-kaggle-com%40kaggle-161607.iam.gserviceaccount.com%2F20221006%2Fauto%2Fstorage%2Fgoog4_request&X-Goog-Date=20221006T152140Z&X-Goog-Expires=259200&X-Goog-SignedHeaders=host&X-Goog-Signature=2c7e207916903eafc1e58493d3b54304b271a1e6699796219be1de5e7b4f919da977d244432b3b1b0c4539e3c732de920f572028bbb923cae8b12a87f03acc6d4dbf0d6778dd62acaf4a54e39112f1c573316666128591073b34756c441cbc95244ec86df3c69f5ab0eee10ea744776a95f66a2e4e0e6ed01050529c3baa48886fbdbd1495e6b9f392adb9ea1f5d02ed5b58bacf73d8055f4795c914766a8ee85615a575e73fd628baddab50b7b205e8eba31ec5ab6a91322b818ed408bb09b55ad384098c58f988c4c385106793889ebb1bfc1fca234c2caecf35f5b7861f650c2d14ec90ebd7f49c9a2d300b40fa95ba6ce552fcf2eb874b164934ec0c15c4"

In [None]:
# urllib.request.urlretrieve(url, filename="deneme.zip")

In [None]:
def download_data(data_name: str):
    """ Downloads the specified dataset.
    Returns
    -------
    """
    downloader = getattr(dataset, f"download_{data_name}_dataset")
    downloader()
    # os.remove("deneme.zip")

In [None]:
from tf_seg.data.dataset import download_data

In [None]:
download_data("camvid")

In [None]:
# Road,128, 64, 128
# RoadShoulder,128, 128, 192
# Sidewalk,0, 0, 192
# SignSymbol,192, 128, 128
# Sky,128, 128, 128

CLASSES = ["Road", "Sky"]
palette = [(128, 64, 128), (128, 128, 128)]


In [None]:
#data_loader = DataLoader(train_image_paths, train_annot_paths, image_size=(512, 512), output_type=(tf.float32, tf.float32),one_hot_encoding=True,channels = (3,3),palette=palette,background_adding=True)

#dataset = data_loader.load_data(batch_size=12,shuffle=True)

In [None]:
# for (i,m)  in dataset.take(3):
#     #plt.imshow(i.numpy().astype(int))
#     plt.subplot(1,2,1)
#     plt.imshow(tf.cast(i,tf.uint8))
#     plt.subplot(1,2,2)
#     plt.imshow(m)
#     plt.show()


#### Tensorflow Dataset

In [None]:
import tensorflow as tf

In [None]:
import tensorflow_datasets as tfds

In [None]:
tfds.load("oxford_iiit_pet:3.*.*")

#### albumentations

In [None]:
IM_SIZE = 512
BATCH_SIZE = 12

transforms = A.Compose(
    [
        A.Resize(IM_SIZE, IM_SIZE),
        A.OneOf(
            [
                A.HorizontalFlip(),
                A.VerticalFlip(),
            ],
            p=0.3,
        ),
        A.RandomRotate90(),
        # A.RandomGridShuffle(grid=(3, 3), always_apply=False, p=0.5),
        A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, always_apply=False, p=0.5),
        # A.Cutout(num_holes=8, max_h_size=8, max_w_size=8, fill_value=0, always_apply=False, p=0.5),
        A.Sharpen(alpha=(0.2, 0.5), lightness=(0.5, 1.0), always_apply=False, p=0.5),
    ]
)


In [None]:
import albumentations as A


In [None]:
A.RandomBrightnessContrast??

In [None]:
def aug_albument(image, mask):
    data = {"image": image, "mask": mask}
    data = transforms(**data)
    image = data["image"]
    mask = data["mask"]
    # image = tf.cast(image/255., tf.float32)
    return image, mask


In [None]:
@tf.function
def process_data(image, mask):
    aug_img, aug_mask = tf.numpy_function(func=aug_albument, inp=[image, mask], Tout=[tf.uint8, tf.uint8])
    return aug_img, aug_mask


In [None]:
DataLoader?

In [None]:
data_loader = DataLoader(
    train_image_paths, train_annot_paths, image_size=(512, 512), batch_size=1,output_type=(tf.uint8, tf.float32), one_hot_encoding=True, channels=(3, 3), palette=palette, background_adding=True
)


In [None]:
train_dataset = data_loader.load_data(batch_size=20, shuffle=True, transform_func=process_data)


In [None]:
def t():
    for (i, m) in train_dataset.take(100):
        pass
        # print(i.shape)


In [None]:
ti = time.time()
t()
print(time.time() - ti)


In [None]:
ti = time.time()
t()
print(time.time() - ti)


In [None]:
%%timeit
train_dataset.take(1)

In [None]:
i, m = train_dataset.take(1)

#### Dataset

In [None]:
import requests

**Camvid**

In [None]:
import os
import sys
import tarfile
import time


source = "http://web4.cs.ucl.ac.uk/staff/g.brostow/MotionSegRecData/files/701_StillsRaw_full.zip"
target = '701_StillsRaw_full.zip'


In [None]:
def reporthook(count, block_size, total_size):
    global start_time
    if count == 0:
        start_time = time.time()
        return
    duration = time.time() - start_time
    progress_size = int(count * block_size)
    speed = progress_size / (1024.**2 * duration)
    percent = count * block_size * 100. / total_size
    sys.stdout.write("\r%d%% | %d MB | %.2f MB/s | %d sec elapsed" %
                    (percent, progress_size / (1024.**2), speed, duration))
    sys.stdout.flush()


if not os.path.isdir('701_StillsRaw') and not os.path.isfile('701_StillsRaw_full.zip'):
    
    if (sys.version_info < (3, 0)):
        import urllib
        urllib.urlretrieve(source, target, reporthook)
    
    else:
        import urllib.request
        urllib.request.urlretrieve(source, target, reporthook)

In [None]:
urllib.request.urlretrieve(source, target,lambda c,b,t: print(t))

In [None]:
from tf_seg.data.dataset import download_camvid_dataset

In [None]:
download_camvid_dataset()

### Aug

In [None]:
%load_ext autoreload
%autoreload 2


import tensorflow as tf

import numpy as np
import matplotlib.pyplot as plt

#from tensorflow.keras.utils import image_dataset_from_directory
#from tensorflow.keras.preprocessing.image import load_img, img_to_array
#from tensorflow_addons.image import rotate
#from glob import glob
#from tf_seg.data import DataLoader

from tf_seg.data.aug import jax_random_crop

In [None]:
image_path = "dataset/camvid/train/0001TP_009210.png"
mask_path = "dataset/camvid/train_labels/0001TP_009210_L.png"


In [None]:
image_content = tf.io.read_file(image_path)
mask_content = tf.io.read_file(mask_path)

image = tf.image.decode_png(image_content, channels=3)
mask = tf.image.decode_png(mask_content, channels=3)


In [None]:
i, m = image.numpy(), mask.numpy()


In [None]:
%%timeit
jax_random_crop(i,m)

In [None]:
plt.figure(figsize=(10, 10))
plt.subplot(1, 2, 1)
plt.imshow(image)
plt.subplot(1, 2, 2)
plt.imshow(mask)


### Config

In [None]:
%load_ext autoreload
%autoreload 2


import tensorflow as tf

import numpy as np
import matplotlib.pyplot as plt

from pathlib import Path
from typing import List, Optional, Union
from warnings import warn

import os 
from omegaconf import DictConfig, ListConfig, OmegaConf

from tf_seg.config import get_config
from tf_seg.models import get_model_builder

from tf_seg.data import get_camvid_data_loader
from tf_seg.data import get_data_loader    

#from tf_seg.config import CONFIG_LOAD_STYLE_LIB,CONFIG_FILE_EXTENSION,CONFIG_STORE_PATH
#from tf_seq.config.

In [None]:
# load config function

# parameter
config_name = "test"
config_path = None

# constant parameters get from constant.py
# CONFIG_STORE_PATH = "./config"
# CONFIG_FILE_EXTENSION = ".yaml"


In [None]:
config = get_config(config_name, config_path)

#### get_model function


In [None]:
model_builder = get_model_builder(config)

In [None]:
from tf_seg.models import model_lib

In [None]:
# necessary imports
from tensorflow.keras.models import Model
from omegaconf import DictConfig, ListConfig


def pascal_case_to_snake_case(s:str)->str:
    """Convert class name to snake case name."""
    return "".join(["_"+i.lower() if i.isupper() else i for i in s]).lstrip("_")

In [None]:
def get_model_builder(config:Union[DictConfig, ListConfig])->Model:
    """Get keras model from config file"""
    class_name = pascal_case_to_snake_case(config.model.class_name)
    model = model_lib[class_name]

    model_config = config.model.copy()    
    model_config.pop("class_name")
    
    return model(**model_config) 

#### get_data function

In [None]:
loaders =get_data_loader(config,train_data=True,val_data=True,test_data=False)

In [None]:
dataset = loaders[0].load_data(batch_size=1)

In [None]:
for i in dataset.take(1):
    plt.imshow(tf.squeeze(i[1]))
    break

#### get_aug function

In [None]:
from tf_seg.transformers import get_transformer
from tf_seg.utils import AlbumentatiosWrapper
import importlib

In [None]:
config = get_config(config_name, config_path)
aug_config = config.augmentation
aug_config

In [None]:
lib = get_transformer(config)

In [None]:
aug_config = config.augmentation

In [None]:
lib = get_transformer(config)
lib

#### get_config function

In [None]:
def get_config(
    config_filename: Optional[str] = None, config_path: Optional[Union[Path, str]] = None, config_file_extension: Optional[str] = CONFIG_FILE_EXTENSION, config_store_path: "str" = CONFIG_STORE_PATH
) -> Union[DictConfig, ListConfig]:

    """
    Get configurable parameters from config file.

    Parameters
    ----------
    config_filename : str, optional
        Name of the config file. The default is None.
    config_path : Union[Path, str], optional
        Path of the config file. The default is None.
    config_file_extension : str, optional
        File extension of the config file. The default is ".yaml".
    config_store_path : str
        Path of the config store. The default is "./config".

    Returns
    -------
    config : Union[DictConfig, ListConfig]
        Configurable parameters.

    """
    assert os.path.isdir(config_store_path), f"{config_store_path} is not a directory"

    if config_path is None:
        config_path = Path(f"{config_store_path}/{config_name}{config_file_extension}")

    config = OmegaConf.load(config_path)

    return config


In [None]:
config = get_config(config_name)
config


In [None]:
def check_base_config_exist(config: Union[DictConfig, ListConfig]) -> bool:
    """check if base config exist in config file"""

    if "base" in config.keys():
        if config["base"] is None:
            return False
        else:
            return True
    else:
        return False


def load_yaml_style_config(yaml_path: str) -> Union[DictConfig, ListConfig]:
    """load config from yaml file"""
    assert os.path.isfile(yaml_path), f"{yaml_path} is not a file"
    config = OmegaConf.load(yaml_path)
    return config


def load_module_style_config(module_path: str) -> Union[DictConfig, ListConfig]:
    """load config from python module"""
    raise NotImplementedError("load_mdule_style_config is not implemented")


def load_base_config(config):
    """load base config from base parameter"""

    # find file extension
    load_style = os.path.splitext(config["base"])[1]

    print("load style :", load_style)

    if load_style == ".py":  # module style
        raise NotImplementedError("load_style is not implemented")

    elif load_style == ".yaml" or load_style == ".yml":
        return load_yaml_style_config(config["base"])

    else:
        raise ValueError("load_style is not valid")


In [None]:
def extact_config(config: Union[DictConfig, ListConfig]) -> Union[DictConfig, ListConfig]:
    """"Extract parent base config and merge with child configs"""

    first_config_keys = list(config.keys())
    buffer_config_dict = {}
    f_config = config.copy()

    for first_k in first_config_keys:

        sub_config = f_config[first_k]

        if check_base_config_exist(sub_config):
            buffer_config = load_base_config(sub_config)
            sub_config.pop("base")
            buffer_config.merge_with(sub_config)
            sub_config = buffer_config

        else:
            if "base" in sub_config.keys():
                sub_config.pop("base")

        f_config[first_k] = sub_config

    return f_config


In [None]:
f_config["augmentation"]

In [None]:
sub_config

In [None]:
buffer_config

In [None]:
sub_config.pop("base")

In [None]:
for b_k in b.keys():
    


In [None]:
b.merge_with(sub_config)

### Backbone

In [None]:
%load_ext autoreload
%autoreload 2


import sys
import numpy as np

import tensorflow as tf
from tensorflow import keras


# from tensorflow.keras import initializers
# from tensorflow.keras.layers import Conv2D,Conv1D
# from tensorflow.keras.layers import BatchNormalization
# from tensorflow.keras.layers import Layer
# from tensorflow.keras.losses import Loss
# from tensorflow.keras import Model
# from tensorflow.keras import initializers

from tf_seg.models import Unet,ResUnet
from tf_seg.backbones import get_backbone 
from tensorflow.keras.layers import Input
from tensorflow.python.keras.engine import keras_tensor
from tensorflow.keras.layers import Conv2D,Conv2DTranspose,Concatenate
from tensorflow.keras.models import Model


**Done Work**

#### Unet

In [None]:
backbone = None #"EfficientNetB1"#"ResNet50"

n_filters = [8, 16, 32, 64, 128,256]#, 512, 1024]
model=Unet(backbone=backbone,n_filters=n_filters,input_shape=(256, 256,3)).build_model()

out = model(np.ones((1,256,256,3)))
keras.utils.plot_model(model,show_shapes=True)

#### ResUnet

##### Workspace

In [None]:

sys.path.append("appendix/keras-unet-collection")

In [None]:
from keras_unet_collection import models

In [None]:
model = models.unet_2d((1024, 1024, 3), [16, 32,312], n_labels=2,backbone="EfficientNetB1")
#model = models.unet_2d((1024, 1024, 3), [64, 128, 256, 512, 1024], n_labels=2)

In [None]:
keras.utils.plot_model(model,show_shapes=True)

In [None]:
input_tensor = Input((1024,1024,3))
#backbone = get_backbone("ResNet50","imagenet",input_tensor,depth=4,freeze_backbone=True,freeze_batch_norm=False)
backbone = get_backbone("ResNet50V2","imagenet",input_tensor,depth=5,freeze_backbone=True,freeze_batch_norm=False)
backbone(input_tensor) 

In [None]:
#model= Unet(backbone="ResNet50",n_filters=[16,32,64]).build_model()
model= Unet(backbone=None,n_filters=[32,64,128,256,512,1024]).build_model()


connection_list = model[0]
decode_n_filters= model[1]
inputs = model[2]
bridge = model[3]

final_activation = "relu"
n_classes = 3

In [None]:
backbone = "ResNet50"
n_filters = [32,64,128,256,512]
#encoder_output= Unet(backbone=None,n_filters=[32,64,128,256,512]).build_model()
encoder_output= Unet(backbone=backbone,n_filters=n_filters).build_model()

inputs = encoder_output[0]
bridge = encoder_output[-1]
connection_list = encoder_output[1:-1][::-1]
inputs,bridge,connection_list

In [None]:
# if backbone is None:
#     decoder_n_filters = n_filters[:-1][::-1]
decoder_n_filters = n_filters[:-1][::-1]
decoder_n_filters

In [None]:
d = bridge

for n,c in enumerate(connection_list):

    print("connection :",c.shape,n)
    d=Conv2DTranspose(decoder_n_filters[n], (3, 3), padding="same", strides=(2, 2))(d)
    print(d.shape)
    d = Concatenate()([d,c])
    print("after concat :",d.shape)
    d = Unet()._conv_block(d,decoder_n_filters[n],pool=False)

print("last shape :",d.shape)

In [None]:
depth = len(connection_list)

if len(decoder_n_filters)>depth:
    n_remain_block = len(decoder_n_filters) - depth
    print(decoder_n_filters[-1*n_remain_block:])

    remain_decoder_n_filter=decoder_n_filters[-1*n_remain_block:]
    for fltr in remain_decoder_n_filter:
        print(fltr)
        d=Conv2DTranspose(fltr, (3, 3), padding="same", strides=(2, 2))(d)
        d = Unet()._conv_block(d,fltr,pool=False)
    
print("last shape :", d.shape)


In [None]:
# if not backbone is None:
# not bigger n_filters 6
#  not smaller n_filters 3

### Trainer

### Callbacks

```python
class CustomCallback(keras.callbacks.Callback):
    def on_train_begin(self, logs=None):
        keys = list(logs.keys())
        print("Starting training; got log keys: {}".format(keys))

    def on_train_end(self, logs=None):
        keys = list(logs.keys())
        print("Stop training; got log keys: {}".format(keys))

    def on_epoch_begin(self, epoch, logs=None):
        keys = list(logs.keys())
        print("Start epoch {} of training; got log keys: {}".format(epoch, keys))

    def on_epoch_end(self, epoch, logs=None):
        keys = list(logs.keys())
        print("End epoch {} of training; got log keys: {}".format(epoch, keys))

    def on_test_begin(self, logs=None):
        keys = list(logs.keys())
        print("Start testing; got log keys: {}".format(keys))

    def on_test_end(self, logs=None):
        keys = list(logs.keys())
        print("Stop testing; got log keys: {}".format(keys))

    def on_predict_begin(self, logs=None):
        keys = list(logs.keys())
        print("Start predicting; got log keys: {}".format(keys))

    def on_predict_end(self, logs=None):
        keys = list(logs.keys())
        print("Stop predicting; got log keys: {}".format(keys))

    def on_train_batch_begin(self, batch, logs=None):
        keys = list(logs.keys())
        print("...Training: start of batch {}; got log keys: {}".format(batch, keys))

    def on_train_batch_end(self, batch, logs=None):
        keys = list(logs.keys())
        print("...Training: end of batch {}; got log keys: {}".format(batch, keys))

    def on_test_batch_begin(self, batch, logs=None):
        keys = list(logs.keys())
        print("...Evaluating: start of batch {}; got log keys: {}".format(batch, keys))

    def on_test_batch_end(self, batch, logs=None):
        keys = list(logs.keys())
        print("...Evaluating: end of batch {}; got log keys: {}".format(batch, keys))

    def on_predict_batch_begin(self, batch, logs=None):
        keys = list(logs.keys())
        print("...Predicting: start of batch {}; got log keys: {}".format(batch, keys))

    def on_predict_batch_end(self, batch, logs=None):
        keys = list(logs.keys())
        print("...Predicting: end of batch {}; got log keys: {}".format(batch, keys))


```

In [None]:
%load_ext autoreload 
%autoreload 2
import time

import tensorflow as tf
from tensorflow import keras

import numpy as np

from tensorflow.keras.callbacks import LearningRateScheduler, ReduceLROnPlateau,Callback
from tensorflow.keras.optimizers.schedules import PolynomialDecay,PiecewiseConstantDecay
from tf_seg.utils import  snake_case_to_pascal_case
from tf_seg.callbacks import get_callbacks



In [None]:
tf.random.set_seed(42)
np.random.seed(42)

In [None]:
# parameter
config_name = "test"
config_path = None
config = get_config(config_name, config_path)

In [None]:
config.callbacks

In [None]:
model.set_weights??

In [None]:
get_callbacks(config.callbacks)

In [None]:
config.callbacks

In [None]:
a = {"a":12}
b = {"b":12}

In [None]:
np.less(1,2)

In [None]:
c_config  = config.callbacks

In [None]:
for i in c_config.keys():
    print(snake_case_to_pascal_case(i))

In [None]:
size = 100
x1 = np.random.randint(1,10,(size,10,1))
y1 = x1.max(axis=1) #+ np.random.rand(size)
x1 = np.concatenate((x1,np.ones((size,1,1))),axis=1)


x2 = np.random.randint(1,10,(size,10,1))
y2 = x2.min(axis=1) #+ np.random.rand(size)
x2 = np.concatenate((x2,np.zeros((size,1,1))),axis=1)

x = np.concatenate((x1,x2))
y = np.concatenate((y1,y2))

In [None]:
def get_model(input_dim):
    model = keras.Sequential()
    model.add(keras.layers.Dense(1, input_dim=input_dim,use_bias=False))
    model.compile(
        optimizer=keras.optimizers.Nadam(learning_rate=0.001),
        loss=tf.keras.losses.mae,
        metrics=["mae"],
    )
    return model


model = get_model(input_dim=11)

In [None]:
# class CustomCallbacks(keras.callbacks.Callback):
#     def __init__(self):
#         super().__init__()
#         self.epoch_level = 10
#     def on_train_begin(self,logs=None):
#         print(self.model)
#         #print(logs.keys())
            
#     def on_epoch_end(self,epoch,logs=None):
#         #print("training epoch end:",logs.keys())
#         print(self.dataset)
#         if epoch% self.epoch_level==0:
#             print(f"Epoch : {epoch} , loss : {logs}")
#     # def on_batch_end(self,batch,logs=None):
#     #     print("     => training batch end:",logs.keys())
class MeasureTotalTime(tf.keras.callbacks.Callback):
    def on_train_begin(self, logs=None):
        print(self.model)
        self.start_time = time.time()

    def on_train_end(self, logs=None):
        self.total_time = time.time() - self.start_time
        print("Total training time: %s" % self.total_time)


class UpdateBestWeights(Callback):
    def __init__(self, metric_name: str, mode:str):
        """
        Update best weights on end of training

        Parameters
        ----------
        metric_name : str
            Name of the metric to use for determining the best weights
        mode : str
            One of {min, max}
        """
        
        super().__init__()
       
        self.metric_name = metric_name
        self.best_weights = None
        self.best_metric = None

        if mode == 'max':
            self.monitor_op = np.greater
            self.best_metric = -np.Inf
        elif mode == 'min':
            self.monitor_op = np.less
            self.best_metric = np.Inf
        else:
            raise ValueError('Mode {} not understood'.format(mode))
        
    def on_epoch_end(self, epoch, logs=None):
        metric = logs[self.metric_name]
        if self.monitor_op(metric, self.best_metric):
            self.best_metric = metric
            self.best_weights = self.model.get_weights()

    def on_train_end(self, logs=None):       
        self.model.set_weights(self.best_weights)
     


update_model = UpdateBestWeights(metric_name='val_loss',mode='min')
tl_tm = MeasureTotalTime() 
lr = LearningRateScheduler(PolynomialDecay(0.9,100,power=1),verbose=1)

In [None]:
model.fit(x, y,validation_data=(x,y), epochs=10,batch_size=120,callbacks=[tl_tm,update_model],verbose=1)

In [None]:
tf.keras.losses.mae(y.reshape(-1), model.predict(x).reshape(-1))

In [None]:
model.evaluate(x,y,batch_size=32)

### Deploy