<a href="https://colab.research.google.com/github/sruthids28/leetcode/blob/main/resnet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [21]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [68]:
import os
import numpy as np
import tensorflow as tf
from glob import glob
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, CSVLogger, TensorBoard, EarlyStopping
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import Model, layers

# Constants
H, W = 128, 128  # Adjust these dimensions as necessary
BATCH_SIZE = 8
LEARNING_RATE = 1e-4
NUM_EPOCHS = 5
DATASET_PATH = "/content/drive/MyDrive/ISIC2018/"
MODEL_PATH = "/content/drive/MyDrive/ISIC2018/files1/resunet_model.keras"
CSV_PATH = "/content/drive/MyDrive/ISIC2018/files1/data.csv"

# Create directory for files if it doesn't exist
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

# Load data function
def load_data(path, split=0.2):
    images = sorted(glob(os.path.join(path, "*.jpg")))

    if len(images) == 0:
        raise ValueError("No images found in the provided dataset path.")

    # Create dummy masks with appropriate shape (assuming binary masks)
    masks = [np.zeros((H, W, 1), dtype=np.float32) for _ in range(len(images))]

    # Split the dataset into training, validation, and test sets
    train_x, valid_x = train_test_split(images, test_size=split)
    train_y, valid_y = train_test_split(masks, test_size=split)

    test_split_size = max(int(len(train_x) * 0.2), 1)
    train_x, test_x = train_test_split(train_x, test_size=test_split_size)
    train_y, test_y = train_test_split(train_y, test_size=test_split_size)

    return (train_x, train_y), (valid_x, valid_y), (test_x, test_y)

# Image reading function
def read_image(path):
    img = tf.io.read_file(path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, [H, W])
    img = img / 255.0  # Normalize to [0, 1]
    return img

# Mask reading function (for dummy masks, just returns zeros)
def read_mask(path):
    # In actual use, you would load real masks. This is just a placeholder.
    mask = tf.zeros((H, W, 1), dtype=tf.float32)
    return mask

# TensorFlow Dataset function
def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y

    x, y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.float32])
    x.set_shape([H, W, 3])
    y.set_shape([H, W, 1])
    return x, y

def tf_dataset(train_x, train_y, batch_size):
    dataset = tf.data.Dataset.from_tensor_slices((train_x, train_y))
    dataset = dataset.map(tf_parse, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.shuffle(buffer_size=1000).batch(batch_size).prefetch(tf.data.AUTOTUNE)
    dataset = dataset.repeat()  # Ensure the dataset repeats
    return dataset

# Example model architecture (ResUNet-like)
def build_resunet(input_shape):
    inputs = layers.Input(shape=input_shape)

    # Example of downsampling layers
    c1 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
    c1 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(c1)
    p1 = layers.MaxPooling2D((2, 2))(c1)

    # Bottleneck
    c2 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(p1)
    c2 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(c2)

    # Example of upsampling layers
    u1 = layers.Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(c2)
    u1 = layers.concatenate([u1, c1])
    c3 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(u1)
    c3 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(c3)

    outputs = layers.Conv2D(1, (1, 1), activation='sigmoid')(c3)  # For binary segmentation
    model = Model(inputs, outputs)
    return model

# Custom metrics
def dice_coef(y_true, y_pred):
    smooth = 1e-6
    intersection = tf.reduce_sum(y_true * y_pred)
    return (2. * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)

def dice_loss(y_true, y_pred):
    return 1 - dice_coef(y_true, y_pred)

# Main execution
if __name__ == "__main__":
    np.random.seed(42)
    tf.random.set_seed(42)

    create_dir("/content/drive/MyDrive/ISIC2018/files1")

    # Load the dataset
    (train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_data(DATASET_PATH)

    print(f"Train: {len(train_x)} - {len(train_y)}")
    print(f"Valid: {len(valid_x)} - {len(valid_y)}")
    print(f"Test: {len(test_x)} - {len(test_y)}")

    # Create TensorFlow datasets
    train_dataset = tf_dataset(train_x, train_y, BATCH_SIZE)
    valid_dataset = tf_dataset(valid_x, valid_y, BATCH_SIZE)

    # Steps per epoch
    train_steps = len(train_x) // BATCH_SIZE + (len(train_x) % BATCH_SIZE > 0)
    valid_steps = len(valid_x) // BATCH_SIZE + (len(valid_x) % BATCH_SIZE > 0)

    # Build and compile model
    model = build_resunet((H, W, 3))
    model.compile(loss=dice_loss, optimizer=Adam(learning_rate=LEARNING_RATE), metrics=[dice_coef])

    # Callbacks for training
    callbacks = [
        ModelCheckpoint(MODEL_PATH, verbose=1, save_best_only=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=1e-7, verbose=1),
        CSVLogger(CSV_PATH),
        TensorBoard(),
        EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True)
    ]

    # Start model training
    model.fit(
        train_dataset,
        epochs=NUM_EPOCHS,
        validation_data=valid_dataset,
        steps_per_epoch=train_steps,
        validation_steps=valid_steps,
        callbacks=callbacks
    )

Train: 640 - 640
Valid: 200 - 200
Test: 160 - 160
Epoch 1/5
[1m80/80[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5s/step - dice_coef: 1.4641e-11 - loss: 1.0000
Epoch 1: val_loss improved from inf to 1.00000, saving model to /content/drive/MyDrive/ISIC2018/files1/resunet_model.keras
[1m80/80[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m470s[0m 5s/step - dice_coef: 1.4641e-11 - loss: 1.0000 - val_dice_coef: 1.4640e-11 - val_loss: 1.0000 - learning_rate: 1.0000e-04
Epoch 2/5
[1m80/80[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5s/step - dice_coef: 1.4642e-11 - loss: 1.0000
Epoch 2: val_loss did not improve from 1.00000
[1m80/80[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m441s[0m 5s/step - dice_coef: 1.4642e-11 - loss: 1.0000 - val_dice_coef: 1.4640e-11 - val_loss: 1.0000 - learning_rate: 1.0000e-04
Epoch 3/5
[1m80/80[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5s/step - dice_coef: 1.4642e-11 - loss: 1.0000
Epoch 3: val_loss did not improve from 1.

In [69]:
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPool2D, UpSampling2D, Concatenate, Input, ZeroPadding2D
from tensorflow.keras.models import Model

def batchnorm_relu(inputs):
    x = BatchNormalization()(inputs)
    x = Activation("relu")(x)
    return x

def residual_block(inputs, num_filters, strides=1):
    """ Convolutional Layer """
    x = batchnorm_relu(inputs)
    x = Conv2D(num_filters, 3, padding="same", strides=strides)(x)
    x = batchnorm_relu(x)
    x = Conv2D(num_filters, 3, padding="same", strides=1)(x)

    """ Shortcut Connection """
    s = Conv2D(num_filters, 1, padding="same", strides=strides)(inputs)
    x = x + s
    return x

def decoder_block(inputs, skip_features, num_filters):
    x = UpSampling2D((2, 2))(inputs)
    x = Concatenate()([x, skip_features])
    x = residual_block(x, num_filters, strides=1)
    return x

def build_resunet(input_shape):
    inputs = Input(input_shape)

    """ Encoder 1 """
    x = Conv2D(64, 3, padding="same", strides=1)(inputs)
    x = batchnorm_relu(x)
    x = Conv2D(64, 3, padding="same", strides=1)(x)
    s = Conv2D(64, 1, padding="same", strides=1)(inputs)
    s1 = x + s

    """ Encoder 2 and 3 """
    s2 = residual_block(s1, 128, strides=2)
    s3 = residual_block(s2, 256, strides=2)

    """ Bridge """
    b = residual_block(s3, 512, strides=2)

    """ Decoder 1, 2, 3 """
    d1 = decoder_block(b, s3, 256)
    d2 = decoder_block(d1, s2, 128)
    d3 = decoder_block(d2, s1, 64)

    """ Classifier """
    outputs = Conv2D(1, 1, padding="same", activation="sigmoid")(d3)

    """ Model """
    model = Model(inputs, outputs)
    return model

if __name__ == "__main__":
    model = build_resunet((256, 256, 3))
    model.summary()

In [71]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import backend as K

def iou(y_true, y_pred):
    def f(y_true, y_pred):
        intersection = (y_true * y_pred).sum()
        union = y_true.sum() + y_pred.sum() - intersection
        x = (intersection + 1e-15) / (union + 1e-15)
        x = x.astype(np.float32)
        return x
    return tf.numpy_function(f, [y_true, y_pred], tf.float32)

smooth = 1e-15
def dice_coef(y_true, y_pred):
    y_true = tf.keras.layers.Flatten()(y_true)
    y_pred = tf.keras.layers.Flatten()(y_pred)
    intersection = tf.reduce_sum(y_true * y_pred)
    return (2. * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)

def dice_loss(y_true, y_pred):
    return 1.0 - dice_coef(y_true, y_pred)



import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
import numpy as np
import cv2
import pandas as pd
from glob import glob
from tqdm import tqdm
import tensorflow as tf
from tensorflow.keras.utils import CustomObjectScope
from sklearn.metrics import accuracy_score, f1_score, jaccard_score, precision_score, recall_score
# from metrics import dice_loss, dice_coef, iou
# from train import load_data, create_dir

H = 256
W = 256

def read_image(path):
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = cv2.resize(x, (H, W))
    ori_x = x
    x = x/255.0
    x = x.astype(np.float32)
    x = np.expand_dims(x, axis=0)
    return ori_x, x

def read_mask(path):
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    x = cv2.resize(x, (H, W))
    ori_x = x
    x = x/np.max(x)
    x = x.astype(np.int32)
    return ori_x, x

def save_result(ori_x, ori_y, y_pred, save_image_path):
    line = np.ones((H, 10, 3)) * 255.0

    ori_y = np.expand_dims(ori_y, axis=-1)  ## (256, 256, 1)
    ori_y = np.concatenate([ori_y, ori_y, ori_y], axis=-1) ## (256, 256, 3)

    y_pred = np.expand_dims(y_pred, axis=-1)    ## (256, 256, 1)
    y_pred = np.concatenate([y_pred, y_pred, y_pred], axis=-1) * 255.0 ## (256, 256, 3)

    cat_images = np.concatenate([ori_x, line, ori_y, line, y_pred], axis=1)
    cv2.imwrite(save_image_path, cat_images)

if __name__ == "__main__":
    """ Seeding """
    np.random.seed(42)
    tf.random.set_seed(42)

    """ Directory for storing files """
    create_dir("results")

    """ Loading the model """
    with CustomObjectScope({"iou": iou, "dice_coef": dice_coef, "dice_loss": dice_loss}):
        model = tf.keras.models.load_model("/content/drive/MyDrive/ISIC2018/files1/resunet_model_ph2.h5")

    """ Loading the dataset. """
    dataset_path = "/content/drive/MyDrive/ISIC2018/"
    (train_x, train_y),(valid_x,valid_y),(test_x, test_y) = load_data(dataset_path)

    """ Predict the mask and calculate the metrics values """
    SCORE = []
    for x, y in tqdm(zip(train_x, train_y), total=len(train_x)):
        """ Extract the image name """
        name = x.split("/")[-1]

        """ Reading the image and mask. """
        ori_x, x = read_image(x)
        ori_y, y = read_mask(y)

        """ Predict the mask """
        y_pred = model.predict(x)[0] > 0.5
        y_pred = np.squeeze(y_pred, axis=-1)
        y_pred = y_pred.astype(np.int32)

        """ Save the image """
        save_image_path = f"results/{name}"
        save_result(ori_x, ori_y, y_pred, save_image_path)

        """ Flattening the numpy arrays. """
        y = y.flatten()
        y_pred = y_pred.flatten()

        """ Calculating metrics values """
        acc_value = accuracy_score(y, y_pred)
        f1_value = f1_score(y, y_pred, labels=[0, 1], average="binary")
        jac_value = jaccard_score(y, y_pred, labels=[0, 1], average="binary")
        recall_value = recall_score(y, y_pred, labels=[0, 1], average="binary")
        precision_value = precision_score(y, y_pred, labels=[0, 1], average="binary")
        SCORE.append([name, acc_value, f1_value, jac_value, recall_value, precision_value])

    """ Metrics values """
    score = [s[1:]for s in SCORE]
    score = np.mean(score, axis=0)
    print(f"Accuracy: {score[0]:0.5f}")
    print(f"F1: {score[1]:0.5f}")
    print(f"Jaccard: {score[2]:0.5f}")
    print(f"Recall: {score[3]:0.5f}")
    print(f"Precision: {score[4]:0.5f}")

    """ Saving all the results """
    df = pd.DataFrame(SCORE, columns=["Image", "Accuracy", "F1", "Jaccard", "Recall", "Precision"])
    df.to_csv("/content/drive/MyDrive/ISIC2018/files1/score.csv")

FileNotFoundError: [Errno 2] Unable to synchronously open file (unable to open file: name = '/content/drive/MyDrive/ISIC2018/files1/resunet_model_ph2.h5', errno = 2, error message = 'No such file or directory', flags = 0, o_flags = 0)