In [None]:
!pip install segmentation-models-pytorch

In [None]:
from google.colab import drive
drive.mount('/content/drive')
import os

# Load Data



In [None]:
# Load directory for image and mask folders
image_dir = '/content/drive/MyDrive/Imaging Science MS/Computer Vision/Final Project/Datasets/DESOBA_Dataset/ShadowImage'
shadow_mask_dir = '/content/drive/MyDrive/Imaging Science MS/Computer Vision/Final Project/Datasets/DESOBA_Dataset/ShadowMask'
instance_mask_dir = '/content/drive/MyDrive/Imaging Science MS/Computer Vision/Final Project/Datasets/DESOBA_Dataset/InstanceMask'

# Make a list of all the filenames in each folder
image_filenames = os.listdir(image_dir)
shadow_mask_filenames = os.listdir(shadow_mask_dir)
instance_mask_filenames = os.listdir(instance_mask_dir)

# Sort
image_filenames.sort()
shadow_mask_filenames.sort()
instance_mask_filenames.sort()

print(len(image_filenames))
print(len(shadow_mask_filenames))
print(len(instance_mask_filenames))

# Combine the directory with each filename
image_paths = [os.path.join(image_dir, filename) for filename in image_filenames]
shadow_mask_paths = [os.path.join(shadow_mask_dir, filename) for filename in shadow_mask_filenames]
instance_mask_paths = [os.path.join(instance_mask_dir, filename) for filename in instance_mask_filenames]

# Check if the lists are exactly the same
if image_filenames == shadow_mask_filenames == instance_mask_filenames:
    print("The lists are exactly the same")
else:
    print("The lists are different")

# Data Preparation

In [None]:
import segmentation_models_pytorch as smp
from segmentation_models_pytorch.encoders import get_preprocessing_fn
from torch.utils.data import Dataset, DataLoader, Subset
from PIL import Image
import numpy as np
from sklearn.model_selection import train_test_split
import torch
from torchvision import transforms
from torchvision.transforms import InterpolationMode
from torchvision.transforms import functional as F

# Convert images to tensors and normalize the mean and std to standard ImageNet stats
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize images to 224x224 (required for ResNet)
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


# Create custom dataset class
class ShadowDataset(Dataset):
    def __init__(self, image_paths, shadow_mask_paths, instance_mask_paths, transform):
      self.image_paths = image_paths
      self.shadow_mask_paths = shadow_mask_paths
      self.instance_mask_paths = instance_mask_paths
      self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
      # Read and binarize
      image = Image.open(self.image_paths[idx]) # read the image
      smask = (np.array(Image.open(self.shadow_mask_paths[idx]).convert('L')) > 0).astype(np.uint8) # read shadow mask and convert to greyscale then binary
      imask = (np.array(Image.open(self.instance_mask_paths[idx]).convert('L')) > 0).astype(np.uint8) # read instance mask and convert to greyscale then binary

      # Combine into single mask
      combined_mask = np.zeros_like(smask, dtype=np.uint8)
      combined_mask[smask == 1] = 1  # Shadow
      combined_mask[imask == 1] = 2  # Object

      # Apply the image transformation
      image = self.transform(image)

      # Convert mask to tensor and resize (no normalization)
      combined_mask = torch.from_numpy(combined_mask).unsqueeze(0).float()  # add channel, float for resizing
      combined_mask = F.resize(combined_mask, (224, 224), interpolation=InterpolationMode.NEAREST) # resize, nearest to avoid class mixing
      combined_mask = combined_mask.squeeze(0).long()  # Back to (H, W) with integer class labels

      return image, combined_mask


# Instantiate main dataset
dataset = ShadowDataset(image_paths, shadow_mask_paths, instance_mask_paths,transform=transform)

# Get index for each partition
train_indices, val_indices = train_test_split(list(range(len(dataset))), test_size=0.2, random_state=42)
val_indices, test_indices = train_test_split(val_indices, test_size=0.5, random_state=42)

# Partition into subsets
train_dataset = Subset(dataset, train_indices)
val_dataset = Subset(dataset, val_indices)
test_dataset = Subset(dataset, test_indices)


# Crate dataloaders for each division
batch_size = 32
num_workers = 2
prefetch_factor = 3
train_loader = DataLoader(train_dataset, batch_size=batch_size,shuffle=True,num_workers=num_workers,pin_memory=True,prefetch_factor=prefetch_factor)
val_loader = DataLoader(val_dataset, batch_size=batch_size,shuffle=False,num_workers=num_workers,pin_memory=True,prefetch_factor=prefetch_factor)
test_loader = DataLoader(test_dataset, batch_size=batch_size,shuffle=False,num_workers=num_workers,pin_memory=True,prefetch_factor=prefetch_factor)

# Train the Model

Perform transfer learning on a Unet model using a ResNet18 encoder.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import segmentation_models_pytorch as smp
from sklearn.metrics import accuracy_score
import numpy as np

# Define the U-Net model
model = smp.Unet(
    encoder_name="resnet18",        # use ResNet18 as the encoder
    encoder_weights="imagenet",     # use the pre-trained ImageNet weights
    in_channels=3,                  # RGB input
    classes=3,                      # 0 = background | 1 = shadow | 2 = object
)

# Define loss function
criterion = nn.CrossEntropyLoss() # for multi-class

# Define optimizer
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Learning rate scheduler
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)

# Move model to the GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
print("Using: ",device)

#-------------------------- Training loop -------------------------------------

# Initialize variables for early stopping
best_val_loss = float("inf")
best_epoch = -1

# keeping track of loss
val_loss_history = []
train_loss_history = []

# For early stopping
no_improvement = 0

epochs = 30
for epoch in range(epochs):
    model.train() # set model to training mode
    train_losses = [] # reset training losses

    for X_batch, y_batch in train_loader:
        # send input features and reference target to device
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)

        # zeroing out previous step gradients
        optimizer.zero_grad()

        # make predictions using the model
        outputs = model(X_batch)

        # calculate the loss
        loss = criterion(outputs, y_batch)

        # calculate the gradients by calling backward on the loss
        loss.backward()

        # take a step by calling step on the optimizer
        optimizer.step()

        # keep score of training loss
        train_losses.append(loss.item())


    train_loss_mean = np.mean(train_losses) # average train loss for current epoch
    train_loss_history.append(train_loss_mean) # add to loss per epoch list


    #------------------------ Validation Loop ----------------------------------
    model.eval() # set model to evaluation mode
    val_losses = [] # reset val losses
    with torch.no_grad(): # don't update gradients
        for X_batch, y_batch in val_loader:
            # Same as training but we are not updating weights
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = model(X_batch)

            # validation loss
            val_loss = criterion(outputs, y_batch)
            val_losses.append(val_loss.item())


    val_loss_mean = np.mean(val_losses) # average val loss for current epoch
    val_loss_history.append(val_loss_mean) # add to loss per epoch list

    # Implementing early stopping
    if val_loss_mean < best_val_loss:
        best_val_loss = val_loss_mean # update the new best val loss
        best_epoch = epoch # update the best epoch
        torch.save(model.state_dict(), "best_model.pt") # save the best model params
        no_improvement = 0 # reset time since improvement
    else:
      no_improvement = no_improvement + 1
      if no_improvement >= 15: # early stop if no improvement for 5 epochs
        print(f"\nEarly stopping triggered after {epoch+1} epochs")
        break


    # take a step in the scheduler to update the learning rate
    scheduler.step()

    # Print summary
    print(f"Epoch {epoch+1}/{epochs} | Train Loss: {np.mean(train_losses):.4f} | Val Loss: {val_loss_mean:.4f} | LR: {scheduler.get_last_lr()[0]:.6f}")

print(f"\nBest model saved from epoch {best_epoch+1} with Val Loss: {best_val_loss:.4f}")



# Plot Training and Validation Loss

In [None]:
import matplotlib.pyplot as plt

# Plot training and validation loss
plt.plot(train_loss_history, label='Training Loss')
plt.plot(val_loss_history, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Evaluate Model Performance on Test Set

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# Load the best model
model.load_state_dict(torch.load("/content/drive/My Drive/Imaging Science MS/Computer Vision/Final Project/Datasets/Model Data/best_model.pt"))
model.eval() # set model to evaluation mode

# Initialize predicted masks and target masks
all_preds = []
all_targets = []

with torch.no_grad(): # don't update weights
    for X_batch, y_batch in test_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device) # send to GPU
        outputs = model(X_batch) # use model
        preds = torch.argmax(outputs, dim=1) # convert to predicted class maps

        # Push to CPU and append the current minibatch
        all_preds.append(preds.cpu())
        all_targets.append(y_batch.cpu())

# Concatenate all batches and flatten all images into a single column
all_preds = torch.cat(all_preds).numpy().flatten()
all_targets = torch.cat(all_targets).numpy().flatten()

# Compute classification metrics
test_accuracy = accuracy_score(all_targets, all_preds)
test_precision = precision_score(all_targets, all_preds, average='macro')
test_recall = recall_score(all_targets, all_preds, average='macro')
test_f1 = f1_score(all_targets, all_preds, average='macro')

print(f"Test Accuracy: {test_accuracy:.4f}") # how many predictions were true
print(f"Test Precision: {test_precision:.4f}") # how many positive predictions were true - about being correct when predicting positive
print(f"Test Recall: {test_recall:.4f}") # how many actual positives were correctly labeled - about catching all the positives
print(f"Test F1 Score: {test_f1:.4f}") # precision and recall combined

# ------------------ Plot Confusion Matrix for Test Data -----------------------

# Calculate the confusion matrix
cm = confusion_matrix(all_targets, all_preds)
class_names = ["Background", "Shadow", "Object"]

# Plot the normalized confusion matrix
cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] # normalize so each row adds up to 1.0
cm_normalized = np.round(cm_normalized, 1)  # Round the normalized values to 1 decimal place
fig, ax = plt.subplots(figsize=(10, 8))
disp = ConfusionMatrixDisplay(confusion_matrix=cm_normalized,display_labels=class_names)
disp.plot(ax=ax,cmap="Blues")
plt.title("Confusion Matrix (Normalized)")
plt.show()



# Visualize Performance on a Single Test Image

In [None]:
import matplotlib.pyplot as plt
import torch
import numpy as np
import matplotlib

# Define the U-Net model
model = smp.Unet(
    encoder_name="resnet18",        # use ResNet18 as the encoder
    encoder_weights="imagenet",     # use the pre-trained ImageNet weights
    in_channels=3,                  # RGB input
    classes=3,                      # 0 = background | 1 = shadow | 2 = object
)

# Move model to the GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
print("Using: ",device)

model.load_state_dict(torch.load("/content/drive/My Drive/Imaging Science MS/Computer Vision/Final Project/Datasets/Model Data/best_model.pt"))

model.eval()
model.to(device)

# Get one batch from test loader
X_test_batch, y_test_batch = next(iter(test_loader))

# pick an image from the batch
i = 9

# Choose image and mask
image = X_test_batch[i].unsqueeze(0).to(device)
true_mask = y_test_batch[i].cpu().squeeze().numpy()

with torch.no_grad():
    pred_mask = model(image) # use model
    pred_mask_class = torch.argmax(pred_mask, axis=1).cpu().squeeze().numpy() # get class map


# Undo normalization before displaying the image
std=[0.229, 0.224, 0.225]
mean=[0.485, 0.456, 0.406]
image = X_test_batch[i].permute(1, 2, 0).cpu().clone()  # [C,H,W] to [H,W,C], clone to not modify the image in place
image[:,:,0] = (image[:,:,0] * std[0]) + mean[0]
image[:,:,1] = (image[:,:,1] * std[1]) + mean[1]
image[:,:,2] = (image[:,:,2] * std[2]) + mean[2]
image = torch.clamp(image * 255, 0, 255).byte().numpy()  # Scale to [0, 255] and convert to uint8


# Create a colormap for the 3 classes (background, shadow, object)
cmap = matplotlib.colormaps["tab20"]

# Create a figure with 3 subplots
fig, axs = plt.subplots(1, 3, figsize=(12, 4))

# Show input image
axs[0].imshow(image)
axs[0].set_title("Input Image")
axs[0].axis("off")

# Show ground truth mask with colormap
axs[1].imshow(true_mask, cmap=cmap, vmin=0, vmax=2)  # Show ground truth with color mapping for 3 classes
axs[1].set_title("Ground Truth Mask")
axs[1].axis("off")

# Show predicted mask with colormap
axs[2].imshow(pred_mask_class, cmap=cmap, vmin=0, vmax=2)  # Show predicted mask with color mapping for 3 classes
axs[2].set_title("Predicted Mask")
axs[2].axis("off")

plt.tight_layout()
plt.show()

# Histogram of image values
plt.hist(image.flatten(), bins=256)
plt.title("Histogram of Input Image Pixels")
plt.xlabel("Pixel Value")
plt.ylabel("Frequency")
plt.show()


# Predict Solar Zenith and Azimuth Angles

In [None]:
# PREDICT SOLAR ANGLE USING OBJECT AND IMAGE MASKS

# Function: cartesian to polar coords
def cart2pol(x,y):
    r = np.sqrt(x**2 + y**2)
    theta = np.degrees(np.arctan2(y,x))
    return r,theta

# Function: polar to cartesian coords
def pol2cart(r,theta):
    x = r * np.cos(theta)
    y = r * np.sin(theta)
    return x,y

# Split the predicted mask into object and shadow based on class
object_mask = pred_mask_class == 2
shadow_mask = pred_mask_class == 1
object_mask = object_mask.astype(np.uint8)
shadow_mask = shadow_mask.astype(np.uint8)

# Display the masks
fig, ax = plt.subplots(1,2,figsize=(10,5))
ax[0].imshow(object_mask, cmap='grey', vmin=0, vmax=2)
ax[0].set_title("Object Mask")
ax[0].axis("off")
ax[1].imshow(shadow_mask, cmap='grey', vmin=0, vmax=2)
ax[1].set_title("Shadow Mask")
ax[1].axis("off")
plt.show()

print(np.unique(object_mask))
print(np.unique(shadow_mask))

# get locations of mask pixels
rows, x = np.where(shadow_mask == 1)
y = shadow_mask.shape[0] - rows

# fit a line to the shadow points
slope, intercept = np.polyfit(x, y, 1) # linear fit
x_fit = np.linspace(0,x.max()) # range of x values
y_fit = slope * x_fit + intercept # apply slope and intercept to create the line

# bias the data by the intercept
y_bias = y - intercept # bias the data
y_fit_bias = y_fit - intercept # bias the fit line (set y intercept to 0)


# show fit line and shadow as scatterplot
fig, ax = plt.subplots(1,2,figsize=(10,5))

ax[0].scatter(x,y,s=0.01,label='shadow')
ax[0].plot(x_fit,y_fit, label='fit',color='r')
ax[0].legend()
ax[0].set_ylim([0,shadow_mask.shape[0]])
ax[0].set_xlim([0,shadow_mask.shape[1]])
ax[0].set_title('Shadow With Fit Line')

ax[1].scatter(x,y_bias,s=0.01,label='shadow')
ax[1].plot(x_fit,y_fit_bias, label='fit',color='r')
ax[1].legend()
ax[1].set_ylim([0,shadow_mask.shape[0]])
ax[1].set_xlim([0,shadow_mask.shape[1]])
ax[1].set_title('Shadow With Biased Fit Line')
plt.show()

# convert fit line to polar coords (degrees)
r,theta = cart2pol(x_fit,y_fit_bias)

# make angles relative to the vertical (azimuth angle)
theta_shadow = np.mean(90 - theta[1:-1]) # average of all points on the line excluding ends

# find the length of the shadow
rxmin = r[x_fit>x.min()].min() # radial length to first point in shadow (bias)
print("r at xmin: ",rxmin)
rxmax = r[x_fit>x.min()].max() # radial length to last point in shadow
print("r at xmax: ",rxmax)
r_shadow = rxmax - rxmin # radial length of shadow
print("r_shadow: ",r_shadow)


# find height of the object
object_height = np.sum(object_mask,axis=1) # add along the columns
object_height = object_height[object_height>0] # all nonzero values
object_height = object_height.shape[0] # number of vertical obect pixels
print("object height: ",object_height)

# find zenith angle (relative to normal)
a = object_height
c = r_shadow
B = theta_shadow

# law of cosines to find 3rd side
b2 = a**2 + c**2 - (2 * a * c * np.cos(np.radians(B)))
b = np.sqrt(b2)

# law of sines to find zenith angle
C = np.degrees(np.arcsin( (c * np.sin(np.radians(B))) / b ))

# if shadow is to the left of the object, flip the azimuth angle
rows_obj, x_obj = np.where(object_mask == 1)
x_shadow = x
print("Shadow average x position: ",np.mean(x_shadow))
print("Object average x position: ",np.mean(x_obj))

# Set quadrant conditions
if slope > 0 and np.mean(x_shadow) > np.mean(x_obj): # shadow is in quadrant I
  azimuth = theta_shadow
elif slope < 0 and np.mean(x_shadow) < np.mean(x_obj): # shadow is in quadrant II
  azimuth = theta_shadow + 90
elif slope > 0 and np.mean(x_shadow) < np.mean(x_obj): # shadow is in quadrant III
    azimuth = theta_shadow + 180
elif  slope < 0 and np.mean(x_shadow) > np.mean(x_obj): # shadow is in quadrant IV
  azimuth = theta_shadow - 90

# predicted solar zenith and azimuth angles
zenith = C

print("")
print("zenith angle: ",zenith)
print("azimuth angle: ",azimuth)

# Show input image
plt.imshow(image)
plt.title(f"Predicted Solar Azimuth Angle: {azimuth:.2f}°")
plt.axis("off")
plt.show()


# Create a figure with 3 subplots
fig, axs = plt.subplots(1, 3, figsize=(12, 4))

# Show input image
axs[0].imshow(image)
axs[0].set_title(f"Predicted Solar Azimuth Angle: {azimuth:.2f}°")
axs[0].axis("off")

# Show ground truth mask with colormap
axs[1].imshow(true_mask, cmap=cmap, vmin=0, vmax=2)  # Show ground truth with color mapping for 3 classes
axs[1].set_title("Ground Truth Mask")
axs[1].axis("off")

# Show predicted mask with colormap
axs[2].imshow(pred_mask_class, cmap=cmap, vmin=0, vmax=2)  # Show predicted mask with color mapping for 3 classes
axs[2].set_title("Predicted Mask")
axs[2].axis("off")


plt.tight_layout()
plt.show()

