Split the data into training, test, and validation folders

In [None]:
!pip install patchify

from IPython.display import clear_output
clear_output()

In [None]:
# %% Purpose:
# 1. Prepare images and masks folder within train, val, test folder
# 2. copy all images into these folders
# 3. create patches of images
#%% Packages
import os
import re
from pathlib import Path
import numpy as np
import math
from patchify import patchify
from PIL import Image
import shutil
import cv2

import numpy as np

import seaborn as sns
import matplotlib.pyplot as plt


#%% Constants
#DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

**UTILITY FUNCTIONS**

In [None]:
"""
Deletes train, val, and test folders
input parameters:
    dest: path to the destination directory

returns: None
"""
def delete_split_folders(dest):
    FOLDERS = ['train', 'val', 'test']
    for folder in FOLDERS:
        dest_folder = os.path.join(dest, folder)

        if os.path.exists(dest_folder):
          #delete folder if exists
          shutil.rmtree(dest_folder)



"""
Creates train, val, and test folders
input parameters:
    dest: path to the destination directory

returns: None
"""
def create_split_folders(dest):
    FOLDERS = ['train', 'val', 'test']
    for folder in FOLDERS:
        dest_folder = os.path.join(dest, folder)
        if not os.path.exists(dest_folder):
            folder_imgs = f"{dest_folder}/images"
            folder_msks = f"{dest_folder}/masks"
            os.makedirs(folder_imgs) if not os.path.exists(folder_imgs) else print('folder already exists')
            os.makedirs(folder_msks) if not os.path.exists(folder_msks) else print('folder already exists')

    models_folder = os.path.join(dest, "models")
    if not os.path.exists(models_folder):
        os.makedirs(models_folder) if not os.path.exists(models_folder) else print('folder already exists')



"""
Splits the data into train, val, and test folders
input parameters:
    data_dir: path to the data directory
    train_ratio: ratio of the data to be used for training
    val_ratio: ratio of the data to be used for validation

returns:
    train_list: list of filenames to be used for training
    val_list: list of filenames to be used for validation
    test_list: list of filenames to be used for testing
"""
def split_train_test_val_folders(data_dir, train_ratio, val_ratio):
  #get list of filenames in the data directory
  file_list = os.listdir(data_dir)
  np.random.shuffle(file_list)


  train_size = int(len(file_list) * train_ratio)
  val_size = int(len(file_list) * val_ratio)

  train_files = file_list[:train_size]
  train_list = [x[6:10] for x in train_files]
  val_files   = file_list[train_size:train_size + val_size]
  val_list = [x[6:10] for x in val_files]
  test_files  = file_list[train_size + val_size:]
  test_list = [x[6:10] for x in test_files]
  return train_list, val_list, test_list


**CREATE PATCHES**

In [None]:
"""
Creates patches(smaller symmetric splits) of images
input parameters:
    src: path to the source image
    dest_path: path to the destination folder

returns: None
"""
def create_patches(src, dest_path):
    path_split = os.path.split(src)
    image = Image.open(src)
    image = np.asarray(image)
    if len(image.shape) > 2:  # only if color channel exists as well
        patches = patchify(image, (224, 224, 3), step=300)
        file_name_wo_ext = Path(src).stem
        for i in range(patches.shape[0]):
            for j in range(patches.shape[1]):
                patch = patches[i, j, 0]
                patch = Image.fromarray(patch)
                num = i * patches.shape[1] + j
                patch.save(f"{dest_path}/{file_name_wo_ext}_patch_{num}.png")

"""
Splits the data into train, val, and test folders
input parameters:
    src_dir: path to the source directory
    dest_dir: path to the destination directory
    train_list: list of filenames to be used for training
    val_list: list of filenames to be used for validation
    test_list: list of filenames to be used for testing

returns: None
"""
def preprocess_data(src_dir, dest_dir, train_list, val_list, test_list):
    for path_name, _, file_name in os.walk(src_dir):
      for f in file_name:
        path_split = os.path.split(path_name)
        img_type = path_split[1]  # either 'clean which are masks' or 'render which are images'

        #get the file number
        if img_type == 'render' or img_type == 'ground':
          file_num = f[6:10]
        elif img_type == 'clean':
          file_num = f[5:9]
        else:
          print(f"unknown type {img_type}")
          return

        if file_num in val_list:
            target_folder = os.path.join(dest_dir, 'val')
        elif file_num in test_list:
            target_folder = os.path.join(dest_dir, 'test')
        elif file_num in train_list:
            target_folder = os.path.join(dest_dir, 'train')
        else:
          print(f"unknown list for {file_num}")
          return

        # copy all images
        src = os.path.join(path_name, f)

        # create patches
        if img_type == 'render':
            dest = os.path.join(target_folder, "images")
            create_patches(src=src, dest_path=dest)

        # copy all masks
        if img_type == 'clean':
            dest = os.path.join(target_folder, "masks")
            create_patches(src=src, dest_path=dest)

**PREPROCESSING**

In [None]:
data_dir = Path('drive/MyDrive/MoonImages/')
working_dir = working_dir = Path('.') #Path('drive/MyDrive/MoonWorking/')
render_dir = data_dir/"render/"


#delete_split_folders(working_dir)
create_split_folders(working_dir)
train_list, val_list, test_list = split_train_test_val_folders(render_dir, train_ratio=0.7, val_ratio=0.15)
preprocess_data(data_dir, working_dir, train_list, val_list, test_list)
