In [5]:
import torchvision
from torchvision import transforms
from torchvision.utils import draw_bounding_boxes
import matplotlib.pyplot as plt

import requests
import matplotlib.pyplot as plt
from PIL import Image
import torch
from torchvision import datasets, transforms
from lightning.pytorch.utilities.types import EVAL_DATALOADERS, TRAIN_DATALOADERS
from torch.utils.data import DataLoader

from typing import Callable, Any

from lightning.pytorch.utilities.types import EVAL_DATALOADERS, TRAIN_DATALOADERS
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

import lightning as L
import torch
import torchmetrics
from torch import Tensor, nn
from torch.nn import functional as F
from torch.utils.data import Dataset
from pathlib import Path
import xmltodict

import os

from typing import Any, Callable, Optional, Union
from lightning.pytorch.core.optimizer import LightningOptimizer

from lightning.pytorch.utilities.types import STEP_OUTPUT
from torch.optim.optimizer import Optimizer

from torchvision.models.detection import ssdlite320_mobilenet_v3_large, SSDLite320_MobileNet_V3_Large_Weights
from torchmetrics.detection import MeanAveragePrecision

from lightning.pytorch.loggers import TensorBoardLogger

In [6]:
def get_pictures_w_people(path):
    peoples = []
    with open(path) as f:
        for line in f.readlines():
            path, flag = line.split()
            if flag == "1":
                peoples.append(path)
    return peoples


In [7]:
def parse_xml(box_path):
    boxes = []
    if os.path.isfile(box_path):
        x = xmltodict.parse(open(box_path).read())['annotation']
        if 'object' in x:
            objects = x['object']
            
            if isinstance(objects, dict):
                boxes.append(list(map(float, objects["bndbox"].values())))
            elif isinstance(objects, list):
                for box in objects:
                    boxes.append(list(map(float, box["bndbox"].values())))
            else:
                raise Exception(f"{type(objects)}")
    boxes = torch.tensor(boxes)
    labels = torch.ones(len(boxes), dtype=int)
    return boxes, labels

In [8]:
class ImageDataset(Dataset):
    def __init__(self, subset_dir: Path) -> None:
        super().__init__()
        self.subset_dir = subset_dir
        pic_w_ppl = get_pictures_w_people(subset_dir)
        self.path = Path("./PeopleArt-master")
        self.images = [self.path / "JPEGImages" /pic_name for pic_name in pic_w_ppl]
        # self.images = list((subset_dir / "JPEGImages").glob("**/*.jpg"))
        # self.images = [image for image in self.images if os.path.isfile(self.subset_dir / "Annotations" / image.parts[-2] / Path(str(image.parts[-1]) + ".xml"))]


    def __getitem__(self, index: int) -> tuple[Tensor, dict[str, Tensor]]:
        # нужно вернуть пару
        # тензор изображения C x W x H
        # словарь с ключами boxes, masks, labels
        image_path = self.images[index]
        img = Image.open(image_path)
        convert_tensor = transforms.ToTensor()
        image = convert_tensor(img)
        #image = torch.load(image_path).unsqueeze(0)  # (W, H) -> (1, W, H)
        box_path = self.path / "Annotations" / image_path.parts[-2] / Path(str(image_path.parts[-1]) + ".xml")
        boxes, labels = parse_xml(box_path)
        return image, dict(
                boxes=boxes,
                labels=labels,
            )

    def __len__(self) -> int:
        return len(self.images)

In [9]:
_collate_fn_t = Callable[[list[tuple[Tensor, Any]]], Any]

class Datamodule(L.LightningDataModule):
    def __init__(
        self,
        testdir: Path,
        traindir: Path,
        valdir: Path,
        batch_size: int,
        transform: Callable[[Image.Image], Tensor] = transforms.ToTensor(),
        num_workers: int = 0,
    ):
        super().__init__()
        self.testdir = testdir
        self.traindir = traindir
        self.valdir = valdir
        self.batch_size = batch_size
        self.transform = transform
        self.num_workers = num_workers

    def prepare_data(self) -> None:
        # в этом методе можно сделать предварительную работу, например
        # скачать данные, сделать тяжёлый препроцессинг
        pass

    @property
    def collate_fn(self) -> _collate_fn_t | None:
        return lambda batch: tuple(zip(*batch))
        
    def setup(self, stage: str) -> None:
        # аргумент `stage` будет приходить из модуля обучения Trainer
        # на стадии обучения (fit) нам нужны оба датасета
        if stage == "fit":
            self.train_dataset = ImageDataset(
                self.traindir,
            )
            self.val_dataset = ImageDataset(
                self.valdir,
            )
        # на стадии валидации (validate) - только тестовый
        elif stage == "validate":
            self.val_dataset = ImageDataset(
                 self.valdir,
            )
        elif stage == "test":
            self.test_dataset = ImageDataset(
                 self.testdir,
            )
        else:
            raise NotImplementedError
        # есть ещё стадии `test` и `predict`, но они нам не понадобятся

    def train_dataloader(self) -> TRAIN_DATALOADERS:
        return DataLoader(
            self.train_dataset,
            batch_size=self.batch_size,
            shuffle=True,
            num_workers=self.num_workers,
            collate_fn=self.collate_fn
        )

    def val_dataloader(self) -> EVAL_DATALOADERS:
        return DataLoader(
            self.val_dataset,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=self.num_workers,
            collate_fn=self.collate_fn,
        )
    def test_dataloader(self) -> EVAL_DATALOADERS:
        return DataLoader(
            self.test_dataset,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=self.num_workers,
            collate_fn=self.collate_fn,
        )

In [10]:
class Lit(L.LightningModule):
    def __init__(self, learning_rate: float) -> None:
        super().__init__()
        self.save_hyperparameters()
        self.weights = SSDLite320_MobileNet_V3_Large_Weights.COCO_V1
        self.model = ssdlite320_mobilenet_v3_large(weights=self.weights)
        self.learning_rate = learning_rate
        self.image_transform = self.weights.transforms()
        # self.train_metrics = create_classification_metrics(
        #     num_classes=10, prefix="train_"
        # )
        # self.val_metrics = create_classification_metrics(num_classes=10, prefix="val_")

    def training_step(
        self, batch: tuple[list[Tensor], list[dict[str, Tensor]]], batch_idx: int
    ):
        self.model.train()
        # image_tensors = [self.image_transform(x) for x, _ in batch]
        # labels = [y for _, y in batch]
        predicts = self.model(batch[0], batch[1])
        loss = 0.5 * predicts['bbox_regression'] + 0.5 * predicts['classification']

        self.log("train_loss", loss, on_epoch=True, on_step=False)
        
        # self.log_dict(self.train_metrics, on_step=False, on_epoch=True)

        return loss

    def validation_step(
        self, batch: tuple[Tensor, Tensor], batch_idx: int
    ):
        # image_tensors = [self.image_transform(x) for x, _ in batch]
        # labels = [y for _, y in batch]
        images, targets = batch
        predicts = self.model(images)
        metric = MeanAveragePrecision(iou_type="bbox")
        metric.update(predicts, targets)
        map = metric.compute()['map']
        self.log('val_map', map)
        

        return {
            "map": map,
        }
    
    def test_step(
        self, batch: tuple[Tensor, Tensor], batch_idx: int
    ):
        
        # image_tensors = [self.image_transform(x) for x, _ in batch]
        # labels = [y for _, y in batch]
    
        images, targets = batch
        predicts = self.model(images)
        metric = MeanAveragePrecision(iou_type="bbox")
        metric.update(predicts, targets)
        map = metric.compute()['map']
        self.log('test_map', map)
        

        return {
            "predicts": predicts,
            "map": map,
        }

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate)
        # давайте кроме оптимизатора создадим ещё расписание для шага оптимизации
        return {
            "optimizer": optimizer,
            "lr_scheduler": torch.optim.lr_scheduler.MultiStepLR(
                optimizer, milestones=[5, 10, 15]
            )
        }

In [11]:
# from aim.pytorch_lightning import AimLogger

# logger = AimLogger(repo="logs", experiment="mnist")

In [12]:
logger = TensorBoardLogger(".")

In [13]:
lit_module = Lit(
    learning_rate=0.0005,
)

In [14]:
trainer = L.Trainer(
    accelerator="gpu",
    max_epochs=10,
    limit_train_batches=100,
    limit_val_batches=100,

    logger=logger,
)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [15]:
datamodule = Datamodule(
    testdir=Path("./PeopleArt-master/Annotations/person_test.txt"),
    traindir=Path("./PeopleArt-master/Annotations/person_train.txt"),
    valdir=Path("./PeopleArt-master/Annotations/person_val.txt"),
    batch_size=64
)

In [16]:

trainer.test(
    model=lit_module,
    datamodule=datamodule,
)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
c:\python\Lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:441: The 'test_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=15` in the `DataLoader` to improve performance.


Testing DataLoader 0:   0%|          | 0/9 [00:00<?, ?it/s]



Testing DataLoader 0:  11%|█         | 1/9 [00:18<02:28,  0.05it/s]

c:\python\Lib\site-packages\lightning\pytorch\utilities\data.py:77: Trying to infer the `batch_size` from an ambiguous collection. The batch size we found is 3. To avoid any miscalculations, use `self.log(..., batch_size=batch_size)`.


Testing DataLoader 0: 100%|██████████| 9/9 [02:12<00:00,  0.07it/s]


[{'test_map': 0.2883700728416443}]

In [12]:

trainer.fit(
    model=lit_module,
    datamodule=datamodule,
)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name            | Type            | Params
----------------------------------------------------
0 | model           | SSD             | 3.4 M 
1 | image_transform | ObjectDetection | 0     
----------------------------------------------------
3.4 M     Trainable params
0         Non-trainable params
3.4 M     Total params
13.760    Total estimated model params size (MB)


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

c:\python\Lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:441: The 'val_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=15` in the `DataLoader` to improve performance.


                                                                           

c:\python\Lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:441: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=15` in the `DataLoader` to improve performance.
c:\python\Lib\site-packages\lightning\pytorch\loops\fit_loop.py:293: The number of training batches (9) is smaller than the logging interval Trainer(log_every_n_steps=50). Set a lower value for log_every_n_steps if you want to see logs for the training epoch.


Epoch 7:  22%|██▏       | 2/9 [00:07<00:25,  0.28it/s, v_num=8]

In [None]:

trainer.test(
    model=lit_module,
    datamodule=datamodule,
)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing DataLoader 0: 100%|██████████| 9/9 [00:28<00:00,  0.31it/s]


[{'test_map': 0.3749520480632782}]