<a href="https://colab.research.google.com/github/natal1e14/AI4GoodTeam2/blob/main/Densenet_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### This notebook is optionally accelerated with a GPU runtime.
### If you would like to use this acceleration, please select the menu option "Runtime" -> "Change runtime type", select "Hardware Accelerator" -> "GPU" and click "SAVE"

----------------------------------------------------------------------

# Densenet

*Author: Pytorch Team*

**Dense Convolutional Network (DenseNet), connects each layer to every other layer in a feed-forward fashion.**

_ | _
- | -
![alt](https://pytorch.org/assets/images/densenet1.png) | ![alt](https://pytorch.org/assets/images/densenet2.png)

In [None]:

import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import os
from PIL import Image
import torchvision.transforms as transforms

class SentimentDataset(Dataset):
    def __init__(self, root_dir, captions_file,
                 transform=transforms.Compose([]),
                 freq_threshold=5):

        self.root_dir = root_dir
        self.df = pd.read_csv(root_dir + "/" + captions_file)
        self.transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
      ])

        self.imgs = self.df["image_name"]
        self.captions = self.df["label"]
        self.bad = 0


    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):

        caption = self.captions[idx]
        # if caption.lower()=="positive":
        #   caption = 0
        # elif caption.lower()=="negative":
        #   caption = 1
        # else:
        #   caption = 2

        img_name = self.imgs[idx]
        try:
          img_location = os.path.join(self.root_dir + '/images', img_name + ".jpg")
          img = Image.open(img_location).convert("RGB")
        except:
          try: 
            img_location = os.path.join(self.root_dir + '/photo', img_name + ".jpg")
            img = Image.open(img_location).convert("RGB")
          except: 
            img_location = os.path.join(self.root_dir + '/photo', "694551465.jpg")
            img = Image.open(img_location).convert("RGB")
            self.bad += 1


        # apply the transfromation to the image
        if self.transform is not None:
            img = self.transform(img)

        #returns image at index idx + corresponding caption
        return img, caption


In [None]:
from torchvision.models import densenet121, DenseNet
import torch 
from typing import Tuple

class SentimentDenseNet(DenseNet):


  def __init__(self): 
               
        #        , growth_rate: int = 32,
        # block_config: Tuple[int, int, int, int] = (6, 12, 24, 16),
        # num_init_features: int = 64,
        # bn_size: int = 4,
        # drop_rate: float = 0,
        # num_classes: int = 1000,
        # memory_efficient: bool = False):
    
    #super().__init__()
    #super(densenet121, self).__init__(pretrained=True, progress=True)
    # super(DenseNet, self).__init__(growth_rate=32, block_config=(6, 12, 24, 16), 
    #                                num_init_features=64, 
    #                                pretrained=True, 
    #                                progress=True)


    keys = ['norm1', 'relu1', 'conv1', 'norm2', 're']

    super(DenseNet, self).__init__()
    self.model = densenet121(pretrained=True, progress=True)
    #Freeze Block 1 and Block 2
    block1 = self.model.features[4]
    for i in range(1,7):
      block1['denselayer' + str(i)].requires_grad= False

    block2 = self.model.features[6]
    for i in range(1,13):
      block2['denselayer' + str(i)].requires_grad= False
        # self.model.features[4].requires_grad = False
    # self.model.features[6].requires_grad = False

    # #Unfreeze Block 3 and Block 4

    block3 = self.model.features[8]
    for i in range(1, 25):
      block3['denselayer' + str(i)].requires_grad= True

    block4 = self.model.features[10]
    for i in range(1,17):
      block4['denselayer' + str(i)].requires_grad= True
    
    # self.model.features[8].requires_grad = True
    # self.model.features[10].requires_grad = True
   # print(self.model.features[4:11])

    self.globalAvg = torch.nn.AdaptiveAvgPool2d((1,1))
    self.dropout = torch.nn.Dropout(0.5)

   # (65536x1 and 64x512)
    self.dense = _DenseLayer(512,3,4,0)#torch.nn.Linear(in_features= 512, out_features=3)


    
  def forward(self, x):
    batch_size = x.shape[0]    #batch_size x (2d image size) x 3 colour channels
    #print(x.shape) # torch.Size([8, 3, 224, 224])
    out = self.model.features(x) 
    #print(out.shape) #torch.Size([8, 1024, 7, 7]) #Correct -- output of last Dense layer is 7x7
    out = self.globalAvg(out)
    #print(out.shape) #torch.Size([8, 1024, 1, 1])
    out = self.dropout(out)
    #print(out.shape) #torch.Size([8, 1024, 1, 1])
    out = out.view(batch_size, 512, -1) #torch.Size([8, 2, 512])
    #print(out.shape)
    out = self.dense(out.unsqueeze(-1)) #Where to do regularization???
    #print(out.shape)
    out = out.squeeze()
    out = out.view(batch_size, -1, 1)

    out = torch.nn.functional.softmax(out[:,0,:])  #8 x 2 x 3
    #print(out.shape) # should be 8x3... but out_features from dense is 512...
    return out 

   
    





In [None]:
from torch import nn
from torch import Tensor


class _DenseLayer(torch.nn.Module):
    def __init__(self, num_input_features, growth_rate, bn_size, drop_rate, memory_efficient=False):
        super(_DenseLayer, self).__init__()
        self.add_module('norm1', nn.BatchNorm2d(num_input_features)),
        self.add_module('relu1', nn.ReLU(inplace=True)),
        self.add_module('conv1', nn.Conv2d(num_input_features, bn_size *
                                           growth_rate, kernel_size=1, stride=1,
                                           bias=False)),
        self.add_module('norm2', nn.BatchNorm2d(bn_size * growth_rate)),
        self.add_module('relu2', nn.ReLU(inplace=True)),
        self.add_module('conv2', nn.Conv2d(bn_size * growth_rate, growth_rate,
                                           kernel_size=3, stride=1, padding=1,
                                           bias=False)),
        self.drop_rate = float(drop_rate)
        self.memory_efficient = memory_efficient

    def bn_function(self, inputs):
        # type: (List[Tensor]) -> Tensor
        concated_features = torch.cat(inputs, 1)
        out = self.norm1(concated_features)
        out = self.relu1(out)
        bottleneck_output = self.conv1(out)  # noqa: T484
        return bottleneck_output

    # todo: rewrite when torchscript supports any
    def any_requires_grad(self, input):
        # type: (List[Tensor]) -> bool
        for tensor in input:
            if tensor.requires_grad:
                return True
        return False

    @torch.jit.unused  # noqa: T484
    def call_checkpoint_bottleneck(self, input):
        # type: (List[Tensor]) -> Tensor
        def closure(*inputs):
            return self.bn_function(inputs)

        return cp.checkpoint(closure, *input)

    # @torch.jit._overload_method  # noqa: F811
    # def forward(self, input):
    #     # type: (List[Tensor]) -> (Tensor)
    #     pass

    # @torch.jit._overload_method  # noqa: F811
    # def forward(self, input):
    #     # type: (Tensor) -> (Tensor)
    #     pass

    # torchscript does not yet support *args, so we overload method
    # allowing it to take either a List[Tensor] or single Tensor
    def forward(self, input):  # noqa: F811
        if isinstance(input, Tensor):
            prev_features = [input]
        else:
            prev_features = input

        if self.memory_efficient and self.any_requires_grad(prev_features):
            if torch.jit.is_scripting():
                raise Exception("Memory Efficient not supported in JIT")

            bottleneck_output = self.call_checkpoint_bottleneck(prev_features)
        else:
            bottleneck_output = self.bn_function(prev_features)

        new_features = self.conv2(self.relu2(self.norm2(bottleneck_output)))
        if self.drop_rate > 0:
            new_features = F.dropout(new_features, p=self.drop_rate,
                                     training=self.training)
        return new_features

In [None]:
model = SentimentDenseNet()
#model.load_state_dict(torch.load('/content/drive/MyDrive/ai4good/trained'))

train_data = SentimentDataset('/content/drive/MyDrive/ai4good', 'train.csv')
train_dataloader = DataLoader(train_data, batch_size=8, shuffle=True)

valid_data = SentimentDataset('/content/drive/MyDrive/ai4good', 'valid.csv')
valid_dataloader = DataLoader(valid_data, batch_size=8, shuffle=True)

lr=0.05
weight_decay=0.01



optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

criterion = torch.nn.BCELoss()
train_losses = []
test_losses = []

train_error_rates = []
test_error_rates = []
import numpy as np

use_gpu = True

from tqdm import tqdm 

if use_gpu:
  # switch model to GPU
  model.cuda()

num_epochs = 15

from functools import partial
tqdm = partial(tqdm, position=0, leave=True)

val_acc = []
train_acc =[]


for epoch in range(num_epochs): 
  train_loss = 0 
  n_iter = 0 
  total = 0
  correct = 0
  model.train()
  for i, (images, labels) in enumerate(tqdm(train_dataloader)): 
    optimizer.zero_grad() 
    
    if use_gpu: 
      images = images.cuda()
      labels = labels.cuda()

    outputs = model(images).squeeze()
   # print(outputs.shape)
    
    # to compute the train_error_rates  
    predictions = 1* (outputs >= 0.5) #torch.max(outputs, 1)
    correct += torch.sum(labels == predictions).item()
    total += labels.shape[0]

    # print(outputs)
    # print(labels)
    
    # compute loss 
   # print(type(predictions)
    loss_bs = criterion(outputs, labels.float())
    # compute gradients
    loss_bs.backward()
    # update weights
    optimizer.step()

    train_loss += loss_bs.detach().item()

    n_iter += 1
    del images, labels
    torch.cuda.empty_cache()

  train_error_rate = 1 - correct/total
  train_acc.append(correct/total)
  #print(train_data.bad)


  #torch.no_grad()
  model.eval()

  test_loss = 0
  test_accuracy = 0
  total_test = 0

  with torch.no_grad():
    for i, (images, labels) in enumerate(tqdm(valid_dataloader)): 

      if use_gpu: 
        images = images.cuda()
        labels = labels.cuda()


      outputs = model(images).squeeze()
      #print(outputs)
      #print(labels)
      
      # to compute the train_error_rates  
      predictions = (outputs >= 0.5) #torch.max(outputs, 1)
      test_accuracy += torch.sum(labels == predictions).item()
      total_test += labels.shape[0]
      test_loss += criterion(outputs, labels.float())

      del images, labels
      torch.cuda.empty_cache()
    
    

  # with torch.no_grad():
  #   test_loss, test_error_rate = prediction(test_loader, model)

  train_error_rates.append(train_error_rate)
  test_error_rates.append(1- test_accuracy/total_test)
  train_losses.append(train_loss/n_iter)
  test_losses.append(test_loss)
  val_acc.append(test_accuracy/total_test)
  
  print('Epoch: {}/{}, Loss: {:.4f}, Val Accuracy: {:.4f}, train Acc: {:.4f}'.format(epoch+1, num_epochs, train_loss/n_iter, test_accuracy/total_test, correct/total))

In [None]:
 correct = 0
 total_test=0
 with torch.no_grad():
    for i, (images, labels) in enumerate(tqdm(valid_dataloader)): 

      if use_gpu: 
        images = images.cuda()
        labels = labels.cuda()


      outputs = model(images)
      
      # to compute the train_error_rates  
      _, predictions = torch.max(outputs, 1)
      correct += torch.sum(labels == predictions).item()
      total_test += labels.shape[0]
      test_loss += criterion(outputs, labels)

      del images, labels
      torch.cuda.empty_cache()

print(correct/total_test)
    

In [None]:
torch.save(model.state_dict(), '/content/drive/MyDrive/ai4good/trained')

In [None]:
import gc
gc.collect()
torch.cuda.empty_cache()

# Data Pre-Processing

In [None]:
import pandas as pd
data = pd.read_csv('/content/drive/MyDrive/ai4good/merged.csv')
data = data[~data['image_name'].isin(bad)]

data['label'] = data['label'].replace('Positive',0)
data['label'] = data['label'].replace('Negative',1)
data['label'] = data['label'].replace('Neutral',2)
data['label'] = data['label'].replace('positive',0)
data['label'] = data['label'].replace('negative',1)
data['label'] = data['label'].replace('neutral',2)
print(data.head())
print(data['label'].dtypes)
print(len(data['label'==0]))
print(len(data['label'==1]))
print(len(data['label'==2]))




In [None]:


from sklearn.model_selection import train_test_split


X_train, X_val, y_train, y_val = train_test_split(data["image_name"], data["label"], test_size=0.2, shuffle=True)



In [None]:
train_rows = pd.merge(X_train, y_train, right_index=True, left_index=True)
valid_rows =pd.merge(X_val, y_val, right_index=True, left_index=True)

train_rows.to_csv('/content/drive/MyDrive/ai4good/train.csv')
valid_rows.to_csv('/content/drive/MyDrive/ai4good/valid.csv')

In [None]:
f = open('unsuccessful_photos_ids.txt')

bad = []

for line in f.readlines():
  line = line.split("\n")
  bad.append(line[0])

f.close()



In [None]:
print(bad[800])

In [None]:
data['label'].value_counts()

In [None]:
pos =  data[data['label']==0]
rest = data[data['label'] ==1]
pos = pos.head(5000)

data = rest.append(pos)
data['label'].value_counts()

In [None]:
from google.colab import drive
drive.mount('/content/drive')