In [4]:

import geopandas as gpd
import pandas as pd

import mercantile
from tqdm import tqdm
import json

from datetime import datetime, timezone
import requests

from vt2geojson.tools import vt_bytes_to_geojson

import os

from concurrent.futures import ThreadPoolExecutor, as_completed
import threading


import gzip



In [5]:
def load_tiles_from_json(bundesland_id, input_folder="output/tile_cache"):
    path = os.path.join(input_folder, f"{bundesland_id}_tiles.json")
    with open(path, "r") as f:
        tile_list = json.load(f)
    return [mercantile.Tile(**t) for t in tile_list]

In [6]:


def export_geodata(gdfs, output_folder="output", base_name="mapillary_traffic-signs", region="ger", save_parquet=True, save_geojson_gz=True):
    """
    Export one or more GeoDataFrames into output files (.parquet and/or .geojson.gz).
    
    Args:
        gdfs (list or GeoDataFrame): List of GeoDataFrames or a single GeoDataFrame.
        output_folder (str): Output directory (created if it doesn't exist).
        base_name (str): Base filename prefix.
        region (str): Region tag for filename.
        save_parquet (bool): Save .parquet file.
        save_geojson_gz (bool): Save .geojson.gz file.
    """
    if gdfs is None or (isinstance(gdfs, (gpd.GeoDataFrame, pd.DataFrame)) and gdfs.empty):
        print("No data to export.")
        return

    # Concatenate if needed
    if isinstance(gdfs, list):
        gdf = gpd.GeoDataFrame(pd.concat(gdfs, ignore_index=True))
    else:
        gdf = gdfs

    os.makedirs(output_folder, exist_ok=True)
    current_date = datetime.now().strftime('%Y-%m-%d')

    if save_parquet:
        parquet_path = os.path.join(output_folder, f"{base_name}_{region}_{current_date}.parquet")
        gdf.to_parquet(parquet_path, index=False)
        print(f"✔ Parquet saved to: {parquet_path}")

    if save_geojson_gz:
        geojson_path = os.path.join(output_folder, f"{base_name}_{region}_{current_date}.geojson")
        gz_path = geojson_path + ".gz"

        gdf.to_file(geojson_path, driver="GeoJSON")

        with open(geojson_path, 'rb') as f_in, gzip.open(gz_path, 'wb') as f_out:
            f_out.writelines(f_in)

        os.remove(geojson_path)
        print(f"✔ Gzipped GeoJSON saved to: {gz_path}")


In [10]:
def process_bundesland(bundesland_id, region_name=None, input_folder="output/tile_cache", output_folder="output", max_workers=6, limit_tiles=None):
    print(f"▶️ Starte Verarbeitung für {bundesland_id}...")

    tiles = load_tiles_from_json(bundesland_id, input_folder=input_folder)
    if limit_tiles:
        tiles = tiles[:limit_tiles]

    def process_tile(tile):

        # Load your access token
        with open("config.json") as f:
            ACCESS_TOKEN = json.load(f)["ACCESS_TOKEN"]

        # Use existing variables
        tile_layer = 'traffic_sign'  # already defined
        tile_coverage = "mly_map_feature_traffic_sign"
        
        url = f"https://tiles.mapillary.com/maps/vtp/{tile_coverage}/2/{tile.z}/{tile.x}/{tile.y}?access_token={ACCESS_TOKEN}"
        response = requests.get(url)
        if response.status_code != 200:
            return None
        try:
            geojson = vt_bytes_to_geojson(response.content, tile.x, tile.y, tile.z, layer=tile_layer)
            features = geojson.get("features", [])
            if not features:
                return None
            gdf_tile = gpd.GeoDataFrame.from_features(features, crs="EPSG:4326")
            gdf_tile['first_seen_at'] = gdf_tile['first_seen_at'].apply(lambda x: datetime.fromtimestamp(x / 1000, tz=timezone.utc)).dt.strftime('%Y-%m-%d')
            gdf_tile['last_seen_at'] = gdf_tile['last_seen_at'].apply(lambda x: datetime.fromtimestamp(x / 1000, tz=timezone.utc)).dt.strftime('%Y-%m-%d')
            gdf_tile['tile_x'] = tile.x
            gdf_tile['tile_y'] = tile.y
            return gdf_tile
        except Exception as e:
            print(f"❌ Fehler bei Tile {tile.x}/{tile.y}: {e}")
            return None

    gdf_all = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(process_tile, tile): tile for tile in tiles}
        for future in tqdm(as_completed(futures), total=len(futures), desc=f"🧩 {bundesland_id}"): #, leave=False
            result = future.result()
            if result is not None:
                gdf_all.append(result)

    if gdf_all:
        gdf_all = gpd.GeoDataFrame(pd.concat(gdf_all, ignore_index=True))
        export_geodata(
            gdfs=gdf_all,
            output_folder=output_folder,
            region=bundesland_id,
            save_parquet=True,
            save_geojson_gz=True
        )
    else:
        print(f"⚠️ Keine Daten für {bundesland_id}.")


In [None]:


# Alle Bundesländer im tile_cache verarbeiten

bland = gpd.read_file("https://raw.githubusercontent.com/isellsoap/deutschlandGeoJSON/main/2_bundeslaender/1_sehr_hoch.geo.json")

#for _, row in bland[:1].iterrows():
for _, row in bland.iterrows():

    b_id = row["id"]
    name = row["name"]

    # Nur verarbeiten, wenn eine JSON-Datei existiert
    tile_json_path = os.path.join("output/tile_cache", f"{b_id}_tiles.json")
    if not os.path.exists(tile_json_path):
        print(f"⏩ Überspringe {b_id}, keine Tiles gefunden.")
        continue

    process_bundesland(b_id, region_name=name)

▶️ Starte Verarbeitung für DE-BW...


🧩 DE-BW: 100%|██████████| 13618/13618 [15:14<00:00, 14.89it/s] 


✔ Parquet saved to: output/mapillary_traffic-signs_DE-BW_2025-08-04.parquet
