# Dataproc with BigQuery Metastore Configuration
A step-by-step guide to setting up a single-node Hadoop environment on Dataproc with a BigLake Metastore.

**Note:** Steps 1 to 3 are executed in your local environment to provision the cluster. The subsequent steps are intended to be run within the Dataproc Jupyter notebook.

## Step 1: Define Cluster Configuration
First, set the configuration for your Dataproc cluster. **Replace the placeholder values** with your Google Cloud project details.

In [None]:
# IMPORTANT: Fill in these values before running!
PROJECT_ID = "johanesa-playground-326616"  # Your Google Cloud project ID
REGION = "us-central1"      # The region to create the cluster in
CLUSTER_NAME = "my-single-node-cluster"  # A name for your Dataproc cluster
# A unique GCS bucket name. Using the project ID as a prefix is a good practice.
BUCKET_NAME = f"{PROJECT_ID}-dataproc-bucket"

# Local Hive
HIVE_DB = "my_hive_db"
HIVE_TABLE = f"{HIVE_DB}.people_hive"

# Catalog for Iceberg on Hive
ICEBERG_HIVE_CATALOG = "iceberg_on_hive"
ICEBERG_HIVE_DB = "my_iceberg_db"
ICEBERG_HIVE_TABLE = f"{ICEBERG_HIVE_CATALOG}.{ICEBERG_HIVE_DB}.people_iceberg"
ICEBERG_HIVE_FROM_BQ = f"{ICEBERG_HIVE_CATALOG}.{ICEBERG_HIVE_DB}.people_filtered_bq"
ICEBERG_HIVE_FROM_SPARK = f"{ICEBERG_HIVE_CATALOG}.{ICEBERG_HIVE_DB}.people_filtered_spark"

# Catalog for Iceberg on BigLake Metastore
BQ_DATASET = "my_iceberg_metastore"
BQ_TABLE = f"{BQ_DATASET}.people_biglake"
ICEBERG_BQ_CATALOG = "iceberg_on_bq"
ICEBERG_BIGLAKE_TABLE = f"{ICEBERG_BQ_CATALOG}.{BQ_TABLE}"
ICEBERG_BIGLAKE_FROM_SPARK = f"{ICEBERG_BQ_CATALOG}.{BQ_DATASET}.people_filtered_spark"

## Step 2: Create and Upload Initialization Script
This script downloads the necessary JARs for Iceberg and BigQuery integration and places them in Spark's classpath. The script is then uploaded to a GCS bucket to be used during cluster creation.

In [None]:
# This multi-line string contains the exact shell script content.
init_script_content = """#!/bin/bash
# install-jars.sh

# This script downloads required JARs for Iceberg + BigQuery Catalog integration.
# It places them directly in Spark's classpath.

set -e -x

# Define variables for JARs
ICEBERG_RUNTIME_URL="https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar"
BQ_CATALOG_JAR_GCS="gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar"

# Download the JARs directly into Spark's main jars directory
wget -P /usr/lib/spark/jars/ "$ICEBERG_RUNTIME_URL"
gsutil cp "$BQ_CATALOG_JAR_GCS" /usr/lib/spark/jars/
"""
# Define the local and GCS paths for the script
local_script_path = "install-jars.sh"
gcs_script_path = f"gs://{BUCKET_NAME}/scripts/{local_script_path}"

# Write the content to a local file
with open(local_script_path, "w") as f:
    f.write(init_script_content)

print(f"Initialization script created locally at: {local_script_path}")

# Upload the local script to GCS using a shell command
!gsutil cp {local_script_path} {gcs_script_path}

print(f"Successfully uploaded script to: {gcs_script_path}")

## Step 3: Create a custom Dataproc Cluster with Iceberg Catalog
This step creates the Dataproc cluster with the `--properties` flag to manually set all required Spark configurations. This ensures the default Spark session for the entire cluster is correctly configured, preventing errors in the interactive notebook environment.

In [None]:
# Define the catalog properties for the Dataproc cluster.
properties_list = [
    f"spark:spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    f"spark:spark.sql.catalog.{ICEBERG_HIVE_CATALOG}=org.apache.iceberg.spark.SparkCatalog",
    f"spark:spark.sql.catalog.{ICEBERG_HIVE_CATALOG}.type=hive",
    f"spark:spark.sql.catalog.{ICEBERG_HIVE_CATALOG}.warehouse=gs://{BUCKET_NAME}/iceberg_on_hive",
    f"spark:spark.sql.catalog.{ICEBERG_BQ_CATALOG}=org.apache.iceberg.spark.SparkCatalog",
    f"spark:spark.sql.catalog.{ICEBERG_BQ_CATALOG}.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog",
    f"spark:spark.sql.catalog.{ICEBERG_BQ_CATALOG}.gcp_project={PROJECT_ID}",
    f"spark:spark.sql.catalog.{ICEBERG_BQ_CATALOG}.location={REGION}",
    f"spark:spark.sql.catalog.{ICEBERG_BQ_CATALOG}.warehouse=gs://{BUCKET_NAME}/iceberg_on_bq"
]
PROPERTIES = ",".join(properties_list)

# The final gcloud command to create the cluster
# It references the GCS path of the script we just uploaded
!gcloud dataproc clusters create {CLUSTER_NAME} \
    --project {PROJECT_ID} \
    --region {REGION} \
    --single-node \
    --image-version 2.2-debian12 \
    --optional-components=JUPYTER \
    --enable-component-gateway \
    --bucket {BUCKET_NAME} \
    --initialization-actions={gcs_script_path} \
    --properties="{PROPERTIES}"

print(f"Cluster '{CLUSTER_NAME}' creation process initiated.")

## Step 4: Access Jupyter and Get Spark Session
Once the cluster is running, access the Jupyter environment. Since all configurations were set at the cluster level, you can get the pre-configured Spark session without any additional setup.

In [None]:
# Verify that the Spark session is active and configured correctly.
from pyspark.sql import SparkSession

# Get the existing, pre-configured Spark session. DO NOT use spark.stop()
spark = SparkSession.builder.getOrCreate()

print("Default Spark session is active and configured correctly. Ready to use.")

## Step 5: Create a Sample DataFrame and Write to HDFS
Create a sample DataFrame and write it to HDFS as a Parquet file. This data will be used in subsequent steps.

In [None]:
# Create a sample DataFrame that will be used throughout the notebook.
df = spark.createDataFrame(
    [('Alice', 25), ('Bob', 30), ('Charlie', 35)], ['name', 'age'])

# Write the DataFrame to HDFS as a Parquet file.
df.write.mode('overwrite').parquet('/user/my_data/people')

# Verify that the Parquet file was created in HDFS.
!hdfs dfs -ls /user/my_data/people

## Step 6: Create a Hive Table
Create a Hive table using the data stored in HDFS. The Hive warehouse location was configured at the cluster level, so this operation will succeed.

In [None]:
# Create a new Hive database if it doesn't already exist.
spark.sql(
    f"CREATE DATABASE IF NOT EXISTS {HIVE_DB} LOCATION 'hdfs:///user/hive_db'")

# Drop the table if it exists to ensure a clean slate.
spark.sql(f"DROP TABLE IF EXISTS {HIVE_TABLE}")

# Create an external Hive table pointing to the Parquet file in HDFS.
spark.sql(f"""
    CREATE EXTERNAL TABLE IF NOT EXISTS {HIVE_TABLE} (
        name STRING,
        age BIGINT
    )
    STORED AS PARQUET
    LOCATION '/user/my_data/people'
""")

print("--- Standard Hive Table ---")
# Query the Hive table to verify its contents.
spark.sql(f"SELECT * FROM {HIVE_TABLE}").show()

## Step 7: Create an Iceberg Table with Hive Metastore
Create an Iceberg table using the Hive metastore. The `iceberg_on_hive` is used for the Iceberg tables.

In [None]:
print("Verifying the pre-configured Iceberg catalog...")
# Show the databases in the Hive Iceberg catalog.
spark.sql(f"SHOW DATABASES IN {ICEBERG_HIVE_CATALOG}").show()

# Create a new database in the Hive Iceberg catalog if it doesn't already exist.
spark.sql(
    f"CREATE DATABASE IF NOT EXISTS {ICEBERG_HIVE_CATALOG}.{ICEBERG_HIVE_DB}")
# Write the DataFrame to a new Iceberg table in the Hive metastore.
spark.sql(f"SELECT * FROM {HIVE_TABLE}") \
    .write \
    .format("iceberg") \
    .mode("overwrite") \
    .saveAsTable(ICEBERG_HIVE_TABLE)

print("\n--- Iceberg Table on Internal Hive Metastore ---")
# Query the Iceberg table to verify its contents.
spark.sql(f"SELECT * FROM {ICEBERG_HIVE_TABLE}").show()

## Step 8: Create an Iceberg Table with BigLake Metastore
Create an Iceberg table using the BigLake metastore. The `iceberg_on_bq` is used for the Iceberg tables.

In [None]:
print("Verifying the pre-configured BQ catalog...")
# Show the databases in the BigQuery Iceberg catalog.
spark.sql(f"SHOW DATABASES IN {ICEBERG_BQ_CATALOG}").show()

# Create the BigQuery dataset that will act as the metastore
!bq mk --connection --location={REGION} --project_id={PROJECT_ID} --connection_type=CLOUD_RESOURCE default-{REGION}
!bq --location={REGION} mk --dataset {PROJECT_ID}:{BQ_DATASET}

# Save the DataFrame to Iceberg table in the BigLake metastore.
spark.sql(f"DROP TABLE IF EXISTS {ICEBERG_BIGLAKE_TABLE}")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS
  {ICEBERG_BIGLAKE_TABLE} ( name string,
    age int )
USING
  ICEBERG TBLPROPERTIES ('bq_connection'='projects/{PROJECT_ID}/locations/{REGION}/connections/default-{REGION}');
""")

# Save the DataFrame as an Iceberg table in the BigLake metastore.
spark.sql(f"SELECT * FROM {ICEBERG_HIVE_TABLE}") \
    .write \
    .format("iceberg") \
    .mode("overwrite") \
    .save(ICEBERG_BIGLAKE_TABLE)

# Query the Iceberg table from Spark to verify its contents.
# spark.sql(f"SELECT * FROM {ICEBERG_BIGLAKE_TABLE}").show()

sql_query = f"SELECT * FROM {BQ_TABLE}"
df = spark.read \
    .format("bigquery") \
    .option("viewsEnabled", "true") \
    .option("query", sql_query) \
    .option("materializationDataset", BQ_DATASET) \
    .load()
df.show()

## Step 9: Push Down Computation with BigQuery Connector using SQL
Now, we'll use the BigQuery Connector to execute a raw SQL query directly within the BigQuery engine. The connector will send the query to BigQuery, which will process it, and only the final results will be written to Internal Hive Metastore.

In [None]:
# Create the BigQuery SQL query as a string.
# The entire query will be executed inside the BigQuery engine.
bq_sql_query = f"""
SELECT
    name,
    age
FROM
    {BQ_TABLE}
WHERE
    age > 28
"""

print("--- Sending this SQL query to BigQuery for execution ---")
print(bq_sql_query)

# Use the 'bigquery' format with the 'query' option.
# Spark sends the query to BigQuery and loads the result set.
filtered_df = spark.read \
    .format("bigquery") \
    .option("viewsEnabled", "true") \
    .option("query", bq_sql_query) \
    .option("materializationDataset", BQ_DATASET) \
    .load()

print(f"\n--- Data returned from BigQuery query ---")
filtered_df.show()

# Finally, save the small, filtered result set back to Internal Hive Metastore.
filtered_df.write.format("iceberg").mode(
    "overwrite").saveAsTable(ICEBERG_HIVE_FROM_BQ)

print("\n--- Iceberg Table on Internal Hive Metastore ---")
spark.sql(f"SELECT * FROM {ICEBERG_HIVE_FROM_BQ}").show()

## Step 10: Push Down Computation with Serverless Spark
Now, we'll use the Serverless Spark to execute a raw SQL query. The Serverless Spark which will process it, and only the final results will be written to Internal Hive Metastore.

In [None]:
# Create the Python script content for the Spark-native job
pyspark_job_content = f"""
from pyspark.sql import SparkSession

# These values are configured via --properties in the gcloud command
ICEBERG_BIGLAKE_TABLE = "{ICEBERG_BIGLAKE_TABLE}"

# Define a new table name for the serverless job's output
OUTPUT_TABLE = f"{ICEBERG_BIGLAKE_FROM_SPARK}"

def main():
    # A standard Spark session. Its catalog config will be injected by Dataproc.
    spark = SparkSession.builder \\
        .appName("Dataproc Serverless Spark Engine Filter") \\
        .getOrCreate()

    # This is a Spark SQL query, not a BigQuery query.
    # Spark reads the data from GCS and filters it in its own engine.
    filter_query = f'''
    SELECT
        name,
        age
    FROM
        {{ICEBERG_BIGLAKE_TABLE}}
    WHERE
        age > 28
    '''

    print(f"--- Running this Spark SQL query: {{filter_query}} ---")

    filtered_df = spark.sql(filter_query)

    print("--- Filtered data computed by Spark ---")
    filtered_df.show()

    # Save the results to a different Iceberg table (one using the Hive catalog)
    print(f"--- Saving results to Iceberg table: {{OUTPUT_TABLE}} ---")
    filtered_df.write \\
        .format("iceberg") \\
        .mode("overwrite") \\
        .saveAsTable(OUTPUT_TABLE)

    print("Job completed successfully!")

if __name__ == "__main__":
    main()
"""

# Define local and GCS paths for the new job script
local_job_path = "filter_job_spark_sql.py"
gcs_job_path = f"gs://{BUCKET_NAME}/scripts/{local_job_path}"

# Write and upload the script
with open(local_job_path, "w") as f:
    f.write(pyspark_job_content)

!gsutil cp {local_job_path} {gcs_job_path}

print(f"Successfully uploaded Spark SQL job script to: {gcs_job_path}")

In [None]:
# Define the catalog properties for the serverless Dataproc cluster.
properties_list = [
    f"spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    f"spark.sql.catalog.{ICEBERG_HIVE_CATALOG}=org.apache.iceberg.spark.SparkCatalog",
    f"spark.sql.catalog.{ICEBERG_HIVE_CATALOG}.type=hive",
    f"spark.sql.catalog.{ICEBERG_HIVE_CATALOG}.warehouse=gs://{BUCKET_NAME}/iceberg_on_hive",
    f"spark.sql.catalog.{ICEBERG_BQ_CATALOG}=org.apache.iceberg.spark.SparkCatalog",
    f"spark.sql.catalog.{ICEBERG_BQ_CATALOG}.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog",
    f"spark.sql.catalog.{ICEBERG_BQ_CATALOG}.gcp_project={PROJECT_ID}",
    f"spark.sql.catalog.{ICEBERG_BQ_CATALOG}.location={REGION}",
    f"spark.sql.catalog.{ICEBERG_BQ_CATALOG}.warehouse=gs://{BUCKET_NAME}/iceberg_on_bq"
]
PROPERTIES = ",".join(properties_list)

# Submit the PySpark job to a serverless Dataproc cluster.
!gcloud dataproc batches submit pyspark {gcs_job_path} \
    --project={PROJECT_ID} \
    --region={REGION} \
    --batch="serverless-spark-engine-job" \
    --version="2.2" \
    --subnet="default" \
    --jars="gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar" \
    --properties="{PROPERTIES}"

print("\nServerless batch job submitted. You can monitor its progress in the Google Cloud Console.")

In [None]:
# Query the table created by the serverless Spark job.
print(f"\n--- Data returned from Spark query ---")
spark.sql(f"SELECT * FROM {ICEBERG_BIGLAKE_FROM_SPARK}").show()

# Create a new Iceberg table from the results of the serverless Spark job.
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {ICEBERG_HIVE_FROM_SPARK}
    USING iceberg
    AS
    SELECT * FROM {ICEBERG_BIGLAKE_FROM_SPARK}
""")

print("\n--- Iceberg Table on Internal Hive Metastore ---")
spark.sql(f"SELECT * FROM {ICEBERG_HIVE_FROM_SPARK}").show()

## Step 11: Clean Up Resources
Delete the Dataproc cluster to avoid incurring further charges.

In [None]:
# Delete the Dataproc cluster.
!gcloud dataproc clusters delete {CLUSTER_NAME} --region {REGION}