## **Dependencies**

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision.utils import save_image

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import os
import glob

from torch.utils import data as D
from torch.utils.data.sampler import SubsetRandomSampler
import random
import torchsummary
from torchsummary import summary

print(torch.__version__)
device = torch.device('cuda')
print(device)

## **Hyper parameters**

In [None]:
batch_size = 60
random_seed = 10
initial_lr = 0.1
num_epoch = 250
num_images = 12620
split = round(num_images/5)

## **Dataset split & Class speration**

In [None]:
transform_train = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
transform_validation = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
transform_test = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

trainset = torchvision.datasets.ImageFolder(
root='./Dataset/Dataset_train/', transform=transform_train)
validset = torchvision.datasets.ImageFolder(
root='./Dataset/Dataset_train/', transform=transform_validation)
testset = torchvision.datasets.ImageFolder(
root='./Dataset/Dataset_test/', transform=transform_test)

num_train = len(trainset)
indices = list(range(num_train))
np.random.seed(random_seed)
np.random.shuffle(indices)

#split = 2524
#train_idx_1 = indices[:1346]
#train_idx_2 = indices[2693:]
#train_idx = train_idx_1 + train_idx_2

train_idx = indices[split+1:]
valid_idx = indices[:split]

num_test = len(testset)
indices_test = list(range(num_test))
test_idx = indices[:]

train_sampler = SubsetRandomSampler(train_idx)
valid_sampler = SubsetRandomSampler(valid_idx)

train_loader = torch.utils.data.DataLoader(
    trainset, batch_size=batch_size, sampler=train_sampler, num_workers=4
)

valid_loader = torch.utils.data.DataLoader(
    validset, batch_size=batch_size, sampler=valid_sampler, num_workers=4
)

test_loader = torch.utils.data.DataLoader(
    testset, batch_size=28, shuffle=False, num_workers=4
)

classes = ('wildfire','nonfire')

## **DenseNet architecture**

In [None]:
class bn_relu_conv(nn.Module):
    def __init__(self, nin, nout, kernel_size, stride, padding, bias=False):
        super(bn_relu_conv, self).__init__()
        self.batch_norm = nn.BatchNorm2d(nin)
        self.relu = nn.ReLU(True)
        self.conv = nn.Conv2d(nin, nout, kernel_size=kernel_size, stride=stride, padding=padding, bias=bias)

    def forward(self, x):
        out = self.batch_norm(x)
        out = self.relu(out)
        out = self.conv(out)

        return out

class bottleneck_layer(nn.Sequential):
    def __init__(self, nin, growth_rate, drop_rate=0.2):    
        super(bottleneck_layer, self).__init__()
      
        self.add_module('conv_1x1', bn_relu_conv(nin=nin, nout=growth_rate*4, kernel_size=1, stride=1, padding=0, bias=False))
        self.add_module('conv_3x3', bn_relu_conv(nin=growth_rate*4, nout=growth_rate, kernel_size=3, stride=1, padding=1, bias=False))
      
        self.drop_rate = drop_rate
      
    def forward(self, x):
        bottleneck_output = super(bottleneck_layer, self).forward(x)
        if self.drop_rate > 0:
            bottleneck_output = F.dropout(bottleneck_output, p=self.drop_rate, training=self.training)
          
        bottleneck_output = torch.cat((x, bottleneck_output), 1)
      
        return bottleneck_output

class Transition_layer(nn.Sequential):
    def __init__(self, nin, theta=0.5):    
        super(Transition_layer, self).__init__()
      
        self.add_module('conv_1x1', bn_relu_conv(nin=nin, nout=int(nin*theta), kernel_size=1, stride=1, padding=0, bias=False))
        self.add_module('avg_pool_2x2', nn.AvgPool2d(kernel_size=2, stride=2, padding=0))

class DenseBlock(nn.Sequential):
    def __init__(self, nin, num_bottleneck_layers, growth_rate, drop_rate=0.2):
        super(DenseBlock, self).__init__()
                        
        for i in range(num_bottleneck_layers):
            nin_bottleneck_layer = nin + growth_rate * i
            self.add_module('bottleneck_layer_%d' % i, bottleneck_layer(nin=nin_bottleneck_layer, growth_rate=growth_rate, drop_rate=drop_rate))

class DenseNet(nn.Module):
    def __init__(self, growth_rate=12, num_layers=10, theta=0.5, drop_rate=0.2, num_classes=10):
        super(DenseNet, self).__init__()

        assert (num_layers - 4) % 6 == 0

        num_bottleneck_layers = (num_layers - 4) // 6

        self.dense_init = nn.Conv2d(3, growth_rate*2, kernel_size=7, stride=2, padding=3, bias=True)
        self.dense_init_2 = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.dense_block_1 = DenseBlock(nin=growth_rate*2, num_bottleneck_layers=num_bottleneck_layers, growth_rate=growth_rate, drop_rate=drop_rate)

        nin_transition_layer_1 = (growth_rate*2) + (growth_rate * num_bottleneck_layers) 
        self.transition_layer_1 = Transition_layer(nin=nin_transition_layer_1, theta=theta)

        self.dense_block_2 = DenseBlock(nin=int(nin_transition_layer_1*theta), num_bottleneck_layers=num_bottleneck_layers, growth_rate=growth_rate, drop_rate=drop_rate)

        nin_transition_layer_2 = int(nin_transition_layer_1*theta) + (growth_rate * num_bottleneck_layers) 
        self.transition_layer_2 = Transition_layer(nin=nin_transition_layer_2, theta=theta)

        self.dense_block_3 = DenseBlock(nin=int(nin_transition_layer_2*theta), num_bottleneck_layers=num_bottleneck_layers, growth_rate=growth_rate, drop_rate=drop_rate)

        nin_fc_layer = int(nin_transition_layer_2*theta) + (growth_rate * num_bottleneck_layers) 

        self.fc_layer = nn.Linear(nin_fc_layer, num_classes)

    def forward(self, x):
        dense_init_output = self.dense_init(x)
        dense_init_output_2 = self.dense_init_2(dense_init_output)

        dense_block_1_output = self.dense_block_1(dense_init_output_2)
        transition_layer_1_output = self.transition_layer_1(dense_block_1_output)

        dense_block_2_output = self.dense_block_2(transition_layer_1_output)
        transition_layer_2_output = self.transition_layer_2(dense_block_2_output)

        dense_block_3_output = self.dense_block_3(transition_layer_2_output)

        global_avg_pool_output = F.adaptive_avg_pool2d(dense_block_3_output, (1, 1))                
        global_avg_pool_output_flat = global_avg_pool_output.view(global_avg_pool_output.size(0), -1)

        output = self.fc_layer(global_avg_pool_output_flat)

        return output
    
def Optimized_DenseNet():
    return DenseNet(growth_rate=24, num_layers=100, theta=0.5, drop_rate=0.2, num_classes=2)

net = Optimized_DenseNet()
densenet = net.to(device)
densenet

## **Model summary**

In [None]:
torchsummary.summary(net, (224,224,3))

## **Training & Save the trained model**

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=initial_lr, momentum=0.9)
lr_scheduler = optim.lr_scheduler.MultiStepLR(optimizer=optimizer, milestones=[int(num_epoch * 0.5), int(num_epoch * 0.75)], gamma=0.1, last_epoch=-1)

for epoch in range(num_epoch):  
    lr_scheduler.step()
    
    running_loss = 0.0
    for i, data in enumerate(train_loader, 0):
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()

        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        
        show_period = 180
        if i % show_period == show_period-1:    # print every "show_period" mini-batches
           
            running_loss = 0.0
        
        
    # validation part
    correct = 0
    total = 0
    for i, data in enumerate(valid_loader, 0):
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = net(inputs)
        
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    


print('Finished Training')


path = "./savemodel/model.pth"
torch.save(net.state_dict(),path)


## **Load model & test**

In [None]:
path = "./savemodel/model.pth"
net.load_state_dict(torch.load(path))
net.eval()

class_correct = list(0. for i in range(2))
class_total = list(0. for i in range(2))

correct = 0
total = 0

with torch.no_grad():
    for data in test_loader:
        
        images, labels = data
        images, labels = images.to(device), labels.to(device)
        outputs = net(images)
        
        _, predicted = torch.max(outputs,1)
        c = (predicted == labels).squeeze()
        print(torch.nn.functional.softmax(outputs, dim=1))
        print(predicted)     
        for i in range(labels.shape[0]):
            
            label = labels[i]
            class_correct[label] += c[i].item()
            class_total[label] += 1
            
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
  
print('Accuracy of the network on the 1031 test images: %5s %%' % (
    100 * correct / total))            
            
for i in range(2):
    print('Accuracy of %5s : %5s %%' % (
        classes[i], 100 * class_correct[i] / class_total[i]))
    print(class_correct[i])
    print(class_total[i])

## **Class activation map**

In [None]:
class CAM(nn.Module):
    def __init__(self, model_to_convert, get_fc_layer=lambda m: m.fc_layer,score_fn=F.softmax, resize=True):
        super().__init__()
        self.backbone = nn.Sequential(*list(model_to_convert.children())[:-1])
        self.fc = get_fc_layer(model_to_convert)
        self.conv  =  nn.Conv2d(self.fc.in_features, self.fc.out_features, kernel_size=1)
        self.conv.weight = nn.Parameter(self.fc.weight.data.unsqueeze(-1).unsqueeze(-1))
        self.conv.bias = self.fc.bias
        self.score_fn = score_fn
        self.resize = resize
        self.eval()
        
    def forward(self, x, out_size=None):
        batch_size, c, *size = x.size()
        feat = self.backbone(x)
        cmap = self.score_fn(self.conv(feat))
        if self.resize:
            if out_size is None:
                out_size = size
            cmap = F.upsample(cmap, size=out_size, mode='bicubic')
        pooled = F.adaptive_avg_pool2d(feat,output_size=1)
        flatten = pooled.view(batch_size, -1)
        cls_score = self.score_fn(self.fc(flatten))
        weighted_cmap =  (cmap*cls_score.unsqueeze(-1).unsqueeze(-1)).sum(dim=1)
        return cmap, cls_score, weighted_cmap
    
path = "./savemodel/model.pth"
net.load_state_dict(torch.load(path))
cam = CAM(net)
#assert not cam.training

if torch.cuda.is_available():
    print("use gpu")
    cam = cam.cuda()
    def to_var(x, requires_grad=False, volatile=False):
        return Variable(x.cuda(), requires_grad=requires_grad, volatile=volatile)
else:
    def to_var(x, requires_grad=False, volatile=False):
        return Variable(x, requires_grad=requirs_grad, volatile=volatile)


target_size = (224,224)

normalize = transforms.Normalize([0.5, 0.5, 0.5],
                                 [0.5, 0.5, 0.5])
transform = transforms.Compose([transforms.Scale(target_size),transforms.CenterCrop(target_size),
                                transforms.ToTensor()])
from torch.autograd import Variable

img_path = "./Dataset/Dataset_test/wildfire/00014.jpg"
img = Image.open(img_path)
img_v = to_var(transform(img).unsqueeze(0),volatile=True)

cmap, score, weighted_cmap = cam(img_v)
print(cmap.size())
print(score.size())
print(weighted_cmap.size())

import matplotlib.pyplot as plt
background = np.array(img.resize(target_size))
color_map = weighted_cmap.data.cpu().numpy()[0]
color_map = cmap.data.cpu().numpy()[0,1]
#print(color_map)
ax = plt.gca()
ax.axes.xaxis.set_visible(False)
ax.axes.yaxis.set_visible(False)
plt.imshow(background)
plt.imshow(color_map,cmap ='jet',alpha=0.5)
plt.show()