In [None]:
%matplotlib inline

import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import seaborn as sns
import numpy as np
import torch 
import torchvision
from PIL import Image
import json
import datetime
from torchvision.transforms import transforms 
from torchvision.utils import make_grid
import torch.nn as nn
import time
import os
import copy
import torchvision.models as models
from torchvision.utils import save_image
import torch.nn.functional as F

from random import randint

from IPython.display import Image
from IPython.core.display import Image, display

  from IPython.core.display import Image, display


In [None]:
# local data_dir 
# data_dir = "/Users/Lisa/Desktop/Master Thesis/RoofNet"
data_dir = "/Users/Lisa/Desktop/Master Thesis/RoofNetXAI/"
model_dir = "/Users/Lisa/Desktop/Master Thesis/RoofNetXAI/roofnet/saved_models_2.0"
# train/val/test dir
data_file_path = data_dir + "preprocessed_data/train_64_noreroofs.npy"
test_data_file_path = data_dir + "preprocessed_data/test_64_noreroofs.npy"
val_data_file_path = data_dir + "preprocessed_data/val_64_noreroofs.npy"

In [None]:
import sys
sys.path.append("/Users/Lisa/Desktop/Master Thesis/RoofNetXAI/roofnet/utils")
from data import ImageDataset

In [None]:
# import Roofnet
# from Roofnet.utils.data import ImageDataset
from torchvision.transforms import transforms 


transform_chain = transforms.Compose([
                        transforms.ToPILImage(mode='RGB'),
                        transforms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5, hue=0),
                        transforms.ToTensor(),
                        transforms.Normalize([0.,0.,0,], [1.,1.,1.]),   
                        
                            ])
data = ImageDataset(data_file_path,
                    transform=transform_chain)
dataloader = torch.utils.data.DataLoader(data, batch_size=32, shuffle=True)
len(data), len(dataloader)

Loading data
Done loading data
Length 1050
Num Roofs 150


(1050, 33)

In [None]:
# Fixed input for debugging
fixed_x = next(iter(dataloader))
fixed_x = fixed_x[0]

In [None]:
# can be skipped
# Check to see if data is loaded 
save_image(fixed_x, 'real_image.png')

Image('real_image.png')

In [None]:
# Set default parameters
image_channels = fixed_x.size(1)
img_dim = fixed_x.size(-1)

In [None]:
from torch import device

def loss_fn(recon_x, x, mu, logvar,beta=1.0):
    x = x.to(device)
    BCE = F.binary_cross_entropy(recon_x, x, size_average=False)
    # see Appendix B from VAE paper:
    # Kingma and Welling. Auto-Encoding Variational Bayes. ICLR, 2014
    # 0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2)
    KLD = -0.5 * torch.mean(1 + logvar - mu.pow(2) - logvar.exp())
    KLD*=beta
    return BCE + KLD, BCE, KLD

In [8]:
sys.path.append("/Users/Lisa/Desktop/Master Thesis/RoofNetXAI/roofnet/models")
from vae import VAE

In [9]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = VAE(img_dim=img_dim,image_channels=image_channels,z_dim=128,device=device).to(device)

In [10]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) 

# Train VAE

In [11]:
sys.path.append("/Users/Lisa/Desktop/Master Thesis/RoofNetXAI/xai")
from gradcam_vae import GradCAMVAE 

In [12]:
# TRAIN VAE

filename = model_dir+'\\roofnet_VAE_test_gradcam.pth'
epochs = 50
bs=32
train = True

# the path for saving GradCAM maps
gradcam_maps_path = "/Users/Lisa/Desktop/Master Thesis/RoofNetXAI/gradcam_maps"


if train:

    start_time = time.time()  # record the start time

    epoch = 0
    while epoch < epochs:
        
        for idx in range(100):
            images = next(iter(dataloader))
            images = images[0]
            recon_images, z, mu, logvar, _ = model(images.to(device))
            loss, bce, kld = loss_fn(recon_images, images, mu, logvar, beta=5.0)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            to_print = "Epoch[{}/{}] Loss: {:.3f} {:.3f} {:.3f}".format(epoch+1, 
                                    epochs, loss.item()/bs, bce.item()/bs, kld.item()/bs)
        epoch += 1
        print(to_print)
        
        if epoch in  [2, 30, 50]:
            model.eval()
            gradcam_vae = GradCAMVAE(model)
            gradcam_maps = gradcam_vae.generate_gradcam(images, target_layer = "encoder")  # Adjust target_layer if needed

            epoch_folder = os.path.join(gradcam_maps_path, f"epoch_{epoch}")
            os.makedirs(epoch_folder, exist_ok=True)
  
            # Overlay GradCAM maps onto the original images and save them
            for i in range(images.size(0)):
                overlay_image = images[i].clone()
                overlay_image[gradcam_maps[i] > 0] = 1.0
                save_path = os.path.join(epoch_folder, f"gradcam_overlay_epoch{epoch}_iter{idx + 1}_image{i + 1}.png")
                save_image(overlay_image, save_path)




    torch.save(model.state_dict(),  filename)
    end_time = time.time()  # record the end time
    training_time = end_time - start_time  # calculate the training time in seconds
    print("Training time: {:.2f} seconds".format(training_time))



Epoch[1/50] Loss: 7748.506 7748.399 0.107
Epoch[2/50] Loss: 7324.646 7324.209 0.436


ValueError: pic should be 2/3 dimensional. Got 1 dimensions.

# Compare real image and reconstruction

Note: VAEs are known to generate blurry. The reason is that the latent code is trying to compress as much info as possible and only focus on the meaningful features.

In [None]:
#64x64
import roofnet


model = roofnet.models.vae.VAE(img_dim=64,image_channels=image_channels,z_dim=128,device=device).to(device)
model.load_state_dict(torch.load(model_dir+'\\roofnet_VAE_Beta1.pth'))
val_data_file_path = data_dir + "\\val_64_noreroofs.npy"

In [None]:
#255x255
model = roofnet.models.vae.VAE(img_dim=255,image_channels=image_channels,z_dim=128,device=device).to(device)
model.load_state_dict(torch.load(model_dir+'\\roofnet_hard.pth'))
val_data_file_path = data_dir + "\\test_easy.npy"

In [None]:
def compare(x):
    x=x.to(device)
    recon_x,_, _, _ = model(x)
    return torch.cat([x, recon_x])

In [None]:
fixed_x = next(iter(dataloader))[0][:1]
compare_x = compare(fixed_x)

save_image(compare_x.data.cpu(), 'sample_image.png')
display(Image('sample_image.png', width=700, unconfined=True))

# Use validation data to make predicitons

In [None]:
transform_chain = transforms.Compose([
                        transforms.ToTensor(),
                        transforms.Normalize([0.,0.,0,], [1.,1.,1.]),   
                        
                            ])
val_data = ImageDataset(val_data_file_path,
                    transform=transform_chain)

In [None]:
val_dataloader = torch.utils.data.DataLoader(val_data, batch_size=7, shuffle=False)
val_dataloader = iter(val_dataloader)

In [None]:
fixed_x = next(iter(val_dataloader))[0][:1]
compare_x = compare(fixed_x)

save_image(compare_x.data.cpu(), 'sample_image.png')
display(Image('sample_image.png', width=700, unconfined=True))

In [None]:
out = []
latents = []
meta = []
for i in val_dataloader:
    images = i[0]
    recon_images,z, _, _ = model(images.to(device))
    z = z.detach().cpu().numpy()
    latents.append(z)
    meta.append([i[2]['address'][0],int(i[2]['transition_year'][0].cpu().numpy())])
    d = int(np.argmax([np.linalg.norm(zi-zj) for zi,zj in zip(z[1:],z[:-1])]))+2013
    out.append([i[2]['address'][0],d,int(i[2]['transition_year'][0].cpu().numpy())])
out = np.array(out)
latents = np.array(latents)

In [None]:
hold = 0
l = 0
for i in out:
    if i[2] != 0:
        l += 1
        if i[1]==i[2]:
            hold += 1
print(hold/l)

Looking at just the argmax of distance between latents does not produce a great accuracy. There are clearly many images where the largest distance between latents is a terrible metric for reroof. The verticle bar represents the reroof date, if it is missing there is no reroof.

In [None]:
for j in range(len(latents)):
    diff = []
    for i in range(6):
        diff.append(np.linalg.norm(latents[j][i]-latents[j][i+1]))
    plt.plot(diff)
    plt.title(meta[j][0])
    if meta[j][1] != 0:
        plt.axvline(int(meta[j][1])-2013)
    plt.show()

# Build binary classifier on latents

In [None]:
from torch.autograd import Variable

## Helper functions:

In [None]:
# Inputs: latent image 1, latent image 2, classification model, and prob threshold
# Returns binary classification, 1 for reroof, 0 for none.
def classify_image_pair(latent_1,latent_2,model,prob_threshold=0.5):
    v_1 = np.hstack((latent_1,latent_2))
    v_2 = np.hstack((latent_2,latent_1))
    model.eval()
    p_1 = net(torch.tensor(v_1).to(device))
    p_2 = net(torch.tensor(v_2).to(device))
    prob = max([p_1.item(),p_2.item()])
    return prob >= prob_threshold

In [None]:
# Input: Latents for a building, meta for building, model, and threshold
# Output: [Address, predicted_transition, actual_transition]
# Returns transition year based on first transition detected
def gen_predictions(latents,meta,model,prob_threshold=0.5):
    out = []
    for i in range(len(latents)):
        hold = []
        hold.append(str(meta[i][0]))
        trans_year = 0
        for j in range(len(latents[i])-1):
            if classify_image_pair(latents[i][j],latents[i][j+1],model,prob_threshold=prob_threshold):
                trans_year = j + 2013
                break
        hold.append(trans_year)
        hold.append(int(meta[i][1]))
        out.append(hold)
    return np.array(out)

In [None]:
# Input: Latents for a building, meta for building, model, and threshold
# Output: [Address, predicted_transition, actual_transition]
# Returns transition year based on highest probability
def get_max_prob(latents,meta,model,prob_threshold=0.5):
    out = []
    for i in range(len(latents)):
        hold = []
        hold.append(str(meta[i][0]))
        trans_year = 0
        temp = []
        for j in range(len(latents[i])-1):
            latent_1 = latents[i][j]
            latent_2 = latents[i][j+1]
            v_1 = np.hstack((latent_1,latent_2))
            v_2 = np.hstack((latent_2,latent_1))
            model.eval()
            p_1 = net(torch.tensor(v_1).to(device))
            p_2 = net(torch.tensor(v_2).to(device))
            prob = max([p_1.item(),p_2.item()])
            temp.append(prob)
        index = np.argmax(temp)
        if temp[index]>=prob_threshold:
            trans_year = 2013 + index
        hold.append(trans_year)
        hold.append(int(meta[i][1]))
        out.append(hold)
    return np.array(out)

In [None]:
# Generates accuracy metrics for detecting reroof and predicting reroof date
def gen_metrics(acc_metric, latents, meta, model, threshold=0.5):
    no_reroof_pred = []
    reroof_pred = []
    hold = acc_metric(latents,meta,net,threshold)
    print("Overall accuracy: {:0.3f}".format(np.mean(hold[:,1]==hold[:,2])))
    for i in hold:
        if int(i[2])==0:
            no_reroof_pred.append(int(int(i[1]) == 0))
        else:
            reroof_pred.append(int(i[1]==i[2]))
    print("No reroof prediction accuracy: {:0.3f}".format(np.mean(no_reroof_pred)))
    print("Reroof prediction accuracy: {:0.3f}".format(np.mean(reroof_pred)))
    return hold

In [None]:
# Inputs: file path for data
# Outputs: Dataloader for training

def gen_dataloader(data_file_path):
    transform_chain = transforms.Compose([
                        transforms.ToTensor(),
                        transforms.Normalize([0.,0.,0,], [1.,1.,1.]),   
                        
                            ])
    data = ImageDataset(data_file_path,
                        transform=transform_chain)

    dataloader = torch.utils.data.DataLoader(data, batch_size=7, shuffle=False)
    dataloader = iter(dataloader)
    return dataloader

In [None]:
# Input: dataloader, latent generating model
# Outputs: Latents generate by model, meta data for buildings
def gen_latents(dataloader, model):
    latents = []
    meta = []
    for i in dataloader:
        images = i[0]
        _, z, _, _ = model(images.to(device))
        z = z.detach().cpu().numpy()
        latents.append(z)
        meta.append([i[2]['address'][0],int(i[2]['transition_year'][0].cpu().numpy())])
    latents = np.array(latents)
    return latents, meta

In [None]:
# Inputs: Latents of data, meta for building
# Output: All possible pairs of latent images and their label 1:reroof, 0:no reroof
def gen_binary_data(latents, meta):
    data_hold = []
    label_hold = []
    for i in range(len(latents)):
        for j in range(len(latents[i])):
            for k in range(len(latents[i])):
                data_hold.append(np.hstack((latents[i][j],latents[i][k])))
                year_j = 2012+j < meta[i][1]
                year_k = 2012+k < meta[i][1]
                label_hold.append(float(year_j != year_k))
    data_hold = np.array(data_hold)
    label_hold = np.array(label_hold)
    
    return data_hold, label_hold

In [None]:
def binary_train_epoch(model, opt, criterion, data, labels, data_loader, val_data, val_labels, val_data_loader, best_acc, logit=False):
    
    best_model_wts = copy.deepcopy(model.state_dict())
    
    model.train()
    loss_hold = []
    
    for i in iter(data_loader):
        batch_size = len(i)
        x_batch = data[i]
        y_batch = labels[i]
        
        x_batch = Variable(torch.from_numpy(x_batch))
        y_batch = torch.tensor(y_batch, dtype=torch.float, device=device)
        y_batch = y_batch.view(batch_size,-1)
        
        opt.zero_grad()
        y_hat = model(x_batch.to(model.device))
        loss = criterion(y_hat,y_batch)
        loss.backward()
        opt.step()
        
        loss_hold.append(loss.item()/batch_size)
    print("Epoch training loss:{:.3f}".format(np.mean(loss_hold)))
    
    #validate
    model.eval()
    pred_acc = []
    for i in iter(val_data_loader):
        batch_size = len(i)
        x_batch = val_data[i]
        y_batch = val_labels[i]
        
        x_batch = Variable(torch.from_numpy(x_batch))
        
        if logit:
            y_logits = model(x_batch.to(model.device))
            s = nn.Sigmoid()
            y_hat = s(y_logits)>0.5
        else:
            y_hat = model(x_batch.to(model.device))>0.5
        
        pred_acc.append(np.mean(y_hat.cpu().numpy() == y_batch.reshape(batch_size,-1)))
        
    pred_acc = np.mean(pred_acc)
    print("Epoch validation accuracy: {:.3f}%".format(pred_acc))
    
    if pred_acc >= best_acc:
        best_model_wts = copy.deepcopy(model.state_dict())
        best_acc = pred_acc
    
    net.load_state_dict(best_model_wts)
    
    return net, best_acc
        
        

## Load the data

In [None]:
model = roofnet.models.vae.VAE(img_dim=64,image_channels=image_channels,z_dim=128,device=device).to(device)
model.load_state_dict(torch.load(model_dir+'\\roofnet_VAE_Beta5.pth'))

data_file_path = data_dir + "\\train_64_noreroofs.npy"

binary_dataloader = gen_dataloader(data_file_path)

In [None]:
val_data_file_path = data_dir + "\\val_64_noreroofs.npy"

binary_val_dataloader = gen_dataloader(val_data_file_path)

In [None]:
test_data_file_path = data_dir + "\\test_64_noreroofs.npy"

binary_test_dataloader = gen_dataloader(test_data_file_path)

In [None]:
latents, meta = gen_latents(binary_dataloader, model)

In [None]:
val_latents, val_meta = gen_latents(binary_val_dataloader, model)

In [None]:
test_latents, test_meta = gen_latents(binary_test_dataloader, model)

In [None]:
data_hold, label_hold = gen_binary_data(latents, meta)

In [None]:
val_hold, val_label = gen_binary_data(val_latents, val_meta)

In [None]:
test_hold, test_label = gen_binary_data(test_latents, test_meta)

## Define the model

In [None]:
class Net(nn.Module):
    
    def __init__(self, zdim = 128):
        super().__init__()
        
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        
        self.network = nn.Sequential(
            nn.Linear(2*zdim, 2*zdim),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(2*zdim, zdim),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(zdim, 64),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(64, 1),
            nn.Sigmoid()
        ).to(self.device)
              
    
    def forward(self, x):
        y = self.network(x)
        return y
    
net = Net()
opt = torch.optim.Adam(net.parameters(), lr=0.001, betas=(0.9, 0.999))
criterion = nn.BCELoss()

In [None]:
index_loader = torch.utils.data.DataLoader(np.arange(len(data_hold)),batch_size=32,shuffle=True)
val_index_loader = torch.utils.data.DataLoader(np.arange(len(val_hold)),batch_size=32,shuffle=True)

## Train the model:

In [None]:
num_epochs = 10
best_acc = 0
for e in range(num_epochs):
    print('Running epoch {}/{}'.format(e,num_epochs))
    net, best_acc = binary_train_epoch(net, opt, criterion, data_hold, label_hold, index_loader, val_hold, val_label, val_index_loader, best_acc)

In [None]:
# Earliest transition method prediction:
hold = gen_metrics(gen_predictions, val_latents, val_meta, net)

In [None]:
net.load_state_dict(torch.load(model_dir+'\\roofnet_binary_class_hard_easyval.pth'))

In [None]:
torch.save(net.state_dict(), model_dir+'\\roofnet_binary_class_b5.pth')

In [None]:
hold = gen_metrics(get_max_prob, val_latents, val_meta, net)

In [None]:
hold = gen_metrics(get_max_prob, test_latents, test_meta, net)

In [None]:
out = {'site_address': hold[:,0], 'transition_true': hold[:,2], 'transition_predicted': hold[:,1]}

In [None]:
import pandas as pd

In [None]:
pd.DataFrame(out).to_csv('Argmax(Prob) Beta5.csv')

Hyperparameter tuning on prediction threshold, best to leave at 0.5:

In [None]:
#out = [Address, predicted_transition, actual_transition]
for i in range(21):
    pt = 0.4+0.01*i
    print(pt)
    no_reroof_pred = []
    reroof_pred = []
    hold = get_max_prob(val_latents,val_meta,net,prob_threshold=pt)
    print("Overall accuracy: {:0.3f}".format(np.mean(hold[:,1]==hold[:,2])))
    for i in hold:
        if int(i[2])==0:
            no_reroof_pred.append(int(int(i[1]) == 0))
        else:
            reroof_pred.append(int(i[1]==i[2]))
    print("No reroof prediction accuracy: {:0.3f}".format(np.mean(no_reroof_pred)))
    print("Reroof prediction accuracy: {:0.3f}".format(np.mean(reroof_pred)))
    print('-'*20)

In [None]:
#out = [Address, predicted_transition, actual_transition]
for i in range(11):
    pt = 0.3+0.01*i
    print(pt)
    no_reroof_pred = []
    reroof_pred = []
    hold = get_max_prob(latents,meta,net,prob_threshold=pt)
    print("Overall accuracy: {:0.3f}".format(np.mean(hold[:,1]==hold[:,2])))
    for i in hold:
        if int(i[2])==0:
            no_reroof_pred.append(int(int(i[1]) == 0))
        else:
            reroof_pred.append(int(i[1]==i[2]))
    print("No reroof prediction accuracy: {:0.3f}".format(np.mean(no_reroof_pred)))
    print("Reroof prediction accuracy: {:0.3f}".format(np.mean(reroof_pred)))
    print('-'*20)

In [None]:
#out = [Address, predicted_transition, actual_transition]
for i in range(21):
    pt = 0.1+0.01*i
    print(pt)
    no_reroof_pred = []
    reroof_pred = []
    hold = get_max_prob(latents,meta,net,prob_threshold=pt)
    print("Overall accuracy: {:0.3f}".format(np.mean(hold[:,1]==hold[:,2])))
    for i in hold:
        if int(i[2])==0:
            no_reroof_pred.append(int(int(i[1]) == 0))
        else:
            reroof_pred.append(int(i[1]==i[2]))
    print("No reroof prediction accuracy: {:0.3f}".format(np.mean(no_reroof_pred)))
    print("Reroof prediction accuracy: {:0.3f}".format(np.mean(reroof_pred)))
    print('-'*20)

In [None]:
#out = [Address, predicted_transition, actual_transition]
for i in range(21):
    pt = 0.4+0.01*i
    print(pt)
    no_reroof_pred = []
    reroof_pred = []
    hold = get_max_prob(latents,meta,net,prob_threshold=pt)
    print("Overall accuracy: {:0.3f}".format(np.mean(hold[:,1]==hold[:,2])))
    for i in hold:
        if int(i[2])==0:
            no_reroof_pred.append(int(int(i[1]) == 0))
        else:
            reroof_pred.append(int(i[1]==i[2]))
    print("No reroof prediction accuracy: {:0.3f}".format(np.mean(no_reroof_pred)))
    print("Reroof prediction accuracy: {:0.3f}".format(np.mean(reroof_pred)))
    print('-'*20)