In [1]:
import numpy as np

DATA_RAW_DIR = 'data/raw'
MASKS_DIR = f'{DATA_RAW_DIR}/masks'
SUBSCENE_DIR = f'{DATA_RAW_DIR}/subscenes'

In [2]:
from processor.process.preprocessor import Preprocessor
from processor.utils.verify_tile_coordinates import *

In [3]:
import json 

class TileMetadata:
    def __init__(self, tile_id, image_filename, mask_filename, product_id, coords,
    cloud_coverage, spatial_location):
        self.tile_id = tile_id
        self.image_filename = image_filename
        self.mask_filename = mask_filename
        self.product_id = product_id
        self.coords = coords
        self.cloud_coverage = cloud_coverage
        # self.padding = padding
        self.spatial_location = spatial_location

    def to_dict(self):
        """ Convert metadata to a python dictionary """
        return {
            "tile_id": self.tile_id,
            "image_filename": self.image_filename,
            "mask_filename": self.mask_filename,
            "product_id": self.product_id,
            "coords": self.coords,
            "cloud_coverage": self.cloud_coverage,
            "spatial_location": self.spatial_location
        }

    def save(self, output_dir):
        """ Save metadata to a json file """
        metadata = self.to_dict()
        filepath = f"{output_dir}/{self.tile_id}.json"
        with open(filepath, "w") as f:
            json.dump(metadata, f, indent=4)

def calculate_cloud_coverage(mask_tile):
    """ Calculate the cloud coverage of a mask """
    # TODO: check the values for cloud and shadow
    cloud_px = np.sum(mask_tile > 0)
    total_pixels = mask_tile.size
    
    # Not in % but in prop
    return np.sum(mask_tile) / mask_tile.size

In [None]:
def generate_tile_id(tile_coords: dict):
    """ Generate a unique tile id from its original image / mask coordinates 
    
    Args:
    tile_coords (dict): dictionary with the following keys:
        - row_start
        - row_end
        - col_start
        - col_end

    Returns:
    str: a unique tile id in the format 
        TL_RS{row_start_ix}_RE{row_end_ix}_CS{col_start_ix}_CE{col_end_ix}

    
    """
    return (
        f"TL_RS{tile_coords['row_start']}"
        + f"_RE{tile_coords['row_end']}"
        + f"_CS{tile_coords['col_start']}"
        + f"_CE{tile_coords['col_end']}"
    )

In [None]:
from csv import DictReader

def extract_class_tags(class_tags_file) -> list[dict]:
    """ Extract class tags from a CSV to list of dictionaries 
    
    Args:
    class_tags_file (str): path to the CSV file

    Returns:
    list: list of dictionaries with the content of the CSV file
    """

    with open(class_tags_file, "r") as f:
        reader = DictReader(f)
        return list(reader)

In [141]:
class BaseImage:
    "Base class for images"
    def __init__(self, input_dir: str, filename: str):
        self.input_dir = input_dir
        self.filename = filename
        self.image = np.load(f"{self.input_dir}/{self.filename}")

    def _get_image_id(self) -> str:
        """Extract the subscene id from the filename"""
        return self.filename.split(".")[0]
    def _pad_image(self):
        """ Pad image to make it divisible by the tile size """

        h, w, _ = self.image.shape
        pad_h = (self.tile_size[0] - (h % self.tile_size[0])) % self.tile_size[0]
        pad_w = (self.tile_size[1] - (w % self.tile_size[1])) % self.tile_size[1]

        # Zero-padding
        padded_image = np.pad(self.image,
                                ((0, pad_h), (0, pad_w), (0, 0)),
                                mode='constant', constant_values=0)

        return padded_image, (pad_h, pad_w)
    
    def _generate_tile_id(self, coords):
        """ Generate a unique tile id from its original image / mask coordinates 
        Args:
        tile_coords (dict): dictionary with the following keys:
            - row_start
            - row_end
            - col_start
            - col_end

        Returns:
        str: a unique tile id in the format 
            TL_RS{row_start_ix}_RE{row_end_ix}_CS{col_start_ix}_CE{col_end_ix}

        
        """
        return (
            f"{self.id}"
            + f"_TL_RS{coords['row_start']}"
            + f"_RE{coords['row_end']}"
            + f"_CS{coords['col_start']}"
            + f"_CE{coords['col_end']}"
        )
    
    def _tile_image(self):
        """ Split the padded image into non-overlapping tiles """
        image, (padding) = self._pad_image()

        pad_h, pad_w = padding
        orig_h, orig_w = image.shape[0] - pad_h, image.shape[1] - pad_w

        # Tiling
        tiles = []
        for i in range(0, image.shape[0], self.tile_size[0]):
            for j in range(0, image.shape[1], self.tile_size[1]):
                tile = image[i:i+self.tile_size[0], j:j+self.tile_size[1], :]
                # tiles.append(tile)

                # Calculate coordinates in original (non-padded) image space
                coords = {
                    "row_start": max(0, i),
                    "row_end": min(orig_h, i + self.tile_size[0]),
                    "col_start": max(0, j),
                    "col_end": min(orig_w, j + self.tile_size[1]),
                    "is_padded": (i + self.tile_size[0] > orig_h) or (j + self.tile_size[1] > orig_w)
                }

                tile = {
                    "id": self._generate_tile_id(coords),
                    "tile": tile,
                    "coords": coords
                }

                tiles.append(tile)

        return tiles
    

    def _generate_tile_ouput_path(self, output_dir, tile_id, extension):
        """ Generate the output path for a tile """
        return f"{output_dir}/{tile_id}.{extension}"
    
    # @property
    # def data(self):
    #     """Load the image data"""
    #     if self._data is None:
    #         self._data = np.load(f"{self.input_dir}/{self.filename}")
    #     return self._data
    
img = BaseImage(SUBSCENE_DIR, "S2A_MSIL1C_20180101T010721_N0206_R045_T53HLD_20180101T041600.npy")    

In [151]:
import os 
import numpy as np

class Mask(BaseImage):
    def __init__(self, mask_dir: str, mask_filename: str, tile_size: tuple[int, int] = (512, 512)):
        super().__init__(mask_dir, mask_filename)
        self.tile_size = tile_size  
        self.id = self._get_image_id()
        self.tiles = self._tile_image()
        # self.metadata = {
        #     "id": self._get_image_id(),
        #     "filename": mask_filename,
        # }

    def save_mask(self, output_dir: str, out_dtype: type = np.uint8):
        """ Save the mask tiles to disk """
        # Create ouput dir if it doesn't exist
        os.makedirs(output_dir, exist_ok=True)

        for ix, tile in enumerate(self.tiles):
            tile_id = tile["id"]
            tile_mask = tile["tile"]
            self.tiles[ix]['cloud_coverage'] = self._calculate_cloud_coverage(tile_mask)

            output_path = self._generate_tile_ouput_path(output_dir, tile_id, "npy")
            
            np.save(output_path, tile_mask.astype(out_dtype))

    def _calculate_cloud_coverage(self, mask) -> None:
        """
        Calculate cloud coverage percentage from one-hot encoded mask
        
        Args:
            mask: NumPy array of shape (H,W,3) with one-hot encoding
                [CLEAR, CLOUD, CLOUD_SHADOW]
        
        Returns:
            float: Proportion of cloud coverage (0-1)
        """

        # Get only the CLOUD channel (index 1)
        cloud_mask = mask[:, :, 1]
        
        # Calculate percentage
        total_pixels = cloud_mask.size
        cloud_pixels = np.sum(cloud_mask)
        cloud_percentage = (cloud_pixels / total_pixels)
        
        return float(cloud_percentage)
            
        
mask = Mask(MASKS_DIR, "S2A_MSIL1C_20180101T010721_N0206_R045_T53HLD_20180101T041600.npy")
mask.save_mask('data/processed/masks')

mask.tiles[0].keys()

dict_keys(['id', 'tile', 'coords', 'cloud_coverage'])

In [None]:
import os
from csv import DictReader
import rasterio
from rasterio.transform import from_origin

class Subscene(BaseImage):
    def __init__(self, subscene_dir, subscene_filename, shapefile_dir, classif_tags_filepath: str, tile_size:tuple[int] = (512, 512)):
        super().__init__(subscene_dir, subscene_filename)
        self.shapefile_dir = shapefile_dir
        self.classif_tags_filepath = classif_tags_filepath
        self.tile_size = tile_size
        self.id = self._get_image_id()
        self.tiles = self._tile_image()
        self.classif_data = self._get_classif_data()

        # self.metadata = {
        #     "id": self._get_product_id(),
        #     "filename": self.filename,
        #     "classif_data": self._get_classif_data()
        # }

    def _extract_class_tags(self) -> list[dict]:
        """ Extract class tags from a CSV to list of dictionaries 
        
        Args:
        class_tags_file (str): path to the CSV file

        Returns:
        list: list of dictionaries with the content of the CSV file
        """

        with open(self.classif_tags_filepath, "r") as f:
            reader = DictReader(f)
            return list(reader)
    
    def _get_classif_data(self):
        """ Extract classification data from the CSV file """
        classif_tags = self._extract_class_tags()
        # search for the subscene id in the filename
        subscene_id = self._get_image_id()
        # filter the classification data for the subscene id
        return list(filter(lambda x: x["scene"] == subscene_id, classif_tags))[0]
    
    def _get_product_id(self):
        """ Extract the product id from the classification data """
        return self._get_classif_data()["scene"]

    def _load_shapefile(self):
        """Load the shapefile to extract geospatial information."""
        shapefile_path = os.path.join(self.shapefile_dir, f"{self.id}/{self.id}.shp")
        if not os.path.exists(shapefile_path):
            raise FileNotFoundError(f"Shapefile not found: {shapefile_path}")

        # Load shapefile using geopandas
        gdf = gpd.read_file(shapefile_path)

        # Extract bounding box (minx, miny, maxx, maxy) and CRS
        bounds = gdf.geometry.iloc[0].bounds  # (minx, miny, maxx, maxy)
        crs = gdf.crs

        # Compute pixel resolution
        width, height = 1022, 1022  # Assuming original subscene size before padding
        x_res = (bounds[2] - bounds[0]) / width
        y_res = (bounds[3] - bounds[1]) / height

        # Create affine transform
        transform = from_origin(bounds[0], bounds[3], x_res, y_res)

        return transform, crs, bounds
    
    def save_subscene_tiles(self, output_dir: str, out_dtype: type = np.uint16, pixel_size: tuple[int, int] = (10, 10), crs: str = "EPSG:4326"):
        """ Save the subscene tiles to a Cloud Optimized GeoTIFF """
        # Create ouput dir if it doesn't exist
        os.makedirs(output_dir, exist_ok=True)

        for t in self.tiles:
            tile = t["tile"].astype(out_dtype)
            tile_id = t["id"]
            tile_coords = t["coords"]

            # TODO: Not sure if it's correctly handled here
            row_start = tile_coords['row_start'] * pixel_size[0]
            col_start = tile_coords['col_start'] * pixel_size[1]

            transform = from_origin(
                north=row_start,
                west=col_start,
                ysize=pixel_size[0],
                xsize=pixel_size[1],
            )


            h, w, c = tile.shape
            output_path = self._generate_tile_ouput_path(output_dir, tile_id, "tif")
            self.tiles
            # print(f"Saving tile to {output_path}")

            with rasterio.open(
                output_path,
                'w',
                driver='GTiff',
                height=h,
                width=w,
                count=c,
                dtype=tile.dtype,
                crs=crs,
                transform=transform,
                tiled=True,
            ) as dataset:
                for i in range(c):
                    band = tile[:, :, i]
                    dataset.write_band(i+1, band)

            # return {
            #     "bounds": dataset.bounds._asdict(),
            #     "transform": dataset.transform.to_gdal(),
            #     "crs": dataset.crs.to_string(),
            #     "original_coords": tile_coords
            # }
    

subs = Subscene(
    SUBSCENE_DIR, 
    "S2A_MSIL1C_20180101T010721_N0206_R045_T53HLD_20180101T041600.npy", 
    "data/raw/classification_tags.csv",
    (512, 512) ,
)
# print(subs.metadata)
# subs.image.shape

# subs.tiles
subs.save_subscene_tiles("data/processed/images")

In [156]:
import json 
import os

class TileMetadata:
    def __init__(self, subscene, mask):
        self.subscene = subscene
        self.mask = mask


    def _to_dict(self, subscene_tile, mask_tile):
        """ Convert metadata to a python dictionary """
        metadata = {
            "id": subscene_tile["id"],
            "image_filename": f"{subscene_tile['id']}.tif",
            "mask_filename": f"{mask_tile['id']}.tif",
            "product_id": self.subscene._get_product_id(),
            "original_coords": subscene_tile["coords"],
            "cloud_coverage": mask_tile['cloud_coverage'],
        }

        return metadata
    def save(self, output_dir):
        """ Save metadata to a json file """
        os.makedirs(output_dir, exist_ok=True)

        for ix, subscene_tile in enumerate(self.subscene.tiles):
            mask_tile = self.mask.tiles[ix]
            metadata = self._to_dict(subscene_tile, mask_tile)
            filepath = f"{output_dir}/{metadata['id']}.json"
            with open(filepath, "w") as f:
                json.dump(metadata, f, indent=4)

metadata = TileMetadata(subs, mask)
metadata.save("data/processed/metadata/tiles")

In [168]:
import glob

INPUT_SUBSCENE_DIR = "data/raw/subscenes"
INPUT_MASK_DIR = "data/raw/masks"
INPUT_CLASSIF_TAGS = "data/raw/classification_tags.csv"
TILE_SIZE = (512, 512)
PIXEL_SIZE = (10, 10)
CRS = "EPSG:4326"
FIRST_N = 10

OUTPUT_SUBSCENE_DIR = "data/processed/images"
OUTPUT_MASKS_DIR = "data/processed/masks"
OUTPUT_METADATA_DIR = "data/processed/metadata/tiles"

subscenes = glob.glob(f"{INPUT_SUBSCENE_DIR}/*.npy")[:FIRST_N]
masks = glob.glob(f"{INPUT_MASK_DIR}/*.npy")[:FIRST_N]


for subscene_path, mask_path in zip(subscenes, masks):
    subscene_f = os.path.basename(subscene_path)
    mask_f = os.path.basename(mask_path)

    subscene = Subscene(INPUT_SUBSCENE_DIR, subscene_f, INPUT_CLASSIF_TAGS, TILE_SIZE)
    mask = Mask(INPUT_MASK_DIR, mask_f, TILE_SIZE)

    subscene.save_subscene_tiles(OUTPUT_SUBSCENE_DIR, pixel_size=PIXEL_SIZE, crs=CRS)
    mask.save_mask(OUTPUT_MASKS_DIR)

    metadata = TileMetadata(subscene, mask)
    metadata.save(OUTPUT_METADATA_DIR)


In [None]:
class Tile: 
    def __init__(
        self,
        subscene_filename,
        mask_filename,
        tile_size,
        tile_coords,
    ):
        self.subscene_filename = subscene_filename
        self.mask_filename = mask_filename
        self.tile_size = tile_size
        self.tile_coords = tile_coords

    def load_image(self, filename: str | int, input_dir: str = None) -> NDArray:
        """ Load image from disk """
        if not input_dir:
            input_dir = self.input_dir

        return np.load(f'{input_dir}/{filename}')
    
    def pad(self):
        """ Pad image to make it divisible by the tile size """
        # if not tile_size:
        #     tile_size = self.tile_size

        h, w, _ = image.shape
        pad_h = (tile_size[0] - (h % tile_size[0])) % tile_size[0]
        pad_w = (tile_size[1] - (w % tile_size[1])) % tile_size[1]

        # Zero-padding
        padded_image = np.pad(image,
                              ((0, pad_h), (0, pad_w), (0, 0)),
                              mode='constant', constant_values=0)

        return padded_image, (pad_h, pad_w)




In [None]:
def calculate_cloud_coverage(mask: NDArray) -> float:
    """
    Calculate cloud coverage percentage from one-hot encoded mask
    
    Args:
        mask: NumPy array of shape (H,W,3) with one-hot encoding
             [CLEAR, CLOUD, CLOUD_SHADOW]
    
    Returns:
        float: Percentage of cloud coverage (0-100)
    """
    # Get only the CLOUD channel (index 1)
    cloud_mask = mask[:, :, 1]
    
    # Calculate percentage
    total_pixels = cloud_mask.size
    cloud_pixels = np.sum(cloud_mask)
    cloud_percentage = (cloud_pixels / total_pixels)
    
    return float(cloud_percentage)

In [None]:
def generate_metadata(img_filename, mask_filename, tile_obj):
    """ Generate metadata for a tile """
    # Load the image and mask
    tile_obj = {
        "id": "abcde",
        "image_filename": img_filename,
        
    }

In [7]:
preproc = Preprocessor(in_filepath=SUBSCENE_DIR, out_filepath='data/processed/images')
image = preproc.load_image('S2A_MSIL1C_20180101T010721_N0206_R045_T53HLD_20180101T041600.npy')
padded_image, padding = preproc.pad_image(image)
tiles, tile_coords = preproc.tile_image(padded_image, padding)
tile_md = preproc.save_tiles(tiles, tile_coords=tile_coords, out_filepath='data/processed/images')

# Test verification
is_correct = verify_tile_coordinates(image, tiles, tile_coords)
print(f"Tile coordinates are correct: {is_correct}")

print(tile_md)
    

Tile coordinates are correct: True
None


In [20]:
preproc = Preprocessor(in_filepath=MASKS_DIR, out_filepath='data/processed/masks')
mask = preproc.load_image('S2A_MSIL1C_20180101T010721_N0206_R045_T53HLD_20180101T041600.npy')
padded_mask, padding = preproc.pad_image(mask)
mask_tiles, mask_tile_coords = preproc.tile_image(padded_mask, padding)

# Test verification
is_correct = verify_tile_coordinates(mask, mask_tiles, mask_tile_coords)
print(f"Mask tile coordinates are correct: {is_correct}")

Mask tile coordinates are correct: True


In [31]:
mask[0, 0, :]

array([False,  True, False])

In [None]:
img = np.load(
    f"{SUBSCENE_DIR}/S2A_MSIL1C_20180101T010721_N0206_R045_T53HLD_20180101T041600.npy")
mask= np.load(
    f"{MASKS_DIR}/S2A_MSIL1C_20180101T010721_N0206_R045_T53HLD_20180101T041600.npy")

img.astype('int16')
mask.astype('int8')

In [68]:
def read_tif(filepath: str, indexes: int|list[int] = None) -> NDArray:
    """Read GeoTIFF file and return as numpy array in (H,W,C) format"""
    with rasterio.open(filepath) as dataset:
        # Read all bands and stack them in the last dimension
        array = dataset.read(indexes=indexes)  # shape: (C,H,W)
        print(array.shape)
        # Transpose to (H,W,C)
        if (len(array.shape) == 3):
            array = array.transpose(1, 2, 0)
        print(array.shape)
        return array

# Test reading the file we just saved
test_filepath = "./data/transformed/images/test.tif"
loaded_array = read_tif(test_filepath,)

# Verify shape matches original
print(f"Original shape: {tile[0].shape}")
print(f"Loaded shape: {loaded_array.shape}")

# Verify data content matches
print(f"Arrays equal: {np.array_equal(tile[0], loaded_array)}")

(13, 512, 512)
(512, 512, 13)
Original shape: (512, 512, 13)
Loaded shape: (512, 512, 13)
Arrays equal: True


In [41]:
tile

[array([[[0.41959998, 0.4095    , 0.3825    , ..., 0.0067    ,
          0.3739    , 0.3177    ],
         [0.4201    , 0.4028    , 0.3752    , ..., 0.0067    ,
          0.37309998, 0.3187    ],
         [0.4206    , 0.4059    , 0.38169998, ..., 0.0067    ,
          0.3651    , 0.3136    ],
         ...,
         [0.357     , 0.3164    , 0.2863    , ..., 0.0058    ,
          0.3014    , 0.2789    ],
         [0.352     , 0.3209    , 0.28919998, ..., 0.0057    ,
          0.3006    , 0.2738    ],
         [0.3471    , 0.3216    , 0.2882    , ..., 0.0057    ,
          0.29549998, 0.26839998]],
 
        [[0.4195    , 0.4062    , 0.3768    , ..., 0.0067    ,
          0.37399998, 0.3182    ],
         [0.4202    , 0.40539998, 0.3809    , ..., 0.0067    ,
          0.37129998, 0.3162    ],
         [0.4209    , 0.4026    , 0.3706    , ..., 0.0067    ,
          0.36479998, 0.31129998],
         ...,
         [0.35979998, 0.32099998, 0.2929    , ..., 0.0058    ,
          0.3108    , 0.

In [8]:
tile_image(img, (128, 128)).shape

(64, 128, 128, 13)

In [59]:
mask.shape, pad_image(img).shape, 

((1022, 1022, 3), (1024, 1024, 13))

In [14]:
mask = np.load(f"{MASKS_DIR}/S2A_MSIL1C_20180101T010721_N0206_R045_T53HLD_20180101T041600.npy")

print(pad_image(mask))

pad_x
2
pad_y
2
[[[False  True False]
  [False  True False]
  [False  True False]
  ...
  [False  True False]
  [False False False]
  [False False False]]

 [[False  True False]
  [False  True False]
  [False  True False]
  ...
  [False  True False]
  [False False False]
  [False False False]]

 [[False  True False]
  [False  True False]
  [False  True False]
  ...
  [False  True False]
  [False False False]
  [False False False]]

 ...

 [[False  True False]
  [False  True False]
  [False  True False]
  ...
  [False  True False]
  [False False False]
  [False False False]]

 [[False False False]
  [False False False]
  [False False False]
  ...
  [False False False]
  [False False False]
  [False False False]]

 [[False False False]
  [False False False]
  [False False False]
  ...
  [False False False]
  [False False False]
  [False False False]]]
