<a href="https://colab.research.google.com/github/osv982/Get_the_diff_between_images/blob/main/Precision_ac_03_04_2023.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# U-Net Image Segmentation


## Setup


In [36]:
import os
import numpy as np
import cv2
from glob import glob
import tensorflow as tf
from sklearn.model_selection import train_test_split
from keras.layers import *
from keras.models import Model
from tqdm import tqdm
from keras.utils import CustomObjectScope
from datetime import datetime
from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, CSVLogger, TensorBoard

## Data

In [89]:
def load_data(path, split=0.2):
    ## 60-20-20
    images = sorted(glob(os.path.join(path, "images/*")))
    masks = sorted(glob(os.path.join(path, "masks/*")))

    total_size = len(images)
    # valid_size = int(split * total_size)
    # test_size = int(split * total_size)

    valid_size = int(0.0002* total_size)
    test_size = int(0.9998* total_size)

    print(f'total_size {total_size} valid_size {valid_size} test_size {test_size}')
    # y - маска x - изображение
    train_x, valid_x = train_test_split(images, test_size=valid_size, random_state=42)
    train_y, valid_y = train_test_split(masks, test_size=valid_size, random_state=42)

    train_x, test_x = train_test_split(train_x, test_size=test_size, random_state=42)
    train_y, test_y = train_test_split(train_y, test_size=test_size, random_state=42)

    return (train_x, train_y), (valid_x, valid_y), (test_x, test_y)

def read_image(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    # x = cv2.resize(x, (256, 256))
    x = x/255.0
    return x

def read_mask(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = x/255.0
    return x

def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y
    x, y = tf.numpy_function(_parse, [x, y], [tf.float64, tf.float64])
    x.set_shape([128, 128, 3])
    y.set_shape([128, 128, 3])
    return x, y

def tf_dataset(x, y, batch=8):
    dataset = tf.data.Dataset.from_tensor_slices((x, y))
    dataset = dataset.map(tf_parse)
    dataset = dataset.batch(batch)
    dataset = dataset.repeat()
    return dataset


In [90]:
    path = 'drive/MyDrive/dataset_image/CVC-612'
    (train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_data(path)
    print(len(train_x), len(valid_x), len(test_x))

    ds = tf_dataset(test_x,test_y)
    for x,y in ds:
        print(x.shape, y.shape)
        break

total_size 6084 valid_size 1 test_size 6082
1 1 6082
(8, 128, 128, 3) (8, 128, 128, 3)


## Model

In [91]:
def conv_block(x, num_filters):
    x = Conv2D(num_filters, (3, 3), padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(num_filters, (3, 3), padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    return x

def build_model():
    size = 128
    num_filters = [8, 16, 32, 48]
    inputs = Input((size, size, 3))

    skip_x = []
    x = inputs
    ## Encoder
    for f in num_filters:
        x = conv_block(x, f)
        skip_x.append(x)
        x = MaxPool2D((2, 2))(x)

    ## Bridge
    x = conv_block(x, num_filters[-1])

    num_filters.reverse()
    skip_x.reverse()
    ## Decoder
    for i, f in enumerate(num_filters):
        x = UpSampling2D((2, 2))(x)
        xs = skip_x[i]
        x = Concatenate()([x, xs])
        x = conv_block(x, f)

    ## Output
    x = Conv2D(3, (1, 1), padding="same")(x)
    x = Activation("softmax")(x)
    # было изменено с sigmoid на softmax

    return Model(inputs, x)

In [None]:
model = build_model()
model.summary()
# for i in dir(model):
#   print(i,'\n')
# model.losses


## Train

In [93]:
def iou(y_true, y_pred):
    def f(y_true, y_pred):
        intersection = (y_true * y_pred).sum()
        union = y_true.sum() + y_pred.sum() - intersection
        x = (intersection + 1e-15) / (union + 1e-15)
        x = x.astype(np.float32)
        return x
    return tf.numpy_function(f, [y_true, y_pred], tf.float32)

In [None]:
path = 'drive/MyDrive/dataset_image/CVC-612'

(train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_data(path)
print(len(train_x), len(valid_x), len(test_x))

## Hyperparameters
batch = 8
lr = 1e-4
epochs = 20

train_dataset = tf_dataset(train_x, train_y, batch=batch)
valid_dataset = tf_dataset(valid_x, valid_y, batch=batch)

print(valid_dataset)

model = build_model()

opt = tf.keras.optimizers.Adam(lr)
# metrics = ["acc", tf.keras.metrics.Recall(), tf.keras.metrics.Precision(), iou]
# model.compile(loss="sparse_categorical_crossentropy", optimizer=opt, metrics=metrics)

loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics = ['accuracy']
model.compile(loss="categorical_crossentropy", optimizer=opt, metrics=metrics)

# log_dir = "out_log"

callbacks = [
    ModelCheckpoint("files/model.h5"),
    ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=4),
    CSVLogger("files/data.csv"),
    # TensorBoard(log_dir=log_dir, histogram_freq=1),
    # TensorBoard(log_dir=log_dir, histogram_freq=1),
    EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=False)
]

train_steps = len(train_x)//batch
valid_steps = len(valid_x)//batch

if len(train_x) % batch != 0:
    train_steps += 1
if len(valid_x) % batch != 0:
    valid_steps += 1

model.fit(train_dataset,
    validation_data=valid_dataset,
    epochs=epochs,
    steps_per_epoch=train_steps,
    validation_steps=valid_steps,
    callbacks=callbacks,
    shuffle = False)

## Predict

In [94]:
def read_image(path):
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = x/255.0
    return x

def read_mask(path):
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = x / 255.0
    return x

def mask_parse(mask):
    # print(f'mask 1_1 shape{mask.shape}\n{mask}')
    mask = np.squeeze(mask)
    # print(f'mask 1_2 shape{mask.shape}\n{mask}')
    # mask = [mask, mask, mask]
    # print(f'mask 1_3 {mask}')
    # mask = np.transpose(mask, (1, 2, 0))
    # print(f'mask 1_4 {mask}')
    return mask

In [None]:
## Dataset
path = 'drive/MyDrive/dataset_image/CVC-612'
path2model = 'drive/MyDrive/dataset_image/model.h5'
batch_size = 8
(train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_data(path)

test_dataset = tf_dataset(test_x, test_y, batch=batch_size)

test_steps = (len(test_x)//batch_size)
if len(test_x) % batch_size != 0:
    test_steps += 1

with CustomObjectScope({'iou': iou}):
    model = tf.keras.models.load_model(path2model)

# print(test_dataset)
# print('\n\n model losses\n\n',model.losses)
# model.evaluate(test_dataset, steps=test_steps)

for i, (x, y) in tqdm(enumerate(zip(test_x, test_y)), total=len(test_x)):
    name, exp = os.path.splitext(str(x))
    im_sz = name.split('_')[2:]
    im_sz = im_sz[0]+'_'+im_sz[1]

    # print(im_sz)
    # print(x)
    # print(y)
    x = read_image(x)
    y = read_mask(y)
    y_pred = model.predict(np.expand_dims(x, axis=0))
    h, w, _ = x.shape

    white_line = np.ones((h, 10, 3)) * 255.0

    # all_images = [
    #     x * 255.0, white_line,
    #     y * 255.0, white_line,
    #     mask_parse(y_pred) * 255.0
    # ]
    # image = np.concatenate(all_images, axis=1)
    image = mask_parse(y_pred) * 255.0

    res = cv2.imwrite(f"drive/MyDrive/dataset_image/CVC-612/result/{im_sz}.png", image)
 


In [99]:
import cv2

ext = '.png'

start_img = ''
main_img =''

w, h, c = (10000, 10000, 3)
d = 128

for i in range(0, h - h % d, d):
    for j in range(0, w - w % d, d):
        fname = os.path.join('drive/MyDrive/dataset_image/CVC-612/result', f'{i}_{j}{ext}')
        if os.path.isfile(fname):
            if j == 0:
                start_img = cv2.imread(fname)
            else:
                temp_img = cv2.imread(fname)
                start_img = cv2.hconcat([start_img, temp_img])
        else:
            print(f'File {fname} is not exist')
            empty_img = np.zeros([128, 128, 3], dtype=np.uint8)
            empty_img.fill(255)
            start_img = cv2.hconcat([start_img, empty_img])

    if i == 0:
        main_img = start_img
    else:
        main_img = cv2.vconcat([main_img, start_img])

filename = os.path.join('drive/MyDrive/dataset_image/CVC-612', f'sum_img_1{ext}')
cv2.imwrite(filename, main_img)

File drive/MyDrive/dataset_image/CVC-612/result/2432_4608.png is not exist
File drive/MyDrive/dataset_image/CVC-612/result/3840_2560.png is not exist


True