In [None]:
# ------------------------------------------
# Was tut dieses Skript?
# Dieses Skript lädt die Mapillay coverage je Bundesland je Tiles herunter,
# filtert sie nach Aufnahmedatum,
# und exportiert die Ergebnisse als Parquet-Dateien.
# ------------------------------------------

In [1]:

# Standard library
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
import json
import os
import time

# Third-party
import geopandas as gpd
import mercantile
import pandas as pd
import requests
from requests.exceptions import SSLError
from tqdm import tqdm
from vt2geojson.tools import vt_bytes_to_geojson


from config import TILES_CONFIG, PROCESSING_CONFIG, MAPILLARY_CONFIG



In [9]:
### Helper function for loading tile lists

def load_tiles_from_json(bundesland_id, input_folder = TILES_CONFIG["cache_folder"]):
    path = os.path.join(input_folder, f"{bundesland_id}_tiles.json")
    with open(path, "r") as f:
        tile_list = json.load(f)
    return [mercantile.Tile(**t) for t in tile_list]

In [10]:

def export_geodata(gdfs, output_folder = PROCESSING_CONFIG["output_folder"], base_name="mapillary_coverage", region="ger", save_parquet=True, save_geojson_gz=True):
    """
    Export one or more GeoDataFrames into output files (.parquet and/or .geojson.gz).
    
    Args:
        gdfs (list or GeoDataFrame): List of GeoDataFrames or a single GeoDataFrame.
        output_folder (str): Output directory (created if it doesn't exist).
        base_name (str): Base filename prefix.
        region (str): Region tag for filename.
        save_parquet (bool): Save .parquet file.
        save_geojson_gz (bool): Save .geojson.gz file.
    """
    if gdfs is None or (isinstance(gdfs, (gpd.GeoDataFrame, pd.DataFrame)) and gdfs.empty):
        print("No data to export.")
        return

    # Concatenate if needed
    if isinstance(gdfs, list):
        gdf = gpd.GeoDataFrame(pd.concat(gdfs, ignore_index=True))
    else:
        gdf = gdfs

    os.makedirs(output_folder, exist_ok=True)
    current_date = datetime.now().strftime('%Y-%m-%d')

    if save_parquet:
        parquet_path = os.path.join(output_folder, f"{base_name}_{region}_{current_date}.parquet")
        gdf.to_parquet(parquet_path, index=False)
        print(f"✔ Parquet saved to: {parquet_path}")


In [None]:
def process_bundesland(bundesland_id, region_name=None, input_folder = TILES_CONFIG["cache_folder"], output_folder = PROCESSING_CONFIG["ml_output_folder"] , max_workers=3, limit_tiles=None):
    print(f"▶️ Starte Verarbeitung für {bundesland_id}...")

    tiles = load_tiles_from_json(bundesland_id, input_folder=input_folder)
    if limit_tiles:
        tiles = tiles[:limit_tiles]
    
    # genereal config
    # Load your access token
    ACCESS_TOKEN = MAPILLARY_CONFIG["access_token"]

    # Use existing variables
    #tile_layer = 'traffic_sign'  # already defined
    #tile_coverage = "mly_map_feature_traffic_sign"

    tile_layer = "sequence"  # or 'image' for points
    tile_coverage = "mly1_computed_public"

    def process_tile(tile):
        
        url = f"https://tiles.mapillary.com/maps/vtp/{tile_coverage}/2/{tile.z}/{tile.x}/{tile.y}?access_token={ACCESS_TOKEN}"
        response = requests.get(url)
        if response.status_code != 200:
            return None
        try:
            geojson = vt_bytes_to_geojson(response.content, tile.x, tile.y, tile.z, layer=tile_layer)
            features = geojson.get("features", [])
            if not features:
                return None
            
            gdf_tile = gpd.GeoDataFrame.from_features(features, crs="EPSG:4326")
            gdf_tile['captured_at'] = gdf_tile['captured_at'].apply(lambda x: datetime.fromtimestamp(x / 1000, tz=timezone.utc))
            #timefilter
            # parse min date string from config (
            min_date_str = PROCESSING_CONFIG["min_capture_date"]
            min_dt = datetime.strptime(min_date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc)
            gdf_tile = gdf_tile[gdf_tile['captured_at'] >= min_dt]
            gdf_tile['captured_at'] = gdf_tile['captured_at'].dt.strftime('%Y-%m-%d')

            gdf_tile['tile_x'] = tile.x
            gdf_tile['tile_y'] = tile.y
            return gdf_tile
        except Exception as e:
            print(f"❌ Fehler bei Tile {tile.x}/{tile.y}: {e}")
            return None

    gdf_all = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(process_tile, tile): tile for tile in tiles}
        for future in tqdm(as_completed(futures), total=len(futures), desc=f"🧩 {bundesland_id}"): #, leave=False
            try:
                result = future.result()
                if result is not None:
                    gdf_all.append(result)
            except SSLError as e:
                print(f"⚠️ SSLError: {e}. Pausiere für 5 Minuten...")
                time.sleep(300)  # 5 Minuten Pause
            except Exception as e:
                print(f"⚠️ Unbekannter Fehler: {e}")

    if gdf_all:
        gdf_all = gpd.GeoDataFrame(pd.concat(gdf_all, ignore_index=True))
        export_geodata(
            gdfs=gdf_all,
            output_folder=output_folder,
            region=bundesland_id,
            save_parquet=True,
            save_geojson_gz=True
        )
    else:
        print(f"⚠️ Keine Daten für {bundesland_id}.")


In [None]:
# just show the bundesländer
bland = gpd.read_file("https://raw.githubusercontent.com/isellsoap/deutschlandGeoJSON/main/2_bundeslaender/1_sehr_hoch.geo.json")
bland

Unnamed: 0,id,name,type,geometry
0,DE-BW,Baden-Württemberg,State,"MULTIPOLYGON (((8.70837 47.71556, 8.70918 47.7..."
1,DE-BY,Bayern,State,"POLYGON ((10.13386 50.55, 10.1398 50.54252, 10..."
2,DE-BE,Berlin,State,"POLYGON ((13.16181 52.59442, 13.174 52.59425, ..."
3,DE-BB,Brandenburg,State,"POLYGON ((13.87951 53.50107, 13.87927 53.49908..."
4,DE-HB,Bremen,State,"POLYGON ((8.98545 53.12822, 8.97316 53.12799, ..."
5,DE-HH,Hamburg,State,"POLYGON ((10.07162 53.71823, 10.0715 53.72192,..."
6,DE-HE,Hessen,State,"POLYGON ((9.49877 51.63152, 9.50474 51.62795, ..."
7,DE-MV,Mecklenburg-Vorpommern,State,"MULTIPOLYGON (((14.26472 53.71069, 14.26472 53..."
8,DE-NI,Niedersachsen,State,"MULTIPOLYGON (((6.86528 53.59597, 6.86528 53.5..."
9,DE-NW,Nordrhein-Westfalen,State,"POLYGON ((8.66628 52.52528, 8.67277 52.51795, ..."


In [17]:
# Alle Bundesländer im tile_cache verarbeiten
#bland = gpd.read_file("https://raw.githubusercontent.com/isellsoap/deutschlandGeoJSON/main/2_bundeslaender/1_sehr_hoch.geo.json")

#for _, row in bland[:1].iterrows():
for _, row in bland[bland.id=="DE-HB"].iterrows():
#for _, row in bland[13:].iterrows():
#for _, row in bland.iterrows():

    b_id = row["id"]
    name = row["name"]

    # Nur verarbeiten, wenn eine JSON-Datei existiert
    tile_json_path = os.path.join(TILES_CONFIG["cache_folder"], f"{b_id}_tiles.json")
    if not os.path.exists(tile_json_path):
        print(f"⏩ Überspringe {b_id}, keine Tiles gefunden.")
        continue

    process_bundesland(b_id, region_name=name)

▶️ Starte Verarbeitung für DE-HB...


🧩 DE-HB:   0%|          | 0/142 [00:00<?, ?it/s]

🧩 DE-HB: 100%|██████████| 142/142 [00:41<00:00,  3.44it/s]


✔ Parquet saved to: ml_output/mapillary_coverage_DE-HB_2025-10-18.parquet
