In [1]:
import numpy as np
import torch
import torch.nn as nn
from torchvision import datasets
from torchvision import transforms
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torch.utils.data.sampler import SubsetRandomSampler
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
class DataLoader:

    def __init__(self, dataset, batch_size=32, val_split=0.2):

        self.dataset = dataset

        self.batch_size = batch_size


    def train_dataloader(self):

        train_size =len(self.dataset)

        train_dataset, _ = torch.utils.data.random_split(self.dataset, [train_size, len(self.dataset) - train_size])

        return DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True)


transform = transforms.Compose([

    transforms.Resize((224, 224)),

    transforms.ToTensor(),

    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),

])

image_dir = '/home/taiel/dl2425_challenge_dataset/dl2425_challenge_dataset/train'
test_dir = '/home/taiel/dl2425_challenge_dataset/dl2425_challenge_dataset/val'
dataset = datasets.ImageFolder(root=image_dir, transform=transform)
testdataset = datasets.ImageFolder(root=test_dir, transform=transform)
Data_Loader = DataLoader(dataset)
testloader = DataLoader(testdataset)


In [3]:
class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_channels, out_channels, i_downsample=None, stride=1):
        super(Bottleneck, self).__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
        self.batch_norm1 = nn.BatchNorm2d(out_channels)

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.batch_norm2 = nn.BatchNorm2d(out_channels)

        self.conv3 = nn.Conv2d(out_channels, out_channels * self.expansion, kernel_size=1, stride=1, padding=0)
        self.batch_norm3 = nn.BatchNorm2d(out_channels * self.expansion)

        self.i_downsample = i_downsample
        self.stride = stride
        self.relu = nn.ReLU()

    def forward(self, x):
        identity = x.clone()
        x = self.relu(self.batch_norm1(self.conv1(x)))

        x = self.relu(self.batch_norm2(self.conv2(x)))

        x = self.conv3(x)
        x = self.batch_norm3(x)

        # downsample if needed
        if self.i_downsample is not None:
            identity = self.i_downsample(identity)
        # add identity
        x += identity
        x = self.relu(x)

        return x


class Block(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, i_downsample=None, stride=1):
        super(Block, self).__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, stride=stride, bias=False)
        self.batch_norm1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, stride=stride, bias=False)
        self.batch_norm2 = nn.BatchNorm2d(out_channels)

        self.i_downsample = i_downsample
        self.stride = stride
        self.relu = nn.ReLU()

    def forward(self, x):
        identity = x.clone()

        x = self.relu(self.batch_norm2(self.conv1(x)))
        x = self.batch_norm2(self.conv2(x))

        if self.i_downsample is not None:
            identity = self.i_downsample(identity)
        print(x.shape)
        print(identity.shape)
        x += identity
        x = self.relu(x)
        return x


class ResNet(nn.Module):
    def __init__(self, ResBlock, layer_list, num_classes, num_channels=3):
        super(ResNet, self).__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(num_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.batch_norm1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(ResBlock, layer_list[0], planes=64)
        self.layer2 = self._make_layer(ResBlock, layer_list[1], planes=128, stride=2)
        self.layer3 = self._make_layer(ResBlock, layer_list[2], planes=256, stride=2)
        self.layer4 = self._make_layer(ResBlock, layer_list[3], planes=512, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * ResBlock.expansion, num_classes)

    def forward(self, x):
        x = self.relu(self.batch_norm1(self.conv1(x)))
        x = self.max_pool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)

        return x

    def _make_layer(self, ResBlock, blocks, planes, stride=1):
        ii_downsample = None
        layers = []

        if stride != 1 or self.in_channels != planes * ResBlock.expansion:
            ii_downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, planes * ResBlock.expansion, kernel_size=1, stride=stride),
                nn.BatchNorm2d(planes * ResBlock.expansion)
            )

        layers.append(ResBlock(self.in_channels, planes, i_downsample=ii_downsample, stride=stride))
        self.in_channels = planes * ResBlock.expansion

        for i in range(blocks - 1):
            layers.append(ResBlock(self.in_channels, planes))

        return nn.Sequential(*layers)


def ResNet50(num_classes, channels=3):
    return ResNet(Bottleneck, [3, 4, 6, 3], num_classes, channels)


def ResNet101(num_classes, channels=3):
    return ResNet(Bottleneck, [3, 4, 23, 3], num_classes, channels)


def ResNet152(num_classes, channels=3):
    return ResNet(Bottleneck, [3, 8, 36, 3], num_classes, channels)

def ResNet304(num_classes, channels=3):
    return ResNet(Bottleneck, [3, 18, 48, 3], num_classes, channels)

In [4]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.fc1 = nn.Linear(6, 96)  # First fully connected layer
        self.fc2 = nn.Linear(96, 48)  # Second fully connected layer
        self.fc3 = nn.Linear(48, 16)  # Third fully connected layer
        self.fc4 = nn.Linear(16, 1)  # Output layer

    def forward(self, x):
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = torch.sigmoid(self.fc4(x))
        return x

In [5]:
class CNNModel(nn.Module):
    def __init__(self, num_classes=2):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, padding=1)

        self.fc1 = nn.Linear(256 * 28 * 28, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 128)
        self.fc4 = nn.Linear(128, 64)
        self.fc5 = nn.Linear(64, 32)
        self.fc6 = nn.Linear(32, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = x.view(-1, 256 * 28 * 28)  # Flatten the tensor
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        x = self.dropout(x)
        x = self.fc4(x)
        x = self.dropout(x)
        x = F.relu(self.fc5(x))
        x = self.dropout(x)
        x = self.fc6(x)
        return x




net, rn50, and rn304 are each trained separately to classify the images. Then the fmodel (cnn) is trained to take as input the output of the three resnet models and produce as output a number between 0 and 1 (no fire and fire).

In [None]:
net = ResNet152(2).to(device)
#net.load_state_dict(torch.load("resnet_152_checkpoint_69.pth",map_location=device))

rn50 = ResNet50(2).to(device)
#rn50.load_state_dict(torch.load('resnet_50_checkpoint_119.pth', map_location=device))

rn304 = ResNet304(2).to(device)
#rn304.load_state_dict(torch.load('resnet_304_checkpoint_68.pth', map_location=device))
fmodel = CNN().to(device)
model = CNNModel(num_classes=2).to(device)


criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001)

In [None]:
def train(EPOCHS):
    for epoch in range(EPOCHS):

        losses = []

        running_loss = 0

        for i, inp in enumerate(Data_Loader.train_dataloader()):
            inputs, labels = inp
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            losses.append(loss.item())
            loss.backward()
            optimizer.step()

            running_loss += loss.item()



            if i%100 == 0 and i > 0:

                print(f'Loss [{epoch+1}, {i}](epoch, minibatch): ', running_loss / 100)

                running_loss = 0.0
    


        avg_loss = sum(losses)/len(losses)


        if epoch % 10 == 0:
            print(vcheck(net))
            torch.save(net.state_dict(),f"resnet_50_checkpoint_{epoch}.pth")



print('Training Done')

In [None]:
def train_fmodel(EPOCHS):
    for epoch in range(EPOCHS):
        criterion = nn.BCELoss()  # Binary Cross Entropy Loss (final layer of the model is sigmoid)
        optimizer = optim.AdamW(model.parameters(), lr=0.01)
        losses = []

        running_loss = 0

        for i, inp in enumerate(Data_Loader.train_dataloader()):

            inputs, labels = inp

            inputs, labels = inputs.to(device), labels.to(device)
            images, labels = inputs, labels
            outputsa = net(images)
            outputsb = rn50(images)
            outputsc = rn304(images)
            a,b,c = outputsa,outputsb, outputsc
            input_tensor = torch.stack((a, b, c), dim=2)
            optimizer.zero_grad()
            outputs = model(input_tensor)
            loss = criterion(outputs,torch.tensor(labels, dtype=torch.float32).view(32,1))
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

            if i%100 == 0 and i > 0:

                print(f'Loss [{epoch+1}, {i}](epoch, minibatch): ', running_loss / 100)

                running_loss = 0.0
            del inputs, labels, images, outputsa, outputsb, outputsc, a, b, c, input_tensor, outputs, loss
            torch.save(model.state_dict(),f"model_2_{epoch}.pth")


    print('Training Done')