# Semantic Segmentation of satellite imagery using U-Net
## Step 1: Data preparation

Based on the [tutorial by Dr. Sreenivas Bhattiprolu](https://www.youtube.com/watch?v=0W6MKZqSke8) ([Code on Github](https://github.com/bnsreenu/python_for_microscopists/tree/master/230_landcover_dataset_segmentation))

Dataset: https://www.kaggle.com/datasets/adrianboguszewski/landcoverai?resource=download

Tasks:

1. Read large images and corresponding masks, divide them into smaller patches.
   And write the patches as images to the local drive.

2. Save only images and masks where masks have some decent amount of labels other than 0.
   Using blank images with label=0 is a waste of time and may bias the model towards
   unlabeled pixels.

3. Divide the sorted dataset from above into train and validation datasets.

4. You have to manually move some folders and rename appropriately if you want to use
   ImageDataGenerator from keras.


In [None]:
# Necessary because of a bug https://github.com/qubvel/segmentation_models/issues/374
%env SM_FRAMEWORK=tf.keras

In [None]:
import shutil
from pathlib import Path

import cv2
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import splitfolders
from matplotlib.colors import BoundaryNorm, ListedColormap
from patchify import patchify


In [None]:
# Define directory paths
dir_root = Path("../data/landcover_ai/")
dir_img = Path(dir_root, "images")
dir_mask = Path(dir_root, "masks")
dir_patch = Path(dir_root, "patches")
dir_patch_img = Path(dir_patch, "images")
dir_patch_mask = Path(dir_patch, "masks")
dir_patch_useful_img = Path(dir_patch, "useful/images")
dir_patch_useful_mask = Path(dir_patch, "useful/masks")

# Define file paths
inventory_path = Path(dir_root, "inventory.csv")

# Create directories if they don't exist
for dir in [
    dir_root,
    dir_img,
    dir_mask,
    dir_patch,
    dir_patch_img,
    dir_patch_mask,
    dir_patch_useful_img,
    dir_patch_useful_mask,
]:
    Path(dir).mkdir(parents=True, exist_ok=True)

In [None]:
classes = {
    0: "Not classified",
    1: "Building",
    2: "Woodland",
    3: "Water",
    4: "Roads",
}

In [None]:
# Get paths of all images
img_files = [file for file in dir_img.iterdir()]


#### Create custom colormap for mask (to get consistency)


In [None]:
# Define your dictionary mapping values to colors
color_dict = {
    0: "white",  # Background / Not classified
    1: "#E5B6BC",  # Building
    2: "#C1DDC1",  # Woodland
    3: "#ACD3E4",  # Water
    4: "yellow",  # Roads
}

# Colors: https://www.schemecolor.com/bleached-rainbow.php

values = np.array(list(color_dict.keys()))
colors = list(color_dict.values())

# Create colormap and norm objects
mask_cmap = ListedColormap(colors)
mask_norm = BoundaryNorm(values, len(values))


#### Visualize the three RGB bands of a random image


In [None]:
def plot_image_channels(image: Path):
    """
    Function to plot an image alongside its mask.
    """
    # Read image
    img = cv2.imread(image.as_posix())  # 3 channels / spectral bands

    _, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(15, 10))

    ax1.imshow(img[:, :, 0], cmap="Reds")
    ax1.set_title("Channel 0: Red")

    ax2.imshow(img[:, :, 1], cmap="Greens")
    ax2.set_title("Channel 1: Green")

    ax3.imshow(img[:, :, 2], cmap="Blues")
    ax3.set_title("Channel 2: Blue")

    plt.show()


In [None]:
# Get random image from all images
img_path = np.random.choice(img_files)

plot_image_channels(img_path)


#### Visualize the mask for the same image


In [None]:
def plot_image_and_mask(image: Path):
    """
    Function to plot an image alongside its mask.
    """
    img = cv2.imread(image.as_posix())
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    mask_path = Path(image.as_posix().replace("/images/", "/masks/"))
    mask = cv2.imread(mask_path.as_posix())

    _, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 10))

    ax1.imshow(img)
    ax1.set_title("RGB image")

    norm = mpl.colors.Normalize(vmin=0, vmax=4)
    cmap = plt.get_cmap("viridis")

    ax2.imshow(mask[:, :, 1], cmap=cmap, norm=norm)
    ax2.set_title("Mask")

    plt.show()


In [None]:
# Image with buildings
# img_path = Path("data/landcover_ai/images/N-33-130-A-d-4-4.tif")

# Image with water
# img_path = Path("data/landcover_ai/images/M-33-20-D-d-3-3.tif")

# Image with roads and buildings
# img_path = Path("data/landcover_ai/images/M-34-6-A-d-2-2.tif")

plot_image_and_mask(img_path)


#### Trying to define colors


In [None]:
mask_path = Path(img_path.as_posix().replace("/images/", "/masks/"))
mask = cv2.imread(mask_path.as_posix())

In [None]:
# Seems to work, but very slow
# https://stackoverflow.com/a/66821752

data = mask[:, :, :1].squeeze().copy()

# define color map
color_map = {
    0: np.array([255, 255, 0]),
    1: np.array([255, 0, 0]),
    2: np.array([0, 255, 0]),
    3: np.array([0, 0, 255]),
    4: np.array([255, 0, 255]),
}

# make a 3d numpy array that has a color channel dimension
data_3d = np.ndarray(shape=(data.shape[0], data.shape[1], 3), dtype=int)
for i in range(0, data.shape[0]):
    for j in range(0, data.shape[1]):
        data_3d[i][j] = color_map[data[i][j]]

# display the plot
fig, ax = plt.subplots(1, 1)
ax.imshow(data_3d)

In [None]:
# Can be checked for each channel. All channels are identical
labels, count = np.unique(mask[:, :, 0], return_counts=True)
print(
    f"Labels are:\n\n"
    f"{labels[0]}. Background: {count[0]:,} ({count[0]/np.sum(count):.2%})\n"
    f"{labels[1]}. Buildings: {count[1]:,} ({count[1]/np.sum(count):.2%})\n"
    f"{labels[2]}. Woodland: {count[2]:,} ({count[2]/np.sum(count):.2%})\n"
    f"{labels[3]}. Water: {count[3]:,} ({count[3]/np.sum(count):.2%})\n"
    f"{labels[4]}. Roads: {count[4]:,} ({count[4]/np.sum(count):.2%})"
)

### Crop each large image into patches of 256x256


In [None]:
def create_patches(
    image_dir: Path, patch_dir: Path, filetypes: list = None, patch_size: int = 256
) -> None:
    """
    Function to create patches from large images.
    """
    # Set default file types
    if filetypes is None:
        filetypes = [".tif", ".tiff"]

    # Make sure patch_dir exists
    Path(patch_dir).mkdir(parents=True, exist_ok=True)

    # Loop through image directory
    for file in image_dir.rglob("*"):
        if file.is_file() and file.suffix in filetypes:
            # Read each image as BGR
            image = cv2.imread(str(file), 1)

            # Calculate nearest size divisible by patch size
            size_x = (image.shape[1] // patch_size) * patch_size
            size_y = (image.shape[0] // patch_size) * patch_size

            # Crop image (y comes first!)
            image = image[0:size_y, 0:size_x]

            # Extract patches from each image (Step=256 for 256 patches means no overlap)
            print("Patchifying image:", file)
            patches_img = patchify(image, (256, 256, 3), step=256)

            counter_create, counter_skip = 0, 0

            for i in range(patches_img.shape[0]):
                for j in range(patches_img.shape[1]):
                    single_patch_img = patches_img[i, j, :, :]

                    # Drop extra unecessary dimension that patchify adds.
                    single_patch_img = single_patch_img[0]

                    patch_path = Path(
                        patch_dir, f"{file.stem}_patch_{str(i)}_{str(j)}{file.suffix}"
                    )

                    if not patch_path.is_file():
                        cv2.imwrite(str(patch_path), single_patch_img)
                        counter_create += 1
                    else:
                        counter_skip += 1

            print(f"Created {counter_create} patches in {patch_dir}.")
            print(f"Skipped {counter_skip} already existing patches.")


In [None]:
# Run the function to create patches for images
create_patches(dir_img, dir_patch_img)


In [None]:
# Run the function to create patches for masks
create_patches(dir_mask, dir_patch_mask)


### Plot random patch alongside its mask


In [None]:
# Get paths of all patches
patch_files = [file for file in dir_patch_img.iterdir()]

# Randomly select one of them
patch_path = np.random.choice(img_files)

plot_image_and_mask(img_path)


### Select patches containing relevant information

Copy patches and masks with real information (=decent amount of labels other than 0) to a new folder.


In [None]:
def select_useful_patches(
    patch_dir: Path,
    path_useful_img: Path,
    path_useful_mask: Path,
    threshold: float = 0.05,
) -> None:
    """
    Function to select useful patches and move them to a folder.
    Based on the percentage of classified pixels in the mask.
    """

    # Create directories if they don't exist
    for dir in [path_useful_img, path_useful_mask]:
        Path(dir).mkdir(parents=True, exist_ok=True)

    count_useful, count_useless = 0, 0

    # Loop through image directory
    for file in patch_dir.rglob("*"):
        if file.is_file() and file.suffix in [".tif", ".tiff"]:
            mask_path = Path(file.as_posix().replace("/images/", "/masks/"))
            mask = cv2.imread(mask_path.as_posix(), 1)

            _, counts = np.unique(mask, return_counts=True)

            # Minimun percentage of useful area with labels that are not 0
            if (counts[0] / counts.sum()) < 1 - threshold:
                new_file_img = Path(dir_patch_useful_img, file.name)
                file.rename(new_file_img)

                new_file_mask = Path(dir_patch_useful_mask, mask_path.name)
                mask_path.rename(new_file_mask)

                count_useful += 1

            else:
                count_useless += 1

    print(f"Found {count_useful} useful images and {count_useless} useless images.")

In [None]:
select_useful_patches(dir_patch_img, dir_patch_useful_img, dir_patch_useful_mask)

### Create patch inventory


In [None]:
def create_patch_inventory(
    patch_dir: Path, csv_path: Path, classes: dict
) -> pd.DataFrame:
    """
    Function to create an inventory of classes in the patches.
    Returns a DataFrame with infomation how often each class appears in mask patches.
    """

    # Create empty dataframe
    cols = list(classes.keys()) + ["total"] + [f"{str(k)}_pct" for k in classes.keys()]
    inventory = pd.DataFrame(columns=cols)

    for file in patch_dir.iterdir():
        mask = cv2.imread(file.as_posix(), 1)

        # Count how often each class exists as pixel value
        val, counts = np.unique(mask, return_counts=True)

        # Create a dict to be added as row to the DataFrame
        row = dict(zip(val, counts))
        row["total"] = sum(row.values())

        # Calculate percentages
        for v in val:
            row[f"{str(v)}_pct"] = row[v] / row["total"]

        df_tmp = pd.DataFrame(row, index=[0])
        inventory = pd.concat([inventory, df_tmp], ignore_index=True).fillna(0)

    inventory.to_csv(csv_path)
    print(f"Saved patch inventory to {inventory_path}")

    return inventory

In [None]:
inventory = create_patch_inventory(dir_patch_useful_mask, inventory_path, classes)

### Split the data into training, validation and testing


In [None]:
input_folder = Path(dir_patch, "useful")
output_folder = Path(dir_root, "tmp")

for dir in [input_folder, output_folder]:
    Path(dir).mkdir(parents=True, exist_ok=True)

# Split with a ratio
# To only split into training and validation set, set a tuple to ratio, i.e, `(.8, .2)`.
splitfolders.ratio(
    input_folder, output=output_folder, seed=42, ratio=(0.75, 0.25), group_prefix=None
)


### Move files to final destination


In [None]:
source = Path(output_folder, "train/images/")
destination = Path(dir_root, "train_images/train/")

for dir in [source, destination]:
    Path(dir).mkdir(parents=True, exist_ok=True)

In [None]:
move_files = {
    "train/images/": "train_images/train/",
    "train/masks/": "train_masks/train/",
    "val/images/": "val_images/val/",
    "val/masks/": "val_masks/val/",
}

for source, dest in move_files.items():
    source = Path(output_folder, source)
    dest = Path(dir_root, dest)

    # Make sure destination exists
    Path(dest).mkdir(parents=True, exist_ok=True)

    counter = 0

    for file in source.iterdir():
        if file.is_file():
            file.rename(Path(dest, file.name))
            counter += 1

    print(f"Moved {counter} files from {source} to {dest}.")


### Remove temporary directory


In [None]:
files_left = 0

# Count number of files left
for file in output_folder.rglob("*"):
    if file.is_file():
        files_left += 1

if files_left == 0:
    shutil.rmtree(output_folder)
    print(f"Removed empty directory {output_folder}.")
else:
    print(f"{output_folder} not empty, did not remove.")
