In [1]:
import sys

sys.path.append("../src")  # for src imports

In [2]:
import math
import os

import cv2
import folium
import geopandas as gpd
import numpy as np
from IPython.display import display
from owslib.wms import WebMapService
from shapely.geometry import Point

from wms.helpers import get_image_size_px

In [3]:
gdf = gpd.read_file(
    r"C:\code\cassda-zertifikatsarbeit\data\de_wildlife_crossing.geojson"
)

In [None]:
# Create a centroids GeoDataFrame from polygon geometries in `gdf`
# Only keep polygon-like geometries; if already points, they will be passed through as-is
centroids = gdf.copy()


# If the geometry is polygon or multipolygon, replace with centroid; otherwise keep geometry
def to_centroid(geom):
    if geom is None or geom.is_empty:
        return geom
    if geom.geom_type in ("Polygon", "MultiPolygon"):
        return geom.centroid
    return geom


centroids["geometry"] = centroids.geometry.apply(to_centroid)
# Ensure result is a GeoDataFrame with proper CRS
centroids = gpd.GeoDataFrame(centroids, geometry="geometry", crs=gdf.crs)
# Optionally drop features where geometry is still None or not a Point
centroids = centroids[centroids.geometry.notnull()]

In [9]:
bounds = gdf.total_bounds  # (minx, miny, maxx, maxy)
minx, miny, maxx, maxy = bounds

center = [(miny + maxy) / 2, (minx + maxx) / 2]
m = folium.Map(location=center, zoom_start=6)
fg = folium.FeatureGroup(name="Wildlife Crossings")
for idx, row in centroids.iterrows():
    geom = row.geometry
    if geom is None or geom.is_empty:
        continue
    # handle MultiPoint by iterating
    if geom.geom_type == "MultiPoint":
        for pt in geom.geoms:
            folium.CircleMarker(
                location=[pt.y, pt.x], radius=4, color="red", fill=True
            ).add_to(fg)
    elif geom.geom_type == "Point":
        folium.CircleMarker(
            location=[geom.y, geom.x], radius=4, color="blue", fill=True
        ).add_to(fg)
fg.add_to(m)
folium.LayerControl().add_to(m)
# Display map in notebook
display(m)

In [11]:
wms = WebMapService(
    "https://image.discomap.eea.europa.eu/arcgis/services/GioLand/VHR_2021_LAEA/ImageServer/WMSServer/?request=GetCapabilities&service=WMS",
    version="1.3.0",
)

In [12]:
layer_name = list(wms.contents)[0]

In [13]:
# create a bbox around the points with some padding
padding = 0.1  # degrees
minx, miny, maxx, maxy = (
    minx - padding,
    miny - padding,
    maxx + padding,
    maxy + padding,
)

In [18]:
# remove exact duplicate geometries
centroids["wkt_tmp"] = centroids.geometry.apply(
    lambda g: g.wkt if g is not None else ""
)
centroids = (
    centroids.drop_duplicates(subset="wkt_tmp")
    .drop(columns="wkt_tmp")
    .reset_index(drop=True)
)

In [None]:
# Output directory for crops
out_dir = os.path.join("..", "data", "de_wms_export")
os.makedirs(out_dir, exist_ok=True)

In [None]:
# for every point create a 200x200m polygon around it and request a WMS crop centered on the point
size_pixels = (1024, 1024)  # width, height in px
img_format = "image/png"
half_size_m = 100  # half of 200 m box

for row in centroids.iterrows():
    print(f"Processing crossing {row.Index}...")
    if row.Index > 3:
        break  # limit to first 4 for testing
    geom = row.geometry
    if geom is None or geom.is_empty or geom.geom_type != "Point":
        continue
    point = geom
    # Calculate bbox in lat/lon (EPSG:4326) around the point with half_size_m buffer
    # Approximate conversion from meters to degrees (valid for small distances)
    lat = point.y
    lon = point.x
    delta_deg = half_size_m / 111320  # rough approximation: 1 deg latitude ~ 111.32 km
    bbox = (lon - delta_deg, lat - delta_deg, lon + delta_deg, lat + delta_deg)

    # Get image size in pixels for the bbox at the desired resolution
    img_size = get_image_size_px(10, bbox)

    # Request WMS image
    img = wms.getmap(
        layers=[layer_name],
        srs="EPSG:4326",
        bbox=bbox,
        size=img_size,
        format=img_format,
        transparent=True,
    )

    # Save image to file
    img_data = img.read()
    img_filename = os.path.join(out_dir, f"crossing_{idx}.png")
    with open(img_filename, "wb") as f:
        f.write(img_data)

    print(f"Saved WMS crop for crossing {idx} to {img_filename}")

Processing crossing 0...
Saved WMS crop for crossing 0 to ..\data\de_wms_export\crossing_0.png
Processing crossing 1...
Saved WMS crop for crossing 0 to ..\data\de_wms_export\crossing_0.png
Processing crossing 1...
Saved WMS crop for crossing 1 to ..\data\de_wms_export\crossing_1.png
Processing crossing 2...
Saved WMS crop for crossing 1 to ..\data\de_wms_export\crossing_1.png
Processing crossing 2...
Saved WMS crop for crossing 2 to ..\data\de_wms_export\crossing_2.png
Processing crossing 3...
Saved WMS crop for crossing 2 to ..\data\de_wms_export\crossing_2.png
Processing crossing 3...
Saved WMS crop for crossing 3 to ..\data\de_wms_export\crossing_3.png
Processing crossing 4...
Saved WMS crop for crossing 3 to ..\data\de_wms_export\crossing_3.png
Processing crossing 4...


In [5]:
# url list contains the whole swissimage 10cm url list

with open(
    r"C:\code\cassda-zertifikatsarbeit\data\swissimage_10cm_urllist.txt", "r"
) as f:
    url_list = [line.strip() for line in f if line.strip()]

In [14]:
# find the matching url for each of the points in the url list from swiss image and download the image
# and save it to the out_dir with the name swissimage_10cm_<featureindex>[_ptindex].png

out_dir = os.path.join("..", "data", "swissimage_dwnl_10cm")

for idx, row in point_gdfs["N2023_Version_Wildtierpassagen"].iterrows():
    geom = row.geometry
    if geom is None or geom.is_empty:
        continue
    pts = []
    if geom.geom_type == "Point":
        pts = [geom]
    else:
        # skip non-point geometries
        continue

    for p_i, pt in enumerate(pts):
        print(f"Processing feature {idx} pt{p_i} at {pt.x}, {pt.y}")
        x, y = math.floor(pt.x / 1000), math.floor(pt.y / 1000)
        print(f" - looking for tile {x}-{y}")
        try:
            img_url = [url for url in url_list if f"{x}-{y}" in url][0]
            img_data = urllib.request.urlopen(img_url).read()
            cv2_img = cv2.imdecode(
                np.frombuffer(img_data, np.uint8), cv2.IMREAD_UNCHANGED
            )
            cv2.imwrite(
                os.path.join(
                    out_dir,
                    f"swissimage_10cm_{idx}" + (f"_pt{p_i}" if p_i else "") + ".png",
                ),
                cv2_img,
            )

        except Exception as e:
            print(f"Failed to fetch image for {layer} index {idx} pt{p_i}:", e)

Processing feature 0 pt0 at 2547594.6644, 1190833.4037999995
 - looking for tile 2547-1190
Processing feature 1 pt0 at 2553722.6777000017, 1186092.2452999987
 - looking for tile 2553-1186
Processing feature 2 pt0 at 2554020.681400001, 1186183.2412
 - looking for tile 2554-1186
Processing feature 3 pt0 at 2533684.664099999, 1167595.5916999988
 - looking for tile 2533-1167
Processing feature 4 pt0 at 2590074.2628000006, 1223640.5067999996
 - looking for tile 2590-1223
Processing feature 5 pt0 at 2609089.2487999983, 1212372.2215000018
 - looking for tile 2609-1212
Processing feature 6 pt0 at 2650860.4673999995, 1177664.7912999988
 - looking for tile 2650-1177
Processing feature 7 pt0 at 2603700.0989000015, 1205235.1022000015
 - looking for tile 2603-1205
Processing feature 8 pt0 at 2610850.316399999, 1219290.2857000008
 - looking for tile 2610-1219
Processing feature 9 pt0 at 2567845.375, 1115833.330600001
 - looking for tile 2567-1115
Processing feature 10 pt0 at 2662588.909499999, 12564