In [None]:
import pandas as pd
import numpy as np
import torch
import os
import gc


In [None]:
import pickle

In [None]:
from torch.utils.data import DataLoader
from torch.utils.data import Dataset

In [None]:
import cv2

In [None]:

import torchvision.models.detection as tmd
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor as FRP
import inspect
import torchvision
#converting the customised rcnn model
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor


In [None]:
root_dir = 'D:\\Drone-Object-Detection\\VisDroneDataset\\'
test_dir = 'test\\'
train_dir = 'train\\'
val_dir = 'validation\\'

In [None]:
def split_image_with_asp_ratio(image):
    print(image.shape)

In [None]:
class VisDroneDataset(Dataset):
    def __init__(self, path, img_shape):
        
        self.root = path
        self.width = img_shape[1]
        self.height = img_shape[0]
        self.images = os.listdir(path+'images\\')
        self.annotations = os.listdir(path+'annotations\\')
        self.length = len(self.images)
        
    def __len__(self):
        return self.length
    
    def __getitem__(self, idx):
        #print(self.root+'images\\'+self.images[idx])
        image = cv2.imread(self.root+'images\\'+self.images[idx])
        #print(image.shape)
        img_width = image.shape[1]
        img_height = image.shape[0]     #right?
        
        image  =cv2.cvtColor(image,cv2.COLOR_BGR2RGB)
        if ((self.width,self.height) != (-1,-1)):
            image = cv2.resize(image,(self.width,self.height),interpolation = cv2.INTER_AREA)
        else:
            self.width = img_width
            self.height = img_height
            
        image = image.transpose(2,0,1)
        image = (image/255.0)
        
        
        #bbox_left, bbox_top, bbox_width, bbox_height, category = map(float, data[:5])
        target = self.annotations[idx]    #*.txt file       
        
        boxes = []
        labels = []
        with open(self.root+'annotations\\'+target) as f:
            for line in f:
                data = line.split(',')#strip
                #print(list(map(float, data[:5])))
                box_left, box_top, box_width, box_height,smth, category = map(float, data[:6])
                
                box_width = 1 if box_width < 1 else box_width
                box_height = 1 if box_height < 1 else box_height
                
                box_right = box_left + box_width
                box_bottom = box_top + box_height
                #rescale box            
                box_left = (box_left/img_width) * self.width
                box_right = (box_right/img_width) * self.width
                box_top = (box_top/img_height) * self.height
                box_bottom = (box_bottom/img_height) * self.height
                boxes.append((box_left,box_top,box_right, box_bottom))
#                 for b in boxes:
#                     for bb in b:
#                         if bb > 480 or bb < 0:
#                             print(b,'\n',bb)
                labels.append(int(category))
                
        
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        #print(boxes)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        target = {}
        target['boxes'] = boxes
        target['labels'] = labels
        target['image_id'] = torch.tensor((idx))
        
        return image, target

In [None]:
train_dataset = VisDroneDataset(root_dir+train_dir,(480,480))
val_dataset = VisDroneDataset(root_dir+val_dir,(-1,-1))

In [None]:
def collate_fn(batch):
    images = []
    targets = []

    for img, target in batch:
        images.append(img)
        targets.append(target)

    return (images, targets)


In [None]:
train_loader = DataLoader(train_dataset, batch_size=3, shuffle=True,collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=3, shuffle=True,collate_fn=collate_fn)

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [None]:
torch.hub.set_dir('D:\\torch_models')

In [None]:
model = None

In [None]:
def choose_model(which):
    model = None
    if which == 'cropped':
        with open('model_trained_on_cropped.pkl', 'rb') as f:
            model = pickle.load(f)
            return model
    else:
        with open('model.pkl', 'rb') as f:
            model = pickle.load(f)
            return model

In [None]:

import matplotlib.pyplot as plt
import matplotlib.patches as patches

CONFIDENCE_THRESHOLD = 0.3
labels_map = ['Background', 'Pedestrian', 'People', 'Bycicle', 'Car', 'Van', 'Truck',
                'Tricycle', 'Awning-tricycle', 'Bus', 'Motor', 'Others']

In [None]:
from torchvision.ops import nms

def non_max_suppression_git(preds, iou_threshold=0.9, score_threshold=0.3):
    
    boxes = preds['boxes']
    scores = preds['scores']
    labels = preds['labels']

    # Filter out predictions below the score_threshold
    keep_idx = scores > score_threshold
    boxes = boxes[keep_idx]
    scores = scores[keep_idx]
    labels = labels[keep_idx]

    # Perform NMS and get the indices of the remaining predictions
    keep_indices = nms(boxes, scores, iou_threshold)

    # Filter out the predictions using the keep_indices
    nms_boxes = boxes[keep_indices]
    nms_scores = scores[keep_indices]
    nms_labels = labels[keep_indices]

    # Create a dictionary containing the filtered predictions
    nms_preds = {
        'boxes': nms_boxes,
        'scores': nms_scores,
        'labels': nms_labels
    }

    return nms_preds


In [None]:
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'

In [None]:
def draw_predictions(imgs, preds, mode, i):#preds - list of dictionaries of tensors
    def draw_bounding_boxes(img, boxes,labels,scores, ax):   # image - tensor, others - numpy
    
        ax.imshow(img.detach().cpu().numpy().transpose(1,2,0))
        ax.set_title('title')
        threshold = CONFIDENCE_THRESHOLD
        for box,label,score in zip(boxes,labels.astype(np.uint8),scores):
            #print(label)
            if(score < threshold):
                continue
            x1, y1, x2, y2 = box
            width = x2 - x1
            height = y2 - y1
            rect = patches.Rectangle((x1, y1), width, height, linewidth=1, edgecolor='r', facecolor='none')
            ax.add_patch(rect)
            ax.text(
                    x1,
                    y1,
                    f"{labels_map[label]}: {(np.round(score*100,1))}%",
                    verticalalignment='top',
                    color='white',
                    fontsize=8,
                    bbox={'facecolor': 'red', 'alpha': 0.7, 'pad': 1}
                )
        #plt.show()
        return ax

    fig, ax = plt.subplots(1,len(imgs), figsize=(40,40))
    for idx,(img,ann) in enumerate(zip(imgs, preds)):
        #print(ann)
        annotations_new = []
        annotations_new.append(torch.cat((ann['boxes'] ,\
                                           (ann['scores']).unsqueeze(dim=1) if mode=='pred' else torch.ones(len(ann['boxes'])).\
                                          unsqueeze(dim=1),\
                                           ann['labels'].unsqueeze(dim=1)),dim=1).detach().cpu().numpy())
        ann = np.array(annotations_new[0])
        #print(ann)
        ann = ann[ann[:,4] > CONFIDENCE_THRESHOLD,:]
        #print(ann)
        
        draw_bounding_boxes(img,ann[:,0:4],ann[:,5],ann[:,4],ax[idx] if len(imgs) != 1 else ax)
        
    if(i!=-1):
        fig.savefig(f'D:\\Drone-Object-Detection\\Results\\Images\\img{i}.png')
        
    plt.show()

In [None]:
def replace_cuda_with_cpu(data):
    if isinstance(data, torch.Tensor):
        return data.cpu()
    elif isinstance(data, dict):
        return {key: replace_cuda_with_cpu(value) for key, value in data.items()}
    elif isinstance(data, list):
        return [replace_cuda_with_cpu(item) for item in data]
    else:
        return data

In [None]:
def dict_to_cpu(preds):
    new_l = []
    for dic in preds:
        new_l.append(replace_cuda_with_cpu(dic))
        
    #print(new_l)
    return new_l


In [None]:
def from_dict_to_arr(dictionary, data = 'target'):
    annotations_new = []
    dictionary = dict_to_cpu(dictionary)
    
    for ann in dictionary:
        
        annotations_new+=(torch.cat((ann['boxes'] ,\
                                           torch.ones(len(ann['boxes'])).unsqueeze(dim=1) if data == 'target' else \
                                     ann['scores'].unsqueeze(dim=1),\
                                           ann['labels'].unsqueeze(dim=1)),dim=1))
    
    
    return annotations_new

In [None]:
from torchmetrics.detection import MeanAveragePrecision
def calc_mAP(preds,target):
    metric = MeanAveragePrecision(iou_type="bbox")
    metric.update(preds, target)
    
    return metric.compute()


In [None]:
def make_all_numpy(obj):
    if isinstance(obj, torch.Tensor):
        return obj.numpy()
    elif isinstance(obj, (list, tuple)):
        return [make_all_numpy(item) for item in obj]
    elif isinstance(obj, dict):
        return {key: make_all_numpy(value) for key, value in obj.items()}
    else:
        return obj

In [None]:
#detections (Array[N, 6]), x1, y1, x2, y2, conf, class
# labels (Array[M, 5]), class, x1, y1, x2, y2
def prepare_for_conf_matrx(pred,annot):      ##############            BBBBBBBBBUUUUUUUUUUUUUUUUGGGGGGGGGGG
    pred = from_dict_to_arr(dict_to_cpu(pred),'my_pred')
    annot = from_dict_to_arr(dict_to_cpu(annot),'target')
    
    pred = make_all_numpy(pred)
    annot = make_all_numpy(annot)
    
    permutation = [5,0,1,2,3,4]
    
    pred, annot = np.array(pred), np.delete(np.array(annot,dtype=np.int16)[:,permutation] , 5,1)
    #print(pred)
    try:
    
        pred[:,0] = pred[:,0].astype(np.int16)
        pred[:,1] = pred[:,1].astype(np.int16)
        pred[:,2] = pred[:,2].astype(np.int16)
        pred[:,3] = pred[:,3].astype(np.int16)
        pred[:,5] = pred[:,5].astype(np.int16)
    
    except Exception :
        print(pred)
    #pred[:,[0,1,2,3,5]] = np.array(pred[:,[0,1,2,3,5]],dtype=np.uint8)
    
    #print(pred)
    return pred,annot

In [None]:
import seaborn as sns
%matplotlib inline

from torchvision import ops

In [None]:


class ConfusionMatrix:
    def __init__(self, num_classes: int, CONF_THRESHOLD=0.3, IOU_THRESHOLD=0.5):
        self.matrix = np.zeros((num_classes + 1, num_classes + 1))
        self.num_classes = num_classes
        self.CONF_THRESHOLD = CONF_THRESHOLD
        self.IOU_THRESHOLD = IOU_THRESHOLD

    def process_batch(self, detections, labels: np.ndarray):
        
        
        gt_classes = labels[:, 0].astype(np.uint8)
        try:
            detections = detections[detections[:, 4] > self.CONF_THRESHOLD]
        except IndexError or TypeError:
            # detections are empty, end of process
            for i, label in enumerate(labels):
                gt_class = gt_classes[i]
                self.matrix[self.num_classes, gt_class] += 1
            return

        detection_classes = detections[:, 5].astype(np.uint8)
        #print(labels[:, 1:])
        #print()
        #print(detections[:, :4])
        all_ious = self.__calculate_iou_matrix__(labels[:, 1:], detections[:, :4])
        #print(detection_classes)
        want_idx = np.where(all_ious > self.IOU_THRESHOLD)
        #print(want_idx)
        all_matches = [[want_idx[0][i], want_idx[1][i], all_ious[want_idx[0][i], want_idx[1][i]]]
                       for i in range(want_idx[0].shape[0])]

        all_matches = np.array(all_matches)
        
        #print(all_matches)
        if all_matches.shape[0] > 0:  # if there is match
            all_matches = all_matches[all_matches[:, 2].argsort()[::-1]]
            all_matches = all_matches[np.unique(all_matches[:, 1], return_index=True)[1]]
            all_matches = all_matches[all_matches[:, 2].argsort()[::-1]]
            all_matches = all_matches[np.unique(all_matches[:, 0], return_index=True)[1]]

            
        #print(all_matches)
        #print(len(all_matches))
        for i, label in enumerate(labels):
            gt_class = gt_classes[i]
            
            if all_matches.shape[0] > 0 and all_matches[all_matches[:, 0] == i].shape[0] == 1:#==1?
                
                detection_class = detection_classes[int(all_matches[all_matches[:, 0] == i, 1][0])]
                self.matrix[detection_class, gt_class] += 1
            else:
                
                self.matrix[self.num_classes, gt_class] += 1

        for i, detection in enumerate(detections):
            if not all_matches.shape[0] or ( all_matches.shape[0] and all_matches[all_matches[:, 1] == i].shape[0] == 0 ):
                detection_class = detection_classes[i]
                self.matrix[detection_class, self.num_classes] += 1

    def return_matrix(self):
        return self.matrix

    def print_matrix(self):
        for i in range(self.num_classes + 1):
            print(' '.join(map(str, self.matrix[i])))
            
    def __calculate_iou_matrix__(self, boxesA, boxesB):

        # Convert numpy arrays to torch tensors
        boxesA_torch = torch.tensor(boxesA, dtype=torch.float32)
        boxesB_torch = torch.tensor(boxesB, dtype=torch.float32)

        # Use torchvision.ops.box_iou to calculate IoU matrix
        iou_matrix_torch = torchvision.ops.box_iou(boxesA_torch, boxesB_torch)

        # Convert the torch tensor back to a numpy array
        iou_matrix = iou_matrix_torch.numpy()

        return iou_matrix
            
            
#SMTH wrong woth all matches

In [None]:
def draw_heatmap(conf_matr, ax, cen=None):
    
    hm = sns.heatmap(np.array(conf_matr.return_matrix()), center =cen,cmap="crest",  annot=True, ax=ax,fmt='g')
    hm.set_ylabel('Predicted')
    hm.set_xlabel('True')
    hm.set_ylabel('Predicted')
    hm.set_xlabel('True')


    my_lm = labels_map.copy()
    my_lm += ['None']
    tickvalues = range(0,len(my_lm))
    hm.set_xticks(ticks=tickvalues, labels=my_lm, rotation = 'vertical')
    hm.set_yticks(ticks=tickvalues, labels=my_lm, rotation = 'horizontal')
    return hm
    
    


In [None]:
from sahi import AutoDetectionModel
from sahi.predict import get_sliced_prediction, predict, get_prediction
from sahi.utils.file import download_from_url
from sahi.utils.cv import read_image
from PIL import Image

In [None]:
class SAHI_predictor:
    def __init__(self,model):
        self.detection_model =self. __create_detection_model__(model)
    
    def from_x1y1wh_to_x1y1x2y2(self, box: list)->list:   
        box[2] = box[0] + box[2]
        box[3] = box[1] + box[3]
        return box
    
    def to_common_dict(self, list_of_dict):
        #print(list_of_dict)
        boxes = []
        labels = []
        scores = []
        for dic in list_of_dict:
            
            boxes.append(self.from_x1y1wh_to_x1y1x2y2(dic['bbox']))
            labels.append(dic['category_id'])##not sure
            scores.append(dic['score'])

        return {'scores':torch.tensor(scores),'labels':torch.tensor(labels), 'boxes':torch.tensor(boxes)}
    
    def predict_with_SAHI(self, batch_imgs):
        batch_imgs = make_all_numpy(replace_cuda_with_cpu(batch_imgs))
        results = []
        for img in batch_imgs:
            result = get_sliced_prediction(
                Image.fromarray(np.moveaxis((img * 255).astype(np.uint8), [0,1,2],[2,0,1])),#2 1 0
                self.detection_model,
                slice_height = 320,
                slice_width = 480,
                overlap_height_ratio = 0,
                overlap_width_ratio = 0 
            ) # returns x1y1wh
            results.append(self.to_common_dict(result.to_coco_annotations()))

        return results
    
    def __create_detection_model__(self, model):
        detection_model = AutoDetectionModel.from_pretrained(
            model_type='torchvision',
            model=model,
            confidence_threshold=0.5,
            image_size=None,       # if need to resize before inference and making pred
            device="cuda:0", 
            load_at_init=True,
        )
        return detection_model
        
    def __call__(self, batch_imgs):
        return self.predict_with_SAHI(batch_imgs)

In [None]:
def concat_predictios(pred1, pred2):
    pred1 = pred1[0]
    pred2 = pred2[0]
    boxes = torch.cat((pred1['boxes'], pred2['boxes'].to(device)),dim=0)
    labels = torch.cat((pred1['labels'], pred2['labels'].to(device)),dim=0)
    scores = torch.cat((pred1['scores'], pred2['scores'].to(device)),dim=0)
    
    return [{'boxes':boxes, 'labels':labels, 'scores':scores}]

In [None]:

def count_avg_map_and_conf_matr(model, sahi_predictor, loader,  value_to_break = 20,flag=None):
    
    conf_matr = ConfusionMatrix(12,CONF_THRESHOLD = CONFIDENCE_THRESHOLD, IOU_THRESHOLD = 0.5)
    
    c = 0
    
    total_map = 0
    total_map_50 = 0
    total_map_75 = 0
    total_map_small = 0
    total_map_medium = 0
    total_map_large = 0
    
    c_total_map_50 = 0
    c_total_map_75 = 0
    c_total_map_small = 0
    c_total_map_medium = 0
    c_total_map_large = 0
    
    total_preds=0
    for i, (batch_imgs, annotations) in enumerate(loader):
        batch_imgs = [torch.from_numpy(img).float().to(device) for img in batch_imgs]#tensor
        
        preds = None
        
        if flag == 'Common model':
            if(model == None):
                raise Exception("you must choose correct flag")
            with torch.no_grad():
                preds = model(batch_imgs)#tensor
                
        elif flag == 'SAHI':
            if(sahi_predictor == None):
                raise Exception("you must choose correct flag")
            preds = sahi_predictor(batch_imgs)
        
        elif flag == 'Full inference':
            if(sahi_predictor == None or model == None):
                raise Exception("you must choose correct flag")
            with torch.no_grad():
                preds = model(batch_imgs)#tensor
            preds = concat_predictios(preds, sahi_predictor(batch_imgs))
            
        elif preds==None:
            raise Exception("you must choose correct flag")
            
            
            
        preds = replace_cuda_with_cpu(preds) 
        nms_preds = []
        for pred in preds:
            nms_pred = non_max_suppression_git(pred)
            nms_preds.append(nms_pred)
        #print(nms_preds)
        map_dict = calc_mAP(nms_preds, annotations)
       
        if(map_dict['map'].item() > 0):
            total_map += map_dict['map'].item()
            c+=1
            
        if(map_dict['map_50'].item() > 0):
            total_map_50 += map_dict['map_50'].item()
            c_total_map_50+=1
            
        if(map_dict['map_75'].item() > 0):
            total_map_75 += map_dict['map_75'].item()
            c_total_map_75+=1
            
        if(map_dict['map_small'].item() > 0):
            total_map_small += map_dict['map_small'].item()
            c_total_map_small+=1
            
        if(map_dict['map_medium'].item() > 0):
            total_map_medium += map_dict['map_medium'].item()
            c_total_map_medium+=1
            
        if(map_dict['map_large'].item() > 0):
            total_map_large += map_dict['map_large'].item()
            c_total_map_large+=1
        
        
        for pred in nms_preds:
            preds_ind = pred['scores'] > CONFIDENCE_THRESHOLD
            pred = pred['labels'][preds_ind]
            total_preds+= sum(pred == 5)

        
            #print(c)
        a,b = prepare_for_conf_matrx(nms_preds,annotations)


        conf_matr.process_batch(a,b)

        if(i%10 == 0):
            print(i)
        if(i == value_to_break):
            break

            
    
    #print(total_preds)
    #map_file.write('avarage map:',total_map / c)
    
    maps = ""
    maps+= ('avgerage map 50-95: '+str(total_map/c if c != 0 else -1) + '\n' + \
            'avgerage map 50: '+str(total_map_50/c_total_map_50 if c_total_map_50 != 0 else -1) + '\n' + \
            'avgerage map 75: '+str(total_map_75/c_total_map_75 if c_total_map_75 != 0 else -1) + '\n' + \
            'avgerage map small: '+str(total_map_small/c_total_map_small if c_total_map_small != 0 else -1) + '\n' + \
            'avgerage map medium: '+str(total_map_medium/c_total_map_medium if c_total_map_medium != 0 else -1) + '\n' + \
            'avgerage map large: '+str(total_map_large/c_total_map_large if c_total_map_large != 0 else -1) + '\n')
            
            
    
    
    #print(maps)
    
    
    return maps, conf_matr

#ОШИБКА В ТИПАХ БЫЛА(np.uint8 неправильно)

In [None]:
model = choose_model('cropped').to(device)
model.eval()
sahi_predictor = SAHI_predictor(model)

In [None]:
import random

def cmp_sahi_and_common(dataset, sahi_predictor, common_model, num_of_examples):
    
    def create_dataset_for_comparing(dataset, size):
        new_dataset = []
        for i in range(size):
            r_n = int(random.random()*len(dataset))
            #print(r_n)
            new_dataset.append(dataset[r_n])

        return new_dataset
    #print(maps_FI)
    #return
    new_dataset = create_dataset_for_comparing(val_dataset,num_of_examples)
    val_loader_for_map = DataLoader(new_dataset, batch_size=1, shuffle=False,collate_fn=collate_fn)
    
    maps_FI, conf_m_FI = count_avg_map_and_conf_matr(common_model, sahi_predictor,loader=val_loader_for_map,\
                                                                 value_to_break = num_of_examples-1,flag='Full inference' )
    
    maps_sahi, conf_m_sahi = count_avg_map_and_conf_matr(None, sahi_predictor,loader=val_loader_for_map,\
                                                                 value_to_break = num_of_examples-1,flag='SAHI' )
    
    maps_not_sahi, conf_m_not_sahi = count_avg_map_and_conf_matr(common_model, None,loader=val_loader_for_map,\
                                                                 value_to_break = num_of_examples-1,flag='Common model' )
    return maps_FI, conf_m_FI, maps_sahi, conf_m_sahi, maps_not_sahi, conf_m_not_sahi

In [None]:

model = model.to(device)

In [None]:
count_avg_map_and_conf_matr.model_preds=None
count_avg_map_and_conf_matr.SAHI_preds=None

maps_FI, conf_m_FI, maps_sahi, conf_m_sahi, maps_not_sahi, conf_m_not_sahi =  cmp_sahi_and_common(val_dataset,\
                                                                                                  sahi_predictor,model,540)


fig, ax = plt.subplots(nrows=3,ncols=1, figsize=(10,15))
print('FI: \n',maps_FI)
print('SAHI: \n',maps_sahi)
print('NOT SAHI: \n',maps_not_sahi)

hm = draw_heatmap(conf_m_FI,ax[0])
hm = draw_heatmap(conf_m_sahi,ax[1])
hm = draw_heatmap(conf_m_not_sahi,ax[2])
with open('Results\\mAPs\\file.txt','w') as f:
    f.write('FI:\n'+maps_FI)
    f.write('SAHI:\n'+maps_sahi)
    f.write('NOT SAHI:\n'+maps_not_sahi)
fig.savefig(f'D:\\Drone-Object-Detection\\Results\\conf_matrix.png')

In [None]:
def draw_random_preds(model, dataset, size, imgs_num):
    
    
    
    imgs = []
    l=[]
    for el in dataset:
        imgs.append(el[0])
        l.append(el[1])
    
    batch_imgs = np.stack(imgs)
    annotations = l
    
    
    batch_imgs = [torch.from_numpy(img).float().to(device) for img in batch_imgs]#tensor
    with torch.no_grad():
        preds = model(batch_imgs)#tensor
    preds = replace_cuda_with_cpu(preds) 




    nms_preds = []
    for pred in preds:
        nms_pred = non_max_suppression_git(pred)
        nms_preds.append(nms_pred)

    draw_predictions(batch_imgs,nms_preds,'pred',imgs_num)#list of dictionaries of tensors


In [None]:
#draw_random_preds(sahi_predictor, val_dataset,2)

In [None]:
def compare_draw_sahi_and_withot(model, sahi_predictor, dataset, size):
    
    def create_dataset_for_comparing(dataset, size):
        new_dataset = []
        for i in range(size):
            r_n = int(random.random()*len(dataset))
            #print(r_n)
            new_dataset.append(dataset[r_n])

        return new_dataset
    
    new_dataset = create_dataset_for_comparing(dataset, size)
    draw_random_preds(model,new_dataset, size, 0 )
    draw_random_preds(sahi_predictor,new_dataset, size,1 )
    

In [None]:
compare_draw_sahi_and_withot(model, sahi_predictor, val_dataset, 1)


In [None]:
#slice_coco?????