# dbt + Spark on Dataproc → BigQuery Demo

This notebook demonstrates an end-to-end workflow:
1. Create a Dataproc cluster with Spark Thrift server
2. Use dbt-spark to run transformations on the cluster
3. Write final results to BigQuery using Spark BigQuery connector

## Architecture
```
Local dbt → Dataproc Spark (Thrift) → Hive Tables (GCS) → BigQuery Tables
```

## Prerequisites
- GCP project with billing enabled
- gcloud CLI authenticated
- Python 3.8+
- jaffle-shop-classic repository at `/Users/[username]/Developer/git/jaffle-shop-classic`

## 1. Configuration

In [None]:
import os
import subprocess
import time
import json
from pathlib import Path

# GCP Configuration
PROJECT_ID = "johanesa-playground-326616"
REGION = "us-central1"
ZONE = f"{REGION}-a"
BQ_LOCATION = "US"

# Resource Names
CLUSTER_NAME = "dbt-spark-demo-cluster"
GCS_BUCKET = "johanesa-dbt-spark-demo"
BQ_DATASET = "jaffle_shop_demo"
HIVE_WAREHOUSE = f"gs://{GCS_BUCKET}/hive-warehouse"

# Paths
JAFFLE_SHOP_PATH = Path("/Users/[username]/Developer/git/jaffle-shop-classic")
DBT_PROFILES_DIR = Path.home() / ".dbt"

print(f"✓ Project ID: {PROJECT_ID}")
print(f"✓ Region: {REGION}")
print(f"✓ Cluster: {CLUSTER_NAME}")
print(f"✓ GCS Bucket: {GCS_BUCKET}")
print(f"✓ BQ Dataset: {BQ_DATASET} (location: {BQ_LOCATION})")

## 2. Helper Functions

In [None]:
def run_command(cmd, check=True, capture_output=True):
    """Run shell command and return result"""
    print(f"Running: {cmd}")
    result = subprocess.run(
        cmd,
        shell=True,
        check=False,  # Don't raise exception immediately
        capture_output=capture_output,
        text=True
    )

    # Always show stdout
    if result.stdout:
        print(result.stdout)

    # Always show stderr if present
    if result.stderr:
        if result.returncode != 0:
            print(f"ERROR: {result.stderr}")
        else:
            print(f"Warning: {result.stderr}")

    # Raise exception if check=True and command failed
    if check and result.returncode != 0:
        raise subprocess.CalledProcessError(
            result.returncode,
            cmd,
            output=result.stdout,
            stderr=result.stderr
        )

    return result


def wait_for_cluster(cluster_name, project_id, region, timeout=600):
    """Wait for cluster to be ready, handling ERROR state"""
    print(f"Waiting for cluster {cluster_name} to be ready...")
    start_time = time.time()
    while time.time() - start_time < timeout:
        result = run_command(
            f"gcloud dataproc clusters describe {cluster_name} --project={project_id} --region={region} --format=json", check=False)
        if result.returncode == 0:
            cluster_info = json.loads(result.stdout)
            status = cluster_info.get('status', {}).get('state')
            print(f"Cluster status: {status}")

            if status == 'RUNNING':
                print("✓ Cluster is ready!")
                return cluster_info
            elif status == 'ERROR':
                error_details = cluster_info.get(
                    'status', {}).get('details', 'Unknown error')
                raise RuntimeError(
                    f"Cluster creation failed! Status: ERROR. Details: {error_details}")

        time.sleep(10)

    raise TimeoutError(
        f"Cluster did not become ready within {timeout} seconds")


def get_cluster_master_ip(cluster_name, project_id, region):
    result = run_command(
        f"gcloud dataproc clusters describe {cluster_name} --project={project_id} --region={region} --format='value(config.masterConfig.instanceNames[0])'")
    master_instance = result.stdout.strip()
    result = run_command(
        f"gcloud compute instances describe {master_instance} --project={project_id} --zone={ZONE} --format='value(networkInterfaces[0].networkIP)'")
    return result.stdout.strip()


print("✓ Helper functions defined")

## 3. Verify Prerequisites

In [None]:
import sys
result = run_command("gcloud --version")
print("✓ gcloud CLI is installed\n")

result = run_command("gcloud config get-value project")
current_project = result.stdout.strip()
print(f"Current project: {current_project}")

if current_project != PROJECT_ID:
    print(f"Setting project to {PROJECT_ID}...")
    run_command(f"gcloud config set project {PROJECT_ID}")

if not JAFFLE_SHOP_PATH.exists():
    raise FileNotFoundError(
        f"jaffle-shop-classic not found at {JAFFLE_SHOP_PATH}")
print(f"✓ jaffle-shop-classic found at {JAFFLE_SHOP_PATH}")

print(f"✓ Python version: {sys.version}")

## 4. Create GCS Bucket

In [None]:
result = run_command(f"gsutil ls -b gs://{GCS_BUCKET}", check=False)

if result.returncode != 0:
    print(f"Creating GCS bucket: {GCS_BUCKET}")
    run_command(f"gsutil mb -p {PROJECT_ID} -l {REGION} gs://{GCS_BUCKET}")
    print(f"✓ Bucket created: gs://{GCS_BUCKET}")
else:
    print(f"✓ Bucket already exists: gs://{GCS_BUCKET}")

## 5. Create Dataproc Cluster

In [None]:
result = run_command(
    f"gcloud dataproc clusters describe {CLUSTER_NAME} --project={PROJECT_ID} --region={REGION}", check=False)

if result.returncode == 0:
    print(f"⚠ Cluster {CLUSTER_NAME} already exists")
else:
    print(f"Creating Dataproc cluster: {CLUSTER_NAME}")
    print("This may take 3-5 minutes...\n")
    create_cmd = f"gcloud dataproc clusters create {CLUSTER_NAME} --project={PROJECT_ID} --region={REGION} --zone={ZONE} --single-node --master-machine-type=n1-standard-4 --master-boot-disk-size=100GB --image-version=2.1-debian11 --properties=hive:hive.metastore.warehouse.dir={HIVE_WAREHOUSE} --enable-component-gateway --optional-components=JUPYTER"
    run_command(create_cmd)
    print("\n✓ Cluster creation initiated")

cluster_info = wait_for_cluster(CLUSTER_NAME, PROJECT_ID, REGION)
print("\n✓ Cluster is ready!")

## 6. Start Spark Thrift Server

In [None]:
print("Starting Spark Thrift Server...")
start_cmd = f"gcloud compute ssh {CLUSTER_NAME}-m --project={PROJECT_ID} --zone={ZONE} --command='sudo /usr/lib/spark/sbin/start-thriftserver.sh --master yarn --deploy-mode client --conf spark.sql.warehouse.dir={HIVE_WAREHOUSE} --conf spark.hadoop.hive.metastore.warehouse.dir={HIVE_WAREHOUSE}'"
run_command(start_cmd, check=False)

print("Waiting for Thrift server to start...")
time.sleep(15)

verify_cmd = f"gcloud compute ssh {CLUSTER_NAME}-m --project={PROJECT_ID} --zone={ZONE} --command='netstat -tuln | grep 10001'"
result = run_command(verify_cmd, check=False)
if result.returncode == 0:
    print("✓ Thrift server is running on port 10001")

master_ip = get_cluster_master_ip(CLUSTER_NAME, PROJECT_ID, REGION)
print(f"\n✓ Cluster master IP: {master_ip}")

## 7. Install dbt-spark

In [None]:
print("Installing dbt-spark...\n")
run_command("pip install -q dbt-core dbt-spark[PyHive]")
result = run_command("dbt --version")
print("\n✓ dbt-spark installed")

## 8. Configure dbt Profile

In [None]:
import yaml
import getpass

DBT_PROFILES_DIR.mkdir(exist_ok=True)
username = getpass.getuser()

profiles_config = {'jaffle_shop': {'target': 'dev', 'outputs': {'dev': {'type': 'spark', 'method': 'thrift',
                                                                        'schema': 'default', 'host': master_ip, 'port': 10001, 'user': username, 'connect_timeout': 60, 'connect_retries': 3}}}}

profiles_path = DBT_PROFILES_DIR / 'profiles.yml'
existing_profiles = {}
if profiles_path.exists():
    with open(profiles_path, 'r') as f:
        existing_profiles = yaml.safe_load(f) or {}

existing_profiles.update(profiles_config)
with open(profiles_path, 'w') as f:
    yaml.dump(existing_profiles, f, default_flow_style=False)

print(f"✓ dbt profile created at {profiles_path}")
print(f"  Host: {master_ip}, Port: 10001, Schema: default")

## 9. Test dbt Connection

In [None]:
print("Testing dbt connection...\n")
os.chdir(JAFFLE_SHOP_PATH)
result = run_command("dbt debug", check=False)
if result.returncode == 0:
    print("\n✓ dbt connection successful!")

## 10. Run dbt Seed

In [None]:
print("Running dbt seed...\n")
os.chdir(JAFFLE_SHOP_PATH)
run_command("dbt seed")
print("\n✓ Seed data loaded!")

## 11. Run dbt Transformations

In [None]:
print("Running dbt transformations...\n")
os.chdir(JAFFLE_SHOP_PATH)
run_command("dbt run")
print("\n✓ Transformations complete!")

## 12. Create BigQuery Dataset

In [None]:
print(f"Creating BigQuery dataset: {BQ_DATASET}\n")
result = run_command(
    f"bq ls -d --project_id={PROJECT_ID} {BQ_DATASET}", check=False)
if result.returncode != 0:
    run_command(
        f"bq mk --project_id={PROJECT_ID} --location={BQ_LOCATION} {BQ_DATASET}")
    print(f"✓ Dataset created: {PROJECT_ID}.{BQ_DATASET}")
else:
    print(f"✓ Dataset exists: {PROJECT_ID}.{BQ_DATASET}")

## 13. Write to BigQuery

In [None]:
pyspark_script = f'''from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Write to BigQuery").config("spark.sql.warehouse.dir", "{HIVE_WAREHOUSE}").enableHiveSupport().getOrCreate()
tables = ["customers", "orders"]
for table in tables:
    print(f"Writing {{table}} to BigQuery...")
    df = spark.table(f"default.{{table}}")
    df.show(5)
    df.write.format("bigquery").option("table", f"{PROJECT_ID}.{BQ_DATASET}.{{table}}").option("temporaryGcsBucket", "{GCS_BUCKET}").mode("overwrite").save()
    print(f"✓ {{table}} written")
print("\n✓ All tables written!")
spark.stop()'''

script_path = "/tmp/write_to_bq.py"
with open(script_path, 'w') as f:
    f.write(pyspark_script)

run_command(
    f"gcloud compute scp {script_path} {CLUSTER_NAME}-m:/tmp/write_to_bq.py --project={PROJECT_ID} --zone={ZONE}")
print("\nSubmitting PySpark job...\n")
run_command(
    f"gcloud dataproc jobs submit pyspark /tmp/write_to_bq.py --cluster={CLUSTER_NAME} --project={PROJECT_ID} --region={REGION}")
print("\n✓ Tables written to BigQuery!")

## 14. Verify Data in BigQuery

In [None]:
print("Verifying data in BigQuery...\n")
print("=" * 60)
print("CUSTOMERS TABLE")
print("=" * 60)
run_command(
    f"bq query --project_id={PROJECT_ID} --use_legacy_sql=false 'SELECT * FROM {BQ_DATASET}.customers LIMIT 5'")

print("\n" + "=" * 60)
print("ORDERS TABLE")
print("=" * 60)
run_command(
    f"bq query --project_id={PROJECT_ID} --use_legacy_sql=false 'SELECT * FROM {BQ_DATASET}.orders LIMIT 5'")

print("\n✓ Data verification complete!")

## 15. Cleanup (Optional)

Run this cell to delete the Dataproc cluster and save costs.

In [None]:
print("Deleting Dataproc cluster...")
run_command(
    f"gcloud dataproc clusters delete {CLUSTER_NAME} --project={PROJECT_ID} --region={REGION} --quiet")
print(f"\n✓ Cluster {CLUSTER_NAME} deleted")
print(
    f"\nNote: GCS bucket ({GCS_BUCKET}) and BigQuery dataset ({BQ_DATASET}) were NOT deleted.")
print("Delete them manually if needed.")

## Summary

This notebook demonstrated:
1. ✓ Created Dataproc cluster with Spark Thrift server
2. ✓ Configured dbt-spark to connect via Thrift
3. ✓ Ran dbt transformations on Spark
4. ✓ Wrote results to BigQuery using Spark BigQuery connector

### Resources Created
- Dataproc Cluster: `dbt-spark-demo-cluster`
- GCS Bucket: `johanesa-dbt-spark-demo`
- BigQuery Dataset: `jaffle_shop_demo`
- BigQuery Tables: `customers`, `orders`