In [1]:
import os
import shutil
import warnings

import numpy as np
import pandas as pd
import collections
import copy
import cv2

from typing import List, Tuple
from sklearn.model_selection import train_test_split
from torchvision.transforms import v2
from tensorflow.keras.preprocessing.image import ImageDataGenerator

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
warnings.simplefilter("ignore")

pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)




In [2]:
# Определяем параметры и путь к корневой папке
SHOULD_DROP = True  # Или False в зависимости от необходимости
DROP_LESS_THAN = 50  # Порог минимального количества фотографий в классе (классы, где меньше не будут обработаны)

SHOULD_CUT = True  # Или False в зависимости от необходимости
KEEP_MAX_TRAIN = 100  # Максимальное количество изображений для train (остальные не будут обработаны)
KEEP_MAX_VAL = 60  # Максимальное количество изображений для val (остальные не будут обработаны)

TRAIN_AUG_TO = 200  # Порог максимального количества фотографий в классе в обучающей выборке(с учетом аугметированных изображений)
VAL_AUG_TO = 80  # # Порог максимального количества фотографий в классе в валидационнрй выборке(с учетом аугметированных изображений)
AUG_SIZE = 300  # Размер создаваемого изображения (N x N)

DATA_ROOT = r"D:\Projects\Coding\ML\SamsungML\data\Coins\from_parsing\coins"  # Абсолютный путь к папке с исходными данными

SPLIT_DIR = fr"{DATA_ROOT}_split"
CUT_DIR = fr"{SPLIT_DIR}_{DROP_LESS_THAN}"
AUG_DIR = fr"{CUT_DIR}_aug"
CAT_DIR = fr"{AUG_DIR}_cat"

In [3]:
def df_from_path(root_dir: str) -> pd.DataFrame:
    """
    Создаёт DataFrame из изображений в указанной директории.

    :param root_dir: Корневая директория, содержащая изображения.
    :return: DataFrame с информацией о классах, путях к файлам и количестве фотографий в каждом классе.
    """
    data_dict: Dict[str, List] = {
        "labels": [],
        "filepaths": [],
        "photos_in_class": []
    }

    coin_types = os.listdir(root_dir)
    for coin_type in coin_types:
        coin_dir = os.path.join(root_dir, coin_type)
        images = os.listdir(coin_dir)
        image_paths = [os.path.join(coin_dir, image) for image in images]
        image_labels = [" ".join(image.split("_")[:3]) for image in images]

        data_dict["labels"].extend([coin_type] * len(image_paths))
        data_dict["filepaths"].extend(image_paths)
        data_dict["photos_in_class"].extend([len(image_paths)] * len(image_paths))

    df = pd.DataFrame(data_dict)
    df['id'] = df['filepaths'].apply(lambda x: os.path.basename(x)[:-6])

    return df

In [4]:
df = df_from_path(DATA_ROOT)
df

Unnamed: 0,labels,filepaths,photos_in_class,id
0,1 алтын 1704,D:\Projects\Coding\ML\SamsungML\data\Coins\fro...,12,1_алтын_1704_1704_id9282
1,1 алтын 1704,D:\Projects\Coding\ML\SamsungML\data\Coins\fro...,12,1_алтын_1704_1704_id9282
2,1 алтын 1704,D:\Projects\Coding\ML\SamsungML\data\Coins\fro...,12,1_алтын_1704_1704_id9283
3,1 алтын 1704,D:\Projects\Coding\ML\SamsungML\data\Coins\fro...,12,1_алтын_1704_1704_id9283
4,1 алтын 1704,D:\Projects\Coding\ML\SamsungML\data\Coins\fro...,12,1_алтын_1704_1704_id9284
5,1 алтын 1704,D:\Projects\Coding\ML\SamsungML\data\Coins\fro...,12,1_алтын_1704_1704_id9284
6,1 алтын 1704,D:\Projects\Coding\ML\SamsungML\data\Coins\fro...,12,1_алтын_1704_1704_id9285
7,1 алтын 1704,D:\Projects\Coding\ML\SamsungML\data\Coins\fro...,12,1_алтын_1704_1704_id9285
8,1 алтын 1704,D:\Projects\Coding\ML\SamsungML\data\Coins\fro...,12,1_алтын_1704_1704_id9286
9,1 алтын 1704,D:\Projects\Coding\ML\SamsungML\data\Coins\fro...,12,1_алтын_1704_1704_id9286


In [5]:
def df_info(df: pd.DataFrame) -> None:
    """
    Выводит информацию о DataFrame, включающую количество классов, фотографий и статистику по распределению фотографий в классах.

    :param df: DataFrame с данными о фотографиях и их классах.
    """
    num_classes = df["labels"].nunique()
    total_photos = df.drop_duplicates("labels")["photos_in_class"].sum()
    avg_photos_per_class = total_photos // num_classes
    odd_photo_classes = df[df["photos_in_class"] % 2 == 1]["labels"].nunique()
    
    print(
        f"Всего классов: {num_classes}\n"
        f"Всего фотографий: {total_photos}\n"
        f"Фотографий в среднем на класс: {avg_photos_per_class}\n"
        f"Классов с нечетным количеством фотографий: {odd_photo_classes}\n"
    )

    class_ranges = [
        (0, 10, "Классов, где до 10 фотографий (5 монет): "),
        (10, 50, "Классов, где от 10 до 50 фотографий (5-25 монет): "),
        (50, 100, "Классов, где от 50 до 100 фотографий (25-50 монет): "),
        (100, 500, "Классов, где от 100 до 500 фотографий (50-250 монет): "),
        (500, 1000, "Классов, где от 500 до 1000 фотографий (250-500 монет): "),
        (1000, float('inf'), "Классов, где от 1000 фотографий (500 монет): ")
    ]

    for min_photos, max_photos, message in class_ranges:
        count = df[(min_photos <= df["photos_in_class"]) & (df["photos_in_class"] < max_photos)].drop_duplicates("labels").shape[0]
        print(f"{message}{count}")

    classes_above_threshold = df[df["photos_in_class"] >= DROP_LESS_THAN].drop_duplicates("labels").shape[0]
    print(f"Классов, где от {DROP_LESS_THAN} фотографий: {classes_above_threshold}")

In [6]:
df_info(df)

Всего классов: 253
Всего фотографий: 18744
Фотографий в среднем на класс: 74
Классов с нечетным количеством фотографий: 0

Классов, где до 10 фотографий (5 монет): 94
Классов, где от 10 до 50 фотографий (5-25 монет): 79
Классов, где от 50 до 100 фотографий (25-50 монет): 31
Классов, где от 100 до 500 фотографий (50-250 монет): 40
Классов, где от 500 до 1000 фотографий (250-500 монет): 7
Классов, где от 1000 фотографий (500 монет): 2
Классов, где от 50 фотографий: 80


In [7]:
df_to_split = df.drop(df[df["photos_in_class"] < DROP_LESS_THAN].index).drop_duplicates('id') if SHOULD_DROP else df.drop_duplicates('id')
X_train, X_test, y_train, y_test = train_test_split(df_to_split['id'], df_to_split['labels'], test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, random_state=42)

In [8]:
df_train = pd.DataFrame(data={"X": X_train, "y": y_train})
df_val = pd.DataFrame(data={"X": X_val, "y": y_val})
df_test = pd.DataFrame(data={"X": X_test, "y": y_test})
print(f"Train: {len(df_train)}, Val: {len(df_val)}, Test: {len(df_test)}")

Train: 4972, Val: 1658, Test: 1658


In [9]:
df_train["y"].value_counts()

y
2 копейки 1867-1917           296
1 копейка 1867-1917           284
3 копейки 1867-1917           257
5 копеек 1763-1796            240
20 копеек 1867-1917           238
15 копеек 1867-1917           229
10 копеек 1867-1917           216
2 копейки 1810-1830           188
1 деньга 1730-1754            171
5 копеек 1867-1915            149
половина копейки 1894-1916    106
1 копейка 1810-1830           105
5 копеек 1867-1917            104
1 рубль 1895-1915              85
50 копеек 1895-1914            77
1 деньга 1764-1796             75
1 полушка 1730-1754            74
1 копейка 1839-1847            71
5 копеек 1831-1839             65
2 копейки 1839-1848            61
2 копейки 1763-1796            57
половина копейки 1839-1848     54
25 копеек 1832-1858            53
1 копейка 1831-1839            52
2 копейки 1850-1860            49
2 копейки 1797-1801            48
1 рубль 1832-1858              47
3 копейки 1839-1848            46
1 копейка 1797-1801            46
20 копеек 18

In [10]:
def get_destination_dir(image_id: str, class_name: str) -> str:
    """
    Определяет путь назначения для изображения на основе его идентификатора и класса.

    :param image_id: Идентификатор изображения.
    :param class_name: Имя класса.
    :return: Путь назначения для изображения.
    """
    if df_train["X"].isin([image_id]).any():
        return os.path.join(CUT_DIR if SHOULD_CUT else SPLIT_DIR, "train", class_name)
    elif df_val["X"].isin([image_id]).any():
        return os.path.join(CUT_DIR if SHOULD_CUT else SPLIT_DIR, "val", class_name)
    elif df_test["X"].isin([image_id]).any():
        return os.path.join(CUT_DIR if SHOULD_CUT else SPLIT_DIR, "test", class_name)
    return ""

def process_image(image_path: str, dest_dir: str) -> None:
    """
    Копирует изображение в указанную директорию.

    :param image_path: Путь к исходному изображению.
    :param dest_dir: Директория назначения.
    """
    if not os.path.exists(dest_dir):
        os.makedirs(dest_dir)

    if SHOULD_CUT:
        images_in_dest = os.listdir(dest_dir)
        if ("train" in dest_dir and len(images_in_dest) >= KEEP_MAX_TRAIN) or \
           ("val" in dest_dir and len(images_in_dest) >= KEEP_MAX_VAL):
            return

    shutil.copy(image_path, os.path.join(dest_dir, os.path.basename(image_path)))

def distribute_images(data_root: str) -> None:
    """
    Распределяет изображения по соответствующим директориям на основе DataFrame.

    :param data_root: Корневая директория с изображениями.
    """
    classes = os.listdir(data_root)
    for class_name in classes:
        class_dir = os.path.join(data_root, class_name)
        images_in_class = os.listdir(class_dir)
        for image in images_in_class:
            image_id = image[:-6]
            if df_to_split["id"].isin([image_id]).any():
                current_image_path = os.path.join(class_dir, image)
                dest_dir = get_destination_dir(image_id, class_name)
                if dest_dir:
                    process_image(current_image_path, dest_dir)

In [11]:
distribute_images(DATA_ROOT)

In [12]:
cut_df_train = df_from_path(CUT_DIR + "\\train")
df_info(cut_df_train)

Всего классов: 80
Всего фотографий: 5730
Фотографий в среднем на класс: 71
Классов с нечетным количеством фотографий: 0

Классов, где до 10 фотографий (5 монет): 0
Классов, где от 10 до 50 фотографий (5-25 монет): 19
Классов, где от 50 до 100 фотографий (25-50 монет): 37
Классов, где от 100 до 500 фотографий (50-250 монет): 24
Классов, где от 500 до 1000 фотографий (250-500 монет): 0
Классов, где от 1000 фотографий (500 монет): 0
Классов, где от 50 фотографий: 61


In [13]:
def create_directory(path: str) -> None:
    """
    Создает директорию по указанному пути, если она не существует.

    :param path: Путь к директории.
    """
    try:
        os.makedirs(path, exist_ok=True)
    except OSError as e:
        print(f"Ошибка при создании директории {path}: {e}")

def augment_images(df: pd.DataFrame, n: int, aug_dir: str, img_size: Tuple[int, int]) -> pd.DataFrame:
    """
    Создает аугментированные изображения для классов, где количество изображений меньше заданного значения n.

    :param df: DataFrame с исходными данными.
    :param n: Минимальное количество изображений в каждом классе.
    :param aug_dir: Директория для сохранения аугментированных изображений.
    :param img_size: Размер изображений (ширина, высота).
    :return: DataFrame с добавленными путями к аугментированным изображениям.
    """
    create_directory(aug_dir)

    image_generator = ImageDataGenerator(
        horizontal_flip=False,
        vertical_flip=False,
        rotation_range=40,
        width_shift_range=0.1,
        height_shift_range=0.1,
        brightness_range=(0.5, 2.0),
        zoom_range=0.2,
        shear_range=0.2,
        fill_mode='nearest'
    )

    grouped_df = df.groupby('labels')
    total_augmented_images = 0

    for label in df['labels'].unique():
        label_dir = os.path.join(aug_dir, label)
        create_directory(label_dir)

        label_group = grouped_df.get_group(label).dropna()
        sample_count = len(label_group)

        if sample_count < n:
            aug_img_count = 0
            needed_images = n - sample_count

            #print(f'Creating {needed_images} augmented images for class {label}', end='\r')
            msg = '{0:40s} for class {1:^30s} creating {2:^5s} augmented images'.format(' ', label, str(needed_images))
            print(msg, '\r', end='')

            aug_gen = image_generator.flow_from_dataframe(
                label_group, x_col='filepaths', y_col=None, target_size=img_size,
                class_mode=None, batch_size=1, shuffle=False,
                save_to_dir=label_dir, save_prefix='aug-', color_mode='rgb',
                save_format='jpg'
            )
            while aug_img_count < needed_images:
                next(aug_gen)
                aug_img_count += 1

            total_augmented_images += aug_img_count

    print(f'Total augmented images created: {total_augmented_images}')

    augmented_filepaths = []
    augmented_labels = []

    for class_name in os.listdir(aug_dir):
        class_path = os.path.join(aug_dir, class_name)
        for file_name in os.listdir(class_path):
            file_path = os.path.join(class_path, file_name)
            augmented_filepaths.append(file_path)
            augmented_labels.append(class_name)

    augmented_df = pd.DataFrame({'filepaths': augmented_filepaths, 'labels': augmented_labels})
    return pd.concat([df, augmented_df], axis=0).reset_index(drop=True)

def balance_dataset(df: pd.DataFrame, aug_dir: str, n: int, img_size: Tuple[int, int]) -> pd.DataFrame:
    """
    Балансирует классы в DataFrame, добавляя аугментированные изображения для классов, где их количество меньше n.

    :param df: DataFrame с исходными данными.
    :param aug_dir: Директория для сохранения аугментированных изображений.
    :param n: Минимальное количество изображений в каждом классе.
    :param img_size: Размер изображений (ширина, высота).
    :return: DataFrame с добавленными путями к аугментированным изображениям.
    """
    if os.path.isdir(aug_dir):
        shutil.rmtree(aug_dir)
    return augment_images(df.copy(), n, aug_dir, img_size)

In [14]:
# Создание и сохранение аугментированных изобраджений
for name in ('train', 'val'):
    print(name)
    curr_dir = fr"{CUT_DIR if SHOULD_CUT else SPLIT_DIR}\{name}"
    dest_dir = fr"{AUG_DIR}\{name}"

    if not os.path.exists(dest_dir):
        os.makedirs(dest_dir)

    n = TRAIN_AUG_TO if name == 'train' else VAL_AUG_TO
    df_temp = df_from_path(curr_dir)

    df_gen = balance_dataset(df=df_temp, aug_dir=dest_dir, n=n, img_size=(AUG_SIZE, AUG_SIZE))

train
Found 34 validated image filenames.      for class     1 гривенник 1764-1776      creating  166  augmented images 
Found 70 validated image filenames.      for class      1 денежка 1850-1855       creating  130  augmented images 
Found 100 validated image filenames.     for class       1 деньга 1730-1754       creating  100  augmented images 
Found 34 validated image filenames.      for class       1 деньга 1757-1760       creating  166  augmented images 
Found 100 validated image filenames.     for class       1 деньга 1764-1796       creating  100  augmented images 
Found 58 validated image filenames.      for class       1 деньга 1766-1779       creating  142  augmented images 
Found 40 validated image filenames.      for class       1 деньга 1797-1801       creating  160  augmented images 
Found 60 validated image filenames.      for class       1 деньга 1810-1828       creating  140  augmented images 
Found 34 validated image filenames.      for class      1 копейка 1705-171

In [15]:
def hconcat_resize(img_list: List[np.ndarray], interpolation: int = cv2.INTER_CUBIC) -> np.ndarray:
    """
    Конкатенация изображений по горизонтали с изменением размера.

    :param img_list: Список изображений для конкатенации.
    :param interpolation: Метод интерполяции для изменения размера.
    :return: Конкатенированное изображение.
    """
    h_min = min(img.shape[0] for img in img_list)
    im_list_resize = [
        cv2.resize(img, (int(img.shape[1] * h_min / img.shape[0]), h_min), interpolation=interpolation)
        for img in img_list
    ]
    return cv2.hconcat(im_list_resize)

def process_image_pair(img1_path: str, img2_path: str, dest_dir: str, base_name: str) -> None:
    """
    Обрабатывает пару изображений, конкатенирует их и сохраняет в заданную директорию.

    :param img1_path: Путь к первому изображению.
    :param img2_path: Путь ко второму изображению.
    :param dest_dir: Директория для сохранения результирующих изображений.
    :param base_name: Базовое имя для сохранения изображений.
    """
    try:
        img1 = cv2.imdecode(np.fromfile(img1_path, dtype=np.uint8), cv2.IMREAD_UNCHANGED)
        img2 = cv2.imdecode(np.fromfile(img2_path, dtype=np.uint8), cv2.IMREAD_UNCHANGED)

        img_h_resize1 = hconcat_resize([img1, img2])
        img_h_resize2 = hconcat_resize([img2, img1])

        save_image(img_h_resize1, os.path.join(dest_dir, f"{base_name}_1.jpg"))
        save_image(img_h_resize2, os.path.join(dest_dir, f"{base_name}_2.jpg"))

    except AttributeError as e:
        print(f"Ошибка при обработке изображений {img1_path} и {img2_path}: {e}")

def save_image(image: np.ndarray, save_path: str) -> None:
    """
    Сохраняет изображение по указанному пути.

    :param image: Изображение для сохранения.
    :param save_path: Путь для сохранения изображения.
    """
    is_success, im_buf_arr = cv2.imencode(".jpg", image)
    if is_success:
        im_buf_arr.tofile(save_path)
    else:
        print(f"Ошибка при сохранении изображения: {save_path}")

def process_directory(root_dir: str, dest_dir: str, aug: bool = False) -> None:
    """
    Обрабатывает директорию с изображениями, конкатенирует пары и сохраняет результаты.

    :param root_dir: Корневая директория с изображениями.
    :param dest_dir: Директория для сохранения результирующих изображений.
    :param aug: Флаг, указывающий, использовать ли аугментированные изображения.
    """
    class_names = os.listdir(root_dir)
    for class_name in class_names:
        class_dir = os.path.join(root_dir, class_name)
        class_dest_dir = os.path.join(dest_dir, class_name)
        os.makedirs(class_dest_dir, exist_ok=True)
        images = os.listdir(class_dir)
        
        if aug:
            process_augmented_images(class_dir, class_dest_dir, images)
        else:
            process_regular_images(class_dir, class_dest_dir, images)

def process_regular_images(class_dir: str, class_dest_dir: str, images: List[str]) -> None:
    """
    Обрабатывает обычные изображения (без аугментации).

    :param class_dir: Директория класса с изображениями.
    :param class_dest_dir: Директория для сохранения результирующих изображений.
    :param images: Список изображений.
    """
    for i in range(0, len(images), 2):
        if i + 1 < len(images):
            img1_path = os.path.join(class_dir, images[i])
            img2_path = os.path.join(class_dir, images[i + 1])
            base_name = images[i][:-6]
            process_image_pair(img1_path, img2_path, class_dest_dir, base_name)

def process_augmented_images(class_dir: str, class_dest_dir: str, images: List[str]) -> None:
    """
    Обрабатывает аугментированные изображения.

    :param class_dir: Директория класса с изображениями.
    :param class_dest_dir: Директория для сохранения результирующих изображений.
    :param images: Список изображений.
    """
    if images:
        indices = [int(img.split("_")[1]) for img in images]
        max_index = max(indices)
        for i in range(0, max_index, 2):
            pairs = [img for img in images if int(img.split("_")[1]) in (i, i + 1)]
            step = len(pairs) // 2
            for j in range(len(pairs) - step):
                img1_path = os.path.join(class_dir, pairs[j])
                img2_path = os.path.join(class_dir, pairs[j + step])
                base_name = pairs[j][:-6]
                process_image_pair(img1_path, img2_path, class_dest_dir, base_name)

In [16]:
# Основной цикл для обработки директорий 'train', 'val' и 'test'
for dataset_type in ('train', 'val', 'test'):
    cut_root = os.path.join(CUT_DIR, dataset_type)
    aug_root = os.path.join(AUG_DIR, dataset_type)
    cat_dir = os.path.join(CAT_DIR, dataset_type)

    process_directory(cut_root, cat_dir)

    if dataset_type != 'test':
        process_directory(aug_root, cat_dir, aug=True)

In [17]:
# Удаление лишних папок
for dir in ([CUT_DIR, AUG_DIR] if SHOULD_CUT else [SPLIT_DIR, AUG_DIR]):
    shutil.rmtree(dir, ignore_errors=True)