In [59]:
import os
from distutils import dir_util
from typing import List, Dict, Union, Tuple
import copy

import numpy as np
import cv2
import matplotlib.pyplot as plt

## アンダーサンプリング

In [32]:
age_base_data_path = 'age/age/'
age_data_dist_path = 'age/under_sampled_age/'

def create_under_sampled_data(base_data_path: str, dist_data_path: str) -> None:
    '''データのアンダーサンプリングを行う関数
    Parameters
    -----------
    base_data_path: 訓練データ用ディレクトリ、バリデーションデータ用ディレクトリおよびテストデータ用ディレクトリが存在するディレクトリのパス
    dist_data_path: アンダーサンプリング結果保存用パス
    '''
    # 入力パスが同じ場合は例外を出力
    if os.path.abspath(base_data_path) == os.path.abspath(dist_data_path):
        raise ValueError('ベースディレクトリのパスと結果保存先のパスは異なるものにしてください。')
        
    # 入力パスの直下にtrain, test, validationという名前のフォルダが存在しなければ例外を出力
    base_child_path_list = os.listdir(base_data_path)
    for base_child_path in base_child_path_list:
        __check_child_dir_exists(base_data_path, base_child_path)
    
    # base_data_pathをdist_data_pathにコピー
    print(f'[{base_data_path}]ディレクトリを[{dist_data_path}]コピー')
    dir_util.copy_tree(base_data_path, dist_data_path)
    
    # 1. dist_data_path/trainディレクトリ内の全クラスについて画像数を数える
    # 2. 一番ファイル数が少ないクラスのデータ数以外のクラスにおける画像を一番ファイル数が少ないクラスのデータ数と同じ数になるようにランダムに削除する。
    train_data_dir = os.path.join(dist_data_path, 'train')
    # 各クラスの画像ディレクトリパスリスト
    train_data_dir4undersampling_path_list = []
    # 各クラスの画像枚数リスト
    train_data_num_list = []
    for data_dir_name in os.listdir(train_data_dir):
        data_dir_path = os.path.join(train_data_dir, data_dir_name)
        train_data_dir4undersampling_path_list.append(data_dir_path)
        train_data_num_list.append(len(os.listdir(data_dir_path)))
    
    # 各クラスのデータのアンダーサンプリング
    minimum_data_num = min(train_data_num_list)
    print(f'全データを{minimum_data_num}枚にします。')
    for index, (undersampling_path, train_data_num) in enumerate(zip(train_data_dir4undersampling_path_list, train_data_num_list)):
        if train_data_num == minimum_data_num: continue
        print(undersampling_path)
        target_file_path_list = [os.path.join(undersampling_path, path) for path in os.listdir(undersampling_path)]
        remove_data_num = len(target_file_path_list) - minimum_data_num
        remove_data_path_list = np.random.choice(a=target_file_path_list, size=remove_data_num, replace = False)
        for remove_data_path in remove_data_path_list:
            os.remove(remove_data_path)
    
def __check_child_dir_exists(base_dir_name, child_dir_name: str) -> None:
    if child_dir_name != 'train' or child_dir_name != 'test' or child_dir_name != 'validation': return None
    if 'train' not in base_child_path:
        raise FileNotFoundError(f'[{base_data_path}]内に{child_dir_name}ディレクトリがありません。')
        


[age/age/]ディレクトリを[age/under_sampled_age/]コピー
全データを979枚にします。
age/under_sampled_age/train/50_60
age/under_sampled_age/train/40_50
age/under_sampled_age/train/20_30
age/under_sampled_age/train/30_40


## アップサンプリング

In [105]:
class GaussianNoize():
    def __init__(self, mean=0, sigma=5):
        self.mean = mean
        self.sigma = sigma
        
    def apply(self, src: np.array) -> np.array:
        '''画像にガウスノイズを付与する関数
        Parameters
        -----------
        mean: ガウスノイズの平均値
        sigma: ガウスノイズの分散

        Returns
        -----------
        ノイズ付与後の画像
        '''
        src = copy.deepcopy(src)
        if src.dtype == 'uint8':
            src = src.astype('float64')
        row, col, ch = src.shape
        gauss_noize = np.random.normal(self.mean, self.sigma, (row, col, ch)).reshape(row, col, ch)
        src += gauss_noize
        return src.astype('uint8')

class GammaTransfer():
    def __init__(self, gamma: float=0.5):
        self.gamma = gamma
    def apply(self, src: np.array) -> np.array:
        gamma_table = np.zeros((256, 1), dtype='uint8')
        for i in range(len(gamma_table)):
            gamma_table[i][0] = 255 * (float(i) / 255) ** (1.0 / self.gamma)
        return cv2.LUT(src, gamma_table)

class ImageFlipper():
    def __init__(self, flip_code: int=1):
        if flip_code == 0 or flip_code == 0:
            raise ValueError('flip_codeは、0か1を入力')
        self.flip_code = flip_code
    def apply(self, src: np.array) -> np.array:
        src = copy.deepcopy(src)
        return cv2.flip(src, self.flip_code)

class ImageBlur():
    def __init__(self, square: Tuple[int, int]=(3, 3)):
        self.square = square
    def apply(self, src: np.array) -> np.array:
        src = copy.deepcopy(src)
        return cv2.blur(src, self.square)

class Rotate():
    def __init__(self, angle: float=10.0, scale: float=1.0):
        self.angle = angle
        self.scale = scale
    def apply(self, src: np.array) -> np.array:
        src = copy.deepcopy(src)
        height, width = src.shape[0], src.shape[1]
        center = (width // 2, height // 2)
        trans = cv2.getRotationMatrix2D(center, self.angle, self.scale)
        return cv2.warpAffine(src, trans, (width, height))
    
img_path = './race/upsampled_race/train/asian/10_0_2_20170110224230094.jpg.chip.jpg'
img = cv2.imread(img_path, 1)
cv2.imwrite('../../origin.jpg', img)
h, w, _ = img.shape
gamma_transfer = ImageBlur((5, 5))
img = gamma_transfer.apply(img)

cv2.imwrite('../../test.jpg', img)

True

In [66]:
os.path.dirname(img_path)


'./race/upsampled_race/train/asian'

In [108]:
def upsample_data(target_dir: str, algorithm_dict: Dict[str, object], choice_num=5) -> None:
    '''指定ディレクトリ内の画像ファイルを読み込み、全ての画像に指定されたアルゴリズムを実行し、保存することで画像をアップサンプリングする関数
    Parameters
    -----------
    target_dir: 対象ディレクトリ
    algorithm_list: 画像に適用するアルゴリズムリスト
    '''
    # 画像リストパスの取得
    assert os.path.exists(target_dir)
    img_path_list = [os.path.join(target_dir, path) for path in os.listdir(target_dir)]
    for img_path in img_path_list:
        base_img = cv2.imread(img_path)
        choiced_algorithm = np.random.choice(list(algorithm_dict.keys()), choice_num, replace=False).tolist()
        for algorithm_name in choiced_algorithm:
            result_img = algorithm_dict[algorithm_name].apply(base_img)
            save_name = __get_base_name_of_save(img_path, algorithm_name)
            save_dir = os.path.dirname(img_path)
            save_path = os.path.join(save_dir, save_name)
            cv2.imwrite(save_path, result_img)

def __get_base_name_of_save(img_path: str, append_name: str):
    base_name = os.path.basename(img_path)
    return os.path.splitext(base_name)[0] + f'_{append_name}.jpg'

target_dir = './race/upsampled_race/train/indian/'
test_dir = '../../test_img/'
algorithm_dict = {
    'flipped': ImageFlipper(),
    'blurred': ImageBlur(),
    'rotate_angle_10': Rotate(angle=10.0),
    'rotate_angle_minus_10': Rotate(angle=-10.0)
}
upsample_data(target_dir, algorithm_dict, 1)

In [79]:
np.random.choice(list(algorithm_dict.keys()), 3, replace = False).tolist()

['rotate_angle_10', 'blurred', 'gaussian_noize']