#Install and Import

In [None]:
!pip install keras-unet-collection

Collecting keras-unet-collection
  Downloading keras_unet_collection-0.1.13-py3-none-any.whl (67 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/67.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.9/67.9 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: keras-unet-collection
Successfully installed keras-unet-collection-0.1.13


In [None]:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
import os
import random
from IPython.display import Image, display

import cv2
from PIL import Image
from PIL import ImageOps

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.preprocessing import image
from tensorflow.keras.layers import Input, Conv2D, Dropout, Activation, UpSampling2D, GlobalMaxPooling2D, multiply
from tensorflow.keras.backend import max
from tensorflow.python.client import device_lib
from keras_unet_collection import models, base, utils

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


#Data Preprocessing

##Function

##Label 및 Index Dic

In [None]:
Index2Label = {0: 'Background',
               1: 'Head',
               2: 'Torso',
               3: 'Upper_Arms',
               4: 'Lower_Arms',
               5: 'Upper_Legs',
               6: 'Lower_Legs'}

Label2Index = {'Background': 0,
               'Head': 1,
               'Torso': 2,
               'Upper_Arms': 3,
               'Lower_Arms': 4,
               'Upper_Legs': 5,
               'Lower_Legs': 6}

# Label2Index = {v: k for k, v in Index2Label.items()}                                                                  m        m mi m3

 이미지 경로 생성
 -Input : 이미지 입력폴더
 -Output : 이미지 출력폴더

In [None]:
def create_images_path(folder_path):
    folder_path = folder_path #1
    imges = os.listdir(folder_path)
    images_path = [os.path.join(folder_path, img) for img in imges]
    return images_path

##시각화

In [None]:
def visualize_images(images_path, visualize=1, rotate=0):
    """
    이미지를 시각화하는 함수

    Args:
    images_path = Input
    #visualize = 1 = True , 2 = False
    #rotate = 회전각

    Returns:
    images_names = Output
    images_array = Output<c
    """
    # Output
    images_names = []
    images_array = []

    for img in tqdm(images_path, desc="Processing images"):
        image_path = img
        image_name = os.path.basename(image_path)

        image = Image.open(image_path)

        # 이미지 회전 메타데이터를 자동으로 해석하여 회전시킴
        image = image.rotate(rotate, expand=True)  # 0은 회전 각도, expand=True는 이미지 크기 조정 (0 / 270)

        images_names.append(image_name)
        image_array = np.array(image)
        images_array.append(image_array)

        # Visualize Images -------------------> Option(On/Off)
        if visualize:

            print("Size of Image:", image.size)
            print("Mode of Image:", image.mode) # (RGB/L/P)

            # print("Shape of Image (Numpy ):", )


            plt.imshow(image)
            plt.title(f'{image_name}')
            plt.axis('off')  # 축 제거
            plt.show()
    else:
        pass

    return images_names, images_array

In [None]:
def visualize_one_hot_encoded_image(one_hot_encoded_image, Index2Label=Index2Label):
    """
    원핫인코딩된 이미지를 시각화하고 클래스별로 따로 시각화하여 보여주는 함수

    Parameters:
    one_hot_encoded_image (np.ndarray): 원핫인코딩된 이미지

    Returns:
    None
    """
    # 이미지의 형태 출력
    print("Transformed Image Shape:", one_hot_encoded_image.shape)

    # 전체 이미지 시각화
    plt.figure(figsize=(8, 8))
    plt.imshow(one_hot_encoded_image.argmax(axis=2), cmap='viridis', interpolation='nearest')
    plt.title('Original Palette Image')
    plt.axis('off')
    plt.show()

    # 클래스별로 따로 시각화
    num_classes = one_hot_encoded_image.shape[2]
    for class_index in range(num_classes):
        class_image = one_hot_encoded_image[:, :, class_index]
        plt.figure(figsize=(8, 8))
        plt.imshow(class_image, cmap='gray')
        plt.title(f'Class: {class_index} {Index2Label[class_index]}')
        plt.axis('off')
        plt.show()

##Processing

In [None]:
def palette_to_one_hot(palette_image_path, num_classes=7, imge_info=True, visualize=True):
    """
    팔레트 이미지를 원핫인코딩하여 (2940, 1960, 7) 형태의 이미지로 변환하는 함수

    Parameters:
    palette_image_path (str): 세그멘테이션된 P 모드의 PIL 팔레트 이미지 경로

    Returns:
    np.ndarray: 원핫인코딩된 이미지
    """
    # 클래스 수 정의 (배경 포함 1개)
    num_classes = num_classes

    # PIL 2 Numpy
    palette_image = Image.open(palette_image_path)
    palette_image_array = np.array(palette_image)

    # 원핫인코딩된 이미지를 저장할 배열 생성
    one_hot_encoded_image = np.zeros((*palette_image_array.shape, num_classes), dtype=np.uint8)

    # 각 픽셀을 원핫인코딩된 형태로 변환
    for i in range(palette_image_array.shape[0]): # 2940
        for j in range(palette_image_array.shape[1]): # 1960
            pixel_value = palette_image_array[i, j]  # 현재 픽셀의 값
            one_hot_encoded_image[i, j, pixel_value] = 1  # 해당 클래스에 대응하는 채널에 1 할당

    if imge_info:
        # 이미지 이름 출력
        image_name = os.path.basename(palette_image_path)
        print("Image Name:", image_name)
        print("Original Image Shape:", palette_image_array.shape)
        print("Transformed Image Shape:", one_hot_encoded_image.shape)
        print()

    if visualize:
        visualize_one_hot_encoded_image(one_hot_encoded_image,  Index2Label=Index2Label)

    return one_hot_encoded_image

In [None]:
def save_images_pkl_with_one_hot_encoding(images_paths, output_folder_path, num_classes=7, imge_info=True, visualize=False):
    """
    이미지를 원핫인코딩하여 새로운 폴더에 저장하는 함수

    Parameters:
    images_paths (list): 이미지 파일 경로의 리스트
    output_folder_path (str): 이미지를 저장할 폴더 이름(현재 경로에 저장)
    num_classes (int): segmentation 세그멘테이션 클래스 개수(배경 포함)
    imge_info (bool): 이미지 정보 출력 토글(default: True)
    visualize (bool): 이미지 시각화 토글(default: False)

    Returns:
    None
    """
    # 새로운 폴더 생성
    os.makedirs(output_folder_path, exist_ok=True)

    # one_hot_encoded_images
    one_hot_encoded_images = []

    # 원핫 인코딩 수행
    for img_path in tqdm(images_paths, desc='Encoding images'):

        # PIL Pallete 2 Numpy
        one_hot_encoded_image = palette_to_one_hot(img_path, num_classes=num_classes, imge_info=imge_info, visualize=visualize)
        one_hot_encoded_images.append(one_hot_encoded_image)

    # 저장할 경로 설정
    save_path = os.path.join(output_folder_path, f"one_hot_encoded_images_array.pkl")

    # 이미지 데이터를 피클로 저장
    with open(save_path, 'wb') as f:
        pickle.dump(one_hot_encoded_images, f)

#Unet

##Unet(Origin), input shape(572x572)

In [None]:
def conv_block(inputs, filters, kernel_size=3):
    x = layers.Conv2D(filters, kernel_size, activation='relu')(inputs)
    x = layers.Conv2D(filters, kernel_size, activation='relu')(x)
    return x

def upconv_block(inputs, filters, kernel_size=2):
    x = layers.UpSampling2D(size=(2, 2))(inputs)
    x = layers.Conv2D(filters, kernel_size, activation='relu', padding='same')(x)
    return x

def unet(img_size, num_classes):
    inputs = keras.Input(shape=img_size + (3,))

    # Contracting Path
    c1 = conv_block(inputs, 64, kernel_size=3)
    p1 = layers.MaxPooling2D(pool_size=(2, 2))(c1)

    c2 = conv_block(p1, 128, kernel_size=3)
    p2 = layers.MaxPooling2D(pool_size=(2, 2))(c2)

    c3 = conv_block(p2, 256, kernel_size=3)
    p3 = layers.MaxPooling2D(pool_size=(2, 2))(c3)

    c4 = conv_block(p3, 512, kernel_size=3)
    p4 = layers.MaxPooling2D(pool_size=(2, 2))(c4)

    # Bottom
    b = conv_block(p4, 1024, kernel_size=3)

    # Expanding Path
    u1 = upconv_block(b, 512, kernel_size=2)
    c4_crop = layers.Cropping2D(cropping=((4, 4), (4, 4)))(c4)
    u1_concat = layers.Concatenate()([u1, c4_crop])
    c5 = conv_block(u1_concat, 512, kernel_size=3)

    u2 = upconv_block(c5, 256, kernel_size=2)
    c3_crop = layers.Cropping2D(cropping=((16, 16), (16, 16)))(c3)
    u2_concat = layers.Concatenate()([u2, c3_crop])
    c6 = conv_block(u2_concat, 256, kernel_size=3)

    u3 = upconv_block(c6, 128, kernel_size=2)
    c2_crop = layers.Cropping2D(cropping=((40, 40), (40, 40)))(c2)
    u3_concat = layers.Concatenate()([u3, c2_crop])
    c7 = conv_block(u3_concat, 128, kernel_size=3)

    u4 = upconv_block(c7, 64, kernel_size=2)
    c1_crop = layers.Cropping2D(cropping=((88, 88), (88, 88)))(c1)
    u4_concat = layers.Concatenate()([u4, c1_crop])
    c8 = conv_block(u4_concat, 64, kernel_size=3)

    outputs = layers.Conv2D(num_classes, 1, activation='softmax')(c8)

    return tf.keras.Model(inputs, outputs)

keras.backend.clear_session()

model = unet(img_size, num_classes)
model.summary()

##Unet(keras_unet_collection)

In [None]:
keras.backend.clear_session()

model = models.unet_2d((None, None, 3), [64, 128, 256, 512], n_labels=3,
                      stack_num_down=2, stack_num_up=2,
                      activation='ReLU', output_activation='Softmax',
                      batch_norm=True, pool='max', unpool='nearest', name='unet')
model.summary()

##Adaptive Unet

In [None]:
import tensorflow as tf
from tensorflow.keras import layers

def conv_block(inputs, filters, kernel_size=3):
    x = layers.Conv2D(filters, kernel_size, activation='relu', padding='same')(inputs)
    x = layers.Conv2D(filters, kernel_size, activation='relu', padding='same')(x)
    return x

def upconv_block(inputs, filters, kernel_size=2):
    x = layers.UpSampling2D(size=(2, 2))(inputs)
    x = layers.Conv2D(filters, kernel_size, activation='relu', padding='same')(x)
    return x

def unet(img_size, num_classes):
    inputs = tf.keras.Input(shape=img_size + (3,))

    # Contracting Path
    c1 = conv_block(inputs, 64, kernel_size=3)
    p1 = layers.MaxPooling2D(pool_size=(2, 2))(c1)

    c2 = conv_block(p1, 128, kernel_size=3)
    p2 = layers.MaxPooling2D(pool_size=(2, 2))(c2)

    c3 = conv_block(p2, 256, kernel_size=3)
    p3 = layers.MaxPooling2D(pool_size=(2, 2))(c3)

    c4 = conv_block(p3, 512, kernel_size=3)
    p4 = layers.MaxPooling2D(pool_size=(2, 2))(c4)

    # Bottom
    b = conv_block(p4, 1024, kernel_size=3)

    # Expanding Path
    u1 = upconv_block(b, 512, kernel_size=2)
    u1_concat = layers.Concatenate()([u1, c4])
    c5 = conv_block(u1_concat, 512, kernel_size=3)

    u2 = upconv_block(c5, 256, kernel_size=2)
    u2_concat = layers.Concatenate()([u2, c3])
    c6 = conv_block(u2_concat, 256, kernel_size=3)

    u3 = upconv_block(c6, 128, kernel_size=2)
    u3_concat = layers.Concatenate()([u3, c2])
    c7 = conv_block(u3_concat, 128, kernel_size=3)

    u4 = upconv_block(c7, 64, kernel_size=2)
    u4_concat = layers.Concatenate()([u4, c1])
    c8 = conv_block(u4_concat, 64, kernel_size=3)

    outputs = layers.Conv2D(num_classes, 1, activation='softmax')(c8)

    return tf.keras.Model(inputs, outputs)


In [None]:
# Defining input image size and number of classes
img_size = (512, 512)
num_classes = 7

##Build and Training

In [None]:
# Clearing previous sessions
tf.keras.backend.clear_session()

In [None]:
#Arguments
img_size = (512, 512)
num_classes = 7

In [None]:

model = unet(img_size, num_classes)

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
model.summary()

In [None]:
tf.keras.backend.clear_session()

model = unet(img_size, num_classes)
model.compile(optimizer="adam", loss="categorical_crossentropy")

with tf.device("/device:GPU:0"):
    history = model.fit(train_gen, epochs=10, batch_size=batch_size, callbacks=[early_stopping_callback, metrics_callback], validation_data=val_gen, verbose=1)

In [None]:
hist = model.fit(X, y)

#Deeplab

##DeeplabV3+ (BB : MobilenetV2)

In [None]:
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.models import Model

def deeplab_model(input_shape=(512, 512, 3), num_classes=21):
    # Input layer
    input_tensor = tf.keras.Input(shape=input_shape, name='input_image')

    # Feature backbone (MobileNetV2 or another backbone)
    backbone = tf.keras.applications.MobileNetV2(input_shape=input_shape, include_top=False, weights='imagenet', input_tensor=input_tensor)
    backbone.trainable = False  # Freeze the backbone weights

    # Atrous Spatial Pyramid Pooling (ASPP)
    x = backbone.output
    x = tf.keras.layers.Conv2D(256, (1, 1), activation='relu', padding='same', name='aspp_conv')(x)
    x = tf.keras.layers.Conv2D(256, (3, 3), activation='relu', padding='same', dilation_rate=(6, 6), name='aspp_dilated_conv1')(x)
    x = tf.keras.layers.Conv2D(256, (3, 3), activation='relu', padding='same', dilation_rate=(12, 12), name='aspp_dilated_conv2')(x)
    x = tf.keras.layers.Conv2D(256, (3, 3), activation='relu', padding='same', dilation_rate=(18, 18), name='aspp_dilated_conv3')(x)
    x = tf.keras.layers.Conv2D(256, (1, 1), activation='relu', padding='same', name='aspp_pooling')(x)
    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    x = tf.keras.layers.Reshape((1, 1, 256))(x)
    x = tf.keras.layers.UpSampling2D(size=(4, 4))(x)
    x = tf.keras.layers.Conv2D(256, (1, 1), activation='relu', padding='same', name='aspp_upsample')(x)

    # Skip connection from the backbone
    skip_connection = backbone.get_layer('block_6_expand_relu').output
    x = tf.keras.layers.Concatenate()([x, skip_connection])

    # Final convolutional layers
    x = tf.keras.layers.Conv2D(256, (3, 3), activation='relu', padding='same', name='final_conv1')(x)
    x = tf.keras.layers.Conv2D(256, (3, 3), activation='relu', padding='same', name='final_conv2')(x)

    # Output layer (1x1 convolution for segmentation)
    output_tensor = tf.keras.layers.Conv2D(num_classes, (1, 1), activation='softmax', name='output')(x)

    # Create the model
    model = Model(inputs=input_tensor, outputs=output_tensor)

    return model

# Instantiate the model
model = deeplab_model()

# Display the model summary
model.summary()

##Arguments

##Training

In [None]:
hist = model.fit(X, y)

#Callback

In [None]:
from tensorflow.keras.callbacks import EarlyStopping

early_stopping_callback = EarlyStopping(patience=3, monitor='val_loss')

In [None]:
from keras.callbacks import Callback

class MetricsCallback(Callback):
    def __init__(self, val_gen):
        self.val_gen = val_gen

    def on_epoch_end(self, epoch, logs=None):
        pixel_accuracy, mean_accuracy, mean_iou = evaluate(model, criterion, self.val_gen, device, num_classes=7)
        print(f'Epoch [{epoch + 1}], Validation Pixel Accuracy: {pixel_accuracy:.4f}')
        print(f'Epoch [{epoch + 1}], Validation Mean Accuracy: {mean_accuracy:.4f}')
        print(f'Epoch [{epoch + 1}], Validation mIoU: {mean_iou:.4f}')

metrics_callback = MetricsCallback(val_gen)

#Save!!!!!!

In [None]:
model.save('Unet.h5')

#Metric

##1.iou

In [None]:
def iou(y_true, y_pred, dtype=tf.float32):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)

    area_true = tf.reduce_sum(y_true_f)
    area_pred = tf.reduce_sum(y_pred_f)
    union = area_true + area_pred - intersection

    iou = tf.where(tf.equal(union, 0), 0.0, tf.math.divide(intersection, union))
    return iou.numpy()

def dice_coef(y_true, y_pred, smooth=1e-6):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    dice = (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)
    return dice.numpy()

##2.Dice coef

In [None]:
def dice_coef(y_true, y_pred, smooth=1e-6):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    dice = (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)
    return dice.numpy()

#Loss

##1. iou loss

In [None]:
#loss
def iou_loss(y_true, y_pred, smooth=1e-6):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)

    area_true = tf.reduce_sum(y_true_f)
    area_pred = tf.reduce_sum(y_pred_f)
    union = area_true + area_pred - intersection

    iou = tf.where(tf.equal(union, 0), 0.0, tf.math.divide(intersection, union))

    return 1 - iou.numpy()

##2. dice loss

$$diceloss = 1 - dicecoef$$

In [None]:
def dice_loss(y_true, y_pred, smooth=1e-6):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    dice = (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)
    return 1 - dice

##3. dice BCE loss

$$ dice BCE loss = BCE + diceloss$$

In [None]:
def dice_BCE_loss(y_true, y_pred, smooth=1e-6):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    dice = (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)
    dice_loss = 1 - (2*intersection + smooth) / (K.sum(y_true) + K.sum(y_pred) + smooth)

    BCE =  losses.binary_crossentropy(y_true_f, y_pred_f)

    Dice_BCE_loss = BCE + dice_loss

    return Dice_BCE_loss.numpy()


##4. focal loss
- 2017년, 극도로 불균형한 데이터 세트를 해결하기 위한 수단으로 도입
$$ Focal loss = BCE*(1-pred)^r$$

In [None]:
alpha = 0.8
gamma = 2

def focal_loss(targets, inputs, alpha=alpha, gamma=gamma):
    inputs = K.flatten(inputs)
    targets = K.flatten(targets)

    bce = K.binary_crossentropy(targets, inputs)
    bce_exp = K.exp(-bce)
    focal_loss = K.mean(alpha * K.pow((1-bce_exp), gamma) * bce)

    return focal_loss


##5. Tversky loss
- 서로 다른 유형의 오류가 얼마나 심하게 처벌되는지를 조정할 수 있는 상수를 활용하여 불균형 데이터셋에 대해 최적화하도록 설계
$$ 1 - TP/(TP + a*FP + b*FN) $$


In [None]:
alpha = 0.5
beta = 0.5

def tversky_loss(targets, inputs, alpha=alpha, beta=beta, smooth=1e-6):
        inputs = K.flatten(inputs)
        targets = K.flatten(targets)

        tp = K.sum((inputs * targets))
        fp = K.sum(((1-targets) * inputs))
        fn = K.sum((targets * (1-inputs)))

        tversky = (tp + smooth)/(tp + alpha*fp + beta*fn + smooth)

        return 1 - tversky

###출처:  
https://github.com/yingkaisha/keras-unet-collection/tree/d30f14a259656d2f26ea11ed978255d6a7d0ce37

https://www.kaggle.com/code/bigironsphere/loss-function-library-keras-pytorch/notebook
  
https://github.com/zhengyang-wang/3D-Unet--Tensorflow/blob/c5d603a69243a69dd6d89edefdf1ba249640450b/utils/HausdorffDistance.py#L8

#GPU(CUDA)

In [None]:
for dev in device_lib.list_local_devices(): # 사용가능한 device(CPU, GPU 등) 목록
    print(dev.device_type, dev.memory_limit)


NameError: name 'device_lib' is not defined

In [None]:
!pip install numba

from numba import cuda

dev = cuda.get_current_device(); dev.reset()