# ADX File-Transfer Analytics Runbook

Interactive notebook for setting up, ingesting data, and verifying the ADX file-transfer analytics pipeline.  
Uses the same schema, mappings, and update policy as the production Event Grid pipeline (FR-034).

**Workflow:**
1. **Configure** — Set cluster URI, database, and auth method
2. **Setup** — Create all ADX objects (tables, mappings, policies, materialized view)
3. **Ingest** — Load CSV/JSON data from local files or Azure Blob Storage
4. **Verify** — Query the target table to confirm data landed correctly

> **Tip:** The CLI script `adx_runbook.py` is also available for scripted/CI usage.

## 1. Prerequisites & Imports

Ensure you have run `uv pip install -r requirements.txt` in this directory's venv.

In [None]:
import os
import time
from pathlib import Path

# ---------------------------------------------------------------------------
# SSL fix: uv-managed Python may ship without CA certs. If SSL_CERT_FILE is
# not set, point it at certifi's bundle so TLS connections work out of the box.
# ---------------------------------------------------------------------------
if not os.environ.get("SSL_CERT_FILE"):
    try:
        import certifi
        os.environ["SSL_CERT_FILE"] = certifi.where()
        print(f"SSL_CERT_FILE set to {certifi.where()}")
    except ImportError:
        print("WARNING: certifi not installed — TLS may fail if system CA certs are missing")

from azure.identity import DefaultAzureCredential
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.data_format import DataFormat
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.ingest import (
    QueuedIngestClient,
    IngestionProperties,
    FileDescriptor,
    BlobDescriptor,
)

print("All imports OK ✓")

## 2. Configuration

Set your ADX cluster URI and database name. Change the auth method if needed.

In [None]:
# ── Cluster & Database ───────────────────────────────────────────────────────
CLUSTER_URI = "https://adx-ft-dev.eastus2.kusto.windows.net"
INGEST_URI  = "https://ingest-adx-ft-dev.eastus2.kusto.windows.net"
DATABASE    = "ftevents_dev"

# ── Authentication ────────────────────────────────────────────────────────────
# Options: "az-cli" | "interactive" | "managed-identity" | "service-principal"
AUTH_METHOD = "az-cli"

# Only needed for service-principal auth:
SP_CLIENT_ID     = os.environ.get("AZURE_CLIENT_ID", "")
SP_CLIENT_SECRET = os.environ.get("AZURE_CLIENT_SECRET", "")
SP_TENANT_ID     = os.environ.get("AZURE_TENANT_ID", "")

print(f"Cluster:  {CLUSTER_URI}")
print(f"Ingest:   {INGEST_URI}")
print(f"Database: {DATABASE}")
print(f"Auth:     {AUTH_METHOD}")

## 3. Authentication Helper

Builds a `KustoConnectionStringBuilder` for the chosen auth method.

In [None]:
def build_kcsb(cluster_uri: str) -> KustoConnectionStringBuilder:
    """Build a KustoConnectionStringBuilder based on AUTH_METHOD."""
    if AUTH_METHOD == "az-cli":
        credential = DefaultAzureCredential(
            exclude_interactive_browser_credential=True,
            exclude_shared_token_cache_credential=True,
        )
        return KustoConnectionStringBuilder.with_azure_token_credential(cluster_uri, credential)
    elif AUTH_METHOD == "interactive":
        return KustoConnectionStringBuilder.with_interactive_login(cluster_uri)
    elif AUTH_METHOD == "managed-identity":
        return KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication(cluster_uri)
    elif AUTH_METHOD == "service-principal":
        assert all([SP_CLIENT_ID, SP_CLIENT_SECRET, SP_TENANT_ID]), (
            "Set SP_CLIENT_ID, SP_CLIENT_SECRET, SP_TENANT_ID for service-principal auth"
        )
        return KustoConnectionStringBuilder.with_aad_application_key_authentication(
            cluster_uri, SP_CLIENT_ID, SP_CLIENT_SECRET, SP_TENANT_ID
        )
    else:
        raise ValueError(f"Unknown auth method: {AUTH_METHOD}")


# ---------------------------------------------------------------------------
# Retry helper for transient network errors
# ---------------------------------------------------------------------------
MAX_RETRIES = 3
RETRY_DELAY = 5


def execute_with_retry(
    client: KustoClient, database: str, command: str, *, retries: int = MAX_RETRIES
) -> object:
    """Execute a management command with retries for transient network errors."""
    last_exc = None
    for attempt in range(1, retries + 1):
        try:
            return client.execute_mgmt(database, command)
        except KustoServiceError as e:
            error_msg = str(e).lower()
            if "failed to process network request" in error_msg or "auth/metadata" in error_msg:
                last_exc = e
                if attempt < retries:
                    print(f"  RETRY ({attempt}/{retries}, waiting {RETRY_DELAY}s)...", flush=True)
                    time.sleep(RETRY_DELAY)
                    continue
            raise
    raise last_exc  # type: ignore[misc]


print("Helpers defined ✓")

## 4. Schema Commands

The full DDL chain — identical to `kql/schema/*.kql` files. All commands are idempotent.

In [None]:
SCHEMA_COMMANDS = [
    # Step 1: Target table
    (
        "Create target table (FileTransferEvents)",
        """\
.create-merge table FileTransferEvents (
    Filename: string,
    SourcePresent: bool,
    TargetPresent: bool,
    SourceLastModifiedUtc: datetime,
    TargetLastModifiedUtc: datetime,
    AgeMinutes: real,
    Status: string,
    Notes: string,
    Timestamp: datetime
)""",
    ),
    # Step 2: Staging table
    (
        "Create staging table (FileTransferEvents_Raw)",
        """\
.create-merge table FileTransferEvents_Raw (
    Filename: string,
    SourcePresent: bool,
    TargetPresent: bool,
    SourceLastModifiedUtc: datetime,
    TargetLastModifiedUtc: datetime,
    AgeMinutes: real,
    Status: string,
    Notes: string
)""",
    ),
    # Step 3: Dead-letter table
    (
        "Create dead-letter table (FileTransferEvents_Errors)",
        """\
.create-merge table FileTransferEvents_Errors (
    RawData: string,
    Database: string,
    ['Table']: string,
    FailedOn: datetime,
    Error: string,
    OperationId: guid
)""",
    ),
    # Step 4: Transformation function
    (
        "Create transformation function",
        """\
.create-or-alter function FileTransferEvents_Transform() {
    FileTransferEvents_Raw
    | extend Timestamp = coalesce(SourceLastModifiedUtc, ingestion_time())
    | project Filename, SourcePresent, TargetPresent,
              SourceLastModifiedUtc, TargetLastModifiedUtc,
              AgeMinutes, Status, Notes, Timestamp
}""",
    ),
    # Step 5: Update policy
    (
        "Attach update policy",
        ".alter table FileTransferEvents policy update "
        "@'[{\"IsEnabled\": true, \"Source\": \"FileTransferEvents_Raw\", "
        "\"Query\": \"FileTransferEvents_Transform()\", "
        "\"IsTransactional\": true, \"PropagateIngestionProperties\": true}]'",
    ),
    # Step 6: CSV mapping (single-line body — execute_mgmt requires it)
    (
        "Create CSV ingestion mapping",
        ".create-or-alter table FileTransferEvents_Raw ingestion csv mapping 'FileTransferEvents_CsvMapping' "
        "'["
        '{"Name":"Filename","DataType":"string","Ordinal":0},'
        '{"Name":"SourcePresent","DataType":"bool","Ordinal":1},'
        '{"Name":"TargetPresent","DataType":"bool","Ordinal":2},'
        '{"Name":"SourceLastModifiedUtc","DataType":"datetime","Ordinal":3},'
        '{"Name":"TargetLastModifiedUtc","DataType":"datetime","Ordinal":4},'
        '{"Name":"AgeMinutes","DataType":"real","Ordinal":5},'
        '{"Name":"Status","DataType":"string","Ordinal":6},'
        '{"Name":"Notes","DataType":"string","Ordinal":7}'
        "]'",
    ),
    # Step 7: JSON mapping (single-line body — execute_mgmt requires it)
    (
        "Create JSON ingestion mapping",
        ".create-or-alter table FileTransferEvents_Raw ingestion json mapping 'FileTransferEvents_JsonMapping' "
        "'["
        '{"column":"Filename","path":"$.Filename","datatype":"string"},'
        '{"column":"SourcePresent","path":"$.SourcePresent","datatype":"bool"},'
        '{"column":"TargetPresent","path":"$.TargetPresent","datatype":"bool"},'
        '{"column":"SourceLastModifiedUtc","path":"$.SourceLastModifiedUtc","datatype":"datetime"},'
        '{"column":"TargetLastModifiedUtc","path":"$.TargetLastModifiedUtc","datatype":"datetime"},'
        '{"column":"AgeMinutes","path":"$.AgeMinutes","datatype":"real"},'
        '{"column":"Status","path":"$.Status","datatype":"string"},'
        '{"column":"Notes","path":"$.Notes","datatype":"string"}'
        "]'",
    ),
    # Step 8: Target table retention (90 days)
    (
        "Set target table retention (90 days)",
        ".alter table FileTransferEvents policy retention "
        "@'{\"SoftDeletePeriod\": \"90.00:00:00\", \"Recoverability\": \"Enabled\"}'",
    ),
    # Step 9: Staging table retention (1 day)
    (
        "Set staging table retention (1 day)",
        ".alter table FileTransferEvents_Raw policy retention "
        "@'{\"SoftDeletePeriod\": \"1.00:00:00\", \"Recoverability\": \"Disabled\"}'",
    ),
    # Step 10: Dead-letter retention (30 days)
    (
        "Set dead-letter table retention (30 days)",
        ".alter table FileTransferEvents_Errors policy retention "
        "@'{\"SoftDeletePeriod\": \"30.00:00:00\", \"Recoverability\": \"Disabled\"}'",
    ),
    # Step 11: Ingestion batching (1 minute)
    (
        "Set ingestion batching policy (1 min)",
        ".alter table FileTransferEvents_Raw policy ingestionbatching "
        "@'{\"MaximumBatchingTimeSpan\": \"00:01:00\", \"MaximumNumberOfItems\": 20, \"MaximumRawDataSizeMB\": 256}'",
    ),
    # Step 12: Materialized view
    (
        "Create DailySummary materialized view",
        """\
.create ifnotexists materialized-view DailySummary on table FileTransferEvents {
    FileTransferEvents
    | summarize
        TotalCount      = count(),
        OkCount         = countif(Status == "OK"),
        MissingCount    = countif(Status == "MISSING"),
        DelayedCount    = countif(Status == "DELAYED"),
        AvgAgeMinutes   = avg(AgeMinutes),
        AgeDigest       = tdigest(AgeMinutes)
    by Date = startofday(Timestamp)
}""",
    ),
    # Step 13: Materialized view retention (730 days)
    (
        "Set DailySummary retention (730 days)",
        ".alter materialized-view DailySummary policy retention "
        "@'{\"SoftDeletePeriod\": \"730.00:00:00\", \"Recoverability\": \"Enabled\"}'",
    ),
]

print(f"Defined {len(SCHEMA_COMMANDS)} schema commands ✓")

## 5. Setup — Create ADX Schema

Runs all 13 schema commands in order. All commands are idempotent — safe to re-run.

In [None]:
kcsb = build_kcsb(CLUSTER_URI)
client = KustoClient(kcsb)

print(f"Setting up ADX schema in {DATABASE}...")
print(f"  Cluster: {CLUSTER_URI}")
print()

for i, (description, command) in enumerate(SCHEMA_COMMANDS, start=1):
    step_label = f"[{i:2d}/{len(SCHEMA_COMMANDS)}]"
    print(f"  {step_label} {description}...", end=" ", flush=True)
    try:
        execute_with_retry(client, DATABASE, command)
        print("OK")
    except KustoServiceError as e:
        error_msg = str(e)
        if "already exists" in error_msg.lower():
            print("SKIPPED (already exists)")
        else:
            print(f"FAILED\n         {error_msg}")
            raise

print()
print("Setup complete ✓  All tables, mappings, policies, and views are ready.")

## 6. Ingest Data

Choose **one** of the options below:
- **6a.** Ingest a local CSV/JSON file
- **6b.** Ingest from Azure Blob Storage

> **⚠ Note:** `QueuedIngestClient` is **fire-and-forget** — `ingest_from_file()` and `ingest_from_blob()` queue the request and return immediately. A "queued ✓" message does **not** mean data landed successfully. Run **Step 7** (Verify Ingestion) after 1–3 minutes to check for ingestion failures and confirm data arrived.

### 6a. Ingest Local File

In [None]:
# ── Configure the file to ingest ─────────────────────────────────────────────
LOCAL_FILE = "../samples/sample-events.csv"   # Relative to this notebook
# LOCAL_FILE = "../samples/sample-events.json"

# ── Auto-detect format & mapping from extension ──────────────────────────────
file_path = Path(LOCAL_FILE).resolve()
assert file_path.exists(), f"File not found: {file_path}"

ext = file_path.suffix.lower()
if ext == ".csv":
    data_format = DataFormat.CSV
    mapping_name = "FileTransferEvents_CsvMapping"
elif ext in (".json", ".jsonl"):
    data_format = DataFormat.JSON
    mapping_name = "FileTransferEvents_JsonMapping"
else:
    raise ValueError(f"Unknown extension '{ext}' — set data_format and mapping_name manually")

print(f"File:    {file_path}")
print(f"Size:    {file_path.stat().st_size:,} bytes")
print(f"Format:  {data_format.name}")
print(f"Mapping: {mapping_name}")

In [None]:
ingest_kcsb = build_kcsb(INGEST_URI)
ingest_client = QueuedIngestClient(ingest_kcsb)

ingestion_props = IngestionProperties(
    database=DATABASE,
    table="FileTransferEvents_Raw",
    data_format=data_format,
    ingestion_mapping_reference=mapping_name,
    ignore_first_record=(data_format == DataFormat.CSV),
)

print(f"Ingesting {file_path.name} into FileTransferEvents_Raw...")
file_descriptor = FileDescriptor(str(file_path), file_path.stat().st_size)
ingest_client.ingest_from_file(file_descriptor, ingestion_properties=ingestion_props)

print()
print("Ingestion queued ✓")
print("Data flows: staging table → update policy → FileTransferEvents")
print("Allow 1-3 minutes for rows to appear.")

### 6b. Ingest from Azure Blob Storage

In [None]:
# ── Configure the blob URI ───────────────────────────────────────────────────
BLOB_URI = "https://stfteventsdev.blob.core.windows.net/file-transfer-events/sample-events.csv"

# ── Auto-detect format from URI extension ────────────────────────────────────
path_part = BLOB_URI.split("?")[0]
if path_part.endswith(".csv"):
    blob_format = DataFormat.CSV
    blob_mapping = "FileTransferEvents_CsvMapping"
elif path_part.endswith(".json") or path_part.endswith(".jsonl"):
    blob_format = DataFormat.JSON
    blob_mapping = "FileTransferEvents_JsonMapping"
else:
    raise ValueError("Cannot detect format from blob URI — set blob_format and blob_mapping manually")

ingest_kcsb = build_kcsb(INGEST_URI)
ingest_client = QueuedIngestClient(ingest_kcsb)

blob_props = IngestionProperties(
    database=DATABASE,
    table="FileTransferEvents_Raw",
    data_format=blob_format,
    ingestion_mapping_reference=blob_mapping,
    ignore_first_record=(blob_format == DataFormat.CSV),
)

print(f"Ingesting blob into FileTransferEvents_Raw...")
print(f"  Blob: {BLOB_URI}")
blob_descriptor = BlobDescriptor(BLOB_URI)
ingest_client.ingest_from_blob(blob_descriptor, ingestion_properties=blob_props)

print()
print("Ingestion queued ✓")
print("Allow 1-3 minutes for rows to appear in FileTransferEvents.")

## 7. Verify Ingestion

Checks for **ingestion failures** first (`.show ingestion failures`), then queries the target table and displays the most recent rows. Validates that `Timestamp` is non-null.

> Run this cell 1–3 minutes after ingesting. If no rows appear but no failures either, wait and re-run.

In [None]:
verify_kcsb = build_kcsb(CLUSTER_URI)
verify_client = KustoClient(verify_kcsb)

# ── Step 1: Check for recent ingestion failures ──────────────────────────────
FAILURE_QUERY = ".show ingestion failures | where FailedOn > ago(10m) | order by FailedOn desc | take 10"

print(f"Checking ingestion failures in {DATABASE}...")
try:
    fail_response = verify_client.execute_mgmt(DATABASE, FAILURE_QUERY)
    fail_rows = list(fail_response.primary_results[0])
    if fail_rows:
        print(f"\n  ⚠ INGESTION FAILURES DETECTED ({len(fail_rows)} recent):\n")
        for frow in fail_rows:
            failed_on = frow["FailedOn"] if "FailedOn" in frow.columns else "?"
            details   = str(frow["Details"])[:120] if "Details" in frow.columns else "?"
            status    = frow["Status"] if "Status" in frow.columns else "?"
            print(f"    [{status}] {failed_on}")
            print(f"      {details}")
            print()
        print("  Troubleshooting: Check RBAC — the ADX cluster managed identity needs")
        print("  'Storage Blob Data Reader' and 'Storage Blob Data Contributor' on the storage account.\n")
    else:
        print("  No ingestion failures in the last 10 minutes ✓\n")
except KustoServiceError as e:
    print(f"  Could not check ingestion failures: {e}\n")

# ── Step 2: Query the target table ───────────────────────────────────────────
VERIFY_QUERY = """\
FileTransferEvents
| order by Timestamp desc
| take 20
| project Filename, SourcePresent, TargetPresent,
          SourceLastModifiedUtc, TargetLastModifiedUtc,
          AgeMinutes, Status, Notes, Timestamp
"""

print(f"Querying {DATABASE}.FileTransferEvents...")
response = verify_client.execute(DATABASE, VERIFY_QUERY)

columns = [col.column_name for col in response.primary_results[0].columns]
rows = list(response.primary_results[0])

if not rows:
    print("\n  No rows found. If you just ingested, wait 1-3 minutes and re-run this cell.")
else:
    print(f"\n  Found {len(rows)} recent rows:\n")
    # Print as a simple table
    header = " | ".join(f"{col:>20s}" for col in columns)
    print(f"  {header}")
    print(f"  {'-' * len(header)}")
    for row in rows:
        vals = " | ".join(f"{str(row[col]):>20s}" for col in columns)
        print(f"  {vals}")

    # Validate Timestamp
    null_ts = sum(1 for r in rows if r["Timestamp"] is None)
    print()
    if null_ts:
        print(f"  ⚠ WARNING: {null_ts} row(s) have null Timestamp!")
    else:
        print("  All rows have non-null Timestamp ✓")

    # Status distribution
    status_counts = {}
    for r in rows:
        s = str(r["Status"])
        status_counts[s] = status_counts.get(s, 0) + 1
    print(f"  Status distribution: {status_counts}")

## 8. Ad-Hoc Queries

Run any KQL query against the database. Edit the query below and execute.

In [None]:
QUERY = """\
// Daily summary — SLA adherence computed at query time
materialized_view("DailySummary")
| extend SlaAdherencePct = round(100.0 * OkCount / TotalCount, 2),
         P95AgeMinutes   = percentile_tdigest(AgeDigest, 95)
| project Date, TotalCount, OkCount, MissingCount, DelayedCount,
          AvgAgeMinutes, P95AgeMinutes, SlaAdherencePct
| order by Date desc
| take 30
"""

adhoc_kcsb = build_kcsb(CLUSTER_URI)
adhoc_client = KustoClient(adhoc_kcsb)

result = adhoc_client.execute(DATABASE, QUERY)
cols = [c.column_name for c in result.primary_results[0].columns]
rows = list(result.primary_results[0])

print(f"Returned {len(rows)} rows\n")
header = " | ".join(f"{c:>18s}" for c in cols)
print(header)
print("-" * len(header))
for row in rows:
    print(" | ".join(f"{str(row[c]):>18s}" for c in cols))

## 9. Check Dead-Letter Table

Shows recent ingestion failures. Useful for debugging.

In [None]:
DL_QUERY = """\
FileTransferEvents_Errors
| order by FailedOn desc
| take 10
| project FailedOn, Error, RawData, OperationId
"""

dl_kcsb = build_kcsb(CLUSTER_URI)
dl_client = KustoClient(dl_kcsb)

dl_result = dl_client.execute(DATABASE, DL_QUERY)
dl_rows = list(dl_result.primary_results[0])

if not dl_rows:
    print("No dead-letter rows ✓")
else:
    print(f"Found {len(dl_rows)} error rows:\n")
    for row in dl_rows:
        print(f"  {row['FailedOn']}  |  {row['Error'][:80]}")
        print(f"    RawData: {str(row['RawData'])[:120]}")
        print()