In [1]:
from google.colab import drive
drive.mount('/content/gdrive')  #mounting

KeyboardInterrupt: ignored

In [None]:
%cd '/content/gdrive/MyDrive/DEEPLOBE/segmentation'

/content/gdrive/MyDrive/DEEPLOBE/segmentation


In [None]:
cd 'maskrcnn_pistol_data'

/content/gdrive/My Drive/DEEPLOBE/segmentation/maskrcnn_pistol_data


In [None]:
ls

coco_instances.json  [0m[01;34mimages[0m/  [01;34mmasks[0m/  visualize.py


In [None]:
#!unzip 'maskrcnn_pistol_data.zip'

In [None]:
import os
import random
import torch
import torchvision
from PIL import Image
from pycocotools.coco import COCO
from torchvision import transforms as T
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor

from models.detection_pytorch_repo import utils
from models.detection_pytorch_repo import engine

class UserDataset(torch.utils.data.Dataset):
    """
    Custom class inheriting from Pytorch's Dataset utility class
    that allows applying custom transformations on user-datasets
    
    It returns transformed images and masks in an iterator object 
    that can be indexed according to the batch sizes
    in the data loading phase for passing to model
    """

    
    def __init__(self, root, annotation, transforms=None):
        self.root = root
        self.transforms = transforms
        self.coco = COCO(annotation)
        self.ids = list(sorted(self.coco.imgs.keys()))

    def __getitem__(self, index):
        # coco file
        coco = self.coco
        # Image ID
        img_id = self.ids[index]
        # List: get annotation id from coco
        ann_ids = coco.getAnnIds(imgIds=img_id)
        # Dictionary: target coco_annotation file for an image
        coco_annotation = coco.loadAnns(ann_ids)
        # path for input image
        path = coco.loadImgs(img_id)[0]['file_name']
        # open the input image
        img = Image.open(os.path.join(self.root, 'images',path))
        
        ## mask ##
        mask_file_path = os.path.splitext(path)[0] + '.jpg'
        mask = Image.open(os.path.join(self.root,'masks',mask_file_path))

        # number of objects in the image
        num_objs = len(coco_annotation)

        # Bounding boxes for objects
        # In coco format, bbox = [xmin, ymin, width, height]
        # In pytorch, the input should be [xmin, ymin, xmax, ymax]
        boxes = []
        for i in range(num_objs):
            xmin = coco_annotation[i]['bbox'][0]
            ymin = coco_annotation[i]['bbox'][1]
            xmax = xmin + coco_annotation[i]['bbox'][2]
            ymax = ymin + coco_annotation[i]['bbox'][3]
            boxes.append([xmin, ymin, xmax, ymax])
            
        boxes = torch.as_tensor(boxes, dtype=torch.float32)

        # Tensorise img_id
        img_id = torch.tensor([img_id])
        # Size of bbox (Rectangular)
        areas = []
        iscrowd = []
        labels = []

        for i in range(num_objs):
            areas.append(coco_annotation[i]['area'])
            iscrowd.append(coco_annotation[i]['iscrowd'])
            labels.append(coco_annotation[i]['category_id'])


        areas,iscrowd,labels = map(torch.tensor, [areas,iscrowd,labels])

        if self.transforms is not None:
            img = self.transforms(img)
            mask = self.transforms(mask)
        mask = mask.numpy().reshape(mask.shape,order='F')
        mask = torch.as_tensor(mask, dtype=torch.uint8)


        # Annotation is in dictionary format
        my_annotation = {}
        my_annotation["boxes"] = boxes
        my_annotation["labels"] = labels
        my_annotation["image_id"] = img_id
        my_annotation["area"] = areas
        my_annotation["iscrowd"] = iscrowd
        my_annotation["masks"] = mask

        return img, my_annotation

    def __len__(self):
        return len(self.ids)


class PreProcessing(object):
    """
    For loading imagefiles, preprocessing like train-test split based on a specified 
    validation set size.
    
    Passing data directory and transforms for creating a dataset iterator
    
    Creates a dataset instance from the UserDataset class by passing data directory path.
    The data directory should have the following structure:
                    data_dir
             ------Image
             ---------Image1
             ---------Image2
             ---------
             ---------
                   
             ---------ImageN
             ------Mask
             ---------Mask1
             ---------Mask2
             ---------
             ---------
             ---------MaskN
    This class also calls the CustomDataset class above for creating 
    a dataset iterator and creates dataloaders using torch.utils.Dataloaders class
    
    """
    def __init__(self,valid_pct=0.2,batch_size=4):
        self.data_dir = os.path.join(r'.\Uploads',
                                     "segmentation",
                                     'instance_segmentation')
        self.coco_instances = os.path.join(r'.\Uploads',
                                           "segmentation",
                                           'instance_segmentation',
                                           "annotation",
                                           "coco_instances.json")
        self.batch_size = batch_size
        self.valid_pct = valid_pct
        self.transform = T.Compose([T.ToTensor()])

    def get_class_dict(self):
        #Category dictinoray for output tagging
        instances = COCO(self.coco_instances)
        categories = instances.loadCats(instances.getCatIds())
        class_dict = {d['id']:d['name'] for d in categories}
    
        # Number of categories and a background
        num_classes = 1 + len(instances.getCatIds())
        
        return num_classes,class_dict

        
    def train_test_split(self):
        """
        Creates a dataset instance from the UserDataset class. It requires the 
        annotation JSON file in the coco format: http://cocodataset.org/#format-data
        
        This method carries out train-val split and also creates dataloaders
        required for model building
        
        """
        # create own Dataset
        user_dataset = UserDataset(root=self.data_dir,
                                annotation=self.coco_instances,
                                transforms=self.transform)
        #train-val split
        samples = len(user_dataset)
        test_counts = int(samples*self.valid_pct)
        train_counts = samples-test_counts
        train_set,val_set = torch.utils.data.random_split(user_dataset, [train_counts, test_counts])

        # Train DataLoader
        train_loader = torch.utils.data.DataLoader(train_set,
                                                  batch_size=self.batch_size,
                                                  shuffle=True,
                                                  collate_fn = utils.collate_fn)
        # Validation DataLoader
        val_loader = torch.utils.data.DataLoader(val_set,
                                                  batch_size=self.batch_size,
                                                  shuffle=False,
                                                  collate_fn = utils.collate_fn)
        return train_loader,val_loader


   
class InstanceSegment():
    """
    creates a device object based on GPU availability
    downloads pretrained models from torchvision library
    changes the final layer based on the target class count
    trains the model and saves the trained model weights in the .\weights folder
    arguments: valid_pct - percentage of images to go into the validation set
                batch_size - no. images in a batch for training
    
    """
    def __init__(self,valid_pct=0.2,batch_size=4):
        self.valid_pct = valid_pct
        self.batch_size = batch_size
        self.preprocessing = PreProcessing(self.valid_pct,
                                           self.batch_size)

        self.num_classes,self.class_dict = self.preprocessing.get_class_dict()
        
        # select device (whether GPU or CPU)
        self.device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')


    def build(self):
        
        #download pretrained models from torchvision library    
        # load an instance segmentation model pre-trained pre-trained on COCO
        self.model = torchvision.models.detection.maskrcnn_resnet50_fpn(pretrained=True)

        # get number of input features for the classifier
        in_features = self.model.roi_heads.box_predictor.cls_score.in_features
        # replace the pre-trained head with a new one
        self.model.roi_heads.box_predictor = FastRCNNPredictor(in_features, self.num_classes)
        # now get the number of input features for the mask classifier
        in_features_mask = self.model.roi_heads.mask_predictor.conv5_mask.in_channels
        hidden_layer = 256
        # and replace the mask predictor with a new one
        self.model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask,hidden_layer,self.num_classes)

    def train_model(self,num_epochs=20,demo = False):
        """trains and validates the model on the train images for the given no. of epochs
            arguments: num_epochs - no. of epochs to run
                        demo - False, by default. Set to True only during demo and testing
        """
        train_loader,val_loader = self.preprocessing.train_test_split()
        
        # number of epochs
        num_epochs = num_epochs    
        self.model.to(self.device)
    
        # parameters and optimizer
        params = [p for p in self.model.parameters() if p.requires_grad]
        optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.01)
        
        self.total_batch_count = len(train_loader)

        print('Training the instance segmentation model..')
        for epoch in range(num_epochs):
            print(f'Epoch no: {epoch}')
            self.model.train()
            i = 1
            for imgs, annotations in train_loader:
                imgs = list(img.to(self.device) for img in imgs)
                annotations = [{key: value.to(self.device) for key, value in annotation.items()} for annotation in annotations]
                loss_dict = self.model(imgs, annotations)
                losses = sum(loss for loss in loss_dict.values())
        
                optimizer.zero_grad()
                losses.backward()
                optimizer.step()
                print(f'Epoch no: {epoch}/{num_epochs-1}, Batch no: {i}/{self.total_batch_count}, Loss: {losses}')
                i+=1
        #### to be removed after demo ###
                if demo == True:
                    break
            if demo == True:
                break
        #### to be removed after demo ###

        print('Model training completed')
        #Validation
        print('Validating on the held-out set...')
        #engine.evaluate(self.model, val_loader, device=self.device)
        print('validation completed')

        #saving the model
        torch.save(self.model.state_dict(),'./weights/instance_segment.pt')

    #Inference
    def predict_instance_masks(self,img,demo=False):
        """
        Loads the image and predicts bounding boxes using the model saved in weights folder
        Overlays predicted boxes on the image along with predicted tags
        Saves the tagged image in the Uploads\tagging\test directory
        """
        import numpy as np
        import cv2

        img = Image.open(img)
        #transforms
        test_image = T.ToTensor()(img)

        model = torchvision.models.detection.maskrcnn_resnet50_fpn(pretrained=False)
        # get number of input features for the classifier
        in_features = model.roi_heads.box_predictor.cls_score.in_features
        # replace the pre-trained head with a new one
        model.roi_heads.box_predictor = FastRCNNPredictor(in_features, self.num_classes)
        # now get the number of input features for the mask classifier
        in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
        hidden_layer = 256
        # and replace the mask predictor with a new one
        model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask,hidden_layer,self.num_classes)
        
        #fill the model object with the trained weights in state_dict
        model.load_state_dict(torch.load('./weights/instance_segment.pt'))
        model.eval()

        #Passing the test image to model
        test_image = test_image.to(self.device)
        pred = model([test_image])

        threshold = 0.1 if demo == True else 0.9
        
        rect_th=1
        text_size=.3
        text_th=1
        
        #Extracting score, masks, classes and predicted bboxes from output
        
        pred_score = list(pred[0]['scores'].detach().to('cpu').numpy())
        if demo: 
            pred_t = 3
        else: pred_t = [pred_score.index(x) for x in pred_score if x>threshold][-1]
        masks = (pred[0]['masks']>0).squeeze().detach().to('cpu').numpy()
        pred_class = [self.class_dict[i] for i in list(pred[0]['labels'].detach().to('cpu').numpy())]
        
        pred_boxes = [[(i[0], i[1]), (i[2], i[3])] for i in list(pred[0]['boxes'].detach().to('cpu').numpy())]
        masks = masks[:pred_t+1]
        pred_boxes = pred_boxes[:pred_t+1]
        pred_class = pred_class[:pred_t+1]

        img = np.array(img)
        #img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
        #Overlaying the masks and predicted bounding boxes on the image
        
        for i in range(len(masks)):
            colours = [[0, 255, 0],[0, 0, 255],[255, 0, 0],[0, 255, 255],[255, 255, 0],[255, 0, 255],[80, 70, 180],[250, 80, 190],[245, 145, 50],[70, 150, 250],[50, 190, 190]]
            r = np.zeros_like(masks[i]).astype(np.uint8)
            g = np.zeros_like(masks[i]).astype(np.uint8)
            b = np.zeros_like(masks[i]).astype(np.uint8)
            r[masks[i] == 1], g[masks[i] == 1], b[masks[i] == 1] = colours[random.randrange(0,10)]
            rgb_mask = np.stack([r, g, b], axis=2)
            img = cv2.addWeighted(img, 1, rgb_mask, 0.5, 0)
            cv2.rectangle(img, pred_boxes[i][0], pred_boxes[i][1],color=(0, 255, 0), thickness=rect_th)
            cv2.putText(img,pred_class[i], pred_boxes[i][0], cv2.FONT_HERSHEY_SIMPLEX, text_size, (0,255,0),thickness=text_th)
        
        inst_mask_image_loc = os.path.join(r'.\Uploads',
                                           "segmentation",
                                           "instance_segmentation",
                                           "test")
        inst_mask_image = os.path.join(inst_mask_image_loc,"inst_masked.jpg")
        
        cv2.imwrite(inst_mask_image,img)
        
        return inst_mask_image


ModuleNotFoundError: ignored