In [None]:
# ================================================================
# LUMVORAX DEPENDENCY 360 VALIDATION (KAGGLE SINGLE CELL - V3 COMPLETE)
# ================================================================
# - Full forensic timeline with nanosecond timestamps.
# - Strict fail policy by default (no compression fallback).
# - Optionally relaxed mode for diagnostics.
# - Single-file copy/paste for Kaggle.

from __future__ import annotations

import importlib
import json
import os
import platform
import re
import shutil
import struct
import subprocess
import sys
import tempfile
import time
from hashlib import sha256, sha512
from pathlib import Path
from typing import Any, Dict, List

LUM_MAGIC = b"LUMV1\x00\x00\x00"
FUNC_RE = re.compile(r"^\s*([a-zA-Z_][\w\s\*]+?)\s+([a-zA-Z_]\w*)\s*\(([^;]*?)\)\s*;\s*$")

KAGGLE_DEP_PATHS = [
    Path('/kaggle/input/datasets/ndarray2000/nx47-dependencies'),
    Path('/kaggle/input/nx47-dependencies'),
    Path('/kaggle/input/lum-vorax-dependencies'),
    Path('/kaggle/input/lumvorax-dependencies'),
]
SOURCE_SCAN_DIRS = [
    Path('/kaggle/working/src/lum'), Path('/kaggle/working/src/file_formats'), Path('/kaggle/working/src/vorax'),
    Path('/kaggle/working/src/logger'), Path('/kaggle/working/src/debug'),
    Path('src/lum'), Path('src/file_formats'), Path('src/vorax'), Path('src/logger'), Path('src/debug'),
]

STRICT_NO_FALLBACK = os.environ.get("LUMVORAX_STRICT_NO_FALLBACK", "1") == "1"
REQUIRE_NATIVE = os.environ.get("LUMVORAX_REQUIRE_NATIVE", "1") == "1"


def now_ns() -> int:
    return time.time_ns()


def log_event(report: Dict[str, Any], step: str, **payload: Any) -> None:
    report.setdefault("events", []).append({"ts_ns": now_ns(), "step": step, **payload})


def pkg_available(name: str) -> bool:
    try:
        importlib.import_module(name)
        return True
    except Exception:
        return False


def install_offline_if_missing(pkg: str, report: Dict[str, Any]) -> Dict[str, Any]:
    if pkg_available(pkg):
        log_event(report, "dependency_already_installed", package=pkg)
        return {"package": pkg, "status": "already_installed"}

    py = sys.executable
    exact = {
        'numpy': 'numpy-2.4.2-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl',
        'tifffile': 'tifffile-2026.1.28-py3-none-any.whl',
        'imagecodecs': 'imagecodecs-2026.1.14-cp311-abi3-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl',
    }
    last = 'not found'
    for root in KAGGLE_DEP_PATHS:
        if not root.exists():
            log_event(report, "dependency_path_absent", package=pkg, root=str(root))
            continue
        wheel = root / exact.get(pkg, '')
        try:
            if wheel.exists():
                cmd = [py, '-m', 'pip', 'install', '--disable-pip-version-check', '--no-index', str(wheel)]
            else:
                cmd = [py, '-m', 'pip', 'install', '--disable-pip-version-check', '--no-index', f'--find-links={root}', pkg]
            log_event(report, "dependency_install_attempt", package=pkg, root=str(root), cmd=cmd)
            subprocess.check_call(cmd)
            if pkg_available(pkg):
                log_event(report, "dependency_install_ok", package=pkg, root=str(root))
                return {"package": pkg, "status": "installed", "root": str(root)}
        except Exception as exc:
            last = str(exc)
            log_event(report, "dependency_install_fail", package=pkg, root=str(root), error=last)

    return {"package": pkg, "status": "missing", "error": last}


def scan_headers_and_functions(report: Dict[str, Any]) -> Dict[str, Any]:
    modules, headers, funcs_total = [], 0, 0
    for d in SOURCE_SCAN_DIRS:
        if not d.exists() or not d.is_dir():
            log_event(report, "source_dir_missing", directory=str(d))
            continue
        for h in sorted(d.glob('*.h')):
            lines = h.read_text(encoding='utf-8', errors='replace').splitlines()
            funcs = []
            for ln in lines:
                m = FUNC_RE.match(ln)
                if m:
                    funcs.append({'return': m.group(1).strip(), 'name': m.group(2), 'args': m.group(3).strip()})
            modules.append({'header': str(h), 'function_count': len(funcs), 'functions': funcs[:30]})
            headers += 1
            funcs_total += len(funcs)
    log_event(report, "header_scan_done", headers=headers, functions=funcs_total)
    return {'summary': {'headers': headers, 'functions': funcs_total}, 'modules': modules}


def smoke_compile_c_modules(report: Dict[str, Any]) -> List[Dict[str, Any]]:
    gcc = shutil.which('gcc')
    if not gcc:
        if REQUIRE_NATIVE:
            raise RuntimeError('gcc_not_found in strict native mode')
        log_event(report, "smoke_skipped", reason="gcc_not_found")
        return [{'status': 'skipped', 'reason': 'gcc_not_found'}]

    candidates = [
        Path('/kaggle/working/src/lum/lum_core.c'), Path('/kaggle/working/src/file_formats/lum_native_universal_format.c'),
        Path('/kaggle/working/src/file_formats/lum_native_file_handler.c'), Path('/kaggle/working/src/file_formats/lum_secure_serialization.c'),
        Path('/kaggle/working/src/vorax/vorax_3d_volume.c'), Path('/kaggle/working/src/vorax/vorax_operations.c'),
        Path('src/lum/lum_core.c'), Path('src/file_formats/lum_native_universal_format.c'),
        Path('src/file_formats/lum_native_file_handler.c'), Path('src/file_formats/lum_secure_serialization.c'),
        Path('src/vorax/vorax_3d_volume.c'), Path('src/vorax/vorax_operations.c'),
    ]

    existing = [c for c in candidates if c.exists()]
    if not existing:
        if REQUIRE_NATIVE:
            raise RuntimeError('no_c_sources_found in strict native mode')
        log_event(report, "smoke_skipped", reason="no_c_sources_found")
        return [{'status': 'skipped', 'reason': 'no_c_sources_found'}]

    out = []
    for c in existing:
        cmd = [gcc, '-fsyntax-only', str(c), '-I', str(Path('src').resolve())]
        t0 = now_ns()
        r = subprocess.run(cmd, capture_output=True, text=True)
        out.append({'file': str(c), 'status': 'ok' if r.returncode == 0 else 'fail', 'returncode': r.returncode, 'stderr_head': (r.stderr or '')[:2000]})
        log_event(report, "c_smoke_result", file=str(c), returncode=r.returncode, elapsed_ns=now_ns() - t0)

    failures = [x for x in out if x['status'] != 'ok']
    if failures and REQUIRE_NATIVE:
        raise RuntimeError(f'c_syntax_smoke_failed:{len(failures)}')

    return out


def normalize_volume(arr):
    import numpy as np
    if arr.ndim == 2:
        arr = arr[np.newaxis, :, :]
    if arr.ndim != 3:
        raise ValueError(f'Expected 2D/3D volume, got {arr.shape}')
    return np.ascontiguousarray(arr.astype(np.float32, copy=False))


def encode_lum_v1(arr3d):
    z, h, w = arr3d.shape
    payload = arr3d.tobytes()
    digest16 = sha512(payload).digest()[:16]
    header = struct.pack('<8sIII16s', LUM_MAGIC, z, h, w, digest16)
    return header + payload


def decode_lum_v1(blob: bytes):
    import numpy as np
    hs = struct.calcsize('<8sIII16s')
    magic, z, h, w, digest16 = struct.unpack('<8sIII16s', blob[:hs])
    if magic != LUM_MAGIC:
        raise ValueError('invalid LUM magic')
    payload = blob[hs:]
    expected = int(z) * int(h) * int(w) * 4
    if len(payload) != expected:
        raise ValueError('payload size mismatch')
    if sha512(payload).digest()[:16] != digest16:
        raise ValueError('payload checksum mismatch')
    return np.frombuffer(payload, dtype=np.float32).reshape((z, h, w))




def competitor_teacher_asset_test(report: Dict[str, Any]) -> Dict[str, Any]:
    candidates = []
    for root in KAGGLE_DEP_PATHS:
        candidates.append(root / 'competitor_teacher_1407735.tif')
        candidates.append(root / 'competitor_teacher_1407735.lum')

    found = []
    for c in candidates:
        if c.exists():
            b = c.read_bytes()
            item = {
                'path': str(c),
                'size': len(b),
                'sha256': sha256(b).hexdigest(),
                'sha512': sha512(b).hexdigest(),
            }
            found.append(item)
            log_event(report, 'competitor_asset_found', **item)

    if not found:
        log_event(report, 'competitor_assets_missing')
        return {'status': 'fail', 'reason': 'competitor_teacher_assets_not_found'}

    has_tif = any(x['path'].lower().endswith('.tif') for x in found)
    has_lum = any(x['path'].lower().endswith('.lum') for x in found)
    return {'status': 'ok' if (has_tif and has_lum) else 'fail', 'has_tif': has_tif, 'has_lum': has_lum, 'assets': found}

def tiff_lum_roundtrip_test(report: Dict[str, Any]) -> Dict[str, Any]:
    import numpy as np
    import tifffile

    tifffile = importlib.reload(tifffile)

    with tempfile.TemporaryDirectory() as td:
        td = Path(td)
        tif_path = td / 'synthetic_teacher.tif'
        lum_path = td / 'synthetic_teacher.lum'

        rng = np.random.default_rng(42)
        vol = (rng.random((8, 32, 32)) > 0.87).astype(np.uint8)

        write_errors = []
        used = None
        compressions = [('LZW', 'LZW')]
        if not STRICT_NO_FALLBACK:
            compressions.extend([('ADOBE_DEFLATE', 'ADOBE_DEFLATE'), ('NONE', None)])

        for tag, comp in compressions:
            try:
                tifffile.imwrite(tif_path, vol, compression=comp)
                used = tag
                log_event(report, "tiff_write_ok", compression=tag)
                break
            except Exception as exc:
                write_errors.append({'attempt': tag, 'error': str(exc)})
                log_event(report, "tiff_write_fail", compression=tag, error=str(exc))

        if used is None:
            raise RuntimeError(f"tiff_write_failed: {write_errors}")

        arr3d = normalize_volume(np.asarray(tifffile.imread(tif_path)))
        blob = encode_lum_v1(arr3d)
        lum_path.write_bytes(blob)
        restored = decode_lum_v1(blob)

        slice_logs = []
        for z in range(restored.shape[0]):
            s = restored[z]
            slice_logs.append({'z': z, 'min': float(s.min()), 'max': float(s.max()), 'mean': float(s.mean()), 'std': float(s.std()), 'nonzero_pct': float((s > 0).mean() * 100.0)})

        return {
            'status': 'ok',
            'strict_no_fallback': STRICT_NO_FALLBACK,
            'shape': [int(x) for x in restored.shape],
            'dtype': str(restored.dtype),
            'global_min': float(restored.min()), 'global_max': float(restored.max()), 'global_mean': float(restored.mean()),
            'global_nonzero_pct': float((restored > 0).mean() * 100.0),
            'slice_logs': slice_logs,
            'roundtrip_ok': bool(np.allclose(arr3d, restored, atol=0.0)),
            'tiff_sha512': sha512(tif_path.read_bytes()).hexdigest(),
            'lum_sha512': sha512(lum_path.read_bytes()).hexdigest(),
            'tiff_sha256': sha256(tif_path.read_bytes()).hexdigest(),
            'lum_sha256': sha256(lum_path.read_bytes()).hexdigest(),
            'write_compression_used': used,
            'forensic_write': {'write_errors': write_errors, 'write_compression_used': used},
        }


def main() -> None:
    t0 = now_ns()
    report: Dict[str, Any] = {
        'report_name': 'lumvorax_dependency_360_kaggle_single_cell_v3_complete',
        'timestamp_ns': now_ns(),
        'runtime': {
            'python': sys.version,
            'platform': platform.platform(),
            'cwd': str(Path.cwd()),
            'is_kaggle': Path('/kaggle').exists(),
        },
        'policy': {
            'strict_no_fallback': STRICT_NO_FALLBACK,
            'require_native': REQUIRE_NATIVE,
        },
        'events': [],
    }

    try:
        for pkg in ('numpy', 'tifffile', 'imagecodecs'):
            report.setdefault('install_report', []).append(install_offline_if_missing(pkg, report))

        report['imports'] = {p: pkg_available(p) for p in ('numpy', 'tifffile', 'imagecodecs', 'pyarrow')}
        log_event(report, "imports_checked", imports=report['imports'])

        report['dependency_dataset_paths'] = [
            {'path': str(p), 'exists': p.exists(), 'files_count': (len(list(p.iterdir())) if p.exists() and p.is_dir() else 0)}
            for p in KAGGLE_DEP_PATHS
        ]
        log_event(report, "dataset_paths_checked", paths=report['dependency_dataset_paths'])

        report['module_inventory'] = scan_headers_and_functions(report)
        if REQUIRE_NATIVE and report['module_inventory']['summary']['headers'] <= 0:
            raise RuntimeError('no_native_headers_found')

        report['c_syntax_smoke'] = smoke_compile_c_modules(report)
        report['competitor_teacher_assets_test'] = competitor_teacher_asset_test(report)
        if report['competitor_teacher_assets_test'].get('status') != 'ok':
            raise RuntimeError('competitor_teacher_assets_missing_or_incomplete')
        report['tiff_lum_roundtrip_test'] = tiff_lum_roundtrip_test(report)

        report['status'] = 'ok'
    except Exception as exc:
        report['status'] = 'fail'
        report['error_type'] = type(exc).__name__
        report['error'] = str(exc)
        log_event(report, "fatal_error", error_type=type(exc).__name__, error=str(exc))

    report['elapsed_ns'] = now_ns() - t0
    report['elapsed_s'] = report['elapsed_ns'] / 1_000_000_000

    out = Path('/kaggle/working/lumvorax_360_validation_report_v3_complete.json')
    if not out.parent.exists():
        out = Path('lumvorax_360_validation_report_v3_complete.json')
    out.write_text(json.dumps(report, indent=2), encoding='utf-8')

    print(json.dumps({
        'status': report.get('status'),
        'error_type': report.get('error_type'),
        'error': report.get('error'),
        'report': str(out),
        'module_headers': report.get('module_inventory', {}).get('summary', {}).get('headers'),
        'public_functions': report.get('module_inventory', {}).get('summary', {}).get('functions'),
        'roundtrip_status': report.get('tiff_lum_roundtrip_test', {}).get('status') if isinstance(report.get('tiff_lum_roundtrip_test'), dict) else None,
        'write_compression_used': report.get('tiff_lum_roundtrip_test', {}).get('write_compression_used') if isinstance(report.get('tiff_lum_roundtrip_test'), dict) else None,
        'events_count': len(report.get('events', [])),
        'elapsed_ns': report.get('elapsed_ns'),
    }, indent=2))

    if report.get('status') != 'ok':
        raise SystemExit(2)


if __name__ == '__main__':
    main()
