# Sheets to BigQuery - Spark Development Version

This notebook is designed for development and will be converted to a Python script for production execution on Dataproc Serverless.

It reads data from Google Sheets and writes to BigQuery using Spark DataFrames.

**Development:** Use this notebook for interactive development
**Production:** DAG converts this to `.py` script for Dataproc Serverless

In [None]:
# Parameters - Dual mode: batch (sys.argv) or interactive (defaults)
import sys
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Check if running with command-line arguments (batch mode)
if len(sys.argv) > 1:
    # BATCH MODE: Read from command-line arguments passed by Dataproc
    GCP_PROJECT = sys.argv[1]
    GCP_REGION = sys.argv[2] if len(sys.argv) > 2 else "us-central1"
    logger.info(f"Batch mode: project={GCP_PROJECT}, region={GCP_REGION}")
else:
    # INTERACTIVE MODE: Use defaults for development
    GCP_PROJECT = "johanesa-playground-326616"  # UPDATE THIS for your project
    GCP_REGION = "us-central1"
    logger.info(f"Interactive mode: project={GCP_PROJECT}, region={GCP_REGION}")

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import gspread
from oauth2client.service_account import ServiceAccountCredentials
from google.cloud import storage
import os

logger.info(f"Starting Spark notebook execution for project: {GCP_PROJECT}")

# Credentials path in GCS
credentials_path = f"gs://{GCP_PROJECT}-notebooks/credentials/drive-api.json"

In [None]:
# INTERACTIVE MODE ONLY - This cell is skipped during notebook-to-script conversion
# Create Spark session for Dataproc Serverless Interactive Session
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session, SparkConnectConfig

session_config = Session()
session_config.spark_connect_session = SparkConnectConfig()
# TODO: Replace with your actual session template
session_config.session_template = f'projects/{GCP_PROJECT}/locations/{GCP_REGION}/sessionTemplates/runtime-00000b96da90'

spark = DataprocSparkSession.builder \
    .projectId(GCP_PROJECT) \
    .location(GCP_REGION) \
    .dataprocSessionConfig(session_config) \
    .getOrCreate()

logger.info(f"Interactive Spark session created: {spark.version}")

In [None]:
# BATCH MODE - Create standard Spark session for Dataproc Serverless batch
# This cell runs in both modes but handles each case appropriately
try:
    # Check if spark session already exists (from interactive cell above)
    spark
    logger.info("Using existing Spark session (interactive mode)")
except NameError:
    # Create new Spark session for batch mode
    spark = SparkSession.builder \
        .appName("sheets-to-bigquery") \
        .getOrCreate()
    logger.info(f"Batch Spark session created: {spark.version}")

logger.info(f"BigQuery project: {GCP_PROJECT}")
logger.info(f"BigQuery location: {GCP_REGION}")

In [None]:
# Authenticate with Google Sheets API
scope = ["https://spreadsheets.google.com/feeds",
         "https://www.googleapis.com/auth/drive"]

# Download credentials from GCS to local file
local_creds = "drive-api.json"
if not os.path.exists(local_creds):
    logger.info(f"Downloading credentials from {credentials_path}")

    # Parse GCS path (gs://bucket-name/path/to/file)
    bucket_name = f"{GCP_PROJECT}-notebooks"
    blob_name = "credentials/drive-api.json"

    # Download using Python GCS client
    storage_client = storage.Client(project=GCP_PROJECT)
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    blob.download_to_filename(local_creds)

    logger.info("Credentials downloaded successfully")

credentials = ServiceAccountCredentials.from_json_keyfile_name(
    local_creds, scope)
googleClient = gspread.authorize(credentials)

logger.info("Successfully authenticated with Google Sheets API")

In [None]:
# Load legacy_charges data
LEGACY_CHARGES_SHEET_ID = os.getenv(
    "LEGACY_CHARGES_SHEET_ID",
    "1kQENu6sumzEQX60fjQtgmXvwPGlUfaNRgW7v_TWFUXo"
)

sheet = googleClient.open_by_key(LEGACY_CHARGES_SHEET_ID)
worksheet = sheet.get_worksheet(0)
legacy_charges = worksheet.get_all_records(numericise_ignore=['all'])

# Clean data
for record in legacy_charges:
    record['mid_label'] = None if record['mid_label'] == '' else record['mid_label']
    record['installment_count'] = None if record['installment_count'] == '' else int(
        record['installment_count'])

# Convert to Spark DataFrame
legacy_charges_df = spark.createDataFrame(legacy_charges).withColumn(
    "installment_count", col("installment_count").cast("int"))

logger.info(
    f"Successfully loaded {legacy_charges_df.count()} records from legacy_charges sheet")

In [None]:
# Load merchant_send_mid_label data
MERCHANT_SEND_MID_LABEL_SHEET_ID = os.getenv(
    "MERCHANT_SEND_MID_LABEL_SHEET_ID",
    "1_8sm8QciAU3T8oDlNS1Pfj-GQlmlJBrAi1TYdnnMlkw"
)

sheet = googleClient.open_by_key(MERCHANT_SEND_MID_LABEL_SHEET_ID)
worksheet = sheet.get_worksheet(0)
merchant_send_mid_label = worksheet.get_all_records()
merchant_send_mid_label_df = spark.createDataFrame(merchant_send_mid_label)

logger.info(
    f"Successfully loaded {merchant_send_mid_label_df.count()} records from merchant_send_mid_label sheet")

In [None]:
# Load merchant_excluded data
MERCHANT_EXCLUDED_SHEET_ID = os.getenv(
    "MERCHANT_EXCLUDED_SHEET_ID",
    "1orVBlPP77HTt9d8x-lC1Oo5xrPp0r1FgVUQ-43DYqYc"
)

sheet = googleClient.open_by_key(MERCHANT_EXCLUDED_SHEET_ID)
worksheet = sheet.get_worksheet(0)
merchant_excluded = worksheet.get_all_records()
merchant_excluded_df = spark.createDataFrame(merchant_excluded)

logger.info(
    f"Successfully loaded {merchant_excluded_df.count()} records from merchant_excluded sheet")

In [None]:
# Preview data
legacy_charges_df.show(5)

In [None]:
# Write to BigQuery temp tables using Spark BigQuery connector
legacy_charges_df.write \
    .format("bigquery") \
    .option("table", f"{GCP_PROJECT}.temp.legacy_charges") \
    .option("writeMethod", "direct") \
    .mode("overwrite") \
    .save()

merchant_send_mid_label_df.write \
    .format("bigquery") \
    .option("table", f"{GCP_PROJECT}.temp.merchant_send_mid_label") \
    .option("writeMethod", "direct") \
    .mode("overwrite") \
    .save()

merchant_excluded_df.write \
    .format("bigquery") \
    .option("table", f"{GCP_PROJECT}.temp.merchant_excluded") \
    .option("writeMethod", "direct") \
    .mode("overwrite") \
    .save()

logger.info("Successfully wrote all dataframes to BigQuery temp tables")

In [None]:
# Create final results table using Spark SQL
RESULTS_TABLE = os.getenv("RESULTS_TABLE", "temp.filtered_legacy_charges")

# Register temp views
legacy_charges_df.createOrReplaceTempView("legacy_charges")
merchant_send_mid_label_df.createOrReplaceTempView("merchant_send_mid_label")
merchant_excluded_df.createOrReplaceTempView("merchant_excluded")

# Execute SQL query
result_df = spark.sql("""
SELECT
    lc.*,
    CURRENT_TIMESTAMP() as processed_at
FROM
    legacy_charges lc
LEFT JOIN
    merchant_send_mid_label msml ON lc.business_id = msml.business_id
LEFT JOIN
    merchant_excluded me ON lc.business_id = me.business_id
WHERE
    msml.business_id IS NULL AND me.business_id IS NULL
""")

# Write results to BigQuery
result_df.write \
    .format("bigquery") \
    .option("table", f"{GCP_PROJECT}.{RESULTS_TABLE}") \
    .option("writeMethod", "direct") \
    .mode("overwrite") \
    .save()

logger.info(f"Successfully created table {RESULTS_TABLE}")

In [None]:
# Get row count and log completion
count = result_df.count()

logger.info(f"Successfully created table {RESULTS_TABLE} with {count} records")
logger.info(
    f"View results: https://console.cloud.google.com/bigquery?project={GCP_PROJECT}&d=temp&t=filtered_legacy_charges&page=table")
logger.info("Notebook execution completed successfully")

# Stop Spark session
spark.stop()