In [1]:
import torch
import numpy as np
import os
from PIL import Image
import torchvision
from torchvision import transforms as T 
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor

In [9]:
import tarfile

with tarfile.open('dl_challenge.tar.xz') as f:
    f.extractall('.')

In [45]:
def Extractor(name="./dl_challenge/"):
    folders = os.listdir(name)
    
    images=[]
    masks=[]
    bboxes=[]
    pcs=[]

    for folder in folders:
        for dirpath, _ , filenames in os.walk(os.path.join("./dl_challenge",folder)):
            for filename in filenames:
                if filename.endswith('.jpg'):
                    image_path=os.path.join(dirpath,filename)
                    image= Image.open(image_path)
                    images.append(image)
                if filename.startswith('mask'):
                    mask_path=os.path.join(dirpath,filename)
                    mask= np.load(mask_path)
                    masks.append(mask)
                if filename.startswith('bbox3d'):
                    bbox_path=os.path.join(dirpath,filename)
                    bbox= np.load(bbox_path)
                    bboxes.append(bbox)
                if filename.startswith('pc'):
                    pc_path=os.path.join(dirpath,filename)
                    pc= np.load(pc_path)
                    pcs.append(pc)

    return images,masks,bboxes,pcs

In [46]:
images,masks,bboxes,pcs=Extractor()
labels=[]
num=0
for mask in masks:
    labels.append(np.arange(num,num+mask.shape[0]))
    num=num+mask.shape[0]

In [47]:
def convert_3d_bbox_to_2d(bboxes_3d):
    # Get the min and max x, y coordinates for each bounding box
    xmin = torch.min(bboxes_3d[:, :, 0], dim=1)[0]  # Min x
    ymin = torch.min(bboxes_3d[:, :, 1], dim=1)[0]  # Min y
    xmax = torch.max(bboxes_3d[:, :, 0], dim=1)[0]  # Max x
    ymax = torch.max(bboxes_3d[:, :, 1], dim=1)[0]  # Max y

    # Stack to form [N, 4] bounding boxes
    bboxes_2d = torch.stack([xmin, ymin, xmax, ymax], dim=1)

    return bboxes_2d


In [48]:
import torch.utils
import torch.utils.data
import torch.utils.data.dataset


class DataLoad(torch.utils.data.dataset.Dataset):
    def __init__(self,images,masks,bboxes,labels):
        self.images=images
        self.masks=masks
        self.bboxes=bboxes
        self.labels=labels
    
    def __getitem__(self,idx):
        data = {}
        
        transform= T.ToTensor()
        image = transform(self.images[idx])
        
        data["masks"] = torch.as_tensor(self.masks[idx],dtype=torch.uint8)
        data["boxes"] = torch.as_tensor(self.bboxes[idx], dtype = torch.float32)
        data["boxes"] = convert_3d_bbox_to_2d(data["boxes"])
        data["labels"] = torch.as_tensor(self.labels[idx],dtype=torch.int64)
        
        return image, data
    
    def __len__(self):
        return len(self.images)
    

In [49]:
labels[-1][-1]

1916

In [50]:
model = torchvision.models.detection.maskrcnn_resnet50_fpn()
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features , labels[-1][-1]+1)
in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
hidden_layer = 256
model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask , hidden_layer , labels[-1][-1]+1)


In [51]:
num = int(0.9 * len(images))
num = num if num % 2 == 0 else num + 1
train_imgs_inds = np.random.choice(range(len(images)) , num , replace = False)
test_imgs_inds = np.setdiff1d(range(len(images)) , train_imgs_inds)
train_imgs = np.array(images,dtype="object")[train_imgs_inds]
test_imgs = np.array(images,dtype="object")[test_imgs_inds]
train_masks = np.array(masks,dtype="object")[train_imgs_inds]
test_masks = np.array(masks,dtype="object")[test_imgs_inds]
train_bboxes = np.array(bboxes,dtype="object")[train_imgs_inds]
test_bboxes = np.array(bboxes,dtype="object")[test_imgs_inds]
train_labels = np.array(labels,dtype="object")[train_imgs_inds]
test_labels = np.array(labels,dtype="object")[test_imgs_inds]

In [52]:
def custom_collate(data):
  return data

In [53]:
train_dl = torch.utils.data.DataLoader(DataLoad(train_imgs , train_masks,train_bboxes,train_labels) , 
                                 batch_size = 2 , 
                                 shuffle = True , 
                                 collate_fn = custom_collate , 
                                 num_workers = 0 , 
                                 pin_memory = True if torch.cuda.is_available() else False)
val_dl = torch.utils.data.DataLoader(DataLoad(test_imgs , test_masks,train_bboxes,train_labels) , 
                                 batch_size = 2 , 
                                 shuffle = True , 
                                 collate_fn = custom_collate , 
                                 num_workers = 0 , 
                                 pin_memory = True if torch.cuda.is_available() else False)

In [54]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

In [55]:
model.to(device)

MaskRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=1e-05)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=1e-05)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=1e-05)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, eps=1e-05)
          (relu):

In [56]:
params = [p for p in model.parameters() if p.requires_grad]

In [57]:
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

In [58]:
print(np.__version__)

1.26.4


In [59]:
all_train_losses = []
all_val_losses = []
flag = False
for epoch in range(30):
    train_epoch_loss = 0
    val_epoch_loss = 0
    model.train()
    for i , dt in enumerate(train_dl):
        imgs = [dt[0][0].to(device) , dt[1][0].to(device)]
        targ = [dt[0][1] , dt[1][1]]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targ]
        loss = model(imgs , targets)
        if not flag:
            print(loss)
            flag = True
        losses = sum([l for l in loss.values()])
        train_epoch_loss += losses.cpu().detach().numpy()
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
    all_train_losses.append(train_epoch_loss)
    with torch.no_grad():
        for j , dt in enumerate(val_dl):
            imgs = [dt[0][0].to(device) , dt[1][0].to(device)]
            targ = [dt[0][1] , dt[1][1]]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targ]
            loss = model(imgs , targets)
            losses = sum([l for l in loss.values()])
            val_epoch_loss += losses.cpu().detach().numpy()
        all_val_losses.append(val_epoch_loss)
    print(epoch , "  " , train_epoch_loss , "  " , val_epoch_loss)

{'loss_classifier': tensor(7.3957, grad_fn=<NllLossBackward0>), 'loss_box_reg': tensor(0.0006, grad_fn=<DivBackward0>), 'loss_mask': tensor(0.6881, grad_fn=<BinaryCrossEntropyWithLogitsBackward0>), 'loss_objectness': tensor(0.6945, grad_fn=<BinaryCrossEntropyWithLogitsBackward0>), 'loss_rpn_box_reg': tensor(1.3228, grad_fn=<DivBackward0>)}
