<a href="https://colab.research.google.com/github/ma850419/Fast_UNet/blob/main/Fast_UNet_wheat_winter.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
import numpy as np

from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Conv2DTranspose
from tensorflow.keras.layers import concatenate
#from test_utils import summary, comparator
#from test_utils import  summary

In [None]:
import os
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

import imageio

import matplotlib.pyplot as plt
%matplotlib inline
path = ''
image_path = os.path.join(path, 'C:/samples/source_png/')
mask_path = os.path.join(path, 'C:/samples/label_png/')
image_list = os.listdir(image_path)
mask_list = os.listdir(mask_path)
image_list = [image_path+i for i in image_list]
mask_list = [mask_path+i for i in mask_list]

In [None]:
N =130
img = imageio.v2.imread(image_list[N])
mask = imageio.v2.imread(mask_list[N])
print(image_list[N],mask_list[N] )
#mask = np.array([max(mask[i, j]) for i in range(mask.shape[0]) for j in range(mask.shape[1])]).reshape(img.shape[0], img.shape[1])

fig, arr = plt.subplots(1, 2, figsize=(14, 10))
arr[0].imshow(img)
arr[0].set_title('Image')
arr[1].imshow(mask[:, :])
arr[1].set_title('Segmentation')

In [None]:
import os
os.chdir('c:/')

In [None]:
image_list_ds = tf.data.Dataset.list_files(image_list, shuffle=False)
mask_list_ds = tf.data.Dataset.list_files(mask_list, shuffle=False)

for path in zip(image_list_ds.take(3), mask_list_ds.take(3)):
    print(path)

In [None]:
image_filenames = tf.constant(image_list)
masks_filenames = tf.constant(mask_list)
dataset = tf.data.Dataset.from_tensor_slices((image_filenames, masks_filenames))

for image, mask in dataset.take(1):
    print(image)
    print(mask)

In [None]:
import tensorflow as tf
def normalize_image(img):
    # Convert uint16 image to float32
    img = tf.image.convert_image_dtype(img, tf.float32)

    # Min-Max Scaling: Normalize to [0, 1]
    img_normalized = (img - tf.reduce_min(img)) / (tf.reduce_max(img) - tf.reduce_min(img))

    return img_normalized

def process_path(image_path, mask_path):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_png(img, channels=3)
    img_normalized = normalize_image(img)#img = tf.image.convert_image_dtype(img, tf.float32)

    mask = tf.io.read_file(mask_path)
    mask = tf.image.decode_png(mask, channels=3)
    mask = tf.math.reduce_max(mask, axis=-1, keepdims=True)
    return img_normalized, mask

def preprocess(image, mask):
    input_image = tf.image.resize(image, (256, 256), method='nearest')
    input_mask = tf.image.resize(mask, (256, 256), method='nearest')

    return input_image, input_mask

image_ds = dataset.map(process_path)
processed_image_ds = image_ds.map(preprocess)


In [None]:
images, labels = zip(*processed_image_ds)
images = np.array(images)
labels = np.array(labels)
print(images.shape)
print(labels.shape)

In [None]:
# UNQ_C1
# GRADED FUNCTION: conv_block
def conv_block(inputs=None, n_filters=32, dropout_prob=0, max_pooling=True):
    """
    Convolutional downsampling block

    Arguments:
        inputs -- Input tensor
        n_filters -- Number of filters for the convolutional layers
        dropout_prob -- Dropout probability
        max_pooling -- Use MaxPooling2D to reduce the spatial dimensions of the output volume
    Returns:
        next_layer, skip_connection --  Next layer and skip connection outputs
    """

    ### START CODE HERE
    conv = Conv2D(n_filters, # Number of filters
                  3,   # Kernel size
                  activation='relu',
                  padding='same',
                  kernel_initializer='he_normal')(inputs)
    conv = Conv2D(n_filters, # Number of filters
                  3,   # Kernel size
                  activation='relu',
                  padding='same',
                  kernel_initializer='he_normal')(conv)
    ### END CODE HERE

    # if dropout_prob > 0 add a dropout layer, with the variable dropout_prob as parameter
    if dropout_prob > 0:
         ### START CODE HERE
        conv = Dropout(dropout_prob)(conv)
         ### END CODE HERE


    # if max_pooling is True add a MaxPooling2D with 2x2 pool_size
    if max_pooling:
        ### START CODE HERE
        next_layer = MaxPooling2D(2,strides=2)(conv)
        ### END CODE HERE

    else:
        next_layer = conv

    skip_connection = conv

    return next_layer, skip_connection

In [None]:
# UNQ_C2
# GRADED FUNCTION: upsampling_block
def upsampling_block(expansive_input, contractive_input, n_filters=32):
    """
    Convolutional upsampling block

    Arguments:
        expansive_input -- Input tensor from previous layer
        contractive_input -- Input tensor from previous skip layer
        n_filters -- Number of filters for the convolutional layers
    Returns:
        conv -- Tensor output
    """

    ### START CODE HERE
    up = Conv2DTranspose(
                 n_filters,    # number of filters
                 3,    # Kernel size
                 strides=2,
                 padding='same')(expansive_input)

    # Merge the previous output and the contractive_input
    merge = concatenate([up, contractive_input], axis=3)
    conv = Conv2D(n_filters,   # Number of filters
                 3,     # Kernel size
                 activation='relu',
                 padding='same',
                 kernel_initializer='he_normal')(merge)
    conv = Conv2D(n_filters,   # Number of filters
                 3,     # Kernel size
                 activation='relu',
                 padding='same',
                 kernel_initializer='he_normal')(conv)
    ### END CODE HERE

    return conv

In [None]:
# UNQ_C3
# GRADED FUNCTION: unet_model
def unet_model(input_size=(256, 256, 3), n_filters=32, n_classes=1):
    """
    Unet model

    Arguments:
        input_size -- Input shape
        n_filters -- Number of filters for the convolutional layers
        n_classes -- Number of output classes
    Returns:
        model -- tf.keras.Model
    """
    inputs = Input(input_size)
    # Contracting Path (encoding)
    # Add a conv_block with the inputs of the unet_ model and n_filters
    ### START CODE HERE
    cblock1 = conv_block(inputs=inputs, n_filters=n_filters*1)
    # Chain the first element of the output of each block to be the input of the next conv_block.
    # Double the number of filters at each new step
    cblock2 = conv_block(inputs=cblock1[0], n_filters=n_filters*2)
    cblock3 = conv_block(inputs=cblock2[0], n_filters=n_filters*4)
    cblock4 = conv_block(inputs=cblock3[0], n_filters=n_filters*8,dropout_prob=0.3) # Include a dropout_prob of 0.3 for this layer
    # Include a dropout_prob of 0.3 for this layer, and avoid the max_pooling layer
    cblock5 = conv_block(inputs=cblock4[0], n_filters=n_filters*16,dropout_prob=0.3, max_pooling=False)
    ### END CODE HERE

    # Expanding Path (decoding)
    # Add the first upsampling_block.
    # Use the cblock5[0] as expansive_input and cblock4[1] as contractive_input and n_filters * 8
    ### START CODE HERE
    ublock6 = upsampling_block(cblock5[0], cblock4[1], n_filters*8)
    # Chain the output of the previous block as expansive_input and the corresponding contractive block output.
    # Note that you must use the second element of the contractive block i.e before the maxpooling layer.
    # At each step, use half the number of filters of the previous block
    ublock7 = upsampling_block(ublock6, cblock3[1], n_filters*4)
    ublock8 = upsampling_block(ublock7, cblock2[1], n_filters*2)
    ublock9 = upsampling_block(ublock8, cblock1[1], n_filters*1)
    ### END CODE HERE

    conv9 = Conv2D(n_filters,
                 3,
                 activation='relu',
                 padding='same',
                 kernel_initializer='he_normal')(ublock9)

    # Add a Conv2D layer with n_classes filter, kernel size of 1 and a 'same' padding
    ### START CODE HERE
    conv10 = Conv2D(n_classes, 1, padding='same',activation='sigmoid')(conv9)
    ### END CODE HERE

    model = tf.keras.Model(inputs=inputs, outputs=conv10)

    return model

In [None]:
#import outputs
from tensorflow.keras.utils import plot_model
img_height =256
img_width = 256
num_channels = 3

unet = unet_model((img_height, img_width, num_channels))
#comparator(summary(unet), outputs.unet_model_output)
#plot_model(unet, to_file='unet_mymodel.png', show_shapes=True, show_layer_names=True)

In [None]:

from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import BinaryCrossentropy
from keras.optimizers import SGD
opt = SGD(learning_rate=0.001)
opt1= Adam(learning_rate=0.0001)
#unet.summary()
unet.compile(optimizer=opt1, loss=BinaryCrossentropy(), metrics=['accuracy'])


In [None]:
def display(display_list):
    plt.figure(figsize=(15, 15))

    title = ['Input Image', 'True Mask', 'Predicted Mask']

    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i+1)
        plt.title(title[i])
        plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
        plt.axis('off')
    plt.show()

In [None]:
from sklearn.model_selection import train_test_split
X_train, X_temp, y_train, y_temp = train_test_split(images, labels, test_size=0.2, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.1, random_state=42)

In [None]:
print(X_train.shape)
print(y_train.shape)
print(X_val.shape)
print(y_val.shape)
print(X_test.shape)
print(y_test.shape)

In [None]:
from tensorflow.keras.callbacks import EarlyStopping

early_stopping = EarlyStopping(monitor='val_loss', patience=4, restore_best_weights=True)
model_history = unet.fit(X_train, y_train, batch_size=8, validation_data=(X_val, y_val),  callbacks=[early_stopping], epochs=20)

In [None]:
import matplotlib.pyplot as plt

# Assuming model_history is the history object returned by model.fit()
# Example: model_history = model.fit(...)

# Extracting data
epochs = range(1, len(model_history.history['accuracy']) + 1)
accuracy = model_history.history['accuracy']
val_accuracy = model_history.history.get('val_accuracy')
loss = model_history.history['loss']
val_loss = model_history.history.get('val_loss')

# Plotting Accuracy
plt.figure(figsize=(12, 6))

ax1 = plt.subplot(1, 2, 1)
ax1.plot(epochs, accuracy, marker='o', linestyle='-', color='b', label='Training Accuracy')
if val_accuracy:
    ax1.plot(epochs, val_accuracy, marker='o', linestyle='--', color='g', label='Validation Accuracy')
ax1.set_xlabel('Epoch', fontsize=14)
ax1.set_ylabel('Accuracy', fontsize=14)
ax1.set_title('Accuracy vs Epoch', fontsize=16)
ax1.set_xticks(range(1, len(epochs) + 1, 2))  # Set x-axis interval to 2
ax1.tick_params(axis='y', labelsize=12)  # Set y-ticks font size to 12
ax1.legend(fontsize=14)
ax1.yaxis.set_major_formatter(plt.FuncFormatter(lambda val, pos: f'{val:.2f}'))

# Plotting Loss
ax2 = plt.subplot(1, 2, 2)
ax2.plot(epochs, loss, marker='o', linestyle='-', color='r', label='Training Loss')
if val_loss:
    ax2.plot(epochs, val_loss, marker='o', linestyle='--', color='orange', label='Validation Loss')
ax2.set_xlabel('Epoch', fontsize=14)
ax2.set_ylabel('Loss', fontsize=14)
ax2.set_title('Loss vs Epoch', fontsize=16)
ax2.set_xticks(range(1, len(epochs) + 1, 2))  # Set x-axis interval to 2
ax2.tick_params(axis='y', labelsize=12)  # Set y-ticks font size to 12
ax2.legend(fontsize=14)
ax2.yaxis.set_major_formatter(plt.FuncFormatter(lambda val, pos: f'{val:.2f}'))

plt.tight_layout()
plt.show()


In [None]:
y_pred =unet.predict(X_train)

# Visualize a few examples
for i in range(250):  # Adjust the range as needed
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 3, 1)
    plt.imshow(X_train[i])
    plt.title("Original Image")

    plt.subplot(1, 3, 2)
    plt.imshow(y_pred[i], cmap='Greens')  # Predicted segmentation mask
    plt.title("Predicted Segmentation")

    plt.subplot(1, 3, 3)
    plt.imshow(y_train[i], cmap='Greens')  # Ground truth segmentation mask
    plt.title("Ground Truth")

    plt.show()

In [None]:
# UNet Segmentation Semantic

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.utils import plot_model
from io import StringIO
import sys
from PIL import Image, ImageDraw, ImageFont
def unet_model(input_size=(128, 128, 3)):
    inputs = Input(input_size)
    # Encoder
    conv1 = Conv2D(64, 3, activation='relu', padding='same')(inputs)
    conv1 = Conv2D(64, 3, activation='relu', padding='same')(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = Conv2D(128, 3, activation='relu', padding='same')(pool1)
    conv2 = Conv2D(128, 3, activation='relu', padding='same')(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    conv3 = Conv2D(256, 3, activation='relu', padding='same')(pool2)
    conv3 = Conv2D(256, 3, activation='relu', padding='same')(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)

    conv4 = Conv2D(512, 3, activation='relu', padding='same')(pool3)
    conv4 = Conv2D(512, 3, activation='relu', padding='same')(conv4)
    pool4 = MaxPooling2D(pool_size=(2, 2))(conv4)

    # Bottleneck
    conv5 = Conv2D(1024, 3, activation='relu', padding='same')(pool4)
    conv5 = Conv2D(1024, 3, activation='relu', padding='same')(conv5)

    # Decoder
    up6 = UpSampling2D(size=(2, 2))(conv5)
    merge6 = concatenate([conv4, up6], axis=3)
    conv6 = Conv2D(512, 3, activation='relu', padding='same')(merge6)
    conv6 = Conv2D(512, 3, activation='relu', padding='same')(conv6)

    up7 = UpSampling2D(size=(2, 2))(conv6)
    merge7 = concatenate([conv3, up7], axis=3)
    conv7 = Conv2D(256, 3, activation='relu', padding='same')(merge7)
    conv7 = Conv2D(256, 3, activation='relu', padding='same')(conv7)

    up8 = UpSampling2D(size=(2, 2))(conv7)
    merge8 = concatenate([conv2, up8], axis=3)
    conv8 = Conv2D(128, 3, activation='relu', padding='same')(merge8)
    conv8 = Conv2D(128, 3, activation='relu', padding='same')(conv8)

    up9 = UpSampling2D(size=(2, 2))(conv8)
    merge9 = concatenate([conv1, up9], axis=3)
    conv9 = Conv2D(64, 3, activation='relu', padding='same')(merge9)
    conv9 = Conv2D(64, 3, activation='relu', padding='same')(conv9)

    conv10 = Conv2D(1, 1, activation='sigmoid')(conv9)

    model = Model(inputs, conv10)
    return model

# Create the model


# Redirect stdout to capture print statements
model = unet_model()
model.summary()
# Your code that prints to stdout

plot_model(model, to_file='unet_model.png', show_shapes=True, show_layer_names=True)

In [None]:
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.losses import BinaryCrossentropy
model.compile(optimizer=SGD(learning_rate=0.0001), loss=BinaryCrossentropy(), metrics=['accuracy'])

In [None]:
from sklearn.model_selection import train_test_split
X_train, X_temp, y_train, y_temp = train_test_split(images, labels, test_size=0.2, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

In [None]:
print(X_train.shape)
print(y_train.shape)
print(X_val.shape)
print(y_val.shape)
print(X_test.shape)
print(y_test.shape)

In [None]:
from tensorflow.keras.callbacks import EarlyStopping

early_stopping = EarlyStopping(monitor='val_loss', patience=4, restore_best_weights=True)
model_history =model.fit(X_train, y_train, batch_size=8, validation_data=(X_val, y_val),  callbacks=[early_stopping], epochs=20)

In [None]:
# DeepLabV3+

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np
import matplotlib.pyplot as plt
base_model = tf.keras.applications.DenseNet121(input_shape=(128, 128, 3), include_top=False)
base_model.trainable = True

inputs = layers.Input(shape=(128, 128, 3))
x = base_model(inputs, training=False)
x = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(x)
x = layers.UpSampling2D((32, 32))(x)
outputs = layers.Conv2D(1, (1, 1), activation='sigmoid')(x)

model = models.Model(inputs, outputs)

In [None]:
from sklearn.model_selection import train_test_split
X_train, X_val, y_train, y_val = train_test_split(images, labels, test_size=0.2, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.1, random_state=42)

In [None]:
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.losses import BinaryCrossentropy
model.compile(optimizer=SGD(learning_rate=1e-4), loss=BinaryCrossentropy(), metrics=['accuracy'])
from tensorflow.keras.callbacks import EarlyStopping

early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
model_history = model.fit(X_train, y_train, batch_size=8, validation_data=(X_val, y_val), callbacks=[early_stopping], epochs=20)