# Segmenting remote sensing imagery with point prompts

[![image](https://studiolab.sagemaker.aws/studiolab.svg)](https://studiolab.sagemaker.aws/import/github/opengeos/segment-geospatial/blob/main/docs/examples/sam2_point_prompts.ipynb)
[![image](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/opengeos/segment-geospatial/blob/main/docs/examples/sam2_point_prompts.ipynb)

This notebook shows how to generate object masks from point prompts with the Segment Anything Model 2 (SAM 2).

Make sure you use GPU runtime for this notebook. For Google Colab, go to `Runtime` -> `Change runtime type` and select `GPU` as the hardware accelerator.

## Install dependencies

Uncomment and run the following cell to install the required dependencies.

In [1]:
%pip install rasterio owslib leafmap
%pip install -U segment-geospatial
import leafmap
from samgeo import SamGeo2, regularize

sam = SamGeo2(
    model_id="sam2-hiera-large",
    automatic=False,
)

from google.colab import drive
drive.mount('/content/drive')

Collecting rasterio
  Downloading rasterio-1.4.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.1 kB)
Collecting owslib
  Downloading OWSLib-0.32.0-py2.py3-none-any.whl.metadata (6.6 kB)
Collecting leafmap
  Downloading leafmap-0.38.16-py2.py3-none-any.whl.metadata (16 kB)
Collecting affine (from rasterio)
  Downloading affine-2.4.0-py3-none-any.whl.metadata (4.0 kB)
Collecting cligj>=0.5 (from rasterio)
  Downloading cligj-0.7.2-py3-none-any.whl.metadata (5.0 kB)
Collecting click-plugins (from rasterio)
  Downloading click_plugins-1.1.1-py2.py3-none-any.whl.metadata (6.4 kB)
Collecting anywidget (from leafmap)
  Downloading anywidget-0.9.13-py3-none-any.whl.metadata (7.2 kB)
Collecting geojson (from leafmap)
  Downloading geojson-3.1.0-py3-none-any.whl.metadata (16 kB)
Collecting ipyvuetify (from leafmap)
  Downloading ipyvuetify-1.10.0-py2.py3-none-any.whl.metadata (7.5 kB)
Collecting pystac-client (from leafmap)
  Downloading pystac_client-0.8.5-py3-none-any.

sam2_hiera_large.pt:   0%|          | 0.00/898M [00:00<?, ?B/s]

In [6]:
import geopandas as gpd
import rasterio
from rasterio.mask import mask
from owslib.wms import WebMapService
from shapely.geometry import box, mapping
import numpy as np
from PIL import Image
import os
from itertools import product
from shapely.geometry import Point
from shapely.ops import triangulate
import zipfile

# ============= PARAMETER HIER ANPASSEN =============
# Pfade und URLs
GEOPACKAGE_PATH = "/content/drive/MyDrive/Deep_Learning/AV_Bodenbedeckung.gpkg"  # Pfad zur Geopackage-Datei
WMS_URL = "https://wms.geo.admin.ch/"    # URL des WMS-Dienstes

# Zentrumspunkt (Beispiel: Koordinaten in LV95)
CENTER_X = 2614000  # X-Koordinate des Hauptzentrums
CENTER_Y = 1178000  # Y-Koordinate des Hauptzentrums

# Grid-Einstellungen
GRID_SIZE = 1  # Anzahl der Kacheln in jede Richtung (3x3 Grid = 9 Kacheln)
TILE_SIZE = 100  # Größe einer einzelnen Kachel in Metern (halbe Breite/Höhe)
OVERLAP = 20  # Überlappung zwischen Kacheln in Metern

# Dataset Name
DATASET_NAME = "thun_test_3"  # Name des Ausgabeordners

# Ausgabe-Einstellungen
RESOLUTION = (1024, 1024)  # Bildauflösung in Pixeln (Breite, Höhe)
# ================================================

class GeoDataPreparator:
    def __init__(self, geopackage_path, wms_url, dataset_name):
        """
        Initialisiert den GeoDataPreparator mit Pfad zum Geopackage und WMS-URL
        """
        print(f"Lade Geopackage von: {geopackage_path}")
        self.geopackage = gpd.read_file(geopackage_path, layer='lcsf')
        print(f"Verbinde mit WMS: {wms_url}")
        self.wms = WebMapService(wms_url)
        self.selected_bounds = None
        self.dataset_name = dataset_name

        # Definition der Farbzuordnung für spezifische Attribute
        self.color_mapping = {
            # 'Strasse_Weg': [110,193,228],      # blau
            # 'uebrige_befestigte': [110,193,228],      # blau
            # 'Trottoir': [110,193,228],         # blau
            # 'Verkehrsinsel': [110,193,228],    # blau
            'Gartenanlage': [254,221,58],   # gelb
            'uebrige_humusierte': [254,221,58],  # gelb
            'Acker_Wiese_Weide': [254,221,58],   # gelb
            'Gebaeude': [60,16,152],   # blau
        }
        self.default_color = [254,221,58]  # grau

    def create_grid_coordinates(self, center_x, center_y, grid_size, tile_size, overlap):
        """
        Erstellt ein Grid von Koordinaten um einen Zentrumspunkt
        """
        offset = tile_size * 2  # Gesamtgröße einer Kachel
        start_x = center_x - (offset * (grid_size // 2))
        start_y = center_y - (offset * (grid_size // 2))

        coordinates = []
        for i, j in product(range(grid_size), range(grid_size)):
            x = start_x + (offset - overlap) * i
            y = start_y + (offset - overlap) * j
            coordinates.append((x, y))

        return coordinates

    def create_directory_structure(self, x, y):
        """
        Erstellt die Verzeichnisstruktur für eine Kachel
        """
        base_dir = os.path.join(self.dataset_name, f"Tile_{int(x)}_{int(y)}")
        masks_dir = os.path.join(base_dir, "masks")
        images_dir = os.path.join(base_dir, "images")

        os.makedirs(masks_dir, exist_ok=True)
        os.makedirs(images_dir, exist_ok=True)

        return masks_dir, images_dir

    def process_tile(self, center_x, center_y, tile_size):
        """
        Verarbeitet eine einzelne Kachel
        """
        # Erstelle Verzeichnisse
        masks_dir, images_dir = self.create_directory_structure(center_x, center_y)

        # Setze Bounding Box für diese Kachel
        self.set_bounds_from_center(center_x, center_y, tile_size)

        # Definiere Ausgabepfade
        filename = f"{int(center_x)}_{int(center_y)}"
        mask_path = os.path.join(masks_dir, f"{filename}.png")
        ortho_path = os.path.join(images_dir, f"{filename}.jpg")

        # Erstelle Maske und Orthofoto
        self.extract_mask(mask_path, RESOLUTION)
        self.get_orthophoto(ortho_path, RESOLUTION)

        return mask_path, ortho_path

    def set_bounds_from_center(self, center_x, center_y, dimension):
        """Setzt die Begrenzungsbox basierend auf einem Zentrumspunkt und Dimension"""
        min_x = center_x - dimension
        max_x = center_x + dimension
        min_y = center_y - dimension
        max_y = center_y + dimension

        self.selected_bounds = [min_x, min_y, max_x, max_y]
        print(f"Gewählter Bereich: {self.selected_bounds}")

    def extract_mask(self, output_path, resolution=(256, 256)):
        """
        Extrahiert und speichert die Maske aus dem GeoPackage als PNG
        und berechnet die Centroide der Gebäude
        """
        if not self.selected_bounds:
            raise ValueError("Keine Grenzen ausgewählt!")

        print(f"Erstelle Maske mit Auflösung {resolution}...")

        # Erstellt eine Box aus den Grenzen
        bbox = box(*self.selected_bounds)

        # Clippt GeoPackage auf ausgewählten Bereich
        mask_data = self.geopackage.clip(bbox)

        centroids = []
        min_distance = 2.0  # Mindestabstand zum Rand in Metern

        # Berechne garantiert innenliegende Punkte für Gebäude im Ausschnitt
        for idx, row in mask_data.iterrows():
            if row['Art'] == 'Gebaeude':
                geom = row.geometry

                try:
                    # Erstelle einen inneren Puffer
                    inner_geom = geom.buffer(-min_distance)

                    # Prüfe ob der innere Puffer gültig ist (nicht leer)
                    if inner_geom.is_empty:
                        # Falls der Puffer zu groß war, reduziere ihn
                        reduced_distance = min_distance / 2
                        inner_geom = geom.buffer(-reduced_distance)

                        # Falls immer noch leer, verwende original Geometrie
                        if inner_geom.is_empty:
                            inner_geom = geom

                    # Versuche representative_point auf der gepufferten Geometrie
                    point = inner_geom.representative_point()

                    # Validiere, dass der Punkt wirklich innerhalb liegt
                    if not point.within(inner_geom):
                        # Fallback: Triangulation
                        from shapely.ops import triangulate
                        triangles = triangulate(inner_geom)
                        if triangles:  # Prüfe ob Triangulation erfolgreich war
                            largest_triangle = max(triangles, key=lambda t: t.area)
                            point = largest_triangle.centroid
                        else:
                            # Wenn Triangulation fehlschlägt, nutze Zentrum der Bounding Box
                            minx, miny, maxx, maxy = inner_geom.bounds
                            point = Point([(minx + maxx)/2, (miny + maxy)/2])

                    # Finale Validierung
                    if point.within(geom):
                        centroids.append([point.x, point.y])
                    else:
                        print(f"Warnung: Konnte keinen gültigen Punkt für Gebäude {idx} finden")

                except Exception as e:
                    print(f"Fehler bei Gebäude {idx}: {str(e)}")
                    continue

        print(f"Gefundene Centroide: {len(centroids)}")
        print("Centroid-Koordinaten:")
        for coord in centroids:
            print(f"[{coord[0]}, {coord[1]}]")

        # Speichere die Centroide als Klassenvariable
        self.point_coords_batch = centroids

        # Sicherstellen, dass der Ausgabepfad auf .png endet
        output_path = os.path.splitext(output_path)[0] + '.png'

        # return output_path

    def get_orthophoto(self, output_path, resolution=(256, 256)):
        """
        Lädt und speichert Orthofoto vom WMS und führt SAM-Segmentierung durch
        Mit zusätzlicher Analyse der Grünwerte
        """
        if not self.selected_bounds:
            raise ValueError("Keine Grenzen ausgewählt!")

        print(f"Lade Orthofoto...")

        import requests
        import rasterio
        from rasterio.transform import from_bounds
        import numpy as np
        from PIL import Image

        sam = SamGeo2(
          model_id="sam2-hiera-large",
          automatic=False,)

        # Pfad für JPG
        temp_jpg_path = os.path.splitext(output_path)[0] + '.jpg'
        # Pfad für das georeferenzierte TIFF
        geotiff_path = os.path.splitext(output_path)[0] + '.tif'

        # WMS Request URL und Parameter bleiben gleich
        wms_url = 'https://wms.geo.admin.ch/'
        params = {
            'SERVICE': 'WMS',
            'VERSION': '1.3.0',
            'REQUEST': 'GetMap',
            'FORMAT': 'image/jpeg',
            'TRANSPARENT': 'false',
            'LAYERS': 'ch.swisstopo.swissimage',
            'CRS': 'EPSG:2056',
            'STYLES': '',
            'WIDTH': resolution[0],
            'HEIGHT': resolution[1],
            'BBOX': f"{self.selected_bounds[0]},{self.selected_bounds[1]},{self.selected_bounds[2]},{self.selected_bounds[3]}"
        }

        try:
            # Lade Orthofoto
            print("Lade WMS Orthofoto...")
            response = requests.get(wms_url, params=params)
            response.raise_for_status()

            # Speichere temporäres JPG
            with open(temp_jpg_path, 'wb') as out:
                out.write(response.content)

            # Lade das Orthofoto als NumPy-Array
            with Image.open(temp_jpg_path) as img:
                ortho_array = np.array(img.convert('RGB'))

            # Berechne die Transformation
            transform = from_bounds(
                self.selected_bounds[0],
                self.selected_bounds[1],
                self.selected_bounds[2],
                self.selected_bounds[3],
                resolution[0],
                resolution[1]
            )

            # Speichere als GeoTIFF
            with rasterio.open(
                geotiff_path,
                'w',
                driver='GTiff',
                height=ortho_array.shape[0],
                width=ortho_array.shape[1],
                count=3,
                dtype=ortho_array.dtype,
                crs='EPSG:2056',
                transform=transform
            ) as dst:
                for i in range(3):
                    dst.write(ortho_array[:, :, i], i + 1)

            # Lösche temporäres JPG
            #os.remove(temp_jpg_path)

            print(f"Georeferenziertes Orthofoto gespeichert als: {geotiff_path}")

            if not hasattr(self, 'point_coords_batch') or not self.point_coords_batch:
                print("Keine Centroide gefunden für SAM-Segmentierung!")
                return

            print(f"Starte SAM-Segmentierung mit {len(self.point_coords_batch)} Punkten...")

            # Setze das georeferenzierte Bild für SAM
            sam.set_image(geotiff_path)

            # Führe Segmentierung durch
            mask_path = os.path.splitext(output_path)[0] + '_sam_mask.tif'
            sam.predict_by_points(
                point_coords_batch=self.point_coords_batch,
                point_crs="EPSG:2056",
                output=mask_path,
                dtype="uint8"
            )

            # Analysiere die Grünwerte und modifiziere die Maske
            def analyze_green_values(ortho_array, mask_array, green_threshold=0.4):
                """
                Analysiert die Grünwerte im Orthofoto für jeden maskierten Bereich
                """
                # Normalisiere RGB-Werte
                rgb_sum = ortho_array.astype(float).sum(axis=2)
                rgb_sum[rgb_sum == 0] = 1  # Verhindere Division durch 0

                # Berechne den relativen Grünanteil
                green_ratio = ortho_array[:,:,1].astype(float) / rgb_sum

                # Erstelle neue Maske basierend auf Grünwerten
                modified_mask = np.zeros_like(mask_array)

                # Für jede eindeutige Region in der Maske
                for region_id in np.unique(mask_array):
                    if region_id == 0:  # Überspringe Hintergrund
                        continue

                    # Erstelle Maske für aktuelle Region
                    region_mask = mask_array == region_id

                    # Berechne durchschnittlichen Grünanteil in der Region
                    region_green_ratio = green_ratio[region_mask].mean()

                    # Setze Wert basierend auf Grünanteil
                    modified_mask[region_mask] = 0 if region_green_ratio > green_threshold else 1

                return modified_mask

              def convert_mask_to_colored_png(mask_path):
                  """
                  Konvertiert eine Binärmaske in ein farbiges PNG-Bild.
                  - Werte von 0 werden zu [254, 221, 58] (gelb)
                  - Werte > 0 werden zu [60, 16, 152] (violett)
                  Speichert das Ergebnis im 'masks' Verzeichnis anstelle von 'images'
                  """
                  import numpy as np
                  from PIL import Image
                  import os
                  import rasterio

                  # Lade die Maske
                  with rasterio.open(mask_path) as src:
                      mask = src.read(1)

                  # Erstelle ein RGB Array mit den gleichen Dimensionen wie die Maske
                  colored_mask = np.zeros((*mask.shape, 3), dtype=np.uint8)

                  # Setze die Farben
                  # Gelb [254, 221, 58] für Werte = 0
                  colored_mask[mask == 0] = [254, 221, 58]
                  # Violett [60, 16, 152] für Werte > 0
                  colored_mask[mask > 0] = [60, 16, 152]

                  # Erstelle PIL Image aus dem Array
                  img = Image.fromarray(colored_mask)

                  # Generiere den neuen Dateipfad
                  # Extrahiere den ursprünglichen Dateinamen
                  original_dir = os.path.dirname(mask_path)
                  base_name = os.path.basename(mask_path)

                  # Entferne '_sam_mask' aus dem Dateinamen
                  base_name = base_name.replace('_sam_mask', '')

                  # Ersetze 'images' durch 'masks' im Pfad
                  new_dir = original_dir.replace('images', 'masks')

                  # Stelle sicher, dass das Zielverzeichnis existiert
                  os.makedirs(new_dir, exist_ok=True)

                  # Erstelle den neuen Dateinamen (ersetze die Endung durch .png)
                  new_name = os.path.splitext(base_name)[0] + '.png'
                  new_path = os.path.join(new_dir, new_name)

                  # Speichere das PNG
                  img.save(new_path)

                  print(f"Farbige Maske gespeichert als: {new_path}")
                  return new_path

            # Lade die ursprüngliche Maske
            with rasterio.open(mask_path) as src:
                original_mask = src.read(1)
                mask_meta = src.meta.copy()

            # Analysiere und modifiziere die Maske
            modified_mask = analyze_green_values(ortho_array, original_mask)

            # Speichere die modifizierte Maske
            mask_meta.update(count=1, dtype='uint8')
            with rasterio.open(mask_path, 'w', **mask_meta) as dst:
                dst.write(modified_mask.astype('uint8'), 1)

            out_vector = "building_vector.geojson"
            array, gdf = sam.region_groups(mask_path, min_size=200, out_vector=out_vector, out_image=mask_path)

            print(f"Modifizierte SAM-Segmentierungsmaske gespeichert als: {mask_path}")

            convert_mask_to_colored_png(mask_path)

            #os.remove(geotiff_path)
            print(f"Georeferenziertes TIFF gelöscht: {geotiff_path}")

            return mask_path

        except Exception as e:
            print(f"Fehler beim Verarbeiten: {str(e)}")
            if 'response' in locals():
                print("WMS URL:", response.url)
            print("Bounds:", self.selected_bounds)
            print("Beispiel-Koordinaten:", self.point_coords_batch[0] if self.point_coords_batch else None)
            return None




    def _colorize_features(self, raster, features, color, resolution):
        """
        Hilfsfunktion zum Einfärben der Features
        """
        # Konvertiert Geometrien zu Pixel-Koordinaten
        transform = rasterio.transform.from_bounds(
            *self.selected_bounds,
            resolution[0],
            resolution[1]
        )

        # Rasternisiert Features
        shapes = [(geom, 1) for geom in features.geometry]
        feature_mask = rasterio.features.rasterize(
            shapes,
            out_shape=resolution,
            transform=transform
        )

        # Färbt die maskierten Bereiche ein
        for i in range(3):  # Für jeden RGB-Kanal
            raster[feature_mask == 1, i] = color[i]

        return raster

def main():
    try:
        # Erstelle Hauptverzeichnis für das Dataset
        os.makedirs(DATASET_NAME, exist_ok=True)

        # Initialisiere GeoDataPreparator
        preparator = GeoDataPreparator(GEOPACKAGE_PATH, WMS_URL, DATASET_NAME)

        # Generiere Grid-Koordinaten
        coordinates = preparator.create_grid_coordinates(
            CENTER_X, CENTER_Y, GRID_SIZE, TILE_SIZE, OVERLAP
        )

        # Verarbeite jede Kachel
        total_tiles = len(coordinates)
        for idx, (x, y) in enumerate(coordinates, 1):
            print(f"\nVerarbeite Kachel {idx}/{total_tiles}")
            print(f"Koordinaten: X={x}, Y={y}")

            try:
                mask_path, ortho_path = preparator.process_tile(x, y, TILE_SIZE)
                print(f"Erfolgreich erstellt:\n  Maske: {mask_path}\n  Orthofoto: {ortho_path}")
            except Exception as e:
                print(f"Fehler bei Kachel {idx}: {str(e)}")
                continue

        print("\nVerarbeitung erfolgreich abgeschlossen!")

        def zipdir(path, ziph):
            # ziph is zipfile handle
            for root, dirs, files in os.walk(path):
                for file in files:
                    ziph.write(os.path.join(root, file))

        zipf = zipfile.ZipFile(f'{DATASET_NAME}.zip', 'w', zipfile.ZIP_DEFLATED)
        zipdir(DATASET_NAME, zipf)
        zipf.close()

        print(f"Dataset in {DATASET_NAME}.zip gesichert!")

    except Exception as e:
        print(f"Ein Fehler ist aufgetreten: {str(e)}")

if __name__ == "__main__":
    main()

IndentationError: unindent does not match any outer indentation level (<tokenize>, line 331)