In [2]:
import sys
sys.path.insert(1, "/home/odyssey/mmk_smoke_detection")

from dataset_preparator.preparator import plot_detections
from dataset_preparator.classJson import JsonData
from typing import Dict, Optional, List, Tuple
from PIL import Image

import glob
import os
import shutil
import pandas as pd
import random
import numpy as np
import tqdm
import cv2

In [3]:
# ct131401  ct131402  ct131403  ct131404	ct131405  ct131406  ct1314.csv
BACKGROUND_LABEL = "background"
EMISSION_LABEL = "emission"
FIRE_LABEL = "fire"
MACHINE_LABEL = "machine"

LABELS = [BACKGROUND_LABEL, EMISSION_LABEL, FIRE_LABEL, MACHINE_LABEL]
PATH_TO_CONF = "/home/odyssey/mmk_smoke_detection/validation/new_conf"
PATH_TO_VALIDATION_DATA = "/home/odyssey/mmk_smoke_detection/validation/05.10.21"

CAMERA_KEYS = [
    path.split('/')[-1]
    for path in glob.glob(os.path.join(PATH_TO_VALIDATION_DATA, '*'))
    if os.path.isdir(path)
]
SQUARE_DIM = 120

H, W = 60, 120

In [4]:
class DatasetDirectoryController:
    MAIN_DATASET_DIR = "dataset"

    _path_for_labels: List[str]
    _dataset_main_dir: str

    def __init__(self,
                 dataset_main_dir: Optional[str] = None):
        self._path_for_labels = []
        self._dataset_main_dir = dataset_main_dir or self.MAIN_DATASET_DIR

    @property
    def dataset_dir(self) -> str:
        return self._dataset_main_dir

    def get_directory_for_label_idx(self, label_idx: int) -> str:
        return self._path_for_labels[label_idx]

    def prepare_directories(self):
        if not os.path.exists(self._dataset_main_dir):
            os.mkdir(self._dataset_main_dir)
        for label in LABELS:
            label_path = os.path.join(self._dataset_main_dir, label)
            if not os.path.exists(label_path):
                os.mkdir(label_path)
            self._path_for_labels.append(label_path)

    @property
    def stats(self) -> Dict[str, int]:
        return {
            LABELS[label_idx]: len(
                glob.glob(
                    os.path.join(
                        self.get_directory_for_label_idx(label_idx),
                        '*'
                    )
                )
            )
            for label_idx in range(4)
        }

In [5]:
class ParsedFilePath:
    cfg_key: str
    dir_path: str
    file_name: str
    file_extension: str
    full_name: str
    label_name: str
    
    def __init__(self, file_path: str):
        self.full_name = file_path
        path, self.file_extension = os.path.splitext(file_path)
        self.file_name = os.path.basename(path)
        self.dir_path = os.path.dirname(path)
        self.cfg_key = self.dir_path.split('/')[0]
        self.label_name = self.dir_path.split('/')[1]
        
    def gen_new_path(self, iter_num: int) -> str:
        new_file_name = f"{self.file_name}_{iter_num}{self.file_extension}"
        return os.path.join(self.label_name, new_file_name)
        

In [6]:
class ConfigController:    
    configs: Dict[str, JsonData]
        
    @staticmethod
    def extract_key_from_conf_filename(
        conf_filename: str
    ) -> str:
        return os.path.basename(conf_filename)[5:13]
    
    def __init__(
        self,
        path_to_configs: str,
    ):
        self.configs = {}
        conf_files = glob.glob(os.path.join(path_to_configs, '*'))
        print(conf_files)
        for conf_file in conf_files:
            conf_key = self.extract_key_from_conf_filename(conf_file)
            conf_data = JsonData(conf_file)
            self.configs[conf_key] = conf_data
            
    def __getitem__(
        self,
        key: str
    ) -> JsonData:
        return self.configs[key]


In [157]:
!ls

05.10.21			   __init__.py	     trdt.py
05.10.21_annotated_ct1314.7z	   new_conf	     validate_detector.py
box_test_draw.jpg		   not_expanded_val  val_visual.ipynb
expanded_val			   test_draw.jpg     vanilla_val_res.ipynb
generate_validation_dataset.ipynb  test_pers.jpg


In [7]:
class ValidationDatasetGenerator:
    ID = 'paths'

    _src_dataset: str
    _dst_dataset: str
    _table: str
    _need_expand: bool
    _configs: ConfigController
        
    _dst_dir_con: DatasetDirectoryController
        
    _skipped_count: int
    
    def __init__(
        self,
        config_controller: ConfigController, 
        src_dataset: str,
        dst_dataset: str,
        path_to_csv: str,
        need_expand: bool = True
    ):
        self._configs = config_controller
        self._src_dataset = src_dataset
        self._dst_dataset = dst_dataset
        self._table = pd.read_csv(path_to_csv)
        self._need_expand = need_expand
        self._skipped_count = 0
        self._dst_dir_con = DatasetDirectoryController(dst_dataset)
        
        if os.path.exists(dst_dataset):
            shutil.rmtree(dst_dataset)
        self._dst_dir_con.prepare_directories()
#             raise Exception(f"Dst dir {dst_dataset} exists, remove it or change it!")
#         shutil.copytree(src_dataset, dst_dataset, ignore=shutil.ignore_patterns('*.jpg', '*.png', '*.csv'))
    
    @staticmethod
    def _increase_square_params(
            dim: int,
            c_less: int,
            c_grt: int,
            max_value: int
    ) -> Tuple[int, int]:
        add_diff = (dim - (c_grt - c_less)) // 2
        less_diff = c_less - add_diff
        grt_diff = max_value - (c_grt + add_diff)

        if less_diff < 0:
            # эту разницу надо добавить к inc_c_grt
            inc_c_less = 0
            inc_c_grt = c_grt + add_diff + np.abs(less_diff)
        elif grt_diff < 0:
            # эту разницу надо добавить к inc_c_less
            inc_c_grt = max_value
            inc_c_less = c_less - add_diff - np.abs(grt_diff)
        else:
            inc_c_less = c_less - add_diff
            inc_c_grt = c_grt + add_diff

        if inc_c_grt - inc_c_less < SQUARE_DIM:
            if inc_c_grt + 1 >= max_value:
                inc_c_less -= 1
            else:
                inc_c_grt += 1

        return inc_c_less, inc_c_grt

    def _expand_box(self,
                    max_height: int,
                    max_width: int,
                    box: np.ndarray) -> List[int]:
        if self._need_expand:
            new_min_y, new_max_y = ValidationDatasetGenerator._increase_square_params(
                H,
                box[1],
                box[3],
                max_value=max_height
            )
            new_min_x, new_max_x = ValidationDatasetGenerator._increase_square_params(W,
                                                                                      box[0],
                                                                                      box[2],
                                                                                      max_value=max_width)
        else:
            new_min_x, new_min_y = box[0], box[1]
            new_max_x, new_max_y = box[2], box[3]
        if new_min_x < 0:
            new_min_x = 0
        if new_min_y < 0:
            new_min_y = 0
        if new_max_x > max_width:
            new_max_x = max_width
        if new_max_y > max_height:
            new_max_y = max_height
        return [new_min_x, new_min_y, new_max_x, new_max_y]

    def _crop_by_place_and_save(
        self,
        img: np.ndarray,
        box: List[int],
        num_iter: int,
        parsed_file: ParsedFilePath
    ):
        cropped_image = img[box[1]: box[3], box[0]: box[2]]
        croopped_img_name = parsed_file.gen_new_path(num_iter)
        path_to_save = os.path.join(
            self._dst_dataset,
            croopped_img_name
        )
        try:
            Image.fromarray(cropped_image).save(path_to_save)
        except Exception:
            print(img.shape, box)
        
    def _apply_perspective_to_img(
        self,
        file_path: str,
        pers_matrix: Dict
    ) -> np.ndarray:
        frame = Image.open(
            os.path.join(
                self._src_dataset,
                file_path
            )
        )
        frame = cv2.warpPerspective(
            np.array(frame),
            np.array(pers_matrix['matrix']),
            (pers_matrix['maxWidth'], pers_matrix['maxHeight']),
            flags=cv2.INTER_LINEAR
        )
        return frame
        
    def _apply_perspective_to_box(
        self,
        boxes: np.ndarray,
        pers_matrix: Dict
    ) -> np.ndarray:
        reshape_boxes = np.float32(boxes.reshape(-1)).reshape(-1, 1, 2)
        mat = np.array(pers_matrix['matrix'])
        return cv2.perspectiveTransform(reshape_boxes, mat).reshape(4).astype(int).reshape(-1, 4)
        
    def _handle_image(
        self,
        file: str
    ):
        parsed_file = ParsedFilePath(file)
        file_part = self._table[self._table[self.ID] == file][['x_min', 'y_min', 'x_max', 'y_max', 'class']].to_numpy()
        file_boxes = file_part[:, :4]
        file_classes = file_part[:, 4]
        
        pers_matrix = self._configs[parsed_file.cfg_key].return_matrix()
        pers_img = self._apply_perspective_to_img(file_path=file, pers_matrix=pers_matrix)
        pers_boxes = self._apply_perspective_to_box(file_boxes, pers_matrix)
        
        max_height, max_width = pers_img.shape[0], pers_img.shape[1]
        pers_boxes = [
            self._expand_box(
                max_height=max_height,
                max_width=max_width,
                box=box
            )
            for box in pers_boxes
        ]
        for num_iter, box in enumerate(pers_boxes):
            self._crop_by_place_and_save(
                img=pers_img,
                box=box,
                num_iter=num_iter,
                parsed_file=parsed_file
            )
                     
        
    def handle_images(
        self
    ):
        files = self._table[self.ID].unique()
        for file in tqdm.tqdm(files):
            if not os.path.exists(os.path.join(self._src_dataset, file)):
                self._skipped_count += 1
                continue
            self._handle_image(file)
        print("SKIPPED FILES COUNT:", self._skipped_count)
        

In [8]:
cfg_controller = ConfigController(
    path_to_configs='new_conf'
)

val_dataset_gen = ValidationDatasetGenerator(
    config_controller=cfg_controller,
    src_dataset='05.10.21',
    dst_dataset='expanded_val',
    path_to_csv='05.10.21/ct1314.csv',
    need_expand=True
)

['new_conf/conf_ct131406.json', 'new_conf/conf_ct131405.json', 'new_conf/conf_ct131402.json', 'new_conf/conf_ct131401.json', 'new_conf/conf_ct131403.json', 'new_conf/conf_ct131404.json']


In [9]:
val_dataset_gen.handle_images()

100%|██████████| 661/661 [00:17<00:00, 38.72it/s]

SKIPPED FILES COUNT: 6





In [161]:
cfg_controller = ConfigController(
    path_to_configs='new_conf'
)

vanilla_val_dataset_gen = ValidationDatasetGenerator(
    config_controller=cfg_controller,
    src_dataset='05.10.21',
    dst_dataset='not_expanded_val',
    path_to_csv='05.10.21/ct1314.csv',
    need_expand=False
)
vanilla_val_dataset_gen.handle_images()

['new_conf/conf_ct131406.json', 'new_conf/conf_ct131405.json', 'new_conf/conf_ct131402.json', 'new_conf/conf_ct131401.json', 'new_conf/conf_ct131403.json', 'new_conf/conf_ct131404.json']


100%|██████████| 661/661 [00:18<00:00, 36.47it/s]

SKIPPED FILES COUNT: 6





In [4]:
! rm -r line_fire_binary_val
! cp -r line_three_val line_fire_binary_val
! rm -r line_fire_binary_val/emission
! ls line_fire_binary_val

background  fire
