### Creating new layer for customized torch function

In [1]:
import torch.nn.functional as F
import torch

In [33]:
from model import MyModel, create_model, MotionBlur
import torch
from dataset import train_dataset
import matplotlib.pyplot as plt
from config import DEVICE, NUM_CLASSES, NUM_EPOCHS, OUT_DIR
from config import VISUALIZE_TRANSFORMED_IMAGES
from config import SAVE_PLOTS_EPOCH, SAVE_MODEL_EPOCH
from utils import Averager, show_tranformed_image
from tqdm.auto import tqdm
from dataset import train_loader, valid_loader
import torch
import matplotlib.pyplot as plt
import time

  from .autonotebook import tqdm as notebook_tqdm


In [204]:
class MotionBlur(torch.nn.Module):
    """Custom layer that picks one out of 8 motion blurs based
    on gumble softmax probability distribution"""

    def __init__(self, kernel_size, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)

        # The greater the size, the more the motion.
        self.kernel_size = kernel_size
        # Create the vertical kernel.
        self.kernel_north = (
            torch.zeros((1, 1,self.kernel_size, self.kernel_size))
            .type(torch.FloatTensor)
        ).to(DEVICE)
        self.kernel_south = (
            torch.zeros((1, 1,self.kernel_size, self.kernel_size))
            .type(torch.FloatTensor)
        ).to(DEVICE)
        self.kernel_east = (
            torch.zeros((1, 1,self.kernel_size, self.kernel_size))
            .type(torch.FloatTensor)
        ).to(DEVICE)
        self.kernel_west = (
            torch.zeros((1, 1,self.kernel_size, self.kernel_size))
            .type(torch.FloatTensor)
        ).to(DEVICE)
        # Fill the middle row with ones.
        self.kernel_north[0, 0,
            : int((self.kernel_size) / 2), int((self.kernel_size - 1) / 2)
        ] = torch.ones(int(self.kernel_size / 2))
        self.kernel_south[0, 0,
            int((self.kernel_size) / 2) :, int((self.kernel_size - 1) / 2)
        ] = torch.ones(int(self.kernel_size / 2))
        self.kernel_east[0, 0,
            int((self.kernel_size - 1) / 2), int((self.kernel_size) / 2) :
        ] = torch.ones(int(self.kernel_size / 2))
        self.kernel_west[0, 0,
            int((self.kernel_size - 1) / 2), : int((self.kernel_size) / 2)
        ] = torch.ones(int(self.kernel_size / 2))

        # Initialize the logits
        self.logits = torch.nn.Parameter(torch.randn(4, device = DEVICE), requires_grad=True)
        self.probs = F.gumbel_softmax(self.logits, tau=1, hard=False).to(DEVICE)

        # Initialize motion convlutions and final image
        self.final_image = torch.nn.Parameter(torch.zeros(1, 512, 512)).to(DEVICE)
        self.motion_east = torch.zeros(1, 512, 512).to(DEVICE)
        self.motion_west = torch.zeros(1, 512, 512).to(DEVICE)
        self.motion_south = torch.zeros(1, 512, 512).to(DEVICE)
        self.motion_north = torch.zeros(1, 512, 512).to(DEVICE)
        self.stack = torch.zeros(4,1,512,512).to(DEVICE)


    def forward(self, x):
        # Compute convolutions
        self.motion_east = F.conv2d(
            x, self.kernel_east, stride=(1), padding="same"
        )
        self.motion_west = F.conv2d(
            x, self.kernel_west, stride=(1), padding="same"
        )
        self.motion_south = F.conv2d(
            x, self.kernel_south, stride=(1), padding="same"
        )
        self.motion_north = F.conv2d(
            x, self.kernel_north, stride=(1), padding="same"
        )
        self.stack = torch.stack(
            (self.motion_north, self.motion_south, self.motion_east, self.motion_west), dim=0
        )
        # Compute final image from gumble softmax probs
        for idx, prob in enumerate(self.probs):
            for blur in self.stack:
                self.final_image = torch.add(self.final_image, torch.mul(prob, blur))
        return self.final_image

In [187]:
from dataset import train_loader
images, target = next(iter(train_loader))
images = images[0].to(DEVICE)

In [188]:
motionblur = MotionBlur(30)
motionblur(images)

tensor([[[ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ...,  0.0000e+00,
           0.0000e+00,  0.0000e+00],
         [ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ...,  0.0000e+00,
           0.0000e+00,  0.0000e+00],
         [ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ...,  0.0000e+00,
           0.0000e+00,  0.0000e+00],
         ...,
         [-7.0931e-17, -1.1256e-16, -1.0023e-16,  ...,  4.1633e-17,
           2.9298e-17,  6.0137e-17],
         [-7.0931e-17, -1.1256e-16, -1.0023e-16,  ...,  4.1633e-17,
           2.9298e-17,  6.0137e-17],
         [-7.0931e-17, -1.1256e-16, -1.0023e-16,  ...,  4.1633e-17,
           2.9298e-17,  6.0137e-17]]], device='cuda:0', grad_fn=<AddBackward0>)

In [205]:
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection import FasterRCNN_ResNet50_FPN_Weights

def create_model(num_classes):
    # load Faster RCNN pre-trained model
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(
        weights=FasterRCNN_ResNet50_FPN_Weights.DEFAULT
    )
    # get the number of input features
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # define a new head for the detector with required number of classes
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model


class MyModel(torch.nn.Module):
    """Define new Faster-RCNN module"""
    def __init__(self, pretrained) -> None:
        super(MyModel, self).__init__()
        self.motionblur = MotionBlur(30)
        self.pretrained = pretrained

    def forward(self, x, targets):
        new_x = []
        for item in x:
            new_x.append(self.motionblur(item))
        new_x = tuple(new_x)
        return self.pretrained(new_x, targets)

In [206]:
# initialize the model and move to the computation device
model = MyModel(pretrained = create_model(NUM_CLASSES))
model = model.cuda()
# get the model parameters
params = [p for p in model.parameters() if p.requires_grad]
# define the optimizer
optimizer = torch.optim.SGD(params, lr=0.001, momentum=0.9, weight_decay=0.0005)

# initialize the weight tracker class
logit_weights = []

# initialize the Averager class
train_loss_hist = Averager()
val_loss_hist = Averager()
train_itr = 1
val_itr = 1

# train and validation loss lists to store loss values of all...
# ... iterations till ena and plot graphs for all iterations
train_loss_list = []
val_loss_list = []

In [208]:
print(next(model.named_parameters()))

('motionblur.logits', Parameter containing:
tensor([ 0.4563, -0.4191,  0.0025, -1.2973], device='cuda:0',
       requires_grad=True))


In [207]:
for name, param in model.named_parameters():
    if param.requires_grad:
        print(name)

motionblur.logits
pretrained.backbone.body.layer2.0.conv1.weight
pretrained.backbone.body.layer2.0.conv2.weight
pretrained.backbone.body.layer2.0.conv3.weight
pretrained.backbone.body.layer2.0.downsample.0.weight
pretrained.backbone.body.layer2.1.conv1.weight
pretrained.backbone.body.layer2.1.conv2.weight
pretrained.backbone.body.layer2.1.conv3.weight
pretrained.backbone.body.layer2.2.conv1.weight
pretrained.backbone.body.layer2.2.conv2.weight
pretrained.backbone.body.layer2.2.conv3.weight
pretrained.backbone.body.layer2.3.conv1.weight
pretrained.backbone.body.layer2.3.conv2.weight
pretrained.backbone.body.layer2.3.conv3.weight
pretrained.backbone.body.layer3.0.conv1.weight
pretrained.backbone.body.layer3.0.conv2.weight
pretrained.backbone.body.layer3.0.conv3.weight
pretrained.backbone.body.layer3.0.downsample.0.weight
pretrained.backbone.body.layer3.1.conv1.weight
pretrained.backbone.body.layer3.1.conv2.weight
pretrained.backbone.body.layer3.1.conv3.weight
pretrained.backbone.body.lay

In [200]:
next(model.parameters()).detach().cpu().numpy()

array([ 1.2310423 ,  0.90460676,  0.25764972, -0.8289195 ], dtype=float32)

In [192]:
prog_bar = tqdm(train_loader, total=len(train_loader))

  0%|          | 0/29 [00:00<?, ?it/s]

In [203]:
for i in range(3):
    for data in prog_bar:
        optimizer.zero_grad()
        images, targets = data
        images = [image.to(DEVICE) for image in images]
        targets = [
            {k: v.to(DEVICE, dtype=torch.int64) for k, v in t.items()} for t in targets
        ]

        loss_dict = model(images, targets)
        
        losses = sum(loss for loss in loss_dict.values())
        loss_value = losses.item()
        train_loss_list.append(loss_value)
        train_loss_hist.send(loss_value)
        
        losses.backward(retain_graph=True)
        optimizer.step()

        train_itr += 1

        # update the loss value beside the progress bar for each iteration
        prog_bar.set_description(desc=f"Loss: {loss_value:.4f}")

In [194]:
params[0]

Parameter containing:
tensor([nan, nan, nan, nan], device='cuda:0', requires_grad=True)