In [11]:
#Note this should be run in the same folder as the landcover.ai dataset.
#Data (Version 1) sourced here https://landcover.ai.linuxpolska.com/
#Produced Zipfile to be upl


import os
import zipfile
import numpy as np
import random
import torch
import matplotlib.pyplot as plt

from PIL import Image
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader

In [12]:
!pip install opencv-python



In [13]:
# Imporant params to be included
num_classes = 5

In [14]:
# Expected to run in the same directory as the landcover files
destination_dir = './'  # Change this to your desired directory


output_dir = os.path.join(destination_dir, "output/")
if not os.path.exists(output_dir):
    # Change the current working directory to the destination directory
    os.chdir(destination_dir)

    # Run split.py
    !python split.py

    # List the contents of the destination directory
    os.listdir(destination_dir)
else:
    print("Output directory already exists. Skipping split.py.")


os.listdir(destination_dir)

Processed M-33-20-D-c-4-2 1/41
Processed M-33-20-D-d-3-3 2/41
Processed M-33-32-B-b-4-4 3/41
Processed M-33-48-A-c-4-4 4/41
Processed M-33-7-A-d-2-3 5/41
Processed M-33-7-A-d-3-2 6/41
Processed M-34-32-B-a-4-3 7/41
Processed M-34-32-B-b-1-3 8/41
Processed M-34-5-D-d-4-2 9/41
Processed M-34-51-C-b-2-1 10/41
Processed M-34-51-C-d-4-1 11/41
Processed M-34-55-B-b-4-1 12/41
Processed M-34-56-A-b-1-4 13/41
Processed M-34-6-A-d-2-2 14/41
Processed M-34-65-D-a-4-4 15/41
Processed M-34-65-D-c-4-2 16/41
Processed M-34-65-D-d-4-1 17/41
Processed M-34-68-B-a-1-3 18/41
Processed M-34-77-B-c-2-3 19/41
Processed N-33-104-A-c-1-1 20/41
Processed N-33-119-C-c-3-3 21/41
Processed N-33-130-A-d-3-3 22/41
Processed N-33-130-A-d-4-4 23/41
Processed N-33-139-C-d-2-2 24/41
Processed N-33-139-C-d-2-4 25/41
Processed N-33-139-D-c-1-3 26/41
Processed N-33-60-D-c-4-2 27/41
Processed N-33-60-D-d-1-2 28/41
Processed N-33-96-D-d-1-1 29/41
Processed N-34-106-A-b-3-4 30/41
Processed N-34-106-A-c-1-3 31/41
Processed N-

['.ipynb_checkpoints',
 'DataPreprocess.ipynb',
 'DataPreprocess_Revised.ipynb',
 'images',
 'landcover_prep_grey_class.zip',
 'masks',
 'output',
 'split.py',
 'test.txt',
 'train.txt',
 'train_class_distribution.txt',
 'train_class_weights.txt',
 'val.txt']

In [15]:
# num_cpu_cores = os.cpu_count()
# print(f"Number of CPU cores: {num_cpu_cores}")
# num_workers = num_cpu_cores - 1

In [16]:
def augmentation(sample):
    input_image, input_mask = sample
    IMG_SIZE = 512

    to_tensor = transforms.ToTensor()

    input_image = to_tensor(input_image)
    
    # Avoid conversion / normalization
    input_mask_np = np.array(input_mask, dtype=np.int64)
    input_mask = torch.as_tensor(input_mask_np, dtype=torch.int8).unsqueeze(0)

    return input_image, input_mask

In [17]:
class CustomDataset(Dataset):
    def __init__(self, data_dir, txt_file, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        txt_file_path = os.path.join(data_dir, txt_file)
        with open(txt_file_path, 'r') as file:
            self.prefixes = [line.strip() for line in file]

    def __len__(self):
        return len(self.prefixes)

    def __getitem__(self, idx):
        prefix = self.prefixes[idx]
        image_path = os.path.join(self.data_dir, 'output', f'{prefix}.jpg')
        mask_path = os.path.join(self.data_dir, 'output', f'{prefix}_m.png') #grey->m because I realized the issue
        image = Image.open(image_path)
        mask = Image.open(mask_path).convert('L')
#         print(mask)

        if self.transform:
            sample = (image, mask)
            sample = self.transform(sample)
            image, mask = sample

        return image, mask

# Define GPU transformation
gpu_transform = transforms.Compose([
    augmentation,
])

# Example usage
train_dataset = CustomDataset(data_dir=destination_dir, txt_file='train.txt', transform=gpu_transform)
val_dataset = CustomDataset(data_dir=destination_dir, txt_file='val.txt', transform=gpu_transform)
test_dataset = CustomDataset(data_dir=destination_dir, txt_file='test.txt', transform=gpu_transform)

In [19]:
# Calculate metrics
def calculate_class_weights_and_distribution(dataset, num_classes, save_to_files=False):
    # Initialize arrays to store the counts of each class
    class_counts = torch.zeros(num_classes)
    total_count = len(dataset)

    # Iterate through the dataset to count class occurrences
    for i in range(total_count):
        _, mask = dataset[i]
        unique_values, value_counts = torch.unique(mask, return_counts=True)

        # Increment class counts based on unique values
        for unique_value, value_count in zip(unique_values, value_counts):
            class_counts[unique_value.item()] += value_count

    # Set weights based on the distribution
    class_weights = class_counts / total_count

    # Weights are inversly proportional to counts
    total_weight = class_weights.sum()
    class_weights = total_weight / class_weights

    # Convert tensors to lists for JSON serialization
    class_weights_list = class_weights.tolist()
    class_distribution = class_counts.tolist()
    
    if save_to_files:
        distribution_file_path = "./class_distribution.txt"
        with open(distribution_file_path, "w") as distribution_file:
            for count in class_distribution:
                distribution_file.write(f"{count}\n")

        print(f"Class distribution written to {distribution_file_path}")

        # Save class weights to a text file if requested
        weights_file_path = "./class_weights.txt"
        with open(weights_file_path, "w") as weights_file:
            for weight in class_weights:
                weights_file.write(f"{weight}\n")

        print(f"Class weights written to {weights_file_path}")

    return class_weights_list, class_distribution

# Usage
class_weights, class_distribution = calculate_class_weights_and_distribution(train_dataset, num_classes, True)

Class distribution written to ./class_distribution.txt
Class weights written to ./class_weights.txt


In [20]:
# Name of the zip file you want to create
zip_filename = "landcover_prep_prepped_class.zip"

# Open a zip file for writing
with zipfile.ZipFile(zip_filename, "w", zipfile.ZIP_DEFLATED) as zipf:
    # Iterate through .txt files in the current directory and add them
    for root, dirs, files in os.walk("."):
        for file in files:
            if file.endswith(".txt"):
                file_path = os.path.join(root, file)
                zipf.write(file_path, os.path.relpath(file_path, "."))

    # Recursively add the contents of the ./output directory to the zip file
    for root, dirs, files in os.walk("./output"):
        for file in files:
            file_path = os.path.join(root, file)
            # Add the file to the zip with a relative path including the "output" directory
            zipf.write(file_path, os.path.relpath(file_path, "."))

print(f"Zipped files to {zip_filename}")

Zipped files to landcover_prep_prepped_class.zip
