# Kronodroid Pipeline Runner (Notebook)

Notebook equivalent of `tools/scripts/run_kronodroid_pipeline.py`.

Data flow:
```
Kaggle → dlt → Parquet/Avro → MinIO → Spark + dbt → Iceberg (LakeFS) → Feast → (optional) LakeFS commit
```

## 1) Environment setup

- Loads `.env` from repo root (same behavior as the script).
- Ensures the repo is importable from `notebooks/`.
- Sets `LAKEFS_BRANCH` from the parameters cell.

In [None]:
import os
import sys
from pathlib import Path

# Ensure repo root is importable when running from the notebooks directory.
REPO_ROOT = Path.cwd().parent if Path.cwd().name == "notebooks" else Path.cwd()
if str(REPO_ROOT) not in sys.path:
    sys.path.insert(0, str(REPO_ROOT))

from tools.scripts import run_kronodroid_pipeline as krono

# Load env vars from `.env` (if present)
krono.load_env_file(REPO_ROOT / ".env")

print(f"Repo root: {REPO_ROOT}")
print(f"LAKEFS_REPOSITORY: {os.getenv('LAKEFS_REPOSITORY', 'kronodroid')}")

## 2) Run parameters

Mirror the CLI flags from `tools/scripts/run_kronodroid_pipeline.py`.

**Important Note on Spark Connectivity:**

When using `TRANSFORM_RUNNER = "spark-operator"`:
- The transformations run in the Kind cluster using Spark Operator (no local Spark needed)
- However, **Feast with Spark offline store still requires Spark connectivity** to register features and materialize
- Setting `SKIP_SPARK_CHECK = True` bypasses the connectivity check, but Feast may still fail if Spark is unavailable

**Options:**
1. **Use Spark Thrift Server** (recommended): Deploy it even when using spark-operator
   ```bash
   kubectl apply -k infra/k8s/kind/addons/spark-thrift/
   kubectl -n dfp wait --for=condition=ready pod -l app=spark-thrift-server --timeout=120s
   ```
2. **Skip Feast steps**: Set `SKIP_FEAST_APPLY = True` and `SKIP_MATERIALIZE = True`
3. **Accept Feast failures**: Let `SKIP_SPARK_CHECK = True` and catch any Feast errors

In [None]:
# Core parameters
BRANCH = "main"
TRANSFORM_RUNNER = "spark-operator"  # "dbt" or "spark-operator"
DBT_TARGET = "dev"  # "dev" (embedded Spark) or "thrift" (Spark server)
K8S_NAMESPACE = "dfp"  # used when TRANSFORM_RUNNER == "spark-operator"
SPARK_IMAGE = "apache/spark:3.5.7-python3"  # used when TRANSFORM_RUNNER == "spark-operator"
SPARK_TIMEOUT_SECONDS = 60 * 30  # used when TRANSFORM_RUNNER == "spark-operator"
FILE_FORMAT = "parquet"  # "parquet" or "avro" (dlt loader uses parquet for avro requests)

# Step toggles
SKIP_INGESTION = False
SKIP_DBT = False
SKIP_FEAST_APPLY = False
SKIP_MATERIALIZE = False
SKIP_COMMIT = False

# Spark check: Set to False when using spark-operator (Feast will try to connect anyway)
# If Feast fails, you may need to start Spark Thrift Server or skip Feast steps
SKIP_SPARK_CHECK = True if TRANSFORM_RUNNER == "spark-operator" else False

# Materialize-only mode (equivalent to `--materialize-only`)
MATERIALIZE_ONLY = False
MATERIALIZE_DAYS = 30

os.environ["LAKEFS_BRANCH"] = BRANCH

print(
    {
        "BRANCH": BRANCH,
        "TRANSFORM_RUNNER": TRANSFORM_RUNNER,
        "DBT_TARGET": DBT_TARGET,
        "FILE_FORMAT": FILE_FORMAT,
        "SKIP_SPARK_CHECK": SKIP_SPARK_CHECK,
        "MATERIALIZE_ONLY": MATERIALIZE_ONLY,
        "MATERIALIZE_DAYS": MATERIALIZE_DAYS,
    }
)

## 3) Optional: quick dependency check

The pipeline uses the `dbt` and `feast` CLIs.

In [None]:
import shutil

print({"dbt": shutil.which("dbt"), "feast": shutil.which("feast"), "kubectl": shutil.which("kubectl")})

## 4) Spark Operator Status Check (when using spark-operator)

These cells use the same monitoring functions as `spark_operator_test.ipynb`.

In [None]:
# Check Spark Operator deployment (only relevant when using spark-operator)
if TRANSFORM_RUNNER == "spark-operator":
    print("Checking Spark Operator deployment...\n")
    
    deployment = krono.kubectl_get_json("deployment", "spark-operator", K8S_NAMESPACE)
    
    if deployment:
        containers = deployment.get("spec", {}).get("template", {}).get("spec", {}).get("containers", [])
        if containers:
            image = containers[0].get("image", "unknown")
            krono.print_status("success", f"Spark Operator image: {image}")
        
        status = deployment.get("status", {})
        ready = status.get("readyReplicas", 0)
        desired = status.get("replicas", 0)
        
        if ready >= 1:
            krono.print_status("success", f"Replicas: {ready}/{desired} ready")
        else:
            krono.print_status("warning", f"Replicas: {ready}/{desired} ready")
    else:
        krono.print_status("error", "Spark Operator not found!")
        print("\n  To install: kubectl apply -k infra/k8s/kind/addons/spark-operator/")
        print("  Or: task spark-operator:up")
else:
    print("Using dbt runner - Spark Operator check skipped")

In [None]:
# List existing SparkApplications
if TRANSFORM_RUNNER == "spark-operator":
    apps = krono.list_spark_applications(K8S_NAMESPACE)
    
    if apps:
        print(f"Existing SparkApplications in namespace '{K8S_NAMESPACE}':\n")
        for app in apps:
            state_icon = {
                'COMPLETED': '[OK]',
                'RUNNING': '[RUN]',
                'SUBMITTED': '[SUB]',
                'FAILED': '[FAIL]',
                'SUBMISSION_FAILED': '[FAIL]'
            }.get(app['state'], '[?]')
            print(f"  {state_icon:7s} {app['name']:50s} {app['created']}")
    else:
        print("No SparkApplications found")
else:
    print("Using dbt runner - skipped")

## 5) Run the pipeline step-by-step

Run the cells below step-by-step, or run the "Run full pipeline" cell to mirror the script's behavior.

### Step 1: dlt ingestion (Kaggle → MinIO)

In [None]:
if not MATERIALIZE_ONLY and not SKIP_INGESTION:
    ok = krono.run_dlt_ingestion(file_format=FILE_FORMAT)
    if not ok:
        raise RuntimeError("dlt ingestion failed")
else:
    print("Skipped")

### Step 2: transformations (MinIO → Iceberg on LakeFS)

In [None]:
# This cell runs the transformations and stores the app name for monitoring
SPARK_APP_NAME = None  # Will be set if using spark-operator

if not MATERIALIZE_ONLY and not SKIP_DBT:
    if TRANSFORM_RUNNER == "dbt":
        ok = krono.run_dbt_spark_transformations(target=DBT_TARGET)
    else:
        ok = krono.run_kubeflow_spark_operator_transformations(
            branch=BRANCH,
            namespace=K8S_NAMESPACE,
            spark_image=SPARK_IMAGE,
            timeout_seconds=SPARK_TIMEOUT_SECONDS,
        )
    if not ok:
        raise RuntimeError("transformations failed")
else:
    print("Skipped")

### Step 2b: Monitor SparkApplication (interactive)

Use these cells to monitor a running SparkApplication or debug failures.

In [None]:
# Check status of a specific SparkApplication
# Replace with actual app name from Step 2 output (e.g., "kronodroid-transform-20250112-123456")
APP_NAME_TO_MONITOR = None  # Set to app name if you want to monitor

if APP_NAME_TO_MONITOR and TRANSFORM_RUNNER == "spark-operator":
    status = krono.get_spark_application_status(APP_NAME_TO_MONITOR, K8S_NAMESPACE)
    
    if status['found']:
        state = status['state']
        state_icons = {
            'COMPLETED': 'success',
            'RUNNING': 'info',
            'SUBMITTED': 'pending',
            'FAILED': 'error',
            'SUBMISSION_FAILED': 'error',
        }
        icon = state_icons.get(state, 'info')
        krono.print_status(icon, f"State: {state}")
        
        if status['spark_application_id']:
            print(f"    Spark Application ID: {status['spark_application_id']}")
        
        if status['driver_info']:
            driver = status['driver_info']
            print(f"    Driver Pod: {driver.get('podName', 'N/A')}")
            if driver.get('webUIAddress'):
                print(f"    Spark UI: {driver.get('webUIAddress')}")
        
        if status['error_message']:
            print(f"    Error: {status['error_message']}")
    else:
        krono.print_status('error', f"SparkApplication '{APP_NAME_TO_MONITOR}' not found")
else:
    print("Set APP_NAME_TO_MONITOR to monitor a specific SparkApplication")

In [None]:
# Get driver logs for a SparkApplication
if APP_NAME_TO_MONITOR and TRANSFORM_RUNNER == "spark-operator":
    print(f"Driver logs for {APP_NAME_TO_MONITOR} (last 100 lines):\n")
    print("=" * 80)
    logs = krono.get_driver_logs(APP_NAME_TO_MONITOR, K8S_NAMESPACE, tail=100)
    if logs:
        print(logs)
    else:
        print("No logs available (driver pod may not exist yet or has been cleaned up)")
    print("=" * 80)
else:
    print("Set APP_NAME_TO_MONITOR to view driver logs")

In [None]:
# Live monitor a SparkApplication until completion
# This will block until the app completes, fails, or times out

LIVE_MONITOR_APP = None  # Set to app name to monitor live
MONITOR_TIMEOUT = 300  # seconds

if LIVE_MONITOR_APP and TRANSFORM_RUNNER == "spark-operator":
    print(f"Monitoring {LIVE_MONITOR_APP} (timeout: {MONITOR_TIMEOUT}s)...\n")
    result = krono.monitor_spark_application_status(
        LIVE_MONITOR_APP,
        namespace=K8S_NAMESPACE,
        timeout_seconds=MONITOR_TIMEOUT,
        poll_interval=5,
        verbose=True
    )
    print(f"\nResult: {result}")
else:
    print("Set LIVE_MONITOR_APP to monitor a SparkApplication live")

### Step 3: Feast apply (register feature definitions)

In [None]:
if not MATERIALIZE_ONLY and not SKIP_FEAST_APPLY:
    check_spark = not SKIP_SPARK_CHECK
    ok = krono.run_feast_apply(check_spark=check_spark)
    if not ok:
        raise RuntimeError("feast apply failed")
else:
    print("Skipped")

### Step 4: Feast materialize (offline → online store)

In [None]:
if not SKIP_MATERIALIZE:
    check_spark = not SKIP_SPARK_CHECK
    ok = krono.run_feast_materialize(days_back=MATERIALIZE_DAYS, check_spark=check_spark)
    if not ok:
        print("WARNING: feature materialization failed")
else:
    print("Skipped")

### Step 5: Commit to LakeFS (Iceberg tables → commit)

Uses `engines.spark_engine.dfp_spark.iceberg_catalog.commit_iceberg_changes`.

In [None]:
from datetime import datetime

if not MATERIALIZE_ONLY and not SKIP_COMMIT:
    krono.commit_to_lakefs(
        branch=BRANCH,
        message=f"Notebook run: Iceberg tables updated {datetime.now().isoformat()}",
    )
else:
    print("Skipped")

## 6) Run full pipeline (script-like)

Convenience cell mirroring the script's `main()` flow.

In [None]:
success = True
check_spark = not SKIP_SPARK_CHECK

if MATERIALIZE_ONLY:
    success = krono.run_feast_materialize(days_back=MATERIALIZE_DAYS, check_spark=check_spark)
else:
    if not SKIP_INGESTION:
        success = krono.run_dlt_ingestion(file_format=FILE_FORMAT)
        if not success:
            raise RuntimeError("Pipeline failed at dlt ingestion")

    if not SKIP_DBT:
        if TRANSFORM_RUNNER == "dbt":
            success = krono.run_dbt_spark_transformations(target=DBT_TARGET)
        else:
            success = krono.run_kubeflow_spark_operator_transformations(
                branch=BRANCH,
                namespace=K8S_NAMESPACE,
                spark_image=SPARK_IMAGE,
                timeout_seconds=SPARK_TIMEOUT_SECONDS,
            )
        if not success:
            raise RuntimeError("Pipeline failed at transformations")

    if not SKIP_FEAST_APPLY:
        success = krono.run_feast_apply(check_spark=check_spark)
        if not success:
            raise RuntimeError("Pipeline failed at feast apply")

    if not SKIP_MATERIALIZE:
        ok = krono.run_feast_materialize(days_back=MATERIALIZE_DAYS, check_spark=check_spark)
        if not ok:
            print("WARNING: feature materialization failed")

    if not SKIP_COMMIT:
        krono.commit_to_lakefs(
            branch=BRANCH,
            message=f"Notebook run: Iceberg tables updated {datetime.now().isoformat()}",
        )

print({"success": bool(success)})

## 7) Cleanup SparkApplications

Use these cells to clean up completed or failed SparkApplications.

In [None]:
# List all SparkApplications with their states
if TRANSFORM_RUNNER == "spark-operator":
    apps = krono.list_spark_applications(K8S_NAMESPACE)
    
    terminal_states = {'COMPLETED', 'FAILED', 'SUBMISSION_FAILED'}
    to_cleanup = [app for app in apps if app['state'] in terminal_states]
    
    if to_cleanup:
        print(f"SparkApplications available for cleanup in '{K8S_NAMESPACE}':\n")
        for app in to_cleanup:
            print(f"  - {app['name']} ({app['state']})")
        print(f"\nTotal: {len(to_cleanup)} application(s)")
    else:
        print("No completed/failed SparkApplications to clean up")
else:
    print("Using dbt runner - skipped")

In [None]:
# Delete a specific SparkApplication
APP_TO_DELETE = None  # Set to app name to delete

if APP_TO_DELETE and TRANSFORM_RUNNER == "spark-operator":
    print(f"Deleting SparkApplication: {APP_TO_DELETE}")
    if krono.delete_spark_application(APP_TO_DELETE, K8S_NAMESPACE):
        krono.print_status('success', f"Deleted {APP_TO_DELETE}")
    else:
        krono.print_status('error', f"Failed to delete {APP_TO_DELETE}")
else:
    print("Set APP_TO_DELETE to delete a SparkApplication")

In [None]:
# Delete ALL completed/failed SparkApplications (use with caution!)
CLEANUP_ALL = False  # Set to True to delete all completed/failed apps

if CLEANUP_ALL and TRANSFORM_RUNNER == "spark-operator":
    apps = krono.list_spark_applications(K8S_NAMESPACE)
    terminal_states = {'COMPLETED', 'FAILED', 'SUBMISSION_FAILED'}
    to_cleanup = [app for app in apps if app['state'] in terminal_states]
    
    if to_cleanup:
        print(f"Deleting {len(to_cleanup)} SparkApplication(s)...\n")
        for app in to_cleanup:
            if krono.delete_spark_application(app['name'], K8S_NAMESPACE):
                krono.print_status('success', f"Deleted {app['name']}")
            else:
                krono.print_status('error', f"Failed to delete {app['name']}")
    else:
        print("No applications to clean up")
else:
    print("Set CLEANUP_ALL = True to delete all completed/failed SparkApplications")

## Quick Reference Commands

Useful kubectl commands for debugging:

```bash
# List all SparkApplications
kubectl -n dfp get sparkapplication

# Describe a SparkApplication
kubectl -n dfp describe sparkapplication <app-name>

# View driver logs
kubectl -n dfp logs <app-name>-driver

# Follow driver logs
kubectl -n dfp logs -f <app-name>-driver

# View Spark Operator logs
kubectl -n dfp logs deployment/spark-operator

# Delete a SparkApplication
kubectl -n dfp delete sparkapplication <app-name>

# Pre-load Spark image into kind
docker pull apache/spark:3.5.7-python3
kind load docker-image apache/spark:3.5.7-python3 --name dfp-kind
```