In [None]:
import pandas as pd
import numpy as np
import os
import matplotlib as plt
from sklearn.metrics import confusion_matrix
import seaborn as sns
from tqdm import tqdm

from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
import torchvision.transforms as transforms

from load_dataset import WildfireDataset

In [None]:
VGG19 = [64,64,'M',128,128,'M',256,256,256,256,'M',512,512,512,
         512,'M',512,512,512,512,'M']
#Then flatten and 4096*4096*1000 Lineal Layers

class VGG_net(nn.Module):
    #in channels=3 because images should be RGB, we put 
    #1000 classes by default 
    def __init__(self, in_channels=3, num_classes=1000):
        super(VGG_net,self).__init__()
        self.in_channels = in_channels
        self.conv_layers = self.create_conv_layers(VGG19) #the function
        #created below, with the parameters from the list VGG16
        
        self.fcs = nn.Sequential(
            nn.Linear(512*7*7, 4096), # the 7 is obtained from dividing
            #224 (the fixed size of the image) by 2**5 where 5 is 
            #the number of max pooling layers applied and contained
            #in the VGG16 list, the 4096 is gotten from the paper
            nn.ReLU(),
            nn.Dropout(p=0.5), 
            nn.Linear(4096,4096),
            nn.ReLU(),
            nn.Dropout(p=0.5), 
            nn.Linear(4096,num_classes)
            
            ) 
        #fully connected layers 
        
    def forward(self,x):
        x = self.conv_layers(x)
        x = x.reshape(x.shape[0],-1) #to flatten it for the linear part 
        x = self.fcs(x) #the flattened part is sent to the fully connected layers 
        return x
    
    def create_conv_layers(self,architecture):
        layers = []
        in_channels = self.in_channels
        
        for x in architecture:
            if type(x) == int: #in the VGG16 list the integers
            #represent a convolutional layer
                out_channels = x
                
                layers += [nn.Conv2d(in_channels=in_channels, 
                                     out_channels=out_channels,
                                     kernel_size=(3,3),
                                     stride=(1,1),
                                     padding=(1,1)),
                                     nn.BatchNorm2d(x), #this was not originally in the paper, but
                                     #it is supposed to improve the performance
                                     nn.ReLU()]
                
                in_channels = x #the new input channels are the out´put channels from the previous layer
            elif x=='M': #if the element in the list is an M, we have a max pooling layer
                layers += [nn.MaxPool2d(kernel_size=(2,2), stride=(2,2))]
                
        return nn.Sequential(*layers) #unpacking all that was stored in the empty list

In [None]:
path_train = "train/"
path_validation = "valid/"
path_test = "test/"

folder1 = "nowildfire"
folder2 = "wildfire"

folders = [folder1, folder2]
path_t = [path_train,path_validation,path_test]

In [None]:
#Get file path+file_names to use in the dataloader

num_files = []
name_files = []

for path in path_t:
    file_name_class = []
    num_files_class = []
    for folder in folders:
        list_files = os.listdir(path+folder)
        list_files = [path+folder+"/" + x for x in list_files]
        num = len(list_files)
        file_name_class = file_name_class + list_files
        num_files_class.append(num)
    name_files.append(file_name_class)
    num_files.append(num_files_class)

label_train = [0]*num_files[0][0] + [1]*num_files[0][1]
label_val = [0]*num_files[1][0] + [1]*num_files[1][1]
label_test = [0]*num_files[2][0] + [1]*num_files[2][1]

temp_transforms = transforms.Compose([
    transforms.ToPILImage(), #we first do this since the transformations are usually applied to this format
    transforms.Resize((224,224)),
    transforms.ToTensor()
    ])

In [None]:
# Get the mean and std for normalization 

# Create a DataLoader for the training data
train_dataset = WildfireDataset(name_files[0], label_train, transform=temp_transforms)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=False)

# Calculate mean and std
mean = torch.zeros(3)
std = torch.zeros(3)
total_images = 0

print("Calculating mean and standard deviation...")
for images, _ in train_loader:
    # Reshape the images to (batch_size, num_channels, -1)
    images = images.view(images.size(0), images.size(1), -1)
    
    # Calculate mean and std per channel
    mean += images.mean(2).sum(0)
    std += images.std(2).sum(0)
    
    total_images += images.size(0)

# Final calculation
mean /= total_images
std /= total_images

print(f"Calculated Mean: {mean.tolist()}")
print(f"Calculated Standard Deviation: {std.tolist()}")

In [None]:
#Integrate normalization to transforms
my_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean.tolist(), std=std.tolist())
])

In [None]:
train_dataset = WildfireDataset(name_files[0],label_train,transform = my_transforms)
train_loader=DataLoader(train_dataset, batch_size=64, num_workers=1, shuffle=True)

val_dataset = WildfireDataset(name_files[1],label_val,transform = my_transforms)
val_loader=DataLoader(val_dataset, batch_size=64, num_workers=1, shuffle=True)

test_dataset = WildfireDataset(name_files[2],label_test,transform = my_transforms)
test_loader=DataLoader(test_dataset, batch_size=64, num_workers=1, shuffle=True)

print(len(train_loader))

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

In [None]:
network = VGG_net(in_channels=3,num_classes=2).to(device)

optimizer = torch.optim.Adam(network.parameters(),lr=0.01)
criterion  = nn.CrossEntropyLoss()
softmax = nn.Softmax(dim=1)

In [None]:
epochs = 15

min_valid_loss = np.inf

ep_loss_train = []
ep_acc_train = []

ep_loss_val = []
ep_acc_val = []

for k in range(epochs):
    print("Starting...")
    # TRAIN
    train_losses = []
    train_loss = 0.0
    correct_train = 0

    network.train()

    with tqdm(train_loader, unit="batch") as tepoch:
        for img, label in tepoch:
            tepoch.set_description(f"Epoch {k}")

            img = img.to(device)
            label = label.to(device)

            optimizer.zero_grad() # Clear the gradients
            
            target = network(img) # Forward Pass

            loss = criterion(target,label) # Find the Loss
            loss.backward() # Calculate gradients 

            optimizer.step() # Update Weights

            train_loss += loss.item() # Calculate Loss
            train_losses.append(loss.item()) #To keep track of the loss

            pred = target.argmax(1, keepdim=True) # get the index of the max log-probability, argmax already computes softmax
            correct_train += pred.eq(label.view_as(pred)).sum().item()

            #tepoch.set_postfix(loss=loss.item(), accuracy=100. * accuracy)

    # VALIDATION
    valid_losses = []
    valid_loss = 0.0 # Reset to 0 for each epoch
    correct_val = 0

    network.eval()

    with torch.no_grad():
        for batch in val_loader:
            img, label = batch
            img = img.to(device)
            label = label.to(device)

            target = network(img)

            loss = criterion(target,label)

            valid_loss += loss.item() * img.size(0)
            valid_losses.append(loss.item())

            pred = target.argmax(1, keepdim=True) # get the index of the max log-probability, argmax already computes softmax
            correct_val += pred.eq(label.view_as(pred)).sum().item()

    #Calculate the average epoch loss and accuracy
    ep_loss_train.append(train_loss / len(train_loader.dataset))
    ep_acc_train.append(100. * correct_train / len(train_loader.dataset))

    ep_loss_val.append(valid_loss / len(val_loader.dataset))
    ep_acc_val.append(100. * correct_val / len(val_loader.dataset))

    print(f'Epoch {k+1} \t\t Training Loss: {train_loss / len(train_loader.dataset)} \t\t Validation Loss: {valid_loss / len(val_loader.dataset)}')
    if min_valid_loss > valid_loss:
        print(f'Validation Loss Decreased({min_valid_loss:.6f}--->{valid_loss:.6f}) \t Saving The Model')
        min_valid_loss = valid_loss
        # Saving State Dict
        torch.save(network.state_dict(), 'saved_model.pth')

In [None]:
# Plotting Loss
plt.figure(figsize=(10, 5))
plt.title("Training and Validation Loss")
plt.plot(ep_loss_train, label="Training Loss")
plt.plot(ep_loss_val, label="Validation Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()

# Plotting Accuracy
plt.figure(figsize=(10, 5))
plt.title("Training and Validation Accuracy")
plt.plot(ep_acc_train, label="Training Accuracy")
plt.plot(ep_acc_val, label="Validation Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()
plt.show()

In [None]:
# TEST

network.load_state_dict(torch.load('saved_model.pth')) #Load the params of the best saved model

test_loss = 0.0
correct_test = 0
all_preds = []
all_labels = []

network.eval() # Set the model to evaluation mode

with torch.no_grad(): # Don't calculate gradients, only forward pass
    for img, label in test_loader:
        img = img.to(device)
        label = label.to(device)

        target = network(img)
        loss = criterion(target, label)
        test_loss += loss.item() * img.size(0)

        pred = target.argmax(1) # get the index of the max log-probability
        correct_test += pred.eq(label).sum().item()
        
        all_preds.extend(pred.cpu().numpy())
        all_labels.extend(label.cpu().numpy())

# Calculate final test accuracy and loss
test_loss = test_loss / len(test_loader.dataset)
test_accuracy = 100. * correct_test / len(test_loader.dataset)
print(f'Test Loss: {test_loss:.4f}')
print(f'Test Accuracy: {test_accuracy:.2f}%')

In [None]:
# Create the confusion matrix
cm = confusion_matrix(all_labels, all_preds)

# Plot the confusion matrix
class_names = ["No Wildfire", "Wildfire"]

plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()