In [None]:
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

import numpy as np
import cv2
from glob import glob
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, TensorBoard
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import CustomObjectScope
from sklearn.metrics import f1_score, jaccard_score, precision_score, recall_score

from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation
from tensorflow.keras.layers import MaxPool2D, Conv2DTranspose, Concatenate, Input
from tensorflow.keras.models import Model

In [None]:
def conv_block(inputs, num_filters):
    x = Conv2D(num_filters, 3, padding="same")(inputs)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(num_filters, 3, padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    return x

In [None]:
def encoder_block(inputs, num_filters):
    x = conv_block(inputs, num_filters)
    p = MaxPool2D((2, 2))(x)
    return x, p

In [None]:
def decoder_block(inputs, skip_features, num_filters):
    x = Conv2DTranspose(num_filters, 2, strides=2, padding="same")(inputs)
    x = Concatenate()([x, skip_features])
    x = conv_block(x, num_filters)
    return x

In [None]:
def unet(input_shape):
    inputs = Input(input_shape)

    s1, p1 = encoder_block(inputs, 64)
    s2, p2 = encoder_block(p1, 128)
    s3, p3 = encoder_block(p2, 256)
    s4, p4 = encoder_block(p3, 512)

    b1 = conv_block(p4, 1024)

    d1 = decoder_block(b1, s4, 512)
    d2 = decoder_block(d1, s3, 256)
    d3 = decoder_block(d2, s2, 128)
    d4 = decoder_block(d3, s1, 64)

    outputs = Conv2D(1, 1, padding="same", activation="sigmoid")(d4)

    model = Model(inputs, outputs, name="UNET")
    return model

model = unet((224,224,3))
model.summary()

In [None]:
smooth = 1e-15
def dice_coef(y_true, y_pred):
    y_true = tf.keras.layers.Flatten()(y_true)
    y_pred = tf.keras.layers.Flatten()(y_pred)
    intersection = tf.reduce_sum(y_true * y_pred)
    return (2. * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)

def dice_loss(y_true, y_pred):
    return 1.0 - dice_coef(y_true, y_pred)

In [None]:
H = 224
W = 224

def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)
create_dir("files")

In [None]:
def load_dataset(path, split=0.2):
    images = sorted(glob(os.path.join(path, "*", "*.jpg")))
    masks = sorted(glob(os.path.join(path, "HAM10000_segmentations_lesion_tschandl/HAM10000_segmentations_lesion_tschandl", "*.png")))
    print(len(images),len(masks))
    split_size = int(len(images) * split)

    train_x, valid_x = train_test_split(images, test_size=split_size, random_state=42)
    train_y, valid_y = train_test_split(masks, test_size=split_size, random_state=42)
 
    train_x, test_x = train_test_split(train_x, test_size=split_size, random_state=42)
    train_y, test_y = train_test_split(train_y, test_size=split_size, random_state=42)

    return (train_x, train_y), (valid_x, valid_y), (test_x, test_y)

In [None]:
def read_image(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = cv2.resize(x, (W, H))
    x = x / 255.0
    x = x.astype(np.float32)
    return x

def read_mask(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)  ## (h, w)
    x = cv2.resize(x, (W, H))   ## (h, w)
    x = x / 255.0               ## (h, w)
    x = x.astype(np.float32)    ## (h, w)
    x = np.expand_dims(x, axis=-1)## (h, w, 1)
    return x

In [None]:
def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y

    x, y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.float32])
    x.set_shape([H, W, 3])
    y.set_shape([H, W, 1])
    return x, y

def tf_dataset(X, Y, batch=2):
    dataset = tf.data.Dataset.from_tensor_slices((X, Y))
    dataset = dataset.map(tf_parse)
    dataset = dataset.batch(batch)
    dataset = dataset.prefetch(10)
    return dataset

In [None]:
batch_size = 32
lr = 1e-4
num_epochs = 100
model_path = os.path.join("files", "model.h5")
csv_path = os.path.join("files", "log.csv")
dataset_path="/kaggle/input/seg-dataset"

In [None]:
(train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_dataset(dataset_path)

print(f"Train: ({len(train_x)},{len(train_y)})")
print(f"Valid: ({len(valid_x)},{len(valid_x)})")
print(f"Test: ({len(test_x)},{len(test_x)})")

In [None]:
train_dataset = tf_dataset(train_x, train_y, batch=batch_size)
valid_dataset = tf_dataset(valid_x, valid_y, batch=batch_size)

In [None]:
model = unet((H, W, 3))
model.compile(loss=dice_loss, optimizer=Adam(lr), metrics=[dice_coef,'accuracy'])

In [None]:
callbacks = [
        ModelCheckpoint(model_path+'.keras', verbose=1, save_best_only=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=1e-7, verbose=1),
        CSVLogger(csv_path),
        EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=False),
    ]

In [None]:
model.fit(train_dataset,epochs=num_epochs,validation_data=valid_dataset,callbacks=callbacks,verbose=1)

In [None]:
import pandas as pd
metrics = pd.read_csv("/kaggle/working/files/log.csv")
metrics

In [None]:
metrics[['dice_coef','val_dice_coef']].plot()
plt.savefig('metrics_dice_plot.png')

In [None]:
metrics[['accuracy','val_accuracy']].plot()
plt.savefig('metrics_accuracy_plot.png')

In [None]:
metrics[['loss','val_loss']].plot()
plt.savefig('metrics_loss_plot.png')

In [None]:
create_dir("results")

In [None]:
from tqdm import tqdm
import cv2
def save_results(image, mask, y_pred, save_image_path):
    mask = np.expand_dims(mask, axis=-1)
    mask = np.concatenate([mask, mask, mask], axis=-1)

    y_pred = np.expand_dims(y_pred, axis=-1)
    y_pred = np.concatenate([y_pred, y_pred, y_pred], axis=-1)
    y_pred = y_pred * 255

    line = np.ones((H, 10, 3)) * 255

    cat_images = np.concatenate([image, line, mask, line, y_pred], axis=1)
    cv2.imwrite(save_image_path, cat_images)

In [None]:
SCORE = []
SCORE = []
for x, y in tqdm(zip(test_x, test_y), total=len(test_y)):
    """ Extracting the name """
    name = x.split("/")[-1]

    """ Reading the image """
    image = cv2.imread(x, cv2.IMREAD_COLOR) ## [H, w, 3]
    image = cv2.resize(image, (W, H))       ## [H, w, 3]
    x = image/255.0                         ## [H, w, 3]
    x = np.expand_dims(x, axis=0)           ## [1, H, w, 3]

    """ Reading the mask """
    mask = cv2.imread(y, cv2.IMREAD_GRAYSCALE)
    mask = cv2.resize(mask, (W, H))

    """ Prediction """
    y_pred = model.predict(x, verbose=0)[0]
    y_pred = np.squeeze(y_pred, axis=-1)
    y_pred = y_pred >= 0.5
    y_pred = y_pred.astype(np.int32)

    """ Saving the prediction """
    save_image_path = os.path.join("results", name)
    save_results(image, mask, y_pred, save_image_path)

    """ Flatten the array """
    mask = mask/255.0
    mask = (mask > 0.5).astype(np.int32).flatten()
    y_pred = y_pred.flatten()

    """ Calculating the metrics values """
    f1_value = f1_score(mask, y_pred, labels=[0, 1], average="binary")
    jac_value = jaccard_score(mask, y_pred, labels=[0, 1], average="binary")
    recall_value = recall_score(mask, y_pred, labels=[0, 1], average="binary", zero_division=0)
    precision_value = precision_score(mask, y_pred, labels=[0, 1], average="binary", zero_division=0)
    SCORE.append([name, f1_value, jac_value, recall_value, precision_value])

""" Metrics values """
score = [s[1:]for s in SCORE]
score = np.mean(score, axis=0)
print(f"F1: {score[0]:0.5f}")
print(f"Jaccard: {score[1]:0.5f}")
print(f"Recall: {score[2]:0.5f}")
print(f"Precision: {score[3]:0.5f}")

df = pd.DataFrame(SCORE, columns=["Image", "F1", "Jaccard", "Recall", "Precision"])
df.to_csv("files/score.csv", index=None)

In [None]:
scores = pd.read_csv("/kaggle/working/files/score.csv")
scores.head(3)

In [None]:
dir = "/kaggle/working/results/"
images = os.listdir("/kaggle/working/results")[:5]
img1 = plt.imread(dir+images[0])
plt.imsave('test_result1.png', img1, cmap='gray')
plt.imshow(img1, cmap='gray')


In [None]:
img1 = plt.imread(dir+images[1])
plt.imsave('test_result2.png', img1, cmap='gray')
plt.imshow(img1, cmap='gray')

In [None]:
img1 = plt.imread(dir+images[2])
plt.imsave('test_result3.png', img1, cmap='gray')
plt.imshow(img1, cmap='gray')


In [None]:
# model.load_weights('/kaggle/input/final-model/model.h5.keras')

In [None]:
# import os
# import cv2
# import numpy as np
# from tensorflow.keras.models import load_model


# # Function to predict masks for images in a folder and save the masks in another folder
# def predict_masks_and_apply(input_folder, output_folder):
#     # Create the output folder if it doesn't exist
#     if not os.path.exists(output_folder):
#         os.makedirs(output_folder)
    
#     # Get a list of image filenames in the input folder
#     image_filenames = os.listdir(input_folder)
    
#     # Iterate over each image
#     for image_filename in image_filenames:
#         # Read the image
#         image_path = os.path.join(input_folder, image_filename)
#         image = cv2.imread(image_path)
        
#         # Preprocess the image (you may need to adjust this based on your training preprocessing)
#         image = cv2.resize(image, (224, 224))  # Resize the image to match the input size of your model
#         image = image / 255.0  # Normalize pixel values
        
#         # Predict the mask
#         mask = model.predict(np.expand_dims(image, axis=0))[0]
        
#         # Threshold the mask (assuming binary segmentation)
#         mask[mask >= 0.5] = 255
#         mask[mask < 0.5] = 0
#         mask = mask.astype(np.uint8)
        
#         # Apply the mask to the image
#         masked_image = cv2.bitwise_and(image, image, mask=mask)
        
#         # Save the masked image
#         masked_image_filename = os.path.splitext(image_filename)[0] + '_masked.png'
#         masked_image_path = os.path.join(output_folder, masked_image_filename)
#         cv2.imwrite(masked_image_path, (masked_image * 255).astype(np.uint8))

# # Apply masks to images in the "benign" folder and save them in the "benign_masked" folder
# predict_masks_and_apply('/kaggle/input/dataset/dataset/BKM', '/kaggle/working/BKM_seg')

# # Apply masks to images in the "malignant" folder and save them in the "malignant_masked" folder
# predict_masks_and_apply('/kaggle/input/dataset/dataset/MSM', '/kaggle/working/MSM_seg')


In [None]:
# count = 0
# for file in os.listdir('/kaggle/working/MSM_concat'):
#     count+=1
# print(count)

In [None]:
# import shutil

# # Function to create a zip archive of a folder
# def create_zip(folder_path, zip_path):
#     shutil.make_archive(zip_path, 'zip', folder_path)

# # Paths to the folders containing the masked images
# benign_masked_folder = '/kaggle/working/BKM_seg'
# malignant_masked_folder = '/kaggle/working/MSM_seg'

# # Create zip archives for the benign and malignant masked image folders
# create_zip(benign_masked_folder, '/kaggle/working/BKM_seg.zip')
# create_zip(malignant_masked_folder, '/kaggle/working/MSM_seg.zip')


In [None]:
# dir = "/kaggle/working/BKM_seg/"
# images = os.listdir("/kaggle/working/BKM_seg")[:5]
# print(images)
# img1 = plt.imread(dir+images[2])
# plt.imshow(img1, cmap='gray')

In [None]:
# import os
# import cv2
# import numpy as np
# from tensorflow.keras.models import load_model


# # Function to predict masks for images in a folder and save the masks in another folder
# def predict_masks_and_apply(input_folder, output_folder):
#     # Create the output folder if it doesn't exist
#     if not os.path.exists(output_folder):
#         os.makedirs(output_folder)
    
#     # Get a list of image filenames in the input folder
#     image_filenames = os.listdir(input_folder)
    
#     # Iterate over each image
#     for image_filename in image_filenames:
#         # Read the image
#         image_path = os.path.join(input_folder, image_filename)
#         image = cv2.imread(image_path)
        
#         # Preprocess the image (you may need to adjust this based on your training preprocessing)
#         image = cv2.resize(image, (224, 224))  # Resize the image to match the input size of your model
#         image = image / 255.0  # Normalize pixel values
        
#         # Predict the mask
#         mask = model.predict(np.expand_dims(image, axis=0))[0]
        
#         # Threshold the mask (assuming binary segmentation)
#         mask[mask >= 0.5] = 255
#         mask[mask < 0.5] = 0
#         mask = mask.astype(np.uint8)
        
#         # Apply the mask to the image
#         if len(mask.shape) == 2:
#             mask = np.stack([mask]*3, axis=-1)
#         concatenated = np.concatenate([image, mask], axis=-1)
    

# #         # Visualize the original image
# #         plt.figure(figsize=(10, 5))
# #         plt.subplot(1, 2, 1)
# #         plt.imshow(original_image)
# #         plt.title('Original Image')

# #         # Visualize the mask
# #         plt.subplot(1, 2, 2)
# #         plt.imshow(mask)
# #         plt.title('Mask')

# #         plt.show()

# # Concatenate the image and mask along the channel dimension
#         concatenated = np.concatenate([image, mask], axis=-1)
#         concatenated_image_filename = os.path.splitext(image_filename)[0] + '_con.png'
#         concatenated_image_path = os.path.join(output_folder, concatenated_image_filename)
#         cv2.imwrite(concatenated_image_path, (concatenated* 255).astype(np.uint8))

# # Apply masks to images in the "benign" folder and save them in the "benign_masked" folder
# predict_masks_and_apply('/kaggle/input/dataset/dataset/BKM', '/kaggle/working/BKM_concat')

# # Apply masks to images in the "malignant" folder and save them in the "malignant_masked" folder
# predict_masks_and_apply('/kaggle/input/dataset/dataset/MSM', '/kaggle/working/MSM_concat')

In [None]:
# import shutil

# # Function to create a zip archive of a folder
# def create_zip(folder_path, zip_path):
#     shutil.make_archive(zip_path, 'zip', folder_path)

# # Paths to the folders containing the masked images
# benign_masked_folder = '/kaggle/working/BKM_concat'
# malignant_masked_folder = '/kaggle/working/MSM_concat'

# # Create zip archives for the benign and malignant masked image folders
# create_zip(benign_masked_folder, '/kaggle/working/BKM_concat')
# create_zip(malignant_masked_folder, '/kaggle/working/MSM_concat')