In [None]:
!pip install rasterio

Collecting rasterio
  Downloading rasterio-1.3.9-cp310-cp310-manylinux2014_x86_64.whl (20.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m20.6/20.6 MB[0m [31m63.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting affine (from rasterio)
  Downloading affine-2.4.0-py3-none-any.whl (15 kB)
Collecting snuggs>=1.4.1 (from rasterio)
  Downloading snuggs-1.4.7-py3-none-any.whl (5.4 kB)
Installing collected packages: snuggs, affine, rasterio
Successfully installed affine-2.4.0 rasterio-1.3.9 snuggs-1.4.7


In [None]:
import rasterio
from rasterio import windows
from rasterio.windows import Window
import numpy as np
import random
import copy
import os

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
def get_file_input_path():
  '''
  This function returns the path to the input file

  Returns:
    The path to the input file
  '''
  while True:
    file_input_path = input(f'Enter the path to the Sentinel input file: ')
    if os.path.exists(file_input_path):
      print(f'The path to the Sentinel input file is valid.')
      return file_input_path
    else:
      print('The path to the input file is invalid. Please try again.')
      print()

In [None]:
def get_files_names_inside_folder(folder_path):
  '''
  This function returns the names of the files inside a folder

  Paramateres:
  - folder_path (str): The path to the folder

  Returns:
  - list: The names of the files inside the folder
  '''
  tif_file_list = []
  for file_name in os.listdir(folder_path):
    if file_name.endswith('.tif'):
      tif_file_list.append(file_name)
  return tif_file_list

In [None]:
def get_output_path_name():
  '''
  This function gets the output path name

  Returns
  - str: output path name
  '''
  while True:
    out_put_file_path = str(input('Please enter the name of the output folder - '))

    if not os.path.exists(out_put_file_path):
      print('This path does not exist.')
      make_new_path = str(input('Do you want to make a new path? (y/n) - '))

      if make_new_path == 'y':
        os.mkdir(out_put_file_path)
        print('The path has been created.')
        break
      elif make_new_path == 'n':
        print('Please enter a valid path.')
        print()
    else:
      print('The path exists')
      break

  return out_put_file_path

In [None]:
def get_files_full_path(folder_path, file_names_list):
  '''
  This function returns the full path of the files

  Parameters:
  - folder_path (str): The path to the folder
  - file_names_list (list): The names of the files

  Returns:
  - list: The full path of the files
  '''
  full_path_list = []

  # Append the full path of the file names
  temporary_file_path_list = []
  for file_name in file_names_list:
    temporary_file_path_list.append(os.path.join(folder_path, file_name))

  # Check if those paths exists or not
  for file_path in temporary_file_path_list:
    if os.path.exists(file_path):
      full_path_list.append(file_path)

  return full_path_list

In [None]:
def create_train_and_test_directories(output_dir):
  '''
  This function creates the train and test directories

  Parameters:
  - output_dir (str): The path to the output directory
  '''
  os.makedirs(output_dir, exist_ok=True)
  train_dir = os.path.join(output_dir, 'train')
  os.makedirs(train_dir, exist_ok=True)
  train_target_dir = os.path.join(output_dir, 'train_target')
  os.makedirs(train_target_dir, exist_ok=True)
  test_dir = os.path.join(output_dir, 'test')
  os.makedirs(test_dir, exist_ok=True)
  test_target_dir = os.path.join(output_dir, 'test_target')
  os.makedirs(test_target_dir, exist_ok=True)

In [None]:
def split_and_save_tiles(feature_path, label_path, output_dir, tile_size=256,
                         train_ratio=0.9):
  '''
  This function splits the tiles and saves them in the train and test directories

  Parameters:
  - feature_path (str): The path to the feature image
  - label_path (str): The path to the label image
  - output_dir (str): The path to the output directory
  - tile_size (int): The size of the tiles
  - train_ratio (float): The ratio of the train data

  Returns:
  - removed (int): The number of tiles removed
  - added (int): The number of tiles added
  '''
  # Read the images
  label_src = rasterio.open(label_path)
  feature_src = rasterio.open(feature_path)

  # Get the profile of the images
  feature_profile = feature_src.profile
  label_profile = label_src.profile

  img_read = feature_src.read()

  # Update the profiles
  label_profile.update(count=1)
  feature_profile.update(count=img_read.shape[0]-1)

  height = label_src.height
  width = label_src.width

  # Get the number of tiles
  num_tiles_x = width // tile_size
  num_tiles_y = height // tile_size

  removed = 0
  added = 0

  # Split the tiles
  for tile_x in range(num_tiles_x):
    for tile_y in range(num_tiles_y):
      # Get the edges of the small tile
      left = tile_x * tile_size
      top = tile_y * tile_size
      right = left + tile_size
      bottom = top + tile_size

      # Get the window of the small tile
      window_width = right - left
      window_height = bottom - top
      window =  Window(left, top, window_width, window_height)

      # Try reshaping the tile
      try:
        feature_tile = img_read[:-1, left:right, top:bottom].reshape(
            3, tile_size, tile_size
        )
      except:
        removed += 1
        continue

      try:
        label_tile = img_read[-1, left:right, top:bottom].reshape(
            1, tile_size, tile_size
        )
      except:
        removed += 1
        continue

      # Randomly adding tiles to train or test batch.
      luck = 'train' if random.random() < train_ratio else 'test'

      output_filename = f"tile_{tile_x}_{tile_y}.tif"
      output_path_feature = os.path.join(output_dir, luck, output_filename)
      output_path_label = os.path.join(output_dir, f"{luck}_target", output_filename)

      if np.count_nonzero(np.isnan(label_tile)) + np.count_nonzero(np.isnan(feature_tile[0,:,:])) > 0:
            removed += 1
            continue

      # # Removing any files containing nan data
      # if np.isnan(label_tile).any() or np.isnan(feature_tile).any():
      #   # print('Failed nan check problem')
      #   removed += 1
      #   continue

      # Updating the profiles
      profile_feature = copy.deepcopy(feature_profile)
      profile_label = copy.deepcopy(label_profile)

      profile_feature.update(
          width=tile_size,
          height=tile_size,
          transform=feature_src.window_transform(window)
      )

      profile_label.update(
          width=tile_size,
          height=tile_size,
          transform=label_src.window_transform(window)
      )

      added += 1

      with rasterio.open(output_path_feature, "w", **profile_feature) as dst:
        dst.write(feature_tile)

      with rasterio.open(output_path_label, "w", **profile_label) as dst:
        dst.write(label_tile)

  return added, removed



Get the path to the Sentinel Training Images

In [None]:
tif_file_path = get_file_input_path()

Enter the path to the Sentinel input file: /content/drive/MyDrive/Raj_RGB_Data/Raj_Rwanda_RGB_Data/Raj_Rwanda_SEN2_2022_Training_Data_2
The path to the Sentinel input file is valid.


In [None]:
tiff_file_names = get_files_names_inside_folder(tif_file_path)
print(f'\033[1mThere are in total {len(tiff_file_names)} files to splitted.\033[0m')

[1mThere are in total 58 files to splitted.[0m


In [None]:
tiff_file_full_path = get_files_full_path(tif_file_path, tiff_file_names)

Get the output path

In [None]:
output_path = get_output_path_name()
create_train_and_test_directories(output_path)

Please enter the name of the output folder - /content/drive/MyDrive/Raj_RGB_Data/Raj_Rwanda_RGB_Data/Split_Tif_Inputs_Example
This path does not exist.
Do you want to make a new path? (y/n) - y
The path has been created.


Split the files

In [None]:
total_added = 0
total_removed = 0

# The tile size is hardcoded, please change accordingly
tile_size = 256

# The train ratio is hardcoded, please change accordingly
train_ratio = 0.9

# Split and save files
for i in range(len(tiff_file_full_path)):
  feature_path = tiff_file_full_path[i]
  label_path = tiff_file_full_path[i]

  rasterio_name = feature_path.split("/")[-1]
  print(f"\033[1mRasterio Started - {rasterio_name}\033[0m")
  added, removed = split_and_save_tiles(feature_path=feature_path,
                                        label_path=label_path,
                                        output_dir=output_path,
                                        tile_size=tile_size,
                                        train_ratio=train_ratio)
  total_added += added
  total_removed += removed
  print(f"\033[1mRastrio Completed - {rasterio_name}\033[0m")
  print()
print()
print(f"\033[1mTotal Added: {total_added}, Total Removed: {total_removed}\033[0m")

[1mRasterio Started - 0_RGB_Band.tif[0m
[1mRastrio Completed - 0_RGB_Band.tif[0m

[1mRasterio Started - 2_RGB_Band.tif[0m
[1mRastrio Completed - 2_RGB_Band.tif[0m

[1mRasterio Started - 3_RGB_Band.tif[0m
[1mRastrio Completed - 3_RGB_Band.tif[0m

[1mRasterio Started - 5_RGB_Band.tif[0m
[1mRastrio Completed - 5_RGB_Band.tif[0m

[1mRasterio Started - 1_RGB_Band.tif[0m
[1mRastrio Completed - 1_RGB_Band.tif[0m

[1mRasterio Started - 6_RGB_Band.tif[0m
[1mRastrio Completed - 6_RGB_Band.tif[0m

[1mRasterio Started - 7_RGB_Band.tif[0m
[1mRastrio Completed - 7_RGB_Band.tif[0m

[1mRasterio Started - 8_RGB_Band.tif[0m
[1mRastrio Completed - 8_RGB_Band.tif[0m

[1mRasterio Started - 9_RGB_Band.tif[0m
[1mRastrio Completed - 9_RGB_Band.tif[0m

[1mRasterio Started - 4_RGB_Band.tif[0m
[1mRastrio Completed - 4_RGB_Band.tif[0m

[1mRasterio Started - 11_RGB_Band.tif[0m
[1mRastrio Completed - 11_RGB_Band.tif[0m

[1mRasterio Started - 13_RGB_Band.tif[0m
[1mRastr