In [1]:
from deep.models import build_model
from deep.reid_model_factory import (
    get_model_name,
    get_model_url,
    load_pretrained_weights,
    show_downloadeable_models,
)

import torch
import torch.nn as nn

from collections import OrderedDict, namedtuple
from os.path import exists as file_exists
from pathlib import Path
import gdown
import torchvision.transforms as T
import pandas as pd

In [None]:
from abc import ABC, abstractmethod


class Metric_Interface(object):
    @abstractmethod
    def reset(self):
        raise NotImplementedError

    @abstractmethod
    def update(self, output):
        pass

    @abstractmethod
    def compute(self):
        return

In [23]:
import os
import pyparsing
from pytorch_lightning import LightningModule, Trainer
from torchmetrics.functional import accuracy
# from metrics.metric import Metric_Interface
from torch import nn
from torch.utils.data import DataLoader
import torch
from tqdm import tqdm
import time

class Person_Reid(LightningModule):
    def __init__(
        self,
        model,
        loss_fn: nn.Module,
        optimizer,
        scheduler,
        evaluator: None,
        # cfg: None,
    ):
        super(Person_Reid, self).__init__()
        self.model = model
        self.loss_fn = loss_fn
        self.optimizer = optimizer
        self.scheduler = scheduler
        self.evaluator = evaluator
        # self.cfg = cfg
        self.predict_ouputs = dict
        self.start_time = time.time()
        return

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        img, pid = batch
        score, feat = self.model(img)
        loss = self.loss_fn(score, feat, pid)
        
        if isinstance(score, list):
            acc = (score[0].max(1)[1] == pid).float().mean()
        else:
            acc = (score.max(1)[1] == pid).float().mean()
        
        self.log(f"Train loss", loss, prog_bar=True)
        self.log(f"Train accuracy", acc, prog_bar=True)

        return {"loss": loss, "acc": acc}

    def evaluate(self, batch, stage):
        img, pid, camid, img_path = batch
        feat = self.model(img)
        self.evaluator.update([feat.cpu(), pid, camid])
        return

    def validation_step(self, batch, batch_idx):
        return self.evaluate(batch, "val")

    # def predict_step(self, batch: pyparsing.Any, batch_idx: int, dataloader_idx: int = 0) -> pyparsing.Any:
    #     return dict(
    #         zip(batch[1], super().predict_step(batch[0], batch_idx, dataloader_idx))
    #     )

    def on_train_epoch_start(self) -> None:
        print("Start training on epoch:", self.current_epoch)

    def training_epoch_end(self, outputs) -> None:
        loss = torch.mean(torch.stack([o["loss"] for o in outputs], dim=0))
        acc = torch.mean(torch.stack([o["acc"] for o in outputs], dim=0)) * 100
        print(
            "Epoch: {:.1f},Train accuracy: {:.5f}, Train loss: {:.5f}".format(
                self.current_epoch, acc, loss
            )
        )
        print("Learning rate:", self.scheduler.get_lr()[0])

    def validation_epoch_end(self, outputs):
        print("Calulating the acc, cmc, mAP")

        cmc, mAP = self.evaluator.compute()

        self.log("Val_CMC@rank1", cmc[0])
        self.log("Val_CMC@rank5", cmc[4])
        self.log("Val_mAP", mAP)

        print("Validation result:")
        print(
            "Validation epoch: {:.1f}, CMC@rank1: {:.5f}%, CMC@rank5: {:.5f}%, mAP: {:.5f}%".format(
                self.current_epoch, cmc[0] * 100, cmc[4] * 100, mAP * 100
            )
        )
        self.evaluator.reset()
        # if time.time() - self.start_time > 36000:
        #     print("OUT of training time, stop!!")
        #     exit(0)
        return

    def test_step(self, batch, batch_idx):
        self.evaluate(batch, "test")

    def configure_optimizers(self):
        return {"optimizer": self.optimizer, "lr_scheduler": self.scheduler}

    def on_validation_epoch_start(self):
        print("--------")
        print("Start validation")

    # def on_train_start(self):
    #     print("Saving directory: ", self.logger.log_dir)
    #     import ruamel.yaml as yaml

    #     if not os.path.exists(self.logger.log_dir):
    #         os.makedirs(self.logger.log_dir)

    #     with open(self.logger.log_dir + "/settings.yaml", "w+") as yml:
    #         yaml.dump(self.cfg, yml, allow_unicode=True, Dumper=yaml.RoundTripDumper)

    # def lr_scheduler_step(self, scheduler, optimizer_idx, metric):
    #     if self.cfg["SOLVER"]["WARMUP_METHOD"] == "cosine":
    #         scheduler.step(
    #             epoch=self.current_epoch
    #         )  # timm's scheduler need the epoch value
    #     else:
    #         scheduler.step()

In [2]:
# kadirnar: I added export_formats to the function
def export_formats():
    # YOLOv5 export formats
    x = [
        ["PyTorch", "-", ".pt", True, True],
        ["PyTorch", "-", ".ckpt", True, True],
        ["TorchScript", "torchscript", ".torchscript", True, True],
        ["ONNX", "onnx", ".onnx", True, True],
        ["OpenVINO", "openvino", "_openvino_model", True, False],
        ["TensorRT", "engine", ".engine", False, True],
        ["CoreML", "coreml", ".mlmodel", True, False],
        ["TensorFlow SavedModel", "saved_model", "_saved_model", True, True],
        ["TensorFlow GraphDef", "pb", ".pb", True, True],
        ["TensorFlow Lite", "tflite", ".tflite", True, False],
        ["TensorFlow Edge TPU", "edgetpu", "_edgetpu.tflite", False, False],
        ["TensorFlow.js", "tfjs", "_web_model", False, False],
        ["PaddlePaddle", "paddle", "_paddle_model", True, True],
    ]
    return pd.DataFrame(x, columns=["Format", "Argument", "Suffix", "CPU", "GPU"])

def check_suffix(file="yolov8x.pt", suffix=(".pt",), msg=""):
    # Check file(s) for acceptable suffix
    if file and suffix:
        if isinstance(suffix, str):
            suffix = [suffix]
        for f in file if isinstance(file, (list, tuple)) else [file]:
            s = Path(f).suffix.lower()  # file suffix
            if len(s):
                assert s in suffix, f"{msg}{f} acceptable suffix is {suffix}"




In [30]:
class ReIDDetectMultiBackend(nn.Module):
    # ReID models MultiBackend class for python inference on various backends
    def __init__(self, weights="vit_base_patch16_224_TransReID.ckpt", device=torch.device("cpu"), fp16=False):
        super().__init__()

        w = weights[0] if isinstance(weights, list) else weights
        (
            self.pt,
            self.ckpt,
            self.jit,
            self.onnx,
            self.xml,
            self.engine,
            self.coreml,
            self.saved_model,
            self.pb,
            self.tflite,
            self.edgetpu,
            self.tfjs,
        ) = self.model_type(
            w
        )  # get backend
        self.fp16 = fp16
        self.fp16 &= self.pt or self.jit or self.engine  # FP16

        # Build transform functions
        self.device = device
        self.image_size = (256, 128)
        self.pixel_mean = [0.485, 0.456, 0.406]
        self.pixel_std = [0.229, 0.224, 0.225]
        self.transforms = []
        self.transforms += [T.Resize(self.image_size)]
        self.transforms += [T.ToTensor()]
        self.transforms += [T.Normalize(mean=self.pixel_mean, std=self.pixel_std)]
        self.preprocess = T.Compose(self.transforms)
        self.to_pil = T.ToPILImage()
        model_name = get_model_name(w)
        print(model_name)

        if w.suffix == ".pt" or w.suffix == '.ckpt':
            model_url = get_model_url(w)
            if not file_exists(w) and model_url is not None:
                gdown.download(model_url, str(w), quiet=False)
            elif file_exists(w):
                pass
            else:
                print(f"No URL associated to the chosen StrongSORT weights ({w}). Choose between:")
                show_downloadeable_models()
                exit()

        # Build model
        self.model = build_model(model_name, num_classes=1, pretrained=not (w and w.is_file()), use_gpu=device)
        
        if self.ckpt:   # PyTorch
            if w and w.is_file() and w.suffix == '.ckpt':
                load_pretrained_weights(self.model, w)

            self.model.to(device).eval()
            self.model.half() if self.fp16 else self.model.float()
        else:
            print("This model framework is not supported yet!")
            exit()
    
    def _preprocess(self, im_batch):

        images = []
        for element in im_batch:
            image = self.to_pil(element)
            image = self.preprocess(image)
            images.append(image)

        images = torch.stack(images, dim=0)
        images = images.to(self.device)

        return images
    
    def forward(self, im_batch):

        # preprocess batch
        im_batch = self._preprocess(im_batch)

        # batch to half
        if self.fp16 and im_batch.dtype != torch.float16:
            im_batch = im_batch.half()

        # batch processing
        features = []
        features = self.model(im_batch)

        if isinstance(features, (list, tuple)):
            return self.from_numpy(features[0]) if len(features) == 1 else [self.from_numpy(x) for x in features]
        else:
            return self.from_numpy(features)

    def from_numpy(self, x):
        return torch.from_numpy(x).to(self.device) if isinstance(x, np.ndarray) else x


    @staticmethod
    def model_type(p="path/to/model.pt"):
        # Return model type from model path, i.e. path='path/to/model.onnx' -> type=onnx
        suffixes = list(export_formats().Suffix) + [".xml"]  # export suffixes
        check_suffix(p, suffixes)  # checks
        p = Path(p).name  # eliminate trailing separators
        pt, ckpt, jit, onnx, xml, engine, coreml, saved_model, pb, tflite, edgetpu, tfjs, _, xml2 = (s in p for s in suffixes)
        xml |= xml2  # *_openvino_model or *.xml
        tflite &= not edgetpu  # *.tflite
        return pt, ckpt, jit, onnx, xml, engine, coreml, saved_model, pb, tflite, edgetpu, tfjs


In [32]:

x  = ReIDDetectMultiBackend(weights=Path('/home/son/Desktop/VIN/CV/project/weights/vit_base_patch16_224_TransReID.ckpt'))

vit_base_patch16_224_TransReID
using stride: 16, and patch number is num_y16 * num_x8
using drop_out rate is : 0.0
using attn_drop_out rate is : 0.0
using drop_path rate is : 0.1




In [33]:
x.forward('/home/son/Desktop/VIN/CV/project/data/MOT20/train/MOT20-01/img1/000002.jpg')

TypeError: pic should be Tensor or ndarray. Got <class 'str'>.

In [29]:
load_pretrained_weights(x, '/home/son/Desktop/VIN/CV/project/weights/vit_base_patch16_224_TransReID.ckpt')

