In [1]:
import json
import math
from pathlib import Path
from collections import defaultdict

import numpy as np
import h5py
from tqdm import tqdm

In [2]:
dataset_path = "/Volumes/SSD/mark/Documents/Works/MT_Dataset/mt_tracks_20250714.h5"

if "ds" in vars():
    ds.close()   # type: ignore

ds = h5py.File(dataset_path, "r")
for attr in ds.attrs:
    print(f"{attr}: {ds.attrs[attr]}")

author: Mark Vodyanitskiy (mvodya@icloud.com)
created_at: 2025-07-13T14:26:08.378871
filter_rules: MIN_TOTAL_POINTS=50, MIN_MOVING_POINTS=5, MIN_MAX_SPEED=20, SPEED_MOVING_MIN=10, SPEED_SANITY_MAX=800
filtered_at: 2026-01-08T06:02:47.931391
sources_count: 27555
sources_size: 439.3Gb
tracks_built_at: 2026-01-09T17:19:14.225437
tracks_count: 11731643
tracks_rules: SPEED_MOVING_MIN=10, STOP_RADIUS_M=250.0, STOP_DWELL_SEC=1800, GAP_HARD_SEC=18000, GAP_VERY_HARD_SEC=36000, DIST_AFTER_GAP_M=150000.0, JUMP_HARD_M=250000.0, DEST_GAP_SEC=14400, DEST_DIST_M=50000.0
version: 1.0


In [3]:
OUT_PATH = "/Volumes/SSD/mark/Documents/Works/MT_Dataset/mt_tracks_20250714_tsorted.h5"
POI_JSON_PATH = "/Volumes/SSD/mark/Documents/Works/MT_Dataset/mt_tracks_20250714_poi.json"

# Фильтрация по POI
USE_POI_FILTER = True

# Layout: /positions/tracks/AAAA-BBBB/CCCC-DDDD
TRACKS_PER_GROUP = 100_000      # ширина диапазона track_id для группы AAAA-BBBB
DATASETS_PER_GROUP = 100        # внутри группы нарезаем на 100 датасетов CCCC-DDDD

# IO/memory
READ_CHUNK_ROWS = 2_000_000       # сколько строк читать за раз из /positions/YYYY/MM/DD
FLUSH_THRESHOLD_ROWS = 2_000_000  # сколько строк накапливаем на (group,sub) перед flush

# Копировать ли исходный /positions в новый файл
COPY_ORIGINAL_POSITIONS = True

# Какие поля пишем в tsorted-структуру
TSORT_FIELDS = [
    "track_id", "timestamp", "lat", "lon",
    "speed", "course", "heading", "rot", "elapsed",
    "ship_id", "file_id", "destination", "tile_z"
]

In [4]:
def ensure_parent_dir(path: str):
    Path(path).parent.mkdir(parents=True, exist_ok=True)

def range_label(a: int, b: int) -> str:
    # 0-padded to keep lexicographic ordering nice
    return f"{a:08d}-{b:08d}"

def compute_group_bounds(track_id: int, tracks_per_group: int):
    g0 = (track_id // tracks_per_group) * tracks_per_group
    g1 = g0 + tracks_per_group - 1
    return g0, g1

def compute_subrange_bounds(track_id: int, g0: int, subrange: int):
    s_idx = (track_id - g0) // subrange
    s0 = g0 + s_idx * subrange
    s1 = min(s0 + subrange - 1, g0 + TRACKS_PER_GROUP - 1)
    return int(s_idx), s0, s1

def stable_sort_by_track_then_time(arr: np.ndarray) -> np.ndarray:
    # stable: first sort by timestamp, then stable sort by track_id
    # mergesort is stable
    idx_time = np.argsort(arr["timestamp"], kind="mergesort")
    arr2 = arr[idx_time]
    idx_track = np.argsort(arr2["track_id"], kind="mergesort")
    return arr2[idx_track]

In [5]:
poi_track_ids = None

if USE_POI_FILTER:
    print("Loading POI json:", POI_JSON_PATH)
    with open(POI_JSON_PATH, "r", encoding="utf-8") as f:
        poi = json.load(f)

    # pairs: "A->B": { ..., "track_ids": [...] }
    # собираем все track_ids в set
    poi_track_ids = set()
    pairs = poi.get("pairs", {})
    for k, v in tqdm(pairs.items(), desc="Collect POI track_ids"):
        tids = v.get("track_ids", [])
        poi_track_ids.update(tids)

    # safety: track_ids могут быть int/str - нормализуем в int
    poi_track_ids = {int(x) for x in poi_track_ids}

    print("POI tracks:", len(poi_track_ids))
else:
    print("POI filter disabled.")


Loading POI json: /Volumes/SSD/mark/Documents/Works/MT_Dataset/mt_tracks_20250714_poi.json


Collect POI track_ids: 100%|██████████| 6003/6003 [00:00<00:00, 336735.77it/s]

POI tracks: 380072





In [6]:
ensure_parent_dir(OUT_PATH)

# Подготовим dtype для tsorted-позиций как subset исходного dtype
# Найдем любой positions dataset чтобы взять dtype (первый попавшийся день)
def find_any_positions_dataset(ds) -> h5py.Dataset:
    pos_root = ds["positions"]
    for y in pos_root:
        for m in pos_root[y]:
            for d in pos_root[y][m]:
                return pos_root[y][m][d]
    raise RuntimeError("No positions datasets found in ds['positions'].")

src_any_pos = find_any_positions_dataset(ds)
src_dtype = src_any_pos.dtype
ts_dtype = np.dtype([(name, src_dtype.fields[name][0]) for name in TSORT_FIELDS])

# Открываем output
out = h5py.File(OUT_PATH, "w")

# attrs
for k, v in ds.attrs.items():
    out.attrs[k] = v
out.attrs["tsorted_built_at"] = np.string_(str(np.datetime64("now")))

# копируем верхнеуровневые таблицы (без positions)
for name in ["files", "ships", "tracks", "zones"]:
    if name in ds:
        print("Copy:", name)
        ds.copy(name, out, name=name)

# positions: копируем опционально
if COPY_ORIGINAL_POSITIONS:
    print("Copy: positions (this can double file size!)")
    ds.copy("positions", out, name="positions")
else:
    # создаем пустой positions root (чтобы структура была совместима)
    out.create_group("positions")

# создаем /positions/tracks для новой структуры
tracks_root = out["positions"].require_group("tracks")

print("Output ready:", OUT_PATH)


Copy: files
Copy: ships
Copy: tracks
Copy: zones
Copy: positions (this can double file size!)
Output ready: /Volumes/SSD/mark/Documents/Works/MT_Dataset/mt_tracks_20250714_tsorted.h5


In [7]:
TQDM_KW = dict(
    ascii=True,          # гарантированный "текстовый" прогрессбар
    dynamic_ncols=True,  # подстраивается под ширину
    mininterval=0.5,
)

pos_root = ds["positions"]

# считаем количество дней заранее (total для прогресса)
years = sorted(pos_root.keys())

total_days = 0
for y in years:
    for m in pos_root[y].keys():
        total_days += len(pos_root[y][m].keys())

print("total_days:", total_days)

total_days: 247


In [8]:
SUBRANGE = math.ceil(TRACKS_PER_GROUP / DATASETS_PER_GROUP)

ds_cache = {}

def get_or_create_tsorted_dataset(out_tracks_root: h5py.Group, g0: int, g1: int, s0: int, s1: int) -> h5py.Dataset:
    g_label = range_label(g0, g1)
    s_label = range_label(s0, s1)

    g = out_tracks_root.require_group(g_label)
    path_key = f"{g_label}/{s_label}"

    dset = ds_cache.get(path_key)
    if dset is not None:
        return dset

    if s_label in g:
        dset = g[s_label]
    else:
        dset = g.create_dataset(
            s_label,
            shape=(0,),
            maxshape=(None,),
            dtype=ts_dtype,
            chunks=True,
            compression="gzip",
            compression_opts=4
        )

    ds_cache[path_key] = dset
    return dset

def append_rows(dset: h5py.Dataset, rows: np.ndarray):
    n0 = dset.shape[0]
    n_add = rows.shape[0]
    dset.resize((n0 + n_add,))
    dset[n0:n0 + n_add] = rows

buffers = defaultdict(list)
buffer_sizes = defaultdict(int)

# главный прогрессбар по дням
p_days = tqdm(total=total_days, desc="Repack days", **TQDM_KW)

for y in years:
    months = sorted(pos_root[y].keys())
    for m in months:
        days = sorted(pos_root[y][m].keys())
        for d in days:
            day_ds = pos_root[y][m][d]
            n = day_ds.shape[0]

            # обновляем postfix чтобы было видно где мы
            p_days.set_postfix_str(f"{y}/{m}/{d} rows={n}")

            if n == 0:
                p_days.update(1)
                continue

            for start in range(0, n, READ_CHUNK_ROWS):
                end = min(start + READ_CHUNK_ROWS, n)
                chunk = day_ds[start:end]

                # subset полей -> ts_dtype
                chunk2 = np.empty((chunk.shape[0],), dtype=ts_dtype)
                for name in TSORT_FIELDS:
                    chunk2[name] = chunk[name]

                # POI фильтр
                if USE_POI_FILTER:
                    tids = chunk2["track_id"].astype(np.int64, copy=False)
                    # быстрее чем генератор по одному элементу:
                    # np.isin на больших массивах может быть тяжелым по памяти,
                    # поэтому делаем через vectorized-проверку на python set:
                    mask = np.fromiter((int(t) in poi_track_ids for t in tids),
                                       dtype=np.bool_, count=tids.shape[0])
                    if not mask.any():
                        continue
                    chunk2 = chunk2[mask]
                    if chunk2.shape[0] == 0:
                        continue

                # распределяем по (group, subrange) без построчного цикла
                tids = chunk2["track_id"].astype(np.int64, copy=False)
                g0s = (tids // TRACKS_PER_GROUP) * TRACKS_PER_GROUP
                s_idxs = ((tids - g0s) // SUBRANGE).astype(np.int64, copy=False)

                order = np.lexsort((s_idxs, g0s))
                chunk2 = chunk2[order]
                g0s = g0s[order]
                s_idxs = s_idxs[order]

                change = np.empty(chunk2.shape[0], dtype=np.bool_)
                change[0] = True
                change[1:] = (g0s[1:] != g0s[:-1]) | (s_idxs[1:] != s_idxs[:-1])

                cuts = np.nonzero(change)[0]
                cuts = np.append(cuts, chunk2.shape[0])

                for i in range(len(cuts) - 1):
                    a, b = cuts[i], cuts[i + 1]
                    g0 = int(g0s[a])
                    g1 = g0 + TRACKS_PER_GROUP - 1
                    s_idx = int(s_idxs[a])
                    s0 = g0 + s_idx * SUBRANGE
                    s1 = min(s0 + SUBRANGE - 1, g1)

                    part = chunk2[a:b]
                    key = (g0, g1, s0, s1)
                    buffers[key].append(part)
                    buffer_sizes[key] += part.shape[0]

                    if buffer_sizes[key] >= FLUSH_THRESHOLD_ROWS:
                        merged = np.concatenate(buffers[key], axis=0)
                        merged = stable_sort_by_track_then_time(merged)

                        dset = get_or_create_tsorted_dataset(tracks_root, g0, g1, s0, s1)
                        append_rows(dset, merged)

                        buffers[key].clear()
                        buffer_sizes[key] = 0

            p_days.update(1)

p_days.close()

# финальный flush
print("Final flush...")
for key, parts in tqdm(list(buffers.items()), desc="Flush leftovers", **TQDM_KW):
    if not parts:
        continue
    g0, g1, s0, s1 = key
    merged = np.concatenate(parts, axis=0)
    merged = stable_sort_by_track_then_time(merged)
    dset = get_or_create_tsorted_dataset(tracks_root, g0, g1, s0, s1)
    append_rows(dset, merged)

out.flush()
out.close()

Repack days: 100%|##########| 247/247 [05:11<00:00,  1.26s/it, 2025/07/10 rows=1401756]


Final flush...


Flush leftovers: 100%|##########| 13781/13781 [00:11<00:00, 1200.65it/s]


In [9]:
if "ds_out" in vars():
    ds_out.close()   # type: ignore

ds_out = h5py.File(OUT_PATH, "r")
for attr in ds_out.attrs:
    print(f"{attr}: {ds_out.attrs[attr]}")

author: Mark Vodyanitskiy (mvodya@icloud.com)
created_at: 2025-07-13T14:26:08.378871
filter_rules: MIN_TOTAL_POINTS=50, MIN_MOVING_POINTS=5, MIN_MAX_SPEED=20, SPEED_MOVING_MIN=10, SPEED_SANITY_MAX=800
filtered_at: 2026-01-08T06:02:47.931391
sources_count: 27555
sources_size: 439.3Gb
tracks_built_at: 2026-01-09T17:19:14.225437
tracks_count: 11731643
tracks_rules: SPEED_MOVING_MIN=10, STOP_RADIUS_M=250.0, STOP_DWELL_SEC=1800, GAP_HARD_SEC=18000, GAP_VERY_HARD_SEC=36000, DIST_AFTER_GAP_M=150000.0, JUMP_HARD_M=250000.0, DEST_GAP_SEC=14400, DEST_DIST_M=50000.0
tsorted_built_at: b'2026-01-12T10:46:51'
version: 1.0


In [10]:
print("HDF5 Dataset Structure:\n\n")

for name, obj in ds_out.items():
    print(f"{name}:")
    if isinstance(obj, h5py.Dataset) and obj.dtype.fields:
        for field_name, (field_dtype, offset, *_) in obj.dtype.fields.items():
            print(f"  - {field_name}: {field_dtype}")
        if len(obj) >= 1:
            print(f"  Example: {obj[1]}")
    if isinstance(obj, h5py.Group):
        if name == "positions" and "tracks" in obj:
            print(f"  /YYYY\n    /MM\n      /DD:")
            for k1, item in obj.items():
                if k1 == "tracks":
                    continue
                for _, item in item.items():
                    for _, item in item.items():
                        for field_name, (field_dtype, offset, *_) in item.dtype.fields.items():
                            print(f"        - {field_name}: {field_dtype}")
                        print(f"        Example: {item[0]}")
                        break
                    break
                break

            print(f"  /tracks\n    /AAAA-BBBB\n      /CCCC-DDDD:")
            for _, item in obj["tracks"].items():
                for _, item in item.items():
                    for field_name, (field_dtype, offset, *_) in item.dtype.fields.items():
                        print(f"        - {field_name}: {field_dtype}")
                    print(f"        Example: {item[0]}")
                    break
                break
        else:
            print(f"  /YYYY\n    /MM\n      /DD:")
            for _, item in obj.items():
                for _, item in item.items():
                    for _, item in item.items():
                        for field_name, (field_dtype, offset, *_) in item.dtype.fields.items():
                            print(f"        - {field_name}: {field_dtype}")
                        print(f"        Example: {item[0]}")
                        break
                    break
                break
    print("\n")


HDF5 Dataset Structure:


files:
  - file_id: int32
  - name: |S256
  - positions_count: int32
  - timestamp: int32
  Example: (1, b'29.10.2024_11_35.json', 43344, 1752416768)


positions:
  /YYYY
    /MM
      /DD:
        - ship_id: int64
        - timestamp: int32
        - lat: float32
        - lon: float32
        - speed: int32
        - course: int32
        - heading: int32
        - rot: int32
        - elapsed: int32
        - destination: |S64
        - tile_z: int32
        - file_id: int32
        - track_id: int64
        Example: (1, 1730200428, 43.0834, 132.3136, 0, 18, 166, 0, 338, b'NOVIIMIR', 10, 0, 1)
  /tracks
    /AAAA-BBBB
      /CCCC-DDDD:
        - track_id: int64
        - timestamp: int32
        - lat: float32
        - lon: float32
        - speed: int32
        - course: int32
        - heading: int32
        - rot: int32
        - elapsed: int32
        - ship_id: int64
        - file_id: int32
        - destination: |S64
        - tile_z: int32
        