# Platform API Testing Notebook

**Last Updated**: 14 JAN 2026

Complete workflow examples for publishing, accessing, and unpublishing geospatial data.

## Table of Contents

### Part 1: Diagnostics
- 1.1 Liveness Probe (`/livez`)
- 1.2 Readiness Probe (`/readyz`)
- 1.3 Full Health Check (`/health`)
- 1.4 Database Health
- 1.5 Container Check

### Part 2: Vector Workflow
- 2.1 Publish Vector
- 2.2 Access Vector (OGC Features API)
- 2.3 Unpublish Vector

### Part 3: Single Raster Workflow
- 3.1 Publish Single Raster
- 3.2 Access Raster (STAC API)
- 3.3 Unpublish Raster

### Part 4: Raster Collection Workflow
- 4.1 Publish Raster Collection
- 4.2 Access Collection (STAC API)
- 4.3 Unpublish Collection

### Part 5: Large Raster Workflow
- 5.1 Publish Large Raster (>800 MB)

### Part 6: Validation & Error Handling
- 6.1 Size/Count Limit Rejections

### Part 7: TiTiler Tile Server
- 7.1 COG Metadata & Info
- 7.2 Interactive Viewer
- 7.3 TileJSON & XYZ Tiles
- 7.4 Rescale & Colormap
- 7.5 Multi-Band Selection (bidx)
- 7.6 Statistics & Point Queries

### Appendices
- Appendix A: Manual Job Status Check
- Appendix B: Platform Request Status Check
- Appendix C: Job Resubmit (Recovery)

---

## Size Routing Summary

| Data Type | Size/Count | Job Type | Notes |
|-----------|------------|----------|-------|
| Single Raster | ≤800 MB | `process_raster_v2` | Standard COG conversion |
| Single Raster | 100 MB - 30 GB | `process_large_raster_v2` | Tiled COG workflow |
| Raster Collection | ≤20 files, each ≤800 MB | `process_raster_collection_v2` | MosaicJSON |
| Vector | Any size | `process_vector` | PostGIS + STAC |

---
## Setup & Configuration

**All environment and component names are defined here.** Update this cell to point to different environments.

In [None]:
import requests
import json
import time
import urllib.parse
from IPython.display import Markdown, display

# =============================================================================
# ENVIRONMENT CONFIGURATION
# =============================================================================
# Update these values to point to different environments (DEV/QA/UAT/PROD)

ENVIRONMENT = "DEV"  # DEV, QA, UAT, PROD

# Function App Base URLs
BASE_URL = "https://rmhazuregeoapi-a3dma3ctfdgngwf6.eastus-01.azurewebsites.net"
OGC_STAC_URL = "https://rmhogcstac-b4f5ccetf0a7hwe9.eastus-01.azurewebsites.net"

# TiTiler Tile Server (Vanilla TiTiler with /vsiaz/ direct access)
TITILER_URL = "https://rmhtitiler-ghcyd7g0bxdvc2hc.eastus-01.azurewebsites.net"

# =============================================================================
# STORAGE CONFIGURATION
# =============================================================================

# Bronze tier: Raw input data
BRONZE_CONTAINER = "rmhazuregeobronze"

# Silver tier: Processed outputs
SILVER_COGS_CONTAINER = "silver-cogs"
SILVER_FATHOM_CONTAINER = "silver-fathom"

# Azure Storage Account (for /vsiaz/ paths)
STORAGE_ACCOUNT = "rmhazuregeo"

# =============================================================================
# DATABASE CONFIGURATION
# =============================================================================

# PostGIS schemas
GEO_SCHEMA = "geo"        # Vector data
APP_SCHEMA = "app"        # Jobs, tasks, metadata
PGSTAC_SCHEMA = "pgstac"  # STAC catalog
H3_SCHEMA = "h3"          # H3 analytics

# =============================================================================
# STAC CONFIGURATION
# =============================================================================

# Default collections
RASTER_COLLECTION = "system-rasters"   # Single rasters
VECTOR_COLLECTION = "system-vectors"   # Vectors

# =============================================================================
# TEST DATA CONFIGURATION
# =============================================================================

# Test vector file
TEST_VECTOR_FILE = "8.geojson"
TEST_VECTOR_DATASET = "test-vectors"
TEST_VECTOR_RESOURCE = "geojson-8"
TEST_VECTOR_VERSION = "v1"

# Test raster file (small, <800 MB)
TEST_RASTER_FILE = "dctest.tif"
TEST_RASTER_DATASET = "test-raster-notebook"
TEST_RASTER_RESOURCE = "dctest"
TEST_RASTER_VERSION = "v1"

# Test large raster (>800 MB)
TEST_LARGE_RASTER_FILE = "antigua.tif"
TEST_LARGE_RASTER_DATASET = "test-large-raster"
TEST_LARGE_RASTER_RESOURCE = "antigua"
TEST_LARGE_RASTER_VERSION = "v1"

# Test raster collection
TEST_COLLECTION_FILES = [
    "namangan/namangan14aug2019_R1C1cog.tif",
    "namangan/namangan14aug2019_R1C2cog.tif",
    "namangan/namangan14aug2019_R2C1cog.tif",
    "namangan/namangan14aug2019_R2C2cog.tif"
]
TEST_COLLECTION_DATASET = "namangan-imagery"
TEST_COLLECTION_RESOURCE = "aug2019"
TEST_COLLECTION_VERSION = "v1"

# Test COG for TiTiler (already processed, in silver-cogs)
TEST_COG_FILE = "05APR13082706_cog_analysis.tif"

# =============================================================================
# HELPER FUNCTIONS
# =============================================================================

def api_call(method, endpoint, data=None, params=None, timeout=30, base_url=None):
    """Make API call and return formatted response."""
    url = f"{base_url or BASE_URL}{endpoint}"
    headers = {"Content-Type": "application/json"}
    
    print(f"\n{'='*60}")
    print(f"{method} {endpoint}")
    print(f"{'='*60}")
    
    if data:
        print(f"\nRequest Body:")
        print(json.dumps(data, indent=2))
    
    try:
        if method == "GET":
            response = requests.get(url, params=params, timeout=timeout)
        elif method == "POST":
            response = requests.post(url, json=data, headers=headers, timeout=timeout)
        elif method == "DELETE":
            response = requests.delete(url, params=params, timeout=timeout)
        else:
            raise ValueError(f"Unsupported method: {method}")
        
        print(f"\nStatus: {response.status_code}")
        
        try:
            result = response.json()
            print(f"\nResponse:")
            print(json.dumps(result, indent=2, default=str))
            return result
        except:
            print(f"\nResponse (text): {response.text[:500]}")
            return response.text
            
    except requests.exceptions.Timeout:
        print(f"\nRequest timed out (timeout={timeout}s)")
        return None
    except Exception as e:
        print(f"\nError: {e}")
        return None


def check_job_status(job_id, max_polls=20, poll_interval=5):
    """Poll job status until completion or timeout."""
    print(f"\n{'='*60}")
    print(f"Polling job: {job_id}")
    print(f"{'='*60}")
    
    for i in range(max_polls):
        result = requests.get(f"{BASE_URL}/api/jobs/status/{job_id}", timeout=30).json()
        status = result.get("status", "unknown")
        stage = result.get("stage", "?")
        total = result.get("totalStages", "?")
        
        print(f"  [{i+1}/{max_polls}] Status: {status}, Stage: {stage}/{total}")
        
        if status in ["completed", "failed"]:
            print(f"\nFinal Result:")
            print(json.dumps(result, indent=2, default=str))
            return result
        
        time.sleep(poll_interval)
    
    print(f"\nPolling timeout after {max_polls * poll_interval}s")
    return result


def derive_table_name(dataset_id, resource_id, version_id):
    """Derive PostGIS table name from DDH identifiers."""
    return f"{dataset_id}_{resource_id}_{version_id}".replace("-", "_")


def derive_stac_item_id(dataset_id, resource_id, version_id):
    """Derive STAC item ID from DDH identifiers."""
    return f"{dataset_id}-{resource_id}-{version_id}".replace("_", "-")


def show_links(links_dict):
    """Display dynamic links as clickable markdown.
    
    Usage: show_links({"Submit Vector": "/api/interface/submit-vector", ...})
    """
    md = "**Quick Links:**\n"
    for name, path in links_dict.items():
        if path.startswith("http"):
            md += f"- [{name}]({path})\n"
        else:
            md += f"- [{name}]({BASE_URL}{path})\n"
    display(Markdown(md))


def vsiaz_path(container, blob_path):
    """Build /vsiaz/ path for GDAL access to Azure Blob Storage."""
    return f"/vsiaz/{container}/{blob_path}"


def titiler_url(endpoint, container, blob_path, **params):
    """Build TiTiler URL with properly encoded /vsiaz/ path.
    
    Args:
        endpoint: TiTiler endpoint (e.g., '/cog/info', '/cog/tiles/{z}/{x}/{y}')
        container: Azure container name
        blob_path: Path to blob within container
        **params: Additional query parameters (rescale, colormap_name, bidx, etc.)
    
    Returns:
        Full TiTiler URL with encoded parameters
    """
    vsiaz = vsiaz_path(container, blob_path)
    encoded_url = urllib.parse.quote(vsiaz, safe='')
    
    url = f"{TITILER_URL}{endpoint}?url={encoded_url}"
    
    for key, value in params.items():
        if value is not None:
            url += f"&{key}={value}"
    
    return url


# =============================================================================
# DISPLAY CONFIGURATION
# =============================================================================

print("=" * 60)
print(f"PLATFORM API TESTING - {ENVIRONMENT} ENVIRONMENT")
print("=" * 60)
print(f"\nEndpoints:")
print(f"  Platform API:    {BASE_URL}")
print(f"  OGC/STAC API:    {OGC_STAC_URL}")
print(f"  TiTiler:         {TITILER_URL}")
print(f"\nStorage:")
print(f"  Bronze:          {BRONZE_CONTAINER}")
print(f"  Silver COGs:     {SILVER_COGS_CONTAINER}")
print(f"  Account:         {STORAGE_ACCOUNT}")
print(f"\nSchemas:")
print(f"  Geo:             {GEO_SCHEMA}")
print(f"  STAC:            {PGSTAC_SCHEMA}")
print(f"\nTest Data:")
print(f"  Vector:          {TEST_VECTOR_FILE}")
print(f"  Raster:          {TEST_RASTER_FILE}")
print(f"  Large Raster:    {TEST_LARGE_RASTER_FILE}")
print(f"  Collection:      {len(TEST_COLLECTION_FILES)} files")
print(f"  Test COG:        {TEST_COG_FILE}")
print("=" * 60)

---
# Part 1: Diagnostics

## Probe Comparison

| Probe | Endpoint | Speed | Checks | Use Case |
|-------|----------|-------|--------|----------|
| **Liveness** | `/api/livez` | ~1ms | Process alive | Platform restart decisions |
| **Readiness** | `/api/readyz` | ~100-500ms | DB + Service Bus | Load balancer routing |
| **Health** | `/api/health` | ~30-60s | Everything | Manual debugging |

System health checks and diagnostic endpoints.

In [None]:
# Execute cell for links to UI Dashboard
show_links({
    "System Health": "/api/interface/health",
    "Pipelines": "/api/interface/pipeline",
    "Service Bus Queues": "/api/interface/queues",
    "Databases": "/api/interface/database",
    "Storage Browser": "/api/interface/storage",
    "OGC Feature Collections Gallery": "/api/interface/vector",
    "STAC Collections Gallery": "/api/interface/stac"
})

## 1.1 Liveness Probe (`/livez`)

**Purpose:** Is the application process alive and running?

This is the **fastest** probe (~1ms). Azure and Kubernetes use this to determine if the container needs to be **restarted**. It only checks that the Python process is responding - no database, no storage, no external dependencies.

**When it fails:** The container/instance should be killed and replaced.

**Use case:** Continuous monitoring by the platform (every 10-30 seconds).

In [None]:
# Liveness Probe - Is the process alive? (~1ms)
result = api_call("GET", "/api/livez", timeout=5)

if result and isinstance(result, dict):
    status = result.get('status', 'unknown')
    message = result.get('message', '')
    print(f"\nLiveness: {'ALIVE' if status == 'alive' else 'DEAD'}")
    print(f"Message: {message}")

## 1.2 Readiness Probe (`/readyz`)

**Purpose:** Is the application ready to handle requests?

This is a **fast** probe (~100-500ms). It verifies that critical dependencies are available:
- Database connection (can we reach PostgreSQL?)
- Service Bus connection (can we queue jobs?)

**When it fails:** The instance should be **removed from the load balancer** but NOT restarted. Traffic is routed to other healthy instances while this one recovers.

**Use case:** Load balancer health checks, deployment validation.

In [None]:
# Readiness Probe - Are dependencies available? (~100-500ms)
result = api_call("GET", "/api/readyz", timeout=10)

if result and isinstance(result, dict):
    status = result.get('status', 'unknown')
    message = result.get('message', '')
    summary = result.get('summary', {})
    
    print(f"\nReadiness: {'READY' if status == 'ready' else 'NOT READY'}")
    print(f"Message: {message}")
    
    if summary:
        passed = summary.get('checks_passed', 0)
        failed = summary.get('checks_failed', 0)
        print(f"Checks: {passed} passed, {failed} failed")
        
        if summary.get('failed_check_names'):
            print(f"Failed: {', '.join(summary['failed_check_names'])}")

## 1.3 Full Health Check (`/health`)

**Purpose:** Comprehensive system diagnostics for operators and debugging.

This is a **slow** probe (~30-60s). It performs exhaustive checks of ALL system components:
- Database connectivity AND query execution
- Service Bus queue status (all 4 queues)
- Storage account access
- PgSTAC catalog health
- Schema inspection
- PostgreSQL function tests

**When to use:** Manual debugging, deployment verification, incident investigation. NOT for automated health checks (too slow, too much load).

**Use case:** "Why isn't my job processing?" - run this to see what's broken.

In [None]:
# Health Check (takes ~60s for full check)
result = api_call("GET", "/api/health", timeout=90)

if result and isinstance(result, dict):
    print(f"\n{'='*60}")
    print("HEALTH SUMMARY")
    print(f"{'='*60}")
    print(f"Overall Status: {result.get('status', 'unknown').upper()}")
    print(f"Version: {result.get('version', 'unknown')}")
    
    components = result.get('components', {})
    print(f"\nComponents ({len(components)}):")
    for name, component in components.items():
        # Each component is a dict with 'status' field
        if isinstance(component, dict):
            status = component.get('status', 'unknown')
            icon = "OK" if status == "healthy" else ("WARN" if status == "disabled" else "FAIL")
        else:
            icon = "?"
        print(f"  {name}: {icon}")

## 1.4 Database Health

Database connection pool, size, and maintenance status.

In [None]:
# Database Health
result = api_call("GET", "/api/dbadmin/health")

if result and isinstance(result, dict):
    print(f"\n{'='*60}")
    print("DATABASE HEALTH")
    print(f"{'='*60}")
    print(f"Status: {result.get('status', 'unknown').upper()}")
    
    # Connection pool
    pool = result.get('connection_pool', {})
    if pool:
        print(f"\nConnection Pool:")
        print(f"  Active: {pool.get('active', 0)}/{pool.get('total', 0)}")
        print(f"  Utilization: {pool.get('utilization_percent', 0)}%")
    
    # Database size
    size = result.get('database_size', {})
    if size:
        print(f"\nDatabase Size:")
        print(f"  Total: {size.get('total', 'unknown')}")
        print(f"  App Schema: {size.get('app_schema', 'unknown')}")
        print(f"  Geo Schema: {size.get('geo_schema', 'unknown')}")
    
    # Health checks
    checks = result.get('checks', [])
    if checks:
        print(f"\nHealth Checks:")
        for check in checks:
            status = check.get('status', 'unknown')
            icon = "OK" if status == "healthy" else ("WARN" if status == "warning" else "FAIL")
            print(f"  {check.get('name', '?')}: {icon} - {check.get('message', '')}")

### 1.4b Query Jobs

Query jobs from the database with optional filters.

**Endpoint:** `GET /api/dbadmin/jobs`

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `status` | query | No | Filter by status: `queued`, `processing`, `completed`, `failed` |
| `job_type` | query | No | Filter by job type (e.g., `process_vector`, `process_raster_v2`) |
| `limit` | query | No | Max jobs to return (default: 50) |
| `hours` | query | No | Only jobs from last N hours |

In [None]:
# Query recent failed jobs
result = api_call("GET", "/api/dbadmin/jobs", params={"status": "failed", "limit": 5})

if result and isinstance(result, dict):
    jobs = result.get('jobs', [])
    print(f"\nFound {len(jobs)} failed jobs")
    for job in jobs:
        job_id = job.get('job_id', 'unknown')[:16]
        job_type = job.get('job_type', 'unknown')
        error = job.get('error_details', 'No error details')
        if error and len(error) > 60:
            error = error[:60] + "..."
        print(f"  - {job_id}... ({job_type})")
        print(f"    Error: {error}")

## 1.5 Container Check

Synchronous endpoint to list blobs in a container. Returns immediately (no job queue).

**Endpoint:** `GET /api/containers/{container_name}/blobs`

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `container_name` | path | Yes | Storage container name (e.g., `rmhazuregeobronze`) |
| `suffix` | query | No | Filter by file extension (e.g., `.tif`, `.geojson`) |
| `prefix` | query | No | Filter by blob path prefix (e.g., `namangan/`) |
| `metadata` | query | No | `true` (default) returns full blob info, `false` returns just names |
| `limit` | query | No | Max blobs to return (default: 500, max: 10000) |

In [None]:
# List TIF files in bronze container
result = api_call("GET", f"/api/containers/{BRONZE_CONTAINER}/blobs", 
                  params={"suffix": ".tif", "limit": 10, "metadata": "true"})

if result and isinstance(result, dict):
    count = result.get("count", 0)
    blobs = result.get("blobs", [])
    print(f"\nFound {count} TIF files")
    if blobs:
        total_mb = sum(b.get("size_mb", 0) for b in blobs)
        print(f"Total size (first {len(blobs)}): {total_mb:.2f} MB")

In [None]:
# List GeoJSON files in bronze container
result = api_call("GET", f"/api/containers/{BRONZE_CONTAINER}/blobs", 
                  params={"suffix": ".geojson", "limit": 10, "metadata": "true"})

if result and isinstance(result, dict):
    count = result.get("count", 0)
    print(f"\nFound {count} GeoJSON files")

---
# Part 2: Vector Workflow

Complete publish → access → unpublish cycle for vector data.

In [None]:
# Execute cell for links to UI Dashboard
show_links({
    "Submit Vector (Web UI)": "/api/interface/submit-vector",
    "Unpublish Vector (Web UI)": "/api/interface/unpublish-vector",
    "OGC Features Collections": f"{OGC_STAC_URL}/api/features/collections",
    "Swagger UI": "/api/docs"
})

## 2.1 Publish Vector

Submit a vector file (GeoJSON, Shapefile, GeoPackage) for ingestion into PostGIS.

**Endpoint:** `POST /api/platform/submit`

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `dataset_id` | body | Yes | Top-level identifier (e.g., `world-borders`) |
| `resource_id` | body | Yes | Resource identifier (e.g., `admin-boundaries`) |
| `version_id` | body | Yes | Version identifier (e.g., `v1`, `2024-01`) |
| `container_name` | body | Yes | Source container with raw file |
| `file_name` | body | Yes | Blob path to vector file (`.geojson`, `.shp.zip`, `.gpkg`) |
| `service_name` | body | No | Human-readable name for the dataset |
| `description` | body | No | Dataset description |
| `access_level` | body | No | Access classification (default: `OUO`) |

**Output Naming:** Table name derived as `{dataset_id}_{resource_id}_{version_id}` (hyphens → underscores)

**Workflow:**
1. Stage 1: Parse and chunk the file
2. Stage 2: Upload chunks to PostGIS (parallel)
3. Stage 3: Create STAC item

In [None]:
# Submit Vector via Platform API
vector_request = {
    "dataset_id": TEST_VECTOR_DATASET,
    "resource_id": TEST_VECTOR_RESOURCE,
    "version_id": TEST_VECTOR_VERSION,
    "container_name": BRONZE_CONTAINER,
    "file_name": TEST_VECTOR_FILE,
    "service_name": "Test Vector Data"
}

result = api_call("POST", "/api/platform/submit", vector_request)
vector_job_id = result.get("job_id") if result else None
vector_request_id = result.get("request_id") if result else None

print(f"\nRequest ID: {vector_request_id}")
print(f"Job ID: {vector_job_id}")

In [None]:
# Poll Vector Job Status
if vector_job_id:
    vector_result = check_job_status(vector_job_id)
else:
    print("No job_id - run previous cell first")

## 2.2 Access Vector (OGC Features API)

Query the published vector data using OGC API - Features.

**Base URL:** `OGC_STAC_URL` (dedicated OGC/STAC app)

### List Collections
**Endpoint:** `GET /api/features/collections`

### Query Features
**Endpoint:** `GET /api/features/collections/{collection}/items`

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `collection` | path | Yes | Table name (e.g., `test_vectors_geojson_8_v1`) |
| `limit` | query | No | Max features to return (default: 10, max: 10000) |
| `offset` | query | No | Skip first N features for pagination |
| `bbox` | query | No | Bounding box filter: `minx,miny,maxx,maxy` (WGS84) |
| `datetime` | query | No | Temporal filter (ISO 8601) |
| `properties` | query | No | Filter by property values |

**Example bbox:** `-71.0,-56.0,-70.0,-55.0`

In [None]:
# List OGC Feature Collections (via dedicated OGC/STAC app)
result = api_call("GET", "/api/features/collections", base_url=OGC_STAC_URL)

if result and isinstance(result, dict):
    collections = result.get("collections", [])
    print(f"\nFound {len(collections)} vector collections")
    for c in collections[:10]:
        print(f"  - {c.get('id', 'unknown')}")

In [None]:
# Query features from published vector
vector_table = derive_table_name(TEST_VECTOR_DATASET, TEST_VECTOR_RESOURCE, TEST_VECTOR_VERSION)
print(f"Querying table: {vector_table}")

result = api_call("GET", f"/api/features/collections/{vector_table}/items",
                  params={"limit": 5}, base_url=OGC_STAC_URL)

if result and isinstance(result, dict):
    features = result.get("features", [])
    print(f"\nReturned {len(features)} features")
    print(f"Total matched: {result.get('numberMatched', 'unknown')}")

In [None]:
# Spatial query with bounding box
# Format: minx,miny,maxx,maxy (WGS84)
bbox = "-71.0,-56.0,-70.0,-55.0"

result = api_call("GET", f"/api/features/collections/{vector_table}/items",
                  params={"bbox": bbox, "limit": 10}, base_url=OGC_STAC_URL)

if result and isinstance(result, dict):
    features = result.get("features", [])
    print(f"\nFeatures in bbox: {len(features)}")

## 2.3 Unpublish Vector

Remove vector data from the platform (PostGIS table + STAC item).

**Endpoint:** `POST /api/platform/unpublish/vector`

### Option 1: By DDH Identifiers (Preferred)
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `dataset_id` | body | Yes | Original dataset_id used during publish |
| `resource_id` | body | Yes | Original resource_id |
| `version_id` | body | Yes | Original version_id |
| `dry_run` | body | No | `true` (default) = preview only, `false` = actually delete |

### Option 2: By Request ID
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `request_id` | body | Yes | Original request_id from publish response |
| `dry_run` | body | No | `true` (default) = preview only |

### Option 3: Cleanup Mode (Direct table_name)
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `table_name` | body | Yes | Direct PostGIS table name |
| `schema_name` | body | No | Schema name (default: `geo`) |
| `dry_run` | body | No | `true` (default) = preview only |

**Important:** Always defaults to `dry_run=true` for safety.

In [None]:
# Unpublish Vector - DRY RUN (shows what would be deleted)
unpublish_request = {
    "dataset_id": TEST_VECTOR_DATASET,
    "resource_id": TEST_VECTOR_RESOURCE,
    "version_id": TEST_VECTOR_VERSION,
    "dry_run": True  # SAFETY: Set to False to actually delete
}

result = api_call("POST", "/api/platform/unpublish/vector", unpublish_request)

if result:
    print(f"\nDry Run: {result.get('dry_run', 'N/A')}")
    print(f"Table: {result.get('table_name', 'N/A')}")
    print(f"Would delete: {result.get('would_delete', 'N/A')}")

In [None]:
# Unpublish Vector - EXECUTE (uncomment to actually delete)
# unpublish_request = {
#     "dataset_id": TEST_VECTOR_DATASET,
#     "resource_id": TEST_VECTOR_RESOURCE,
#     "version_id": TEST_VECTOR_VERSION,
#     "dry_run": False  # DANGER: Will actually delete!
# }
# result = api_call("POST", "/api/platform/unpublish/vector", unpublish_request)
print("Uncomment and run to actually delete the vector data")

---
# Part 3: Single Raster Workflow

Complete publish → access → unpublish cycle for single raster files (≤800 MB).

In [None]:
# Execute cell for links to UI Dashboard
show_links({
    "Submit Single Raster (Web UI)": "/api/interface/submit-raster",
    "Unpublish Raster (Web UI)": "/api/interface/unpublish-raster",
    "STAC Collections": f"{OGC_STAC_URL}/api/collections",
    "STAC Search": f"{OGC_STAC_URL}/api/search",
    "Swagger UI": "/api/docs"
})

## 3.1 Publish Single Raster

Submit a single raster file for COG conversion and STAC cataloging.

**Endpoint:** `POST /api/platform/raster`

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `dataset_id` | body | Yes | Top-level identifier (e.g., `satellite-imagery`) |
| `resource_id` | body | Yes | Resource identifier (e.g., `nairobi-2024`) |
| `version_id` | body | Yes | Version identifier (e.g., `v1`) |
| `container_name` | body | Yes | Source container with raw raster |
| `file_name` | body | Yes | Blob path to raster file (`.tif`, `.img`, etc.) |
| `service_name` | body | No | Human-readable name |
| `description` | body | No | Dataset description |
| `access_level` | body | No | Access classification (default: `OUO`) |

**Output Naming:** STAC item ID = `{dataset_id}-{resource_id}-{version_id}`

**Size Routing:**
- ≤800 MB → Standard COG conversion (`process_raster_v2`)
- >800 MB → Auto-routes to tiled workflow (`process_large_raster_v2`)

**Workflow:**
1. Stage 1: Download and validate
2. Stage 2: Convert to COG
3. Stage 3: Create STAC item

In [None]:
# Submit Single Raster via Platform API
raster_request = {
    "dataset_id": TEST_RASTER_DATASET,
    "resource_id": TEST_RASTER_RESOURCE,
    "version_id": TEST_RASTER_VERSION,
    "container_name": BRONZE_CONTAINER,
    "file_name": TEST_RASTER_FILE,
    "service_name": "Test Raster Data",
    "access_level": "OUO"
}

result = api_call("POST", "/api/platform/raster", raster_request)
raster_job_id = result.get("job_id") if result else None
raster_request_id = result.get("request_id") if result else None

print(f"\nRequest ID: {raster_request_id}")
print(f"Job ID: {raster_job_id}")

In [None]:
# Poll Raster Job Status
if raster_job_id:
    raster_result = check_job_status(raster_job_id)
else:
    print("No job_id - run previous cell first")

## 3.2 Access Raster (STAC API)

Query the published raster data using STAC API.

**Base URL:** `OGC_STAC_URL` (dedicated OGC/STAC app)

### List Collections
**Endpoint:** `GET /api/collections`

### Get Specific Item
**Endpoint:** `GET /api/collections/{collection_id}/items/{item_id}`

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `collection_id` | path | Yes | STAC collection (e.g., `system-rasters`) |
| `item_id` | path | Yes | STAC item ID (e.g., `test-raster-notebook-dctest-v1`) |

### Search Items
**Endpoint:** `POST /api/search`

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `collections` | body | No | Array of collection IDs to search |
| `bbox` | body | No | Bounding box: `[minx, miny, maxx, maxy]` |
| `datetime` | body | No | Temporal filter (ISO 8601 interval) |
| `limit` | body | No | Max items to return (default: 10) |
| `ids` | body | No | Array of specific item IDs |
| `intersects` | body | No | GeoJSON geometry for spatial filter |

**Example search:**
```json
{"collections": ["system-rasters"], "bbox": [-180, -90, 180, 90], "limit": 10}
```

In [None]:
# List STAC Collections
result = api_call("GET", "/api/collections", base_url=OGC_STAC_URL)

if result and isinstance(result, dict):
    collections = result.get("collections", [])
    print(f"\nFound {len(collections)} STAC collections")
    for c in collections[:10]:
        print(f"  - {c.get('id', 'unknown')}")

In [None]:
# Get specific STAC item
stac_item_id = derive_stac_item_id(TEST_RASTER_DATASET, TEST_RASTER_RESOURCE, TEST_RASTER_VERSION)
print(f"Looking for STAC item: {stac_item_id}")

result = api_call("GET", f"/api/collections/{RASTER_COLLECTION}/items/{stac_item_id}",
                  base_url=OGC_STAC_URL)

if result and isinstance(result, dict):
    print(f"\nItem found: {result.get('id', 'unknown')}")
    assets = result.get('assets', {})
    print(f"Assets: {list(assets.keys())}")
    if 'data' in assets:
        print(f"COG URL: {assets['data'].get('href', 'N/A')[:80]}...")

In [None]:
# STAC Search
search_params = {
    "collections": [RASTER_COLLECTION],
    "limit": 10
}

result = api_call("POST", "/api/search", search_params, base_url=OGC_STAC_URL)

if result and isinstance(result, dict):
    features = result.get("features", [])
    print(f"\nFound {len(features)} items")
    for f in features[:5]:
        print(f"  - {f.get('id', 'unknown')}")

## 3.3 Unpublish Raster

Remove raster data from the platform (COG blobs + STAC item).

**Endpoint:** `POST /api/platform/unpublish/raster`

### Option 1: By DDH Identifiers (Preferred)
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `dataset_id` | body | Yes | Original dataset_id used during publish |
| `resource_id` | body | Yes | Original resource_id |
| `version_id` | body | Yes | Original version_id |
| `dry_run` | body | No | `true` (default) = preview only, `false` = actually delete |

### Option 2: By Request ID
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `request_id` | body | Yes | Original request_id from publish response |
| `dry_run` | body | No | `true` (default) = preview only |

### Option 3: Cleanup Mode (Direct STAC identifiers)
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `stac_item_id` | body | Yes | STAC item ID to delete |
| `collection_id` | body | Yes | STAC collection containing the item |
| `dry_run` | body | No | `true` (default) = preview only |

**Important:** Always defaults to `dry_run=true` for safety.

In [None]:
# Unpublish Raster - DRY RUN
unpublish_request = {
    "dataset_id": TEST_RASTER_DATASET,
    "resource_id": TEST_RASTER_RESOURCE,
    "version_id": TEST_RASTER_VERSION,
    "dry_run": True  # SAFETY: Set to False to actually delete
}

result = api_call("POST", "/api/platform/unpublish/raster", unpublish_request)

if result:
    print(f"\nDry Run: {result.get('dry_run', 'N/A')}")
    print(f"STAC Item: {result.get('stac_item_id', 'N/A')}")
    print(f"Collection: {result.get('collection_id', 'N/A')}")

In [None]:
# Unpublish Raster - EXECUTE (uncomment to actually delete)
# unpublish_request = {
#     "dataset_id": TEST_RASTER_DATASET,
#     "resource_id": TEST_RASTER_RESOURCE,
#     "version_id": TEST_RASTER_VERSION,
#     "dry_run": False  # DANGER: Will actually delete!
# }
# result = api_call("POST", "/api/platform/unpublish/raster", unpublish_request)
print("Uncomment and run to actually delete the raster data")

---
# Part 4: Raster Collection Workflow

Complete publish → access → unpublish cycle for multi-file raster collections.

In [None]:
# Execute cell for links to UI Dashboard
show_links({
    "Submit Raster Collection (Web UI)": "/api/interface/submit-raster-collection",
    "Unpublish Raster (Web UI)": "/api/interface/unpublish-raster",
    "STAC Collections": f"{OGC_STAC_URL}/api/collections",
    "STAC Search": f"{OGC_STAC_URL}/api/search",
    "Swagger UI": "/api/docs"
})

## 4.1 Publish Raster Collection

Submit multiple raster files as a unified collection with MosaicJSON.

**Endpoint:** `POST /api/platform/raster-collection`

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `dataset_id` | body | Yes | Top-level identifier (e.g., `namangan-imagery`) |
| `resource_id` | body | Yes | Resource identifier (e.g., `aug2019`) |
| `version_id` | body | Yes | Version identifier (e.g., `v1`) |
| `container_name` | body | Yes | Source container with raw rasters |
| `file_name` | body | Yes | **Array** of blob paths (2-20 files) |
| `service_name` | body | No | Human-readable name |
| `description` | body | No | Dataset description |
| `access_level` | body | No | Access classification (default: `OUO`) |

**Limits:**
- Min 2 files, Max 20 files per collection
- Each file must be ≤800 MB

**Output:** Creates a STAC collection with multiple items + MosaicJSON for unified tile access.

**Workflow:**
1. Stage 1: Validate all files (size, existence)
2. Stage 2: Convert each to COG (parallel)
3. Stage 3: Create MosaicJSON
4. Stage 4: Create STAC collection + items

In [None]:
# Submit Raster Collection via Platform API
collection_request = {
    "dataset_id": TEST_COLLECTION_DATASET,
    "resource_id": TEST_COLLECTION_RESOURCE,
    "version_id": TEST_COLLECTION_VERSION,
    "container_name": BRONZE_CONTAINER,
    "file_name": TEST_COLLECTION_FILES,
    "service_name": "Test Raster Collection",
    "access_level": "OUO"
}

result = api_call("POST", "/api/platform/raster-collection", collection_request)
collection_job_id = result.get("job_id") if result else None
collection_request_id = result.get("request_id") if result else None

print(f"\nRequest ID: {collection_request_id}")
print(f"Job ID: {collection_job_id}")
print(f"File Count: {result.get('file_count', 'N/A')}" if result else "")

In [None]:
# Poll Collection Job Status (longer timeout for multi-file)
if collection_job_id:
    collection_result = check_job_status(collection_job_id, max_polls=30, poll_interval=10)
else:
    print("No job_id - run previous cell first")

## 4.2 Access Collection (STAC API)

Query the published collection using STAC API.

**Base URL:** `OGC_STAC_URL` (dedicated OGC/STAC app)

### List Items in Collection
**Endpoint:** `GET /api/collections/{collection_id}/items`

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `collection_id` | path | Yes | Collection ID (e.g., `namangan-imagery-aug2019-v1`) |
| `limit` | query | No | Max items to return (default: 10) |
| `bbox` | query | No | Bounding box filter: `minx,miny,maxx,maxy` |

### Search Within Collection
**Endpoint:** `POST /api/search`

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `collections` | body | Yes | Array with collection ID |
| `bbox` | body | No | Bounding box: `[minx, miny, maxx, maxy]` |
| `limit` | body | No | Max items to return |

**Note:** Collections have MosaicJSON for unified tile serving via TiTiler.

In [None]:
# List items in the collection
collection_id = f"{TEST_COLLECTION_DATASET}-{TEST_COLLECTION_RESOURCE}-{TEST_COLLECTION_VERSION}"
print(f"Looking for collection: {collection_id}")

result = api_call("GET", f"/api/collections/{collection_id}/items",
                  params={"limit": 10}, base_url=OGC_STAC_URL)

if result and isinstance(result, dict):
    features = result.get("features", [])
    print(f"\nItems in collection: {len(features)}")
    for f in features:
        print(f"  - {f.get('id', 'unknown')}")

In [None]:
# Search within collection by bbox
search_params = {
    "collections": [collection_id],
    "bbox": [70.0, 40.0, 72.0, 42.0],  # Namangan area
    "limit": 10
}

result = api_call("POST", "/api/search", search_params, base_url=OGC_STAC_URL)

if result and isinstance(result, dict):
    features = result.get("features", [])
    print(f"\nItems in bbox: {len(features)}")

## 4.3 Unpublish Collection

Remove entire collection (all COGs + MosaicJSON + STAC collection + items).

**Endpoint:** `POST /api/platform/unpublish/raster`

Uses same endpoint as single raster unpublish. Parameters:

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `dataset_id` | body | Yes | Original dataset_id |
| `resource_id` | body | Yes | Original resource_id |
| `version_id` | body | Yes | Original version_id |
| `dry_run` | body | No | `true` (default) = preview only |

**Note:** Unpublishing a collection removes ALL items and the MosaicJSON.

In [None]:
# Unpublish Collection - DRY RUN
unpublish_request = {
    "dataset_id": TEST_COLLECTION_DATASET,
    "resource_id": TEST_COLLECTION_RESOURCE,
    "version_id": TEST_COLLECTION_VERSION,
    "dry_run": True
}

result = api_call("POST", "/api/platform/unpublish/raster", unpublish_request)

if result:
    print(f"\nDry Run: {result.get('dry_run', 'N/A')}")
    print(f"Collection: {result.get('collection_id', 'N/A')}")

---
# Part 5: Large Raster Workflow

Processing rasters larger than 800 MB (up to 30 GB).

In [None]:
# Execute cell for links to UI Dashboard
show_links({
    "Submit Raster (Web UI)": "/api/interface/submit-raster",
    "Unpublish Raster (Web UI)": "/api/interface/unpublish-raster",
    "STAC Collections": f"{OGC_STAC_URL}/api/collections",
    "Container Contents": f"/api/containers/{BRONZE_CONTAINER}/blobs?suffix=.tif&limit=20",
    "Swagger UI": "/api/docs"
})

## 5.1 Publish Large Raster (>800 MB)

Large rasters use a tiled processing workflow for files 100 MB - 30 GB.

**Endpoint:** `POST /api/platform/raster` (same as single raster - auto-routes based on size)

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `dataset_id` | body | Yes | Top-level identifier |
| `resource_id` | body | Yes | Resource identifier |
| `version_id` | body | Yes | Version identifier |
| `container_name` | body | Yes | Source container with raw raster |
| `file_name` | body | Yes | Blob path to large raster file |
| `service_name` | body | No | Human-readable name |
| `access_level` | body | No | Access classification (default: `OUO`) |

**Size Limits:**
- Min: 100 MB (smaller files use standard `process_raster_v2`)
- Max: 30 GB

**Processing Time:** Large rasters take 30+ minutes to process.

**Workflow:**
1. Stage 1: Generate tiling scheme
2. Stage 2: Extract tiles (sequential)
3. Stage 3: Create tile COGs (parallel)
4. Stage 4: Create MosaicJSON
5. Stage 5: Create STAC collection

**Output:** Creates a STAC collection with tile items + MosaicJSON (same as raster collection).

In [None]:
# Submit Large Raster via Platform API
# Platform auto-routes to process_large_raster_v2 based on file size

large_raster_request = {
    "dataset_id": TEST_LARGE_RASTER_DATASET,
    "resource_id": TEST_LARGE_RASTER_RESOURCE,
    "version_id": TEST_LARGE_RASTER_VERSION,
    "container_name": BRONZE_CONTAINER,
    "file_name": TEST_LARGE_RASTER_FILE,
    "service_name": "Test Large Raster",
    "access_level": "OUO"
}

result = api_call("POST", "/api/platform/raster", large_raster_request)
large_raster_job_id = result.get("job_id") if result else None

print(f"\nJob ID: {large_raster_job_id}")
print(f"Job Type: {result.get('job_type', 'N/A')}" if result else "")

In [None]:
# Poll Large Raster Job Status (long timeout)
if large_raster_job_id:
    large_raster_result = check_job_status(large_raster_job_id, max_polls=60, poll_interval=30)
else:
    print("No job_id - run previous cell first")

---
# Part 6: Validation & Error Handling

Examples of pre-flight validation rejecting invalid requests.

## 6.1 Size/Count Limit Rejections

The platform validates requests before processing:

- Single raster >800 MB → Rejected (use large raster endpoint)
- Collection >20 files → Rejected
- Collection with file >800 MB → Rejected
- Missing files → Rejected with list of missing

In [None]:
# Test: Single raster too large for standard processing
print("="*60)
print("TEST: Single raster >800 MB via standard endpoint")
print("="*60)

result = api_call("POST", "/api/jobs/submit/process_raster_v2", {
    "blob_name": TEST_LARGE_RASTER_FILE,
    "container_name": BRONZE_CONTAINER
})

if result and "error" in str(result).lower():
    print("\nCorrectly rejected - use process_large_raster_v2 instead")

In [None]:
# Test: Collection with too many files
print("="*60)
print("TEST: Collection exceeding 20 file limit")
print("="*60)

result = api_call("POST", "/api/jobs/submit/process_raster_collection_v2", {
    "container_name": BRONZE_CONTAINER,
    "blob_list": [f"file{i}.tif" for i in range(21)],
    "collection_id": "test-too-many"
})

if result and "error" in str(result).lower():
    print("\nCorrectly rejected - max 20 files per collection")

In [None]:
# Test: Collection with non-existent file
print("="*60)
print("TEST: Collection with missing file")
print("="*60)

result = api_call("POST", "/api/jobs/submit/process_raster_collection_v2", {
    "container_name": BRONZE_CONTAINER,
    "blob_list": [
        TEST_COLLECTION_FILES[0],  # exists
        "nonexistent_file_xyz123.tif"  # does not exist
    ],
    "collection_id": "test-missing"
})

if result and "error" in str(result).lower():
    print("\nCorrectly rejected - lists missing files")

---
## Appendix A: Manual Job Status Check

Check status of any CoreMachine job by job_id.

**Endpoint:** `GET /api/jobs/status/{job_id}`

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `job_id` | path | Yes | 64-character SHA256 job hash |

**Response includes:**
- `status`: `queued`, `processing`, `completed`, `failed`
- `stage` / `totalStages`: Current progress
- `taskSummary`: Count of tasks by status
- `resultData`: Output data (when completed)
- `error_details`: Error message (when failed)

In [None]:
# Manual Job Status Check
# Replace with your job_id
manual_job_id = "YOUR_JOB_ID_HERE"

if manual_job_id != "YOUR_JOB_ID_HERE":
    check_job_status(manual_job_id)
else:
    print("Replace 'YOUR_JOB_ID_HERE' with an actual job_id")

---
## Appendix B: Platform Request Status Check

Check status of any Platform request by request_id (DDH-friendly).

**Endpoint:** `GET /api/platform/status/{request_id}`

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `request_id` | path | Yes | 32-character request hash from Platform submission |

**Response includes:**
- `job_status`: Current job status
- `job_stage`: Current stage number
- `data_type`: `vector` or `raster`
- `task_summary`: Task completion counts
- `result_data`: Output URLs and metadata (when completed)

In [None]:
# Platform Status Check
# Replace with your request_id
manual_request_id = "YOUR_REQUEST_ID_HERE"

if manual_request_id != "YOUR_REQUEST_ID_HERE":
    result = api_call("GET", f"/api/platform/status/{manual_request_id}")
    if result and result.get("success"):
        print(f"\nStatus: {result.get('job_status', 'N/A')}")
        print(f"Stage: {result.get('job_stage', 'N/A')}")
else:
    print("Replace 'YOUR_REQUEST_ID_HERE' with an actual request_id")

---
## Appendix C: Job Resubmit (Recovery)

Clean reset and resubmit a job with the same parameters. Useful for:
- Failed jobs that need retry
- Orphaned jobs stuck in processing
- Jobs that completed but need to be re-run

**Endpoint:** `POST /api/jobs/{job_id}/resubmit`

### Path Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `job_id` | path | Yes | 64-character SHA256 job hash to resubmit |

### Request Body (Optional)
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `dry_run` | boolean | `false` | Preview cleanup plan without executing |
| `delete_blobs` | boolean | `false` | Also delete COG files from storage |
| `force` | boolean | `false` | Resubmit even if job is currently processing |

### Cleanup Actions Performed
| Job Type | Actions |
|----------|---------|
| **Vector** | Delete tasks → Drop PostGIS table → Delete STAC item → Delete job |
| **Raster** | Delete tasks → Delete STAC item → (optionally) Delete COGs → Delete job |

### Response
```json
{
    "success": true,
    "original_job_id": "abc123...",
    "new_job_id": "abc123...",
    "job_type": "process_raster_v2",
    "cleanup_summary": {
        "tasks_deleted": 5,
        "job_deleted": true,
        "tables_dropped": [],
        "stac_items_deleted": ["item-123"],
        "blobs_deleted": []
    },
    "message": "Job resubmitted successfully"
}
```

**Note:** Job IDs are deterministic (SHA256 of job_type + params), so resubmitting with same parameters generates the same job_id.

In [None]:
# Job Resubmit - DRY RUN (preview cleanup plan)
# Replace with your job_id
resubmit_job_id = "YOUR_JOB_ID_HERE"

if resubmit_job_id != "YOUR_JOB_ID_HERE":
    result = api_call("POST", f"/api/jobs/{resubmit_job_id}/resubmit", 
                      {"dry_run": True})
    
    if result and result.get("success"):
        print(f"\n{'='*60}")
        print("DRY RUN - Cleanup Plan")
        print(f"{'='*60}")
        print(f"Job Type: {result.get('job_type', 'N/A')}")
        print(f"Job Status: {result.get('job_status', 'N/A')}")
        
        plan = result.get("cleanup_plan", {})
        print(f"\nWould cleanup:")
        print(f"  Tasks to delete: {plan.get('tasks_to_delete', 0)}")
        print(f"  Tables to drop: {plan.get('tables_to_drop', [])}")
        print(f"  STAC items to delete: {plan.get('stac_items_to_delete', [])}")
        print(f"  Blobs to delete: {plan.get('blobs_to_delete', [])}")
else:
    print("Replace 'YOUR_JOB_ID_HERE' with an actual job_id")

In [None]:
# Job Resubmit - EXECUTE (uncomment to actually resubmit)
# This will:
#   1. Delete all tasks for the job
#   2. Drop PostGIS tables (vector jobs)
#   3. Delete STAC items
#   4. Delete job record
#   5. Resubmit with same parameters

# resubmit_job_id = "YOUR_JOB_ID_HERE"
# result = api_call("POST", f"/api/jobs/{resubmit_job_id}/resubmit", 
#                   {"dry_run": False})
# 
# if result and result.get("success"):
#     print(f"\nJob resubmitted!")
#     print(f"Original Job ID: {result.get('original_job_id', 'N/A')[:16]}...")
#     print(f"New Job ID: {result.get('new_job_id', 'N/A')[:16]}...")
#     print(f"\nCleanup Summary:")
#     summary = result.get("cleanup_summary", {})
#     print(f"  Tasks deleted: {summary.get('tasks_deleted', 0)}")
#     print(f"  Tables dropped: {summary.get('tables_dropped', [])}")
#     print(f"  STAC items deleted: {summary.get('stac_items_deleted', [])}")

print("Uncomment and run to actually resubmit the job")

In [None]:
# Job Resubmit - FORCE (for stuck/processing jobs)
# Use force=True to resubmit a job that is currently in 'processing' status
# WARNING: This may cause duplicate work if the original job is still running

# resubmit_job_id = "YOUR_JOB_ID_HERE"
# result = api_call("POST", f"/api/jobs/{resubmit_job_id}/resubmit", 
#                   {"dry_run": False, "force": True})

print("Uncomment and run to force resubmit a stuck job")

---
# Part 7: TiTiler Tile Server

Dynamic tile serving for Cloud-Optimized GeoTIFFs (COGs) stored in Azure Blob Storage.

## Architecture

TiTiler uses GDAL's `/vsiaz/` virtual filesystem for direct Azure Blob access:

```
Client Request → TiTiler → GDAL /vsiaz/ → Azure Blob Storage → COG Data → PNG/JPEG Tile
```

**No database required** - TiTiler can serve ANY COG in blob storage (cataloged or not).

## URL Encoding

The `/vsiaz/{container}/{blob}` path must be URL-encoded:

```python
# Raw path
/vsiaz/silver-cogs/myfile.tif

# URL-encoded
%2Fvsiaz%2Fsilver-cogs%2Fmyfile.tif
```

Use the `titiler_url()` helper function defined in the setup cell to build URLs automatically.

In [None]:
# Execute cell for links to UI Dashboard
show_links({
    "TiTiler API Docs": f"{TITILER_URL}/docs",
    "TiTiler Landing Page": f"{TITILER_URL}/",
    "COG Viewer (Test File)": titiler_url("/cog/viewer", SILVER_COGS_CONTAINER, TEST_COG_FILE),
    "STAC Collections Gallery": "/api/interface/stac"
})

## 7.1 COG Metadata & Info

Get metadata about a COG including bounds, CRS, band information, and data types.

**Endpoint:** `GET /cog/info`

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `url` | query | Yes | URL-encoded `/vsiaz/{container}/{blob}` path |

**Response includes:**
- `bounds`: Geographic extent [minx, miny, maxx, maxy]
- `crs`: Coordinate reference system (e.g., "EPSG:4326")
- `band_metadata`: List of band info (dtype, nodata, colorinterp)
- `width` / `height`: Raster dimensions in pixels
- `overviews`: Available overview levels

In [None]:
# Get COG Info - metadata about the raster
info_url = titiler_url("/cog/info", SILVER_COGS_CONTAINER, TEST_COG_FILE)
print(f"Request URL:\n{info_url}\n")

response = requests.get(info_url, timeout=30)
print(f"Status: {response.status_code}")

if response.status_code == 200:
    info = response.json()
    print(f"\n{'='*60}")
    print("COG INFO")
    print(f"{'='*60}")
    print(f"Bounds: {info.get('bounds', 'N/A')}")
    print(f"CRS: {info.get('crs', 'N/A')}")
    print(f"Size: {info.get('width', '?')} x {info.get('height', '?')} pixels")
    print(f"Bands: {info.get('count', '?')}")
    print(f"Data Type: {info.get('dtype', 'N/A')}")
    print(f"Overviews: {info.get('overviews', [])}")
    
    # Band details
    band_meta = info.get('band_metadata', [])
    if band_meta:
        print(f"\nBand Metadata:")
        for i, band in enumerate(band_meta, 1):
            print(f"  Band {i}: {band}")
else:
    print(f"Error: {response.text[:500]}")

## 7.2 Interactive Viewer

Open a COG in TiTiler's built-in interactive map viewer.

**Endpoint:** `GET /cog/viewer`

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `url` | query | Yes | URL-encoded `/vsiaz/{container}/{blob}` path |

The viewer automatically loads tiles and displays a Leaflet map with the COG data.

In [None]:
# Generate Interactive Viewer URL
viewer_url = titiler_url("/cog/viewer", SILVER_COGS_CONTAINER, TEST_COG_FILE)
print("Interactive Viewer URL:")
print(viewer_url)
print("\nOpen this URL in your browser to view the COG on an interactive map.")

# Display as clickable link
display(Markdown(f"**[Open Viewer]({viewer_url})**"))

## 7.3 TileJSON & XYZ Tiles

Get TileJSON specification or individual map tiles.

### TileJSON
**Endpoint:** `GET /cog/WebMercatorQuad/tilejson.json`

Returns tile URL template, zoom levels, and bounds for use in mapping libraries (Leaflet, MapLibre, OpenLayers).

### XYZ Tiles
**Endpoint:** `GET /cog/tiles/WebMercatorQuad/{z}/{x}/{y}`

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `url` | query | Yes | URL-encoded `/vsiaz/` path |
| `z` | path | Yes | Zoom level (0-22) |
| `x` | path | Yes | Tile column |
| `y` | path | Yes | Tile row |
| `scale` | query | No | Tile scale (`1` = 256px, `2` = 512px) |
| `format` | query | No | Output format: `png`, `jpeg`, `webp` |

In [None]:
# Get TileJSON specification
tilejson_url = titiler_url("/cog/WebMercatorQuad/tilejson.json", SILVER_COGS_CONTAINER, TEST_COG_FILE)
print(f"TileJSON URL:\n{tilejson_url}\n")

response = requests.get(tilejson_url, timeout=30)
print(f"Status: {response.status_code}")

if response.status_code == 200:
    tilejson = response.json()
    print(f"\n{'='*60}")
    print("TILEJSON")
    print(f"{'='*60}")
    print(f"Name: {tilejson.get('name', 'N/A')}")
    print(f"Min Zoom: {tilejson.get('minzoom', '?')}")
    print(f"Max Zoom: {tilejson.get('maxzoom', '?')}")
    print(f"Bounds: {tilejson.get('bounds', 'N/A')}")
    print(f"Center: {tilejson.get('center', 'N/A')}")
    print(f"\nTile URL Template:")
    tiles = tilejson.get('tiles', [])
    if tiles:
        print(f"  {tiles[0][:100]}...")
else:
    print(f"Error: {response.text[:500]}")

## 7.4 Rescale & Colormap

Control how raster values are mapped to colors.

### Rescale Parameter
**Format:** `rescale=min,max`

Maps raster values to 0-255 range for display:
- Values ≤ min → 0 (black)
- Values ≥ max → 255 (white/color max)
- Values between → linearly interpolated

### Colormap Parameter
**Format:** `colormap_name={name}`

Available colormaps: `viridis`, `plasma`, `inferno`, `magma`, `terrain`, `RdYlBu`, `Blues`, `Greens`, `Reds`, etc.

### Common Use Cases

| Data Type | Recommended Settings |
|-----------|---------------------|
| RGB Imagery | No rescale needed (uint8) |
| 16-bit Satellite | `rescale=0,2000` |
| DEM/Elevation | `rescale=0,3000&colormap_name=terrain` |
| NDVI | `rescale=-1,1&colormap_name=RdYlGn` |
| Temperature | `rescale=250,320&colormap_name=RdYlBu` |

In [None]:
# Example: Viewer with rescale and colormap
# Useful for single-band data like DEMs, temperature, or indices

# Default viewer (auto-detect)
default_viewer = titiler_url("/cog/viewer", SILVER_COGS_CONTAINER, TEST_COG_FILE)

# With explicit rescale (for 16-bit data)
rescaled_viewer = titiler_url("/cog/viewer", SILVER_COGS_CONTAINER, TEST_COG_FILE,
                               rescale="0,2000")

# With colormap (for single-band visualization)
colormap_viewer = titiler_url("/cog/viewer", SILVER_COGS_CONTAINER, TEST_COG_FILE,
                               rescale="0,255",
                               colormap_name="viridis")

print("Viewer URL Examples:")
print(f"\n1. Default (auto):")
print(f"   {default_viewer}")
print(f"\n2. With rescale (16-bit data):")
print(f"   {rescaled_viewer}")
print(f"\n3. With colormap (single-band):")
print(f"   {colormap_viewer}")

## 7.5 Multi-Band Selection (bidx)

Select which bands to display from multi-band rasters.

### Band Index Parameter
**Format:** `bidx=N` (repeat for multiple bands)

Band indices are **1-based** (first band = 1).

### Common Patterns

| Sensor | Bands | Natural Color | False Color |
|--------|-------|---------------|-------------|
| 3-band RGB | 3 | `bidx=1&bidx=2&bidx=3` | N/A |
| 4-band RGBA | 4 | `bidx=1&bidx=2&bidx=3` | (skip alpha) |
| WorldView-2/3 | 8 | `bidx=5&bidx=3&bidx=2` | `bidx=7&bidx=5&bidx=3` |
| Sentinel-2 | 10+ | `bidx=4&bidx=3&bidx=2` | `bidx=8&bidx=4&bidx=3` |
| Landsat 8 | 11 | `bidx=4&bidx=3&bidx=2` | `bidx=5&bidx=4&bidx=3` |

**Note:** Our ETL pipeline automatically selects optimal bands and stores them in STAC item properties.

In [None]:
# Example: Band selection for multi-band imagery
# Note: bidx parameters need to be added manually since titiler_url() 
# doesn't handle repeated parameters

vsiaz = vsiaz_path(SILVER_COGS_CONTAINER, TEST_COG_FILE)
encoded = urllib.parse.quote(vsiaz, safe='')

# Standard RGB (bands 1, 2, 3)
rgb_url = f"{TITILER_URL}/cog/viewer?url={encoded}&bidx=1&bidx=2&bidx=3"

# Example for 8-band WorldView: Natural color (R=5, G=3, B=2)
wv_natural = f"{TITILER_URL}/cog/viewer?url={encoded}&bidx=5&bidx=3&bidx=2&rescale=0,2000"

# Example for 8-band WorldView: False color NIR (NIR1=7, R=5, G=3)
wv_false = f"{TITILER_URL}/cog/viewer?url={encoded}&bidx=7&bidx=5&bidx=3&rescale=0,2000"

print("Band Selection Examples:")
print(f"\n1. Standard RGB (bands 1,2,3):")
print(f"   {rgb_url}")
print(f"\n2. WorldView Natural Color (bands 5,3,2 + rescale):")
print(f"   {wv_natural}")
print(f"\n3. WorldView False Color NIR (bands 7,5,3 + rescale):")
print(f"   {wv_false}")

## 7.6 Statistics & Point Queries

Get band statistics or extract values at specific coordinates.

### Statistics
**Endpoint:** `GET /cog/statistics`

Returns min, max, mean, stddev, histogram for each band.

### Point Query
**Endpoint:** `GET /cog/point/{lon},{lat}`

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `url` | query | Yes | URL-encoded `/vsiaz/` path |
| `lon` | path | Yes | Longitude (WGS84) |
| `lat` | path | Yes | Latitude (WGS84) |

Returns pixel values at the specified coordinate for all bands.

In [None]:
# Get COG Statistics
stats_url = titiler_url("/cog/statistics", SILVER_COGS_CONTAINER, TEST_COG_FILE)
print(f"Statistics URL:\n{stats_url}\n")

response = requests.get(stats_url, timeout=60)
print(f"Status: {response.status_code}")

if response.status_code == 200:
    stats = response.json()
    print(f"\n{'='*60}")
    print("BAND STATISTICS")
    print(f"{'='*60}")
    
    for band_name, band_stats in stats.items():
        print(f"\n{band_name}:")
        print(f"  Min: {band_stats.get('min', 'N/A')}")
        print(f"  Max: {band_stats.get('max', 'N/A')}")
        print(f"  Mean: {band_stats.get('mean', 'N/A'):.2f}" if band_stats.get('mean') else "  Mean: N/A")
        print(f"  Std Dev: {band_stats.get('std', 'N/A'):.2f}" if band_stats.get('std') else "  Std Dev: N/A")
else:
    print(f"Error: {response.text[:500]}")

In [None]:
# Point Query - Extract pixel value at specific coordinate
# First get bounds from info to find a valid point
info_url = titiler_url("/cog/info", SILVER_COGS_CONTAINER, TEST_COG_FILE)
info_response = requests.get(info_url, timeout=30)

if info_response.status_code == 200:
    info = info_response.json()
    bounds = info.get('bounds', [-180, -90, 180, 90])
    
    # Get center point of the raster
    center_lon = (bounds[0] + bounds[2]) / 2
    center_lat = (bounds[1] + bounds[3]) / 2
    
    # Build point query URL
    vsiaz = vsiaz_path(SILVER_COGS_CONTAINER, TEST_COG_FILE)
    encoded = urllib.parse.quote(vsiaz, safe='')
    point_url = f"{TITILER_URL}/cog/point/{center_lon},{center_lat}?url={encoded}"
    
    print(f"Point Query: ({center_lon:.4f}, {center_lat:.4f})")
    print(f"URL: {point_url}\n")
    
    response = requests.get(point_url, timeout=30)
    print(f"Status: {response.status_code}")
    
    if response.status_code == 200:
        point_data = response.json()
        print(f"\n{'='*60}")
        print("POINT VALUES")
        print(f"{'='*60}")
        print(f"Coordinates: {point_data.get('coordinates', 'N/A')}")
        print(f"Values: {point_data.get('values', 'N/A')}")
        
        band_names = point_data.get('band_names', [])
        values = point_data.get('values', [])
        if band_names and values:
            print(f"\nBy Band:")
            for name, val in zip(band_names, values):
                print(f"  {name}: {val}")
    else:
        print(f"Error: {response.text[:500]}")
else:
    print("Could not get COG info to determine valid point")