In [None]:
!pip install matplotlib
!pip3 install torch==1.10.0+cu113 torchvision==0.11.1+cu113  -f https://download.pytorch.org/whl/cu113/torch_stable.html
!pip install albumentations
!pip install pandas 
!pip install tqdm 
!pip install opencv-python 

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import albumentations as A
from albumentations.pytorch import ToTensorV2 
import torchvision
from torchvision import datasets,transforms
from tqdm import tqdm
import cv2
from torch.utils.data import Dataset,DataLoader
import torch.optim as optim
from PIL import Image
import os, sys
import torch.nn.functional as F
import ast
from pathlib import Path
import xml.etree.ElementTree as ET 

In [None]:
# config
LR = 1e-4
SPLIT = 0.2
# DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
DEVICE = "cpu"
print(DEVICE)
BATCH_SIZE = 4
EPOCHS = 1
DATAPATH = Path.cwd()
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:100"

In [None]:
# получаем информацию о датасете
def png2jpg(path):
    print(path)
    im = Image.open(path)
    rgb_im = im.convert('RGB')
    rgb_im.save(path.split(".")[0]+".jpg")
    os.remove(path)

def create_df_from_dataset(directory):
    base_dir = Path.cwd() / Path(directory) 
    annot_file = base_dir / Path('annotations.xml')
    tree = ET.parse(annot_file)
    root = tree.getroot()
    images = root.findall('image')
    image_id = []
    image_width = []
    image_height = []
    image_coords = []
    s_image_coords = []
    image_bboxes = []
    for image in images:
        img_name = image.attrib.get('name',"")
        if (img_name.split(".")[1] == "png"):
            continue
            if (os.path.isfile(str(Path.cwd() / 'dataset' / img_name))):
                png2jpg(str(Path.cwd() / directory / img_name))
            img_name = img_name.split(".")[0] + ".jpg"
        image_id.append(img_name)
        i_width = float(image.attrib.get('width',0))
        i_height = float(image.attrib.get('height',0))
        image_width.append(i_width)
        image_height.append(i_height)
        points = image.findall('points')
        boxes = []
        if len(points) == 0:
            image_coords.append("")
        else:
            for point in points:
                coords = point.attrib.get('points',"") + ";"
                for coord in coords.split(";"):
                    if coord != '':
                        coord = coord.split(",")
                        x = float(coord[0])
                        y = float(coord[1])
                        xmin = x - 0.98*i_width if x - 0.98*i_width >= 0 else 0
                        ymin = y - 0.98*i_height if y - 0.98*i_height >=0 else 0
                        width = xmin + 0.04*i_width if xmin + 0.04*i_width <= i_width else i_width
                        height = ymin + 0.04*i_height if xmin + 0.04*i_height <= i_height else i_height
                        boxes.append([xmin, ymin, width, height])
        image_bboxes.append(boxes)
    df = pd.DataFrame({
        "image_id": image_id,
        "width": image_width,
        "height": image_height,
        "bboxes": image_bboxes,
    })
    return df


df = create_df_from_dataset("dataset")
df


In [None]:
df_mask = df[df["bboxes"].apply(lambda x: x != [])] 
df_unmask = df[df["bboxes"].apply(lambda x: x == [])] 
# df_mask
# df_unmask

In [None]:
def train_test_split(dataFrame,split):
    len_tot = len(dataFrame)
    val_len = int(split*len_tot)
    train_len = len_tot-val_len
    train_data,val_data = dataFrame.iloc[:train_len][:],dataFrame.iloc[train_len:][:]
    return train_data,val_data

In [None]:
train_data_df,val_data_df = train_test_split(df_mask,SPLIT)

In [None]:
train_data_df

In [None]:
class CeedDataset(Dataset):
    def __init__(self,data,root_dir,transform=None,train=True):
        self.data = data
        self.root_dir = root_dir
        self.image_names = self.data.image_id.values
        self.bboxes = self.data.bboxes.values
        self.transform = transform
        self.isTrain = train
    def __len__(self):
        return len(self.data)
    def __getitem__(self,index):
        img_path = str(Path(os.path.join(self.root_dir,self.image_names[index])))
        print(img_path)
        image = cv2.imread(img_path, cv2.IMREAD_COLOR)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB).astype(np.float32)
        image /= 255.0
        bboxes = torch.tensor(self.bboxes[index],dtype=torch.float64)
        bboxes[:,2] = bboxes[:,0]+bboxes[:,2]
        bboxes[:,3] = bboxes[:,1]+bboxes[:,3]
        area = (bboxes[:,3]-bboxes[:,1])*(bboxes[:,2]-bboxes[:,0])
        area = torch.as_tensor(area,dtype=torch.float32)
        labels = torch.ones((len(bboxes),),dtype=torch.int64)
        iscrowd = torch.zeros((len(bboxes),),dtype=torch.int64)
        target = {}
        target['boxes'] = bboxes
        target['labels']= labels
        target['image_id'] = torch.tensor([index])
        target["area"] = area
        target['iscrowd'] = iscrowd
        if self.transform is not None:
            sample = {
                'image': image,
                'bboxes': target['boxes'],
                'labels': labels
            }
            sample = self.transform(**sample)
            image = sample['image']
            target['boxes'] = torch.stack(tuple(map(torch.tensor, zip(*sample['bboxes'])))).permute(1, 0)
        return image,target

In [None]:
train_transform = A.Compose([
    A.Flip(0.5),
    ToTensorV2(p=1.0)
],bbox_params = {'format':"pascal_voc",'label_fields': ['labels']})
val_transform = A.Compose([
      ToTensorV2(p=1.0)
],bbox_params = {'format':"pascal_voc","label_fields":['labels']})

In [None]:
def collate_fn(batch):
    return tuple(zip(*batch))

In [None]:
train_data = CeedDataset(train_data_df, str(DATAPATH / "dataset"),transform=train_transform)
valid_data = CeedDataset(val_data_df, str(DATAPATH / "dataset"),transform=val_transform)

In [None]:
image,target = train_data.__getitem__(1)
# plt.imshow(image)
print(image.shape)

In [None]:
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
num_classes = 2
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features,num_classes)

In [None]:
class Averager:
    def __init__(self):
        self.current_total=0.0
        self.iterations = 0.0
    def send(self,value):
        self.current_total+=value
        self.iterations+=1
    
    @property
    def value(self):
        if self.iterations == 0:
            return 0
        else:
            return 1.0*self.current_total/self.iterations
    def reset(self):
        self.current_total = 0.0
        self.iterations = 0.0

In [None]:
train_dataloader = DataLoader(train_data,batch_size=BATCH_SIZE,shuffle=True,collate_fn=collate_fn)
val_dataloader = DataLoader(valid_data,batch_size=BATCH_SIZE,shuffle=False,collate_fn=collate_fn)

In [None]:
train_loss = []
# val_loss = []
model = model.to(DEVICE)
params =[p for p in model.parameters() if p.requires_grad]
optimizer = optim.Adam(params,lr=LR)
loss_hist = Averager()
itr = 1
lr_scheduler=None

In [None]:
loss_hist = Averager()
itr = 1
for epoch in range(EPOCHS):
    loss_hist.reset()
    
    for images, targets in train_dataloader:
        images = list(image.to(DEVICE) for image in images)
        targets = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets]
        print("GO")
        loss_dict = model(images, targets)

        losses = sum(loss for loss in loss_dict.values())
        loss_value = losses.item()

        loss_hist.send(loss_value)

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        if itr % 50 == 0:
            print(f"Iteration #{itr} loss: {loss_value}")

        itr += 1

    
    # update the learning rate
    if lr_scheduler is not None:
        lr_scheduler.step()

    print(f"Epoch #{epoch} loss: {loss_hist.value}")  

In [None]:
torch.save(model.state_dict(), 'fasterrcnn_resnet50_fpn.pth')