In [1]:
import gzip
import json
import math
import struct
import sys


In [2]:
# --- TileID 計算関数 ---
# https://github.com/protomaps/PMTiles/blob/main/spec/v3/spec.md#41-directory-entries
# TileIDからz, x, yを取得（検証やデバッグ用）
def tileid_to_zxy(tile_id):
    z_val = 0
    if tile_id > 0:
        z_val = int(math.log2(tile_id + 1) // 2)
    n = (1 << (2 * z_val)) - 1
    if tile_id > n:
        tile_id = n

    x_val = 0
    y_val = 0
    acc = 0
    for z_i in range(z_val):
        quadrant = (tile_id - acc) // (1 << (2 * (z_val - z_i - 1)))
        if quadrant == 0:  # top left
            pass
        elif quadrant == 1:  # top right
            x_val += 1 << z_i
        elif quadrant == 2:  # bottom left
            y_val += 1 << z_i
        elif quadrant == 3:  # bottom right
            x_val += 1 << z_i
            y_val += 1 << z_i
        acc += (1 << (2 * (z_val - z_i - 1))) * quadrant
    return z_val, x_val, y_val


In [3]:
# --- TileID 計算関数 (Hilbert Curve) ---
# (_rot, zxy_to_tileid - 変更なし)
def _rot(n, x, y, rx, ry):
    """Helper function for Hilbert curve coordinate rotation."""
    if ry == 0:
        if rx == 1:
            x = n - 1 - x
            y = n - 1 - y
        x, y = y, x
    return x, y


def zxy_to_tileid(z, x, y):
    """
    Converts Z/X/Y tile coordinates to a PMTiles TileID based on the Hilbert curve.
    """
    if z > 26:
        raise ValueError("Zoom level exceeds maximum (26)")
    if x >= (1 << z) or y >= (1 << z):
        raise ValueError(f"X ({x}) or Y ({y}) exceeds tile range for zoom level {z}")

    if z == 0:
        return 0

    tile_id = 0
    n = 1 << z
    cur_x = x
    cur_y = y

    for i in range(z - 1, -1, -1):
        level_n = 1 << i
        rx = (cur_x >> i) & 1
        ry = (cur_y >> i) & 1
        quadrant = (rx * 3) ^ ry
        tile_id += quadrant * (level_n * level_n)
        cur_x, cur_y = _rot(level_n, cur_x % level_n, cur_y % level_n, rx, ry)

    offset = (n * n - 1) // 3
    tile_id += offset
    return tile_id


In [4]:
# --- 座標計算ヘルパー ---
# https://wiki.openstreetmap.org/wiki/Slippy_map_tilenames#Python
def deg2num(lat_deg, lon_deg, zoom):
    lat_rad = math.radians(lat_deg)
    n = 1 << zoom
    xtile = int((lon_deg + 180.0) / 360.0 * n)
    ytile = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)
    return xtile, ytile


def num2deg(xtile, ytile, zoom):
    n = 1 << zoom
    lon_deg = xtile / n * 360.0 - 180.0
    lat_rad = math.atan(math.sinh(math.pi * (1 - 2 * ytile / n)))
    lat_deg = math.degrees(lat_rad)
    return lat_deg, lon_deg


In [5]:
# タイルの境界ボックスを計算
def tile_bounds(z, x, y):
    lat_max, lon_min = num2deg(x, y, z)
    lat_min, lon_max = num2deg(x + 1, y + 1, z)
    return lon_min, lat_min, lon_max, lat_max


In [6]:
# タイルの中心座標を計算
def tile_center(z, x, y):
    lon_min, lat_min, lon_max, lat_max = tile_bounds(z, x, y)
    center_lon = (lon_min + lon_max) / 2
    center_lat = (lat_min + lat_max) / 2
    # PMTiles仕様では中心ズームも定義されているが、
    # 単一タイルの場合はタイルのズームレベルと同じで良い
    return center_lon, center_lat, z


In [7]:
# --- Varintエンコード ---
# https://github.com/protomaps/PMTiles/blob/main/spec/v3/spec.md#a1-encode-a-directory
def encode_varint(value):
    result = bytearray()
    while True:
        byte = value & 0x7F
        value >>= 7
        if value:
            byte |= 0x80
        result.append(byte)
        if not value:
            break
    return bytes(result)


In [8]:
# --- メイン処理: 単一MVTタイルからPMTiles v3を作成 ---
def create_single_tile_pmtiles(mvt_input_path, pmtiles_output_path, z, x, y):
    """
    単一のMVTタイルデータからPMTiles v3ファイルを生成する。
    空のメタデータセクションを追加。ヘッダー、圧縮ルートディレクトリ、
    圧縮済み空メタデータ、圧縮タイルデータ。全てGzip圧縮。

    Args:
        mvt_input_path (str): 入力MVTファイルのパス。
        pmtiles_output_path (str): 出力PMTilesファイルのパス。
        z (int): タイルのズームレベル。
        x (int): タイルのX座標。
        y (int): タイルのY座標。
    """
    try:
        # --- 1. タイルデータの読み込み ---
        with open(mvt_input_path, "rb") as f:
            tile_data = f.read()

        # --- 2. 圧縮設定とデータ準備 (全てGzip) ---
        internal_compression = 1  # Gzip
        tile_compression = 1  # Gzip

        # Tile Data (Gzip圧縮)
        compressed_tile_data = gzip.compress(tile_data)
        compressed_tile_length = len(compressed_tile_data)

        # --- 修正点: 空のメタデータの準備と圧縮 ---
        metadata_json = {}  # Empty JSON object
        metadata_bytes = json.dumps(metadata_json, separators=(",", ":")).encode(
            "utf-8"
        )  # Should be b'{}'
        compressed_metadata = gzip.compress(metadata_bytes)  # Gzip圧縮
        metadata_length = len(compressed_metadata)  # 圧縮後の長さ (非ゼロになるはず)

        # --- 3. TileID (Hilbert)と座標の計算 ---
        tile_id = zxy_to_tileid(z, x, y)  # Hilbert ID
        lon_min, lat_min, lon_max, lat_max = tile_bounds(z, x, y)
        center_lon, center_lat, center_z = tile_center(z, x, y)
        min_lon_e7 = int(lon_min * 10_000_000)
        min_lat_e7 = int(lat_min * 10_000_000)
        max_lon_e7 = int(lon_max * 10_000_000)
        max_lat_e7 = int(lat_max * 10_000_000)
        center_lon_e7 = int(center_lon * 10_000_000)
        center_lat_e7 = int(center_lat * 10_000_000)

        # --- 4. ルートディレクトリの構築 (Gzip圧縮、単一エントリ) ---
        root_dir_entry = (
            encode_varint(tile_id)
            + encode_varint(1)
            + encode_varint(0)
            + encode_varint(compressed_tile_length)
        )
        compressed_root_dir = gzip.compress(root_dir_entry)
        root_dir_length = len(compressed_root_dir)

        # --- 5. オフセットの計算 (空メタデータを含む) ---
        header_length = 127
        root_dir_offset = header_length
        metadata_offset = root_dir_offset + root_dir_length  # Root Dirの直後 (非ゼロ)
        tile_data_offset = metadata_offset + metadata_length  # Metaの直後
        leaf_dir_offset = (
            tile_data_offset + compressed_tile_length
        )  # Tile Dataの直後 (非ゼロ)
        leaf_dir_length = 0

        # --- 6. ヘッダーの構築 (メタデータオフセット/長さを設定) ---
        header_format = "<7sBQQQQQQQQQQQBBBBBBiiiiBii"
        num_addressed_tiles = 1
        num_tile_entries = 1
        num_tile_contents = 1

        if not (0 <= z <= 255):
            raise ValueError(f"タイルズーム {z} invalid")
        if not (0 <= center_z <= 255):
            raise ValueError(f"中心ズーム {center_z} invalid")

        header = struct.pack(
            header_format,
            b"PMTiles",  # 1: Magic
            3,  # 2: Version
            root_dir_offset,  # 3: Root Offset
            root_dir_length,  # 4: Root Length
            metadata_offset,  # 5: Meta Offset (Updated, non-zero)
            metadata_length,  # 6: Meta Length (Updated, non-zero)
            leaf_dir_offset,  # 7: Leaf Offset
            leaf_dir_length,  # 8: Leaf Length (0)
            tile_data_offset,  # 9: Tile Data Offset
            compressed_tile_length,  # 10: Tile Data Length
            num_addressed_tiles,  # 11: Addressed Tiles = 1
            num_tile_entries,  # 12: Tile Entries = 1
            num_tile_contents,  # 13: Tile Contents = 1
            0,  # 14: Clustered
            internal_compression,  # 15: Internal Comp (1=Gzip)
            tile_compression,  # 16: Tile Comp (1=Gzip)
            1,  # 17: Tile Type (MVT)
            z,
            z,  # 18, 19: Min/Max Zoom
            min_lon_e7,
            min_lat_e7,
            max_lon_e7,
            max_lat_e7,  # 20-23: Bounds
            center_z,
            center_lon_e7,
            center_lat_e7,  # 24-26: Center
        )

        if len(header) != header_length:
            raise ValueError(f"ヘッダー長が不正: {len(header)}")

        # --- 7. ファイルへの書き込み (空メタデータを含む) ---
        with open(pmtiles_output_path, "wb") as f_out:
            f_out.write(header)
            f_out.write(compressed_root_dir)
            f_out.write(compressed_metadata)  # 圧縮済み空メタデータを書き込む
            f_out.write(compressed_tile_data)

        print(
            f"PMTilesファイルが正常に作成されました (空メタデータ付き): {pmtiles_output_path}"
        )
        print(f"  TileID (Hilbert): {tile_id} (z={z}, x={x}, y={y})")
        print(
            f"  Compression: Internal={internal_compression}, Tile={tile_compression} (1=Gzip)"
        )
        print(
            f"  ルートディレクトリ (Gzip圧縮): Offset={root_dir_offset}, Length={root_dir_length}"
        )
        print(
            f"    -> Entry: TileID={tile_id}, RL=1, Offset=0, Length={compressed_tile_length}"
        )
        print(
            f"  メタデータ (空, Gzip圧縮): Offset={metadata_offset}, Length={metadata_length}"
        )  # 更新
        print(
            f"  タイルデータ (Gzip圧縮): Offset={tile_data_offset}, Length={compressed_tile_length}"
        )
        print(
            f"  リーフディレクトリ: Offset={leaf_dir_offset}, Length={leaf_dir_length}"
        )

    except FileNotFoundError:
        print(
            f"エラー: 入力MVTファイルが見つかりません: {mvt_input_path}",
            file=sys.stderr,
        )
    except ValueError as e:
        print(f"エラー: {e}", file=sys.stderr)
    except Exception as e:
        print(f"予期せぬエラーが発生しました: {e}", file=sys.stderr)
        import traceback

        traceback.print_exc()

In [9]:
# --- パラメータ設定 ---
# 実際のMVTファイルパス、出力パス、タイル座標に置き換えてください
input_mvt = "./14/8907/5509.mvt"
output_pmtiles = "output.pmtiles"
tile_z = 14
tile_x = 8907
tile_y = 5509

create_single_tile_pmtiles(input_mvt, output_pmtiles, tile_z, tile_x, tile_y)


PMTilesファイルが正常に作成されました (実験: Dir Offset = Absolute): output.pmtiles
  TileID (Hilbert): 317919281 (z=14, x=8907, y=5509)
  Compression: Internal=1, Tile=1 (1=Gzip)
  ルートディレクトリ (Gzip圧縮): Offset=127, Length=29
    -> Entry: TileID=317919281, RL=1, Offset=178, Length=123
  メタデータ (空, Gzip圧縮): Offset=156, Length=22
  タイルデータ (Gzip圧縮): Offset=178, Length=123
  リーフディレクトリ: Offset=301, Length=0
