In [None]:
import pathlib
import os

import cv2
import pandas as pd 
import numpy as np
import albumentations
import matplotlib.pyplot as plt

PROJECT_ROOT = pathlib.Path('./../..').resolve()

DATA_DIR = PROJECT_ROOT / 'SignDetectorAndClassifier' / 'data'
DATASET_DIR = DATA_DIR / 'YOLO_DATASET'

### Winter photo example for `classifier`

![](./../data/winter_traffic_signs_example/2_cut.png)
![](./../data/winter_traffic_signs_example/1_cut.png)

In [None]:
img = cv2.imread(
    str(DATA_DIR / 'STOCK_SIGNS' / '1.8.png')
)
img = cv2.cvtColor(img, cv2.COLOR_BGRA2RGBA)

plt.imshow(img)

```python
class Winterize(albumentations.ImageOnlyTransform):
    def __init__(self, contrast_limit=0.8, tiles_count=5, tiles_size=5, tiles_relative=True, always_apply=False, p=0.5):
        super(Winterize, self).__init__(always_apply, p)
        self.contrast_limit = contrast_limit
        self.tiles_count = tiles_count
        self.tiles_size = tiles_size,
        self.tiles_relative = tiles_relative
        
    # TODO: implement white noise + gaussian on alpha channel of input image
    # TODO: implement while-gray random cuts from image
    def _transform(self, img):
        self.noise = np.random.normal(1, 1, img.shape)
        img = albumentations.brightness_contrast_adjust(img)
        return self.noise 

    def apply(self, img, clip_limit=2, **params):
        if albumentations.get_num_channels(img) != 4:
            raise TypeError("Winterize transformation expects RGBA image")

        return self._transform(img)

    def get_params(self):
        return {'contrast_limit': self.contrast_limit, 'tiles_count': self.tiles_count, 'tiles_size': self.tiles_size, 'tiles_relation': self.tiles_relative}

    def get_transform_init_args_names(self):
        return ("contrast_limit", "tiles_count", "tiles_size", "tiles_relative")

a = Winterize()

img_t = a.apply(img)
plt.imshow(img_t)
```

### Winter photo example for `detector`
![](./../data/winter_traffic_signs_example/1_full.png)

In [None]:
import pathlib
import os

import cv2
import pandas as pd 
import numpy as np
import albumentations


PROJECT_ROOT = pathlib.Path('./../../').resolve()

DATA_DIR = PROJECT_ROOT / 'SignDetectorAndClassifier' / 'data'
DATASET_DIR = DATA_DIR / 'YOLO_DATASET'
DATASET_DIR

In [None]:
USER_FULL_FRAMES = DATASET_DIR / 'USER_FULL_FRAMES' 
SAMPLE_IMG = USER_FULL_FRAMES / 'autosave01_02_2012_09_13_36.jpg'

from enum import Enum

class Season(Enum): 
    Winter = 1
    Fall = 2
    Summer = 3
    Autumn = 4
    
def extract_season_from_name(filepath: pathlib.Path) -> Season:
    name = filepath.stem 
    month = int(
        name.replace('autosave', '').replace('_', ' ').split(' ')[1]
    )
    if month in [12, 1, 2]:
        return Season.Winter
    if month in [3, 4, 5]:
        return Season.Autumn
    if month in [6, 7, 8]:
        return Season.Summer
    if month in [9, 10, 11]:
        return Season.Fall
    
    raise ValueError(f'Invalid month {month}')

files = USER_FULL_FRAMES.iterdir()
seasons = list(map(extract_season_from_name, files))
print(*[f'\t{x} count {seasons.count(x)}\n' for x in Season])

In [None]:
import ast

def read_yolo_dataset(csv_path: pathlib.Path, filepath_prefix: str):
    data = pd.read_csv(csv_path)
    data['filepath'] = data['filepath'].apply(lambda x: pathlib.Path(filepath_prefix) / x)
    data['size'] = data['size'].apply(lambda x: ast.literal_eval(x))
    data['coords'] = data['coords'].apply(lambda x: ast.literal_eval(x))
    return data

yolo_dataset = read_yolo_dataset(DATASET_DIR / 'USER_FULL_FRAMES.csv', DATASET_DIR)
yolo_dataset['season'] = yolo_dataset['filepath'].apply(lambda x: extract_season_from_name(x))
yolo_dataset

Do I realy need season in dataset?

## Per season split

In [None]:
from collections import Counter
import seaborn as sns

hist_data = []
set_ = list(set(yolo_dataset['set']))
for idx, x in enumerate(set_):
    count_data = dict(
        Counter(map(extract_season_from_name, yolo_dataset[yolo_dataset['set'] == x]['filepath'].to_list())
        )
    )
    count_data = {str(x).split('.')[-1]: y for x, y in count_data.items()}
    hist_data.append(sorted([key for key, val in count_data.items() for _ in range(val)])) #, bins=set(count_data.keys()))

    # sns.countplot(data=df, x='Season')
plt.hist(hist_data, bins=len(set_), align='mid')
plt.legend(set_)
plt.show()

# TODO: fix x axis labels

In [None]:
from maddrive_adas.utils.transforms import ConvertCenterXYWH2CV2Rectangle, UnmakeRel
from maddrive_adas.utils.general import xywh2xyxy
from maddrive_adas.utils.fs import imread_rgb

from albumentations import CoarseDropout, Compose

import random 

import torch

def put_outer_circle_mask(img: np.ndarray, abs_xywh):
    raise NotImplementedError
    rectangle_coords = ConvertCenterXYWH2CV2Rectangle(abs_xywh)
    center = (abs_xywh[0], abs_xywh[1]) # x, y
    radius = abs_xywh[2] // 2   # width // 2
    print(radius)
    print(center)
    outer_mask = np.ones_like(img)
    # outer_mask = cv2.rectangle(outer_mask, (rectangle_coords[0], rectangle_coords[1]), (rectangle_coords[2], rectangle_coords[3]), (0, 0, 0), -1)
    # inner_mask = cv2.circle(outer_mask, center, radius, color=(255, 255, 255), thickness=-1)
    # total_mask = cv2.bitwise_and(outer_mask, inner_mask)
    return outer_mask
    img = cv2.bitwise_and(img.copy(), total_mask)
    return img

def _put_crops(img: np.ndarray, rectangle_coords: list[int], color: list[int], p=0.5):
    sub_img = img[rectangle_coords[1]:rectangle_coords[3], rectangle_coords[0]:rectangle_coords[2], :]
    h, w, d = sub_img.shape
    # print(h, w, d)
    aug_compose = Compose(
        CoarseDropout(
            max_height=h // 5,
            max_width=w // 5,
            min_holes=1,
            max_holes=10,
            fill_value=color,
            p=p),
    )
    coarsed_sub_img = aug_compose(image=sub_img)['image']
    img[rectangle_coords[1]:rectangle_coords[3], rectangle_coords[0]:rectangle_coords[2], :] = coarsed_sub_img
    return img

def put_shieeet_on_img_like_winter(img: np.ndarray, sign_coordinates_xywh_rel: list[list[int]]):
    h, w, d = img.shape
    for coords in sign_coordinates_xywh_rel:
        abs_coords = UnmakeRel(coords, w, h)
        rectangle_coords =  ConvertCenterXYWH2CV2Rectangle(abs_coords)
        # print(rectangle_coords)
        color_const = random.randrange(80, 256)
        color = [color_const] * 3
        img = _put_crops(img, rectangle_coords, color=color, p=1)

    return img

def put_rectangle(img, p1, p2, color):
    """Usage example:
    ```for i in data['coords']:
        abs_coords = UnmakeRel(i, w, h)
        rectangle_coords =  ConvertCenterXYWH2CV2Rectangle(abs_coords)
        p1, p2 = (rectangle_coords[0], rectangle_coords[1]), (rectangle_coords[2], rectangle_coords[3]), 
        img = put_rectangle(img, p1, p2, (0, 0, 0))
    ```
    """
    return cv2.rectangle(
        img, 
        p1, p2,
        color, 
        thickness=5
    )

def get_all_rectangle_points(two_rectangle_points: list[int]) -> list[tuple[int]]:
    """Return TL, TR, BR, BL points."""
    p1 = (two_rectangle_points[0], two_rectangle_points[1])
    p2 = (two_rectangle_points[0], two_rectangle_points[3])
    p3 = (two_rectangle_points[2], two_rectangle_points[3])
    p4 = (two_rectangle_points[2], two_rectangle_points[1])
    return [p1, p2, p3, p4]

def resize_triangle(triangle_pts: list[int], k_limit=0.5, randomize_k=True) -> list[int]:
    """Resize trianble.

    Args:
        triangle_pts (list[int]): Array of triangle points.
        k_limit (float, optional): Triangle border scale limit. If randomize k, apply scale [0; k), else apply k. Defaults to 0.5.
        randomize_k (bool, optional): Apply random k scale from range [0; k]. Defaults to True.

    Returns:
        list[int]: _description_
    """
    xs, ys = [x[0] for x in triangle_pts], [x[1] for x in triangle_pts]

    base_x, base_y = None, None
    offset_x, offset_y = None, None

    for x in xs: 
        if xs.count(x) == 2:
            base_x = x
        else: 
            offset_x = x
    offset_x -= base_x

    for x in ys: 
        if ys.count(x) == 2:
            base_y = x
        else:
            offset_y = x
    offset_y -= base_y
    
    if base_x is None and base_y is None:
        raise ValueError('Unable to get base triangle point. Is it right triangle?')
        
    x_scale = k_limit * random.random() if randomize_k else k_limit
    y_scale = k_limit * random.random() if randomize_k else k_limit
    return [(base_x, base_y), (int(base_x + offset_x * x_scale), base_y), (base_x, int(base_y + offset_y * y_scale))]


def hide_corner(img: np.ndarray, sign_coordinates_xywh_rel: list[list[int]], p=0.9, k_limit=1., randomize_k=True):
    """_summary_ TODO:

    Args:
        img (np.ndarray): _description_
        sign_coordinates_xywh_rel (list[list[int]]): _description_
        p (float, optional): _description_. Defaults to 0.9.
        k_limit (_type_, optional): _description_. Defaults to 1..
        randomize_k (bool, optional): _description_. Defaults to True.

    Returns:
        _type_: _description_
    """
    h, w, _ = img.shape
    for coords in sign_coordinates_xywh_rel: 
        if random.random() > p: 
            continue
        abs_coords = UnmakeRel(coords, w, h)
        rectangle_coords =  ConvertCenterXYWH2CV2Rectangle(abs_coords)
        triangle_coordinates: list[int] = random.sample(get_all_rectangle_points(rectangle_coords), 3)
        triangle_coordinates = resize_triangle(triangle_coordinates,  k_limit=k_limit, randomize_k=randomize_k)
        # print(triangle_coordinates)
        pts = np.array(triangle_coordinates, np.int32)
        # color_const = random.randrange(80, 256)
        color = [random.randrange(80, 256), random.randrange(80, 256), random.randrange(80, 256)] # [color_const] * 3
        cv2.drawContours(img, [pts], 0, color, -1)
    return img

def get_item(df: pd.DataFrame, idx: int):
    data = df.iloc[idx]
    img = imread_rgb(data['filepath'])
    img = put_shieeet_on_img_like_winter(img.copy(), data['coords'])
    img = hide_corner(img.copy(), data['coords'])
    return img

img = get_item(yolo_dataset, 5140)
plt.figure(figsize=(20,6))
plt.imshow(img);

In [None]:
import torch

from maddrive_adas.utils.augmentations import (
    Albumentations,
    augment_hsv,
    letterbox,
    random_perspective,
)

from maddrive_adas.utils.general import (
    xywh2xyxy,
    xywhn2xyxy,
    xyxy2xywhn,
)

class WinterizedYoloDataset(torch.utils.data.Dataset):
    def __init__(
        self,
        df: pd.DataFrame,
        set_label: str,
        hyp_arg: dict,
        img_size=640,
        augment=False,
        hide_corner_chance=0.5
    ):
        self.img_size = img_size
        self.augment = augment
        self.hyp = hyp_arg
        self.df = df[df["set"] == set_label]
        self._hide_corner_chance = hide_corner_chance
        self.albumentations = Albumentations() if augment else None

    def load_image(self, instance):
        path, (w0, h0) = instance["filepath"], instance["size"]
        img = cv2.imread(str(path))
        assert img is not None, f"Image Not Found {path}"
        img = hide_corner(img, instance['coords'], p=self._hide_corner_chance, k_limit=1)
        img = put_shieeet_on_img_like_winter(img, instance['coords'])

        r = self.img_size / max(h0, w0)  # ratio

        if r != 1:  # if sizes are not equal
            img = cv2.resize(
                img,
                (int(w0 * r), int(h0 * r)),
                interpolation=cv2.INTER_AREA
                if r < 1 and not self.augment
                else cv2.INTER_LINEAR,
            )
        return img, (h0, w0), img.shape[:2]

    def __getitem__(self, index):
        # locate img info from DataFrame
        instance = self.df.iloc[index]

        # get Img, src height, width and resized height, width
        try:
            img, (h0, w0), (h, w) = self.load_image(instance)
        except ValueError as e:
            raise ValueError(f'VE for {e}: index is {index}. {instance}')
            
        shape = self.img_size

        # make img square
        img, ratio, pad = letterbox(img, shape, auto=False, scaleup=self.augment)

        # store core shape info
        shapes = (h0, w0), ((h / h0, w / w0), pad)  # for COCO mAP rescaling

        # add class to labels. We have 1 class, so just add zeros into first column
        labels = np.array(instance["coords"])
        labels = np.c_[np.zeros(labels.shape[0]), labels]

        # fix labels location caused by letterbox
        labels[:, 1:] = xywhn2xyxy(
            labels[:, 1:], ratio[0] * w, ratio[1] * h, padw=pad[0], padh=pad[1]
        )

        if self.augment:
            img, labels = random_perspective(
                img,
                labels,
                degrees=self.hyp["degrees"],
                translate=self.hyp["translate"],
                scale=self.hyp["scale"],
                shear=self.hyp["shear"],
                perspective=self.hyp["perspective"],
            )

        labels[:, 1:5] = xyxy2xywhn(
            labels[:, 1:5], w=img.shape[1], h=img.shape[0], clip=False, eps=1e-3
        )

        # YOLO augmentation technique (!copy-paste!)
        if self.augment:
            # Albumentations
            img, labels = self.albumentations(img, labels)
            nl = len(labels)  # update after albumentations

            # HSV color-space
            augment_hsv(
                img,
                hgain=self.hyp["hsv_h"],
                sgain=self.hyp["hsv_s"],
                vgain=self.hyp["hsv_v"],
            )

            # Remove random flip
            # # Flip up-down
            # if random.random() < self.hyp["flipud"]:
            #     img = np.flipud(img)
            #     if nl:
            #         labels[:, 2] = 1 - labels[:, 2]

            # # Flip left-right
            # if random.random() < self.hyp["fliplr"]:
            #     img = np.fliplr(img)
            #     if nl:
            #         labels[:, 1] = 1 - labels[:, 1]

        nl = len(labels)

        # why out size (?, 6)??
        labels_out = torch.zeros((nl, 6))
        if nl:
            labels_out[:, 1:] = torch.from_numpy(labels)

        img = img.transpose((2, 0, 1))[::-1]  # HWC to CHW, BGR to RGB
        img = np.ascontiguousarray(img)

        return torch.from_numpy(img), labels_out, instance["filepath"], shapes

    def __len__(self):
        return len(self.df.index)

    @staticmethod
    def collate_fn(batch):
        img, label, path, shapes = zip(*batch)  # transposed
        for i, l in enumerate(label):
            l[:, 0] = i  # add target image index for build_targets()
        return torch.stack(img, 0), torch.cat(label, 0), path, shapes

import yaml
hyps_file = DATA_DIR / "hyp.scratch.yaml"
with open(hyps_file, errors='ignore') as f:
    hyp = yaml.safe_load(f)
    
a = WinterizedYoloDataset(yolo_dataset, set_label='train', hyp_arg=hyp, augment=True, hide_corner_chance=1.)
res = a[2]
print(res[0].numpy().shape)
plt.figure(figsize = (20,10))
plt.imshow(res[0].permute(1, 2, 0));

In [None]:
train_dataset = WinterizedYoloDataset(yolo_dataset, set_label='train', hyp_arg=hyp, augment=True, hide_corner_chance=1.)
valid_dataset = WinterizedYoloDataset(yolo_dataset, set_label='valid', hyp_arg=hyp, augment=False, hide_corner_chance=0.)
test_dataset = WinterizedYoloDataset(yolo_dataset, set_label='test', hyp_arg=hyp, augment=False, hide_corner_chance=0.)

from torch.utils.data import DataLoader

batch_size = 128
nw = 0
train_loader = DataLoader(
        train_dataset,  # InfiniteDataLoader ?
        batch_size=batch_size,
        shuffle=True,
        num_workers=nw,  # doesnt work in Windows
        sampler=None,
        pin_memory=True,
        collate_fn=WinterizedYoloDataset.collate_fn,
    )

test_loader = DataLoader(
        test_dataset,  # InfiniteDataLoader ?
        batch_size=batch_size,
        shuffle=False,
        num_workers=nw,  # doesnt work in Windows
        sampler=None,
        pin_memory=True,
        collate_fn=WinterizedYoloDataset.collate_fn,
    )

valid_loader = DataLoader(
        valid_dataset,  # InfiniteDataLoader ?
        batch_size=batch_size,
        shuffle=False,
        num_workers=nw,  # doesnt work in Windows
        sampler=None,
        pin_memory=True,
        collate_fn=WinterizedYoloDataset.collate_fn,
    )


In [None]:
from torch.optim import SGD, lr_scheduler
from torch.cuda import amp
from maddrive_adas.utils.general import one_cycle, LOGGER
from maddrive_adas.utils.loss import ComputeLoss
from maddrive_adas.utils.torch_utils import ModelEMA, de_parallel
from tqdm.notebook import tqdm
from datetime import datetime
import yaml
from maddrive_adas.models.yolo import Model

def train(epochs, model: Model, train_loader: DataLoader, valid_loader: DataLoader, device: torch.device, opt=None, imgsz=640):       
    print('Train called')
    start_epoch = 0
    nc = 1
    model.float()
    cuda = device.type == 'cuda'
    nb = len(train_loader)
    nw = max(round(hyp['warmup_epochs'] * nb), 1000)
    nbs = 64  # nominal batch size
    batch_size = train_loader.batch_size
    last_opt_step = -1
        
    g0, g1, g2 = [], [], []  # optimizer parameter groups
    for v in model.modules():
        if hasattr(v, 'bias') and isinstance(v.bias, torch.nn.Parameter):  # bias
            g2.append(v.bias)
        if isinstance(v, torch.nn.BatchNorm2d):  # weight (no decay)
            g0.append(v.weight)
        elif hasattr(v, 'weight') and isinstance(v.weight, torch.nn.Parameter):  # weight (with decay)
            g1.append(v.weight)
    
    optimizer = SGD(g0, lr=hyp['lr0'], momentum=hyp['momentum'], nesterov=True)
    
    optimizer.add_param_group({'params': g1, 'weight_decay': hyp['weight_decay']})  # add g1 with weight_decay
    optimizer.add_param_group({'params': g2})  # add g2 (biases)
    del g0, g1, g2
    
    lf = one_cycle(1, hyp['lrf'], epochs)  # cosine 1->hyp['lrf']
    scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=lf)
    
    ema = ModelEMA(model)
    
    nl = de_parallel(model).model[-1].nl  # number of detection layers (to scale hyps)
    hyp['box'] *= 3 / nl  # scale to layers
    hyp['cls'] *= nc / 80 * 3 / nl  # scale to classes and layers
    hyp['obj'] *= (imgsz / 640) ** 2 * 3 / nl  # scale to image size and layers
    hyp['label_smoothing'] = opt.label_smoothing if opt else 0.
    
    model.nc = nc  # attach number of classes to model
    model.hyp = hyp  # attach hyperparameters to model
    model.names = ['sign']
    
    scaler = amp.GradScaler(enabled=cuda)
    compute_loss = ComputeLoss(model)
    print('Pre epoch print')
    for epoch in range(start_epoch, epochs):
        model.train()
        mloss = torch.zeros(3, device=device)
        print('Epoch started')
        pbar = enumerate(train_loader)
        LOGGER.info(('\n' + '%10s' * 7) % ('Epoch', 'gpu_mem', 'box', 'obj', 'cls', 'labels', 'img_size'))
        pbar = tqdm(pbar, total=nb, bar_format='{l_bar}{bar:10}{r_bar}{bar:-10b}')  # progress bar
        
        optimizer.zero_grad()
        for i, (imgs, targets, paths, _) in pbar:
            ni = i + nb * epoch  # number integrated batches (since train start)
            imgs = imgs.to(device, non_blocking=True).float() / 255  # uint8 to float32, 0-255 to 0.0-1.0
            
            # Warmup
            if ni <= nw:
                xi = [0, nw]  # x interp
                # compute_loss.gr = np.interp(ni, xi, [0.0, 1.0])  # iou loss ratio (obj_loss = 1.0 or iou)
                accumulate = max(1, np.interp(ni, xi, [1, nbs / batch_size]).round())
                for j, x in enumerate(optimizer.param_groups):
                    # bias lr falls from 0.1 to lr0, all other lrs rise from 0.0 to lr0
                    x['lr'] = np.interp(ni, xi, [hyp['warmup_bias_lr'] if j == 2 else 0.0, x['initial_lr'] * lf(epoch)])
                    if 'momentum' in x:
                        x['momentum'] = np.interp(ni, xi, [hyp['warmup_momentum'], hyp['momentum']])

            # Forward
            with amp.autocast(enabled=cuda):
                pred = model(imgs.half())  # forward
                loss, loss_items = compute_loss(pred, targets.float().to(device))  # loss scaled by batch_size
                
            # Backward
            scaler.scale(loss).backward()

            # Optimize
            if ni - last_opt_step >= accumulate:
                scaler.step(optimizer)  # optimizer.step
                scaler.update()
                optimizer.zero_grad()
                if ema:
                    ema.update(model)
                last_opt_step = ni
            
            mloss = (mloss * i + loss_items) / (i + 1)  # update mean losses
            mem = f'{torch.cuda.memory_reserved() / 1E9 if torch.cuda.is_available() else 0:.3g}G'  # (GB)
            pbar.set_description(('%10s' * 2 + '%10.4g' * 5) % (
                f'{epoch}/{epochs - 1}', mem, *mloss, targets.shape[0], imgs.shape[-1]))
        
        ###
        # every 5 epochs check mAP
        if False and (epoch + 1) % 5 == 0:
            ema.update_attr(model, include=['yaml', 'nc', 'hyp', 'names', 'stride', 'class_weights'])
            map = valid_epoch()
        ###

        # Scheduler
        lr = [x['lr'] for x in optimizer.param_groups]  # for loggers
        scheduler.step()
                
        now = datetime.now()
        model_save_name = DATA_DIR / f'YoloV5_{now.strftime("%d.%m_%H.%M")}_lbox{mloss[0]}_lobj{mloss[1]}.pt'
        
        torch.save(model.state_dict(), model_save_name)

In [None]:
model_cfg_file = DATA_DIR / 'yolov5l_custom_anchors.yaml'
model = Model(cfg=model_cfg_file, ch=3, nc=1)

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
restore= DATA_DIR / 'WinterizedYoloV5.pt'

try:
    model.load_state_dict(torch.load(restore, map_location=device))
    model.eval()
    print('Model successfully loaded!')
except FileNotFoundError as exc_obj:
    print(f'[!] File [{restore}] not found')

In [None]:
for i in train_loader:
    sample_data = i
    
sample_data

In [None]:
train_dataset[5140]

In [None]:
IMG_SIZE = 640
train(50, model, train_loader, valid_loader, device, imgsz=IMG_SIZE)