# GCS to BigQuery Data Ingestion Pipeline

Created: 2026-01-20

This notebook contains Spark code for ingesting payment data from Google Cloud Storage (GCS) to BigQuery.

## Configuration
- **Source**: Google Cloud Storage (GCS)
- **Destination**: BigQuery (prd-dagen.payments_v1)
- **Data Format**: Parquet (configurable)
- **Processing**: PySpark with DataFrame API

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import logging
from datetime import datetime

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

logger.info(f"Starting GCS to BigQuery ingestion pipeline at {datetime.now()}")

In [None]:
# Initialize Spark Session with BigQuery connector
spark = SparkSession.builder \
    .appName("gcs-to-bigquery-payments") \
    .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.32.1") \
    .getOrCreate()

# Set Spark logging level
spark.sparkContext.setLogLevel("INFO")

logger.info("Spark session initialized successfully")

In [None]:
# Configuration
GCS_BUCKET = "gs://your-gcs-bucket"  # Replace with your GCS bucket
GCS_PATH = f"{GCS_BUCKET}/payments-data"  # Path to payment data in GCS
DATA_FORMAT = "parquet"  # Can be: parquet, csv, json, orc

# BigQuery Configuration
BQ_PROJECT = "prd-dagen"
BQ_DATASET = "payments_v1"
BQ_TABLE = "payments_raw"  # Replace with your target table
BQ_TEMP_BUCKET = "gs://your-temp-bucket"  # Temporary bucket for BigQuery operations

logger.info(f"Configuration: GCS_PATH={GCS_PATH}, BQ_TABLE={BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}")

In [None]:
# Read data from GCS
try:
    logger.info(f"Reading {DATA_FORMAT.upper()} data from GCS: {GCS_PATH}")
    
    if DATA_FORMAT.lower() == "parquet":
        df = spark.read.parquet(GCS_PATH)
    elif DATA_FORMAT.lower() == "csv":
        df = spark.read.option("header", "true").option("inferSchema", "true").csv(GCS_PATH)
    elif DATA_FORMAT.lower() == "json":
        df = spark.read.json(GCS_PATH)
    elif DATA_FORMAT.lower() == "orc":
        df = spark.read.orc(GCS_PATH)
    else:
        raise ValueError(f"Unsupported data format: {DATA_FORMAT}")
    
    logger.info(f"Successfully read data from GCS. Row count: {df.count()}")
    logger.info(f"Schema: {df.schema}")
    
except Exception as e:
    logger.error(f"Error reading from GCS: {str(e)}")
    raise

In [None]:
# Data validation and cleaning
try:
    logger.info("Starting data validation and cleaning")
    
    # Add ingestion timestamp
    df = df.withColumn("ingestion_timestamp", current_timestamp())
    
    # Handle null values (example - adjust based on your data)
    # df = df.fillna(0, subset=["amount"])
    # df = df.fillna("UNKNOWN", subset=["category"])
    
    # Remove duplicates if needed
    # df = df.dropDuplicates()
    
    logger.info(f"Data validation complete. Final row count: {df.count()}")
    
except Exception as e:
    logger.error(f"Error during data validation: {str(e)}")
    raise

In [None]:
# Display sample data
logger.info("Sample of processed data:")
df.show(5, truncate=False)
print(f"\nTotal Rows: {df.count()}")
print(f"Schema:\n{df.printSchema()}")

In [None]:
# Write data to BigQuery
try:
    logger.info(f"Writing data to BigQuery: {BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}")
    
    df.write \
        .format("bigquery") \
        .option("table", f"{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}") \
        .option("temporaryGcsBucket", BQ_TEMP_BUCKET) \
        .option("writeMethod", "direct") \
        .mode("append") \
        .save()
    
    logger.info(f"Successfully wrote {df.count()} rows to BigQuery")
    print(f"✓ Data successfully ingested to BigQuery table: {BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}")
    
except Exception as e:
    logger.error(f"Error writing to BigQuery: {str(e)}")
    raise

In [None]:
# Verification: Query BigQuery to confirm data was written
try:
    logger.info("Verifying data in BigQuery")
    
    verification_df = spark.read \
        .format("bigquery") \
        .option("table", f"{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}") \
        .load()
    
    row_count = verification_df.count()
    logger.info(f"Verification successful. Total rows in BigQuery table: {row_count}")
    print(f"✓ Verification complete. Table contains {row_count} rows.")
    
except Exception as e:
    logger.error(f"Error during verification: {str(e)}")
    raise

In [None]:
# Cleanup
logger.info(f"Ingestion pipeline completed at {datetime.now()}")
spark.stop()
logger.info("Spark session stopped")