In [None]:
import os
import cv2
import time
from tqdm import tqdm
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from keras.layers import Input
from tensorflow.keras.utils import img_to_array, load_img
from tensorflow.keras.applications import ResNet50
from sklearn.metrics import precision_score, f1_score, confusion_matrix
from tensorflow.keras.layers import Conv2D, GlobalAveragePooling2D, Reshape, MaxPooling2D, UpSampling2D, concatenate
from tensorflow.keras.models import Model
from keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.optimizers import Adam

# Define image dimensions and channels
IMG_HEIGHT = 512
IMG_WIDTH = 512
IMG_CHANNELS = 3
input_shape = (IMG_HEIGHT, IMG_WIDTH, 3)
num_classes = 1  # Single class segmentation

train_image_dir = '/home/syam.varnatt/THESIS/images/train/images'
train_mask_dir = '/home/syam.varnatt/THESIS/images/train/masks'
val_image_dir = '/home/syam.varnatt/THESIS/images/val/images'
val_mask_dir = '/home/syam.varnatt/THESIS/images/val/masks'
SAVE_DIRECTORY = '/home/syam.varnatt/THESIS/comparison-model/unet/no-aug/results'

def load_dataset(image_dir, mask_dir, img_size):
    images = []
    masks = []
    
    for image_name in tqdm(os.listdir(image_dir)):
        if image_name.lower().endswith(('.jpeg', '.jpg')):
            base_name = os.path.splitext(image_name)[0]
            mask_name = f"{base_name}_mask.jpg"
            
            img_path = os.path.join(image_dir, image_name)
            mask_path = os.path.join(mask_dir, mask_name)
            
            if os.path.exists(img_path) and os.path.exists(mask_path):
                try:
                    image = load_img(img_path, target_size=img_size)
                    mask = load_img(mask_path, target_size=img_size, color_mode="grayscale")
                    
                    image = img_to_array(image)
                    mask = img_to_array(mask)
                    
                    images.append(image)
                    masks.append(mask)
                except Exception as e:
                    print(f"Error processing {image_name}: {e}")
            else:
                print(f"Mask not found for image: {image_name}")
    
    images = np.array(images) / 255.0
    masks = np.array(masks) / 255.0
    masks = (masks > 0.5).astype(np.uint8)
    
    return images, masks

# Load datasets
train_images, train_masks = load_dataset(train_image_dir, train_mask_dir, (IMG_HEIGHT, IMG_WIDTH))
print("Training dataset loaded completely")
val_images, val_masks = load_dataset(val_image_dir, val_mask_dir, (IMG_HEIGHT, IMG_WIDTH))
print("Validation dataset loaded completely")

In [None]:
def unet(input_size=(IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS)):
    inputs = Input(input_size)

    # Contraction path
    c1 = Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
    c1 = Conv2D(64, (3, 3), activation='relu', padding='same')(c1)
    p1 = MaxPooling2D((2, 2))(c1)

    c2 = Conv2D(128, (3, 3), activation='relu', padding='same')(p1)
    c2 = Conv2D(128, (3, 3), activation='relu', padding='same')(c2)
    p2 = MaxPooling2D((2, 2))(c2)

    c3 = Conv2D(256, (3, 3), activation='relu', padding='same')(p2)
    c3 = Conv2D(256, (3, 3), activation='relu', padding='same')(c3)
    p3 = MaxPooling2D((2, 2))(c3)

    c4 = Conv2D(512, (3, 3), activation='relu', padding='same')(p3)
    c4 = Conv2D(512, (3, 3), activation='relu', padding='same')(c4)
    p4 = MaxPooling2D((2, 2))(c4)

    # Bottleneck
    c5 = Conv2D(1024, (3, 3), activation='relu', padding='same')(p4)
    c5 = Conv2D(1024, (3, 3), activation='relu', padding='same')(c5)

    # Expansive path
    u6 = UpSampling2D((2, 2))(c5)
    u6 = concatenate([u6, c4])
    c6 = Conv2D(512, (3, 3), activation='relu', padding='same')(u6)
    c6 = Conv2D(512, (3, 3), activation='relu', padding='same')(c6)

    u7 = UpSampling2D((2, 2))(c6)
    u7 = concatenate([u7, c3])
    c7 = Conv2D(256, (3, 3), activation='relu', padding='same')(u7)
    c7 = Conv2D(256, (3, 3), activation='relu', padding='same')(c7)

    u8 = UpSampling2D((2, 2))(c7)
    u8 = concatenate([u8, c2])
    c8 = Conv2D(128, (3, 3), activation='relu', padding='same')(u8)
    c8 = Conv2D(128, (3, 3), activation='relu', padding='same')(c8)

    u9 = UpSampling2D((2, 2))(c8)
    u9 = concatenate([u9, c1], axis=3)
    c9 = Conv2D(64, (3, 3), activation='relu', padding='same')(u9)
    c9 = Conv2D(64, (3, 3), activation='relu', padding='same')(c9)

    outputs = Conv2D(1, (1, 1), activation='sigmoid')(c9)

    model = Model(inputs=[inputs], outputs=[outputs])
    model.compile(optimizer=Adam(), loss='binary_crossentropy', metrics=['accuracy'])

    return model

# Initiate the U-Net model
model = unet()
model.summary()

In [None]:
class VisualizationCallback(tf.keras.callbacks.Callback):
    def __init__(self, val_images, val_masks, save_dir, num_samples=3):
        super(VisualizationCallback, self).__init__()
        self.val_images = val_images
        self.val_masks = val_masks
        self.save_dir = save_dir
        self.num_samples = num_samples  # Number of samples to visualize per epoch
        os.makedirs(save_dir, exist_ok=True)  # Ensure the save directory exists

    def on_epoch_end(self, epoch, logs=None):
        # Randomly select a few samples to visualize predictions
        idxs = np.random.choice(len(self.val_images), self.num_samples, replace=False)
        images_to_show = self.val_images[idxs]
        masks_to_show = self.val_masks[idxs]
        
        # Use the model attribute directly, which is set automatically by Keras
        predictions = self.model.predict(images_to_show)

        # Plot and save the images, masks, and predictions
        plt.figure(figsize=(15, 10))
        for i in range(self.num_samples):
            plt.subplot(self.num_samples, 3, i * 3 + 1)
            plt.imshow(images_to_show[i])
            plt.title(f'Input Image {i+1}')
            plt.axis('off')

            plt.subplot(self.num_samples, 3, i * 3 + 2)
            plt.imshow(masks_to_show[i].squeeze(), cmap='gray')
            plt.title(f'True Mask {i+1}')
            plt.axis('off')

            plt.subplot(self.num_samples, 3, i * 3 + 3)
            plt.imshow(predictions[i].squeeze(), cmap='gray')
            plt.title(f'Predicted Mask {i+1}')
            plt.axis('off')

        plt.tight_layout()
        # Save the plot to file
        plt.savefig(os.path.join(self.save_dir, f'epoch_{epoch+1:02d}_predictions.png'))
        plt.close()
        print(f"Saved visualization for epoch {epoch+1}.")

# Instantiate the visualization callback
visualization_callback = VisualizationCallback(val_images, val_masks, SAVE_DIRECTORY)


In [None]:
class TrainingTimeCallback(tf.keras.callbacks.Callback):
    def on_train_begin(self, logs=None):
        self.start_time = time.time()

    def on_train_end(self, logs=None):
        end_time = time.time()
        training_time = end_time - self.start_time
        print(f"Training Time: {training_time / 60:.2f} minutes")

# Create an instance of the callback
training_time_callback = TrainingTimeCallback()

# Define callbacks
checkpoint = ModelCheckpoint('unet_model.keras', monitor='val_loss', verbose=1, save_best_only=True, mode='min')
early_stopping = EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='min')

# Train the model
history = model.fit(
    train_images, train_masks,
    validation_data=(val_images, val_masks),
    batch_size=8,
    epochs=100,
    callbacks=[checkpoint, early_stopping, training_time_callback, visualization_callback]
)

# Save the training history to an Excel file
history_df = pd.DataFrame(history.history)
history_df.to_excel(os.path.join('training_history.xlsx'), index=False)

In [None]:
# Load the best model saved so far
from tensorflow.keras.models import load_model

model = load_model('/home/syam.varnatt/THESIS/comparison-model/unet/no-aug/unet_model.keras')

# Continue training from the previous state
# Adjust the number of epochs based on how many more epochs are desired
new_epochs = 50  # Set the additional number of epochs

# Define the same callbacks
checkpoint = ModelCheckpoint('unet_model.keras', monitor='val_loss', verbose=1, save_best_only=True, mode='min')
early_stopping = EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='min')

# Continue training
history = model.fit(
    train_images, train_masks,
    validation_data=(val_images, val_masks),
    batch_size=8,
    epochs=100 + new_epochs,  # Total number of epochs after resuming
    initial_epoch=100,  # Start from the last epoch trained (100 in your case)
    callbacks=[checkpoint, early_stopping, training_time_callback, visualization_callback]
)

# Optionally, save the updated training history to a new Excel file
history_df = pd.DataFrame(history.history)
history_df.to_excel(os.path.join('continued_training_history.xlsx'), index=False)


In [None]:

# Predict on validation set
val_preds = model.predict(val_images)

# Convert predictions to binary
val_preds_bin = (val_preds > 0.3).astype(int)

# Calculate additional metrics
precision = precision_score(val_masks.flatten(), val_preds_bin.flatten())
f1 = f1_score(val_masks.flatten(), val_preds_bin.flatten())

# Calculate Mean IoU
mean_iou_metric = tf.keras.metrics.MeanIoU(num_classes=2) 
mean_iou_metric.update_state(val_masks, val_preds_bin)
mean_iou = mean_iou_metric.result().numpy()

# Save metrics to Excel
metrics_df = pd.DataFrame({
    'Precision': [precision],
    'F1 Score': [f1],
    'Mean IoU': [mean_iou]
})
print(f"Precision: ", precision, "F1 score: ", f1, "Mean Iou : ", mean_iou)

# Plot training history
history_df = pd.read_excel('training_history.xlsx')

plt.figure()
plt.plot(history_df['loss'], label='Training Loss')
plt.plot(history_df['val_loss'], label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.savefig('loss_plot.png')
plt.show()

plt.figure()
plt.plot(history_df['accuracy'], label='Training Accuracy')
plt.plot(history_df['val_accuracy'], label='Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.savefig('accuracy_plot.png')
plt.show()

# Save confusion matrix as heatmap
cm = confusion_matrix(val_masks.flatten(), val_preds_bin.flatten())
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.savefig('confusion_matrix.png')
plt.show()

# Visualize some predictions on validation set
for i in range(3):
    fig, axs = plt.subplots(1, 3, figsize=(15, 5))
    axs[0].imshow(val_images[i])
    axs[0].set_title('Input Image')
    axs[1].imshow(val_masks[i].squeeze(), cmap='gray')
    axs[1].set_title('Ground Truth')
    axs[2].imshow(val_preds_bin[i].squeeze(), cmap='gray')
    axs[2].set_title('Predicted Mask')
    plt.savefig(f'prediction_{i}.png')
    plt.show()

In [None]:
#calculate inference time

# Use a small batch from validation data
sample_images = val_images[:5]

# Measure inference time
start_time = time.time()
predictions = model.predict(sample_images)
end_time = time.time()

inference_time = (end_time - start_time) / len(sample_images)
print(f"Average Inference Time per Image: {inference_time:.4f} seconds")

In [None]:

import os
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing.image import load_img, img_to_array, array_to_img
from tensorflow.keras.models import load_model

# Define input size
input_size = (512, 512, 3)

# Function to preprocess images
def preprocess_image(image_path, target_size):
    # Load image
    image = load_img(image_path, target_size=target_size[:2])
    # Convert to numpy array
    image_array = img_to_array(image)
    # Normalize the image
    image_array = image_array / 255.0
    return image_array

# Function to save images
def save_image(image_array, save_path):
    # Convert array to image
    image = array_to_img(image_array)
    # Save the image
    image.save(save_path)

# Function to display images
def display_images(images, titles=None, cmap='gray'):
    plt.figure(figsize=(20, 10))
    for i, image in enumerate(images):
        plt.subplot(3, 4, i + 1)  # Adjust depending on the number of images
        plt.imshow(image, cmap=cmap)
        if titles:
            plt.title(titles[i])
        plt.axis('off')
    plt.show()

# Load and preprocess images
image_folder = '/home/syam.varnatt/THESIS/Plot_Images(x10,x20)'
image_paths = [os.path.join(image_folder, fname) for fname in os.listdir(image_folder) if fname.endswith('.png')]

# Sort the image paths by filenames
image_paths = sorted(image_paths)

# Select a subset of images
image_paths = image_paths[0:]

# Predict segmentation masks for each image
# Extract image names from paths
image_names = [os.path.basename(path) for path in image_paths]

# Define the folder path to save images and masks
save_folder = '/home/syam.varnatt/THESIS/comparison-model/unet/no-aug/predict_height'
os.makedirs(save_folder, exist_ok=True)

# Predict segmentation masks and save images and masks
predicted_masks = []
for i, image_path in enumerate(image_paths):
    # Preprocess image
    image = preprocess_image(image_path, input_size)
    image_batch = np.expand_dims(image, axis=0)
    # Predict mask
    predicted_mask = model.predict(image_batch)[0]
    predicted_mask = (predicted_mask > 0.5).astype(np.uint8)
    
    # Save the original image
    original_image = img_to_array(load_img(image_path, target_size=input_size[:2])) / 255.0
    save_image(original_image, os.path.join(save_folder, f'original_{image_names[i]}'))
    
    # Save the predicted mask
    save_image(predicted_mask, os.path.join(save_folder, f'mask_{image_names[i]}'))
    
    predicted_masks.append(predicted_mask)


In [None]:
#calculate inference time for predicting heights

# Use a small batch from validation data
sample_images = image_batch[:5]

# Measure inference time
start_time = time.time()
predictions = model.predict(sample_images)
end_time = time.time()

inference_time = (end_time - start_time) / len(sample_images)
print(f"Average Inference Time per flight height predicted Image: {inference_time:.4f} seconds")