# Data Augmentation over Kunisch Patterns. 
## Seminario de Tesis I, Primavera 2022 
### MDS Program. University of Chile.
#### Supervisor: Prof. Benjamín Bustos, Prof. Iván Sipirán
#### Author: Matías Vergara

Performs data augmentation on patterns through the application of linear transformations.

## Imports

In [38]:
import cv2
import pandas as pd
from IPython.display import display
import os
import math
import random
import shutil
import imgaug.augmenters as aug

## Mounting Google Drive

In [39]:
# Mounting google Drive
try:
    from google.colab import drive
    drive.mount('/content/drive')
    folder_path = 'drive/MyDrive/TesisMV/'
except:
    folder_path = '../'

## Define flags

It is enough to select ds flags and number of crops (this one will have effect depending on flags) and then run the rest of the cells. This way, two folders will be created: a labels one and a patterns one. Both of them will be named after the selected flags, separed by "_"

In [40]:
SUBCHAPTERS = True
DS_FLAGS = []
              # 'ref': [invertX, invertY],
              # 'rot': [rotate90, rotate180, rotate270],
              # 'crop': [crop] * CROP_TIMES,
              # 'blur': [blur],
              # 'emboss': [emboss],
              # 'randaug': [randaug],
              # 'rain': [rain],
              # 'elastic': [elastic]
CROP_TIMES = 1
RANDOM_TIMES = 1
ELASTIC_TIMES = 1
MAP_TIMES = {'crop': CROP_TIMES,
         'randaug': RANDOM_TIMES,
         'elastic': ELASTIC_TIMES,
}

## Transformations

In [41]:
DS_FLAGS = sorted(DS_FLAGS)
data_flags = '_'.join(DS_FLAGS) if len(DS_FLAGS) > 0 else 'base'
if SUBCHAPTERS:
    data_flags = 'subchapters/' + data_flags
MULTIPLE_TRANSF = ['crop', 'randaug', 'elastic']
COPY_FLAGS = DS_FLAGS.copy()

for t in MULTIPLE_TRANSF:
    if t in DS_FLAGS:
        COPY_FLAGS.remove(t)
        COPY_FLAGS.append(t + str(MAP_TIMES[t]))
        data_flags = '_'.join(COPY_FLAGS)


In [42]:
def rotate90(path):
    image = cv2.imread(path)
    rotated = cv2.rotate(image, cv2.cv2.ROTATE_90_CLOCKWISE)
    # cv2.imshow("90", rotated)
    return rotated, "rot90"


def rotate180(path):
    image = cv2.imread(path)
    rotated = cv2.rotate(image, cv2.cv2.ROTATE_180)
    # cv2.imshow("180", rotated)
    return rotated, "rot180"


def rotate270(path):
    image = cv2.imread(path)
    rotated = cv2.rotate(image, cv2.cv2.ROTATE_180)
    rotated = cv2.rotate(rotated, cv2.cv2.ROTATE_90_CLOCKWISE)
    # cv2.imshow("270", rotated)
    return rotated, "rot270"


def invertX(path):
    image = cv2.imread(path)
    flipped = cv2.flip(image, 1)
    # cv2.imshow("flipX", flipped)
    return flipped, "invX"


def invertY(path):
    image = cv2.imread(path)
    flipped = cv2.flip(image, 0)
    # cv2.imshow("flipY", flipped)
    return flipped, "invY"


def crop(path, min_width = 1/2, min_height= 1/2, max_width = 1/1.1,
         max_height = 1/1.1):
    image = cv2.imread(path)
    height, width = image.shape[0], image.shape[1] # Caution: there are images in RGB and GS
    min_width = math.ceil(width * min_width)
    min_height = math.ceil(height * min_height)
    max_width = math.ceil(width * max_width)
    max_height = math.ceil(height * max_height)
    x1 = random.randint(0, width - min_width)
    w = random.randint(min_width, width - x1)
    y1 = random.randint(0, height - min_height)
    h = random.randint(min_height, height - y1)
    crop = image[y1:y1+h, x1:x1+w]
    return crop, "crop"

def blur(path):
    image = cv2.imread(path)
    image_aug = aug.AverageBlur(k=(4, 11))(image=image)
    return image_aug, "blur"

def emboss(path):
    image = cv2.imread(path)
    image_aug = aug.Emboss(alpha=(0.0, 1.0), strength=(0.5, 1.5))(image = image)
    return image_aug, "embs"

def elastic(path):
    image = cv2.imread(path)
    image_aug = aug.PiecewiseAffine(scale=(0.03, 0.075))(image = image)
    return image_aug, "elastic"

def randaug(path):
    image = cv2.imread(path)
    image_aug = aug.RandAugment(m=(2, 9))(image = image)
    return image_aug, "randaug"

def snow(path):
    image = cv2.imread(path)
    image_aug = aug.Snowflakes(flake_size=(0.6, 0.5), speed=(0.2, 0.5))(image = image)
    return image_aug, "snow"


def rain(path):
    image = cv2.imread(path)
    image_aug = aug.Rain(speed=(0.1, 0.5))(image = image)
    return image_aug, "rain"


def apply_transformations(pin, pout, transformations):
    # ../patterns/originals/84e/84e.png
    new_names = []
    i = 0
    for transformation in transformations:
        result, transf_name = transformation(pin)
        if transf_name in MULTIPLE_TRANSF: # special treatment for crops and randoms
          transf_name += str(i)
          i+=1
        path_els = pin.split("/")
        obj_name = path_els[3] + "_" + transf_name
        filename = obj_name + ".png"
        os.makedirs(pout, exist_ok = True)
        cv2.imwrite(pout + filename, result)
        new_names.append(obj_name)
    return new_names

# Select data augmentation functions based on data flags

MAP_FLAGS = {'ref': [invertX, invertY],
             'rot': [rotate90, rotate180, rotate270],
             'crop': [crop],
             'blur': [blur],
             'emboss': [emboss],
             'randaug': [randaug],
             'rain': [rain],
             'elastic': [elastic]
             # snow is not working properly
             }

ALLOWED_TRANSFORMATIONS = []
for f in DS_FLAGS:
    ALLOWED_TRANSFORMATIONS += MAP_FLAGS[f]
HOR_TRANSFORMATIONS = [invertX, rotate180, blur, rain, emboss]
VER_TRANSFORMATIONS = [invertY, rotate180, blur, rain, emboss]
COMMON_TRANSFORMATIONS = [invertX, invertY, rotate90, rotate180, rotate270,
                          blur, rain, emboss]

for t in MULTIPLE_TRANSF:
    if t in DS_FLAGS:
        HOR_TRANSFORMATIONS += MAP_FLAGS[t] * RANDOM_TIMES
        VER_TRANSFORMATIONS += MAP_FLAGS[t] * RANDOM_TIMES
        COMMON_TRANSFORMATIONS += MAP_FLAGS[t] * RANDOM_TIMES


def mergeTransformations(flags, map_flags, map_times, trans_list): 
    # could be improved a lot 
    for k, v in map_flags.items():
        if k not in flags:
            for el in v:
                while el in trans_list:
                    trans_list.remove(el)
    print(trans_list)
    return trans_list

mergeTransformations(DS_FLAGS, MAP_FLAGS, MAP_TIMES, HOR_TRANSFORMATIONS)
mergeTransformations(DS_FLAGS, MAP_FLAGS, MAP_TIMES, VER_TRANSFORMATIONS)
mergeTransformations(DS_FLAGS, MAP_FLAGS, MAP_TIMES, COMMON_TRANSFORMATIONS)

[]
[]
[]


[]

In [43]:
df = pd.read_json(folder_path + "labels/normalized_df.json", orient='index', encoding='latin-1')
classes = pd.read_csv(folder_path + "labels/class_labels.csv", index_col=0)

display(classes)

index = df.index.values
colnames = df.columns

valNumber = math.ceil(0.1 * len(index))
testNumber = math.ceil(0.2 * len(index))
trainNumber = len(index) - valNumber - testNumber

print("Patterns for training: {}".format(trainNumber))
print("Patterns for validation: {}".format(valNumber))
print("Patterns for testing: {}".format(testNumber))


random.shuffle(index)

elem_train = index[:trainNumber]
elem_val = index[trainNumber:trainNumber+valNumber]
elem_test = index[trainNumber+valNumber:]

assert (valNumber + testNumber + trainNumber) == len(index)

# print(elem_train)
# print(elem_val)
# print(elem_test)

Unnamed: 0,chapter,subchapter
1a,strokes and lines,strokes
1b,strokes and lines,strokes
1c,strokes and lines,strokes
1d,strokes and lines,strokes
1e,strokes and lines,strokes
...,...,...
96e,pictographics,trees and animals
96f,pictographics,trees and animals
96g,pictographics,trees and animals
96h,pictographics,trees and animals


Patterns for training: 542
Patterns for validation: 78
Patterns for testing: 156


## Augmentation
(Only over training set)



In [44]:
new_entries = {}

for pattern in elem_train: # only training set
    labels = df.loc[[pattern]]
    lbl_class = classes.loc[[pattern]]['chapter'].values[0]
    if SUBCHAPTERS:
        lbl_class = classes.loc[[pattern]]['subchapter'].values[0]
    path_in = folder_path + "patterns/originals/" + pattern + "/" + pattern + ".png"
    path_out = folder_path + 'patterns/' + data_flags + '/train/' + lbl_class + "/"
    is_hor = labels['horizontal'].values[0]
    is_ver = labels['vertical'].values[0]
    if is_hor and is_ver:
        pass
    if is_hor and not is_ver:
        new_names = apply_transformations(path_in, path_out, HOR_TRANSFORMATIONS)
        labels = df.loc[[pattern]].values[0]
    elif is_ver and not is_hor:
        new_names = apply_transformations(path_in, path_out, VER_TRANSFORMATIONS)
        labels = df.loc[[pattern]].values[0]
    else: #if not is_hor and not is_ver:
        new_names = apply_transformations(path_in, path_out, COMMON_TRANSFORMATIONS)
        labels = df.loc[[pattern]].values[0]
    for name in new_names:
        new_entries[name] = labels
    # add the base pattern to the folder
    os.makedirs(path_out, exist_ok = True)
    shutil.copy(path_in, path_out)

for pattern in elem_val:
    lbl_class = classes.loc[[pattern]]['chapter'].values[0]
    if SUBCHAPTERS:
        lbl_class = classes.loc[[pattern]]['subchapter'].values[0]
    path_in = folder_path + "patterns/originals/" + pattern + "/" + pattern + ".png"
    path_out = folder_path + 'patterns/' + data_flags + '/val/' + lbl_class + "/"
    os.makedirs(path_out, exist_ok = True)
    shutil.copy(path_in, path_out)

for pattern in elem_test:
    lbl_class = classes.loc[[pattern]]['chapter'].values[0]
    if SUBCHAPTERS:
        lbl_class = classes.loc[[pattern]]['subchapter'].values[0]
    path_in = folder_path + "patterns/originals/" + pattern + "/" + pattern + ".png"
    path_out = folder_path + 'patterns/' + data_flags + '/test/' + lbl_class + "/"
    os.makedirs(path_out, exist_ok = True)
    shutil.copy(path_in, path_out)

# agregar todas las entradas de elem_train a new_entries, y crear 
# el dataset "augmented_train_df.json"

for p in elem_train:
  labels = df.loc[p]
  new_entries[p] = labels.values

labels_output = folder_path + "labels/" + data_flags + "/"

os.makedirs(labels_output, exist_ok = True)

df_train = pd.DataFrame.from_dict(new_entries, columns=colnames, orient='index')
df_train.to_json(labels_output + "augmented_train_df.json", orient='index')

# agregar todas las entradas de elem_val a val_entries, y crear 
# el dataset "val_df.json"
val_entries = {}
for p in elem_val:
  labels = df.loc[p]
  val_entries[p] = labels.values

df_val = pd.DataFrame.from_dict(val_entries, columns=colnames, orient='index')
df_val.to_json(labels_output + "val_df.json", orient='index')

# agregar todas las entradas de elem_test a test_entries, y crear
# el dataset "test_df.json"
test_entries = {}
for p in elem_test:
  labels = df.loc[p]
  test_entries[p] = labels.values

df_test = pd.DataFrame.from_dict(test_entries, columns=colnames, orient='index')
df_test.to_json(labels_output + "test_df.json", orient='index')