### Configure Spark-GCS and Spark-BigQuery Connector

In [1]:
# Suppress Warnings
import warnings
warnings.filterwarnings("ignore")

import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import types 

import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

In [2]:
creds_location = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")

In [3]:
conf = SparkConf() \
    .setMaster("spark://vm-instance-nyc-taxi.asia-southeast1-a.c.de-project-nyc-taxi.internal:7077") \
    .setAppName("gcs_to_bigquery") \
    .set(
        "spark.jars",
        "/home/salacjamesrhode23/connectors/gcs-connector-hadoop3-2.2.5.jar,"
        "/home/salacjamesrhode23/connectors/spark-bigquery-with-dependencies_2.12-0.42.4.jar"
    ) \
    .set("google.cloud.auth.service.account.enable", "true") \
    .set("google.cloud.auth.service.account.json.keyfile", creds_location)

In [4]:
sc = SparkContext(conf=conf)
hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", creds_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")



25/11/15 02:13:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [6]:
spark.conf.set("temporaryGcsBucket","temporary_bucket_001")

In [7]:
spark

In [8]:
break

SyntaxError: 'break' outside loop (668683560.py, line 1)

### Data Transformation and Ingestion Logic + Write to BigQuery in parallel

#### Process Email Orders Spark Dataframe to Parquet

In [None]:
df_sample = pd.read_csv("gs://ecomm_bucket001/output_files/from_emails/email_orders_20251114_130435.csv")
df_sample.dtypes

In [None]:
# Alignment check
df_sample['qty'].unique()

In [None]:
schema = types.StructType([
	types.StructField('customer', types.StringType(), True),
	types.StructField('product', types.StringType(), True),
	types.StructField('sku', types.StringType(), True),
	types.StructField('qty', types.IntegerType(), True),
	types.StructField('price', types.StringType(), True),
	types.StructField('line_total', types.StringType(), True),
	types.StructField('total_amount', types.StringType(), True),
	types.StructField('payment_method', types.StringType(), True),
	types.StructField('payment_reference', types.StringType(), True),
	types.StructField('order_date', types.TimestampType(), True),
	types.StructField('payment_date', types.TimestampType(), True)
])

In [None]:
df_email_orders = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .option("sep", ",") \
    .option("quote", '"') \
    .option("escape", '"') \
    .option("multiLine", "true") \
    .option("ignoreLeadingWhiteSpace", "true") \
    .option("ignoreTrailingWhiteSpace", "true") \
    .option("encoding", "utf-8") \
    .option("mode", "FAILFAST") \
    .csv("gs://ecomm_bucket001/output_files/from_emails/*.csv")

In [None]:
# Alignement check:
df_email_orders.select("qty").distinct().show()

In [None]:
# Write to customers table
df_email_orders.write \
    .format("bigquery") \
    .option("table", "de-project-nyc-taxi.ecomm_staging.raw_orders_emails") \
    .option("temporaryGcsBucket", "ecomm_bucket001") \
    .mode("append") \
    .save()

#### Process Database Orders Spark Dataframe to Parquet

In [None]:
df_sample = pd.read_csv("gs://ecomm_bucket001/output_files/from_database/orders_20251114_140244.csv")
df_sample.dtypes

In [None]:
# Alignment check
df_sample['year'].unique()

In [None]:
schema = types.StructType([
    types.StructField('order_number', types.StringType(), True),
    types.StructField('order_date', types.TimestampType(), True),
    types.StructField('year', types.IntegerType(), True),
    types.StructField('billing_name', types.StringType(), True),
    types.StructField('lineitem_name', types.StringType(), True),
    types.StructField('lineitem_qty', types.IntegerType(), True),
    types.StructField('payment_method', types.StringType(), True),
    types.StructField('payment_reference', types.StringType(), True),
    types.StructField('payment_date', types.TimestampType(), True),
    types.StructField('fulfillment_date', types.TimestampType(), True),
    types.StructField('first_name', types.StringType(), True),
    types.StructField('last_name', types.StringType(), True),
    types.StructField('email', types.StringType(), True),
    types.StructField('address_company', types.StringType(), True),
    types.StructField('address_city', types.StringType(), True),
    types.StructField('address_province', types.StringType(), True),
    types.StructField('address_zip', types.StringType(), True),
    types.StructField('phone', types.StringType(), True),
    types.StructField('product_sku', types.StringType(), True),
    types.StructField('product_description', types.StringType(), True),
    types.StructField('vendor', types.StringType(), True),
    types.StructField('product_category', types.StringType(), True),
    types.StructField('unit_price', types.FloatType(), True),
    types.StructField('image_src', types.StringType(), True)
    ])

In [None]:
df_database_orders = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .option("sep", ",") \
    .option("quote", '"') \
    .option("escape", '"') \
    .option("multiLine", "true") \
    .option("ignoreLeadingWhiteSpace", "true") \
    .option("ignoreTrailingWhiteSpace", "true") \
    .option("encoding", "utf-8") \
    .option("mode", "FAILFAST") \
    .csv("gs://ecomm_bucket001/output_files/from_database/*.csv")

In [None]:
# Alignement check:
df_database_orders.select("year").distinct().show()

In [None]:
# Write to customers table
df_database_orders.write \
    .format("bigquery") \
    .option("table", "de-project-nyc-taxi.ecomm_staging.raw_orders_postgres") \
    .option("temporaryGcsBucket", "ecomm_bucket001") \
    .mode("append") \
    .save()

#### Process Faker Orders Spark Dataframe to Parquet

In [9]:
df_faker_orders = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .parquet("gs://ecomm_bucket001/output_files/from_faker/*.parquet")

                                                                                

In [10]:
# Write to orders table
df_faker_orders.repartition(15) \
    .write \
    .format("bigquery") \
    .option("table", "de-project-nyc-taxi.ecomm_staging.raw_orders_faker") \
    .option("temporaryGcsBucket", "ecomm_bucket001") \
    .mode("append") \
    .save()

                                                                                

#### Process Product Dimension Spark Dataframe to Parquet

In [None]:
df_sample = pd.read_csv("gs://ecomm_bucket001/output_files/from_api/products.csv")
df_sample.dtypes

In [None]:
# Alignment check
df_sample['Unit Price'].unique()

In [None]:
schema = types.StructType([
    types.StructField('Image Src', types.StringType(), True), 
    types.StructField('Product Category', types.StringType(), True), 
    types.StructField('Product Description', types.StringType(), True), 
    types.StructField('Product SKU', types.StringType(), True), 
    types.StructField('Title', types.StringType(), True), 
    types.StructField('Unit Price', types.FloatType(), True), 
    types.StructField('Vendor', types.StringType(), True)
    ])

In [None]:
df_products = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .option("sep", ",") \
    .option("quote", '"') \
    .option("escape", '"') \
    .option("multiLine", "true") \
    .option("ignoreLeadingWhiteSpace", "true") \
    .option("ignoreTrailingWhiteSpace", "true") \
    .option("encoding", "utf-8") \
    .option("mode", "FAILFAST") \
    .csv("gs://ecomm_bucket001/output_files/from_api/products.csv")

In [None]:
# Alignement check:
df_products.select('Unit Price').distinct().show()

In [None]:
# Write to products table
df_products.write \
    .format("bigquery") \
    .option("table", "de-project-nyc-taxi.ecomm_staging.raw_products") \
    .option("temporaryGcsBucket", "ecomm_bucket001") \
    .mode("append") \
    .save()

#### Process Customers Dimension Spark Dataframe to Parquet

In [None]:
df_sample = pd.read_csv("gs://ecomm_bucket001/output_files/from_api/customers.csv")
df_sample.dtypes

In [None]:
# Alignment check
df_sample['Address Zip'].unique()

In [None]:
schema = StructType([
    StructField('Address City', StringType(), True), 
    StructField('Address Company', StringType(), True), 
    StructField('Address Province', StringType(), True), 
    StructField('Address Zip', IntegerType(), True), 
    StructField('Email', StringType(), True), 
    StructField('First Name', StringType(), True), 
    StructField('Full Name', StringType(), True), 
    StructField('Last Name', StringType(), True), 
    StructField('Phone', StringType(), True)
    ])

In [None]:
df_customers = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .option("sep", ",") \
    .option("quote", '"') \
    .option("escape", '"') \
    .option("multiLine", "true") \
    .option("ignoreLeadingWhiteSpace", "true") \
    .option("ignoreTrailingWhiteSpace", "true") \
    .option("encoding", "utf-8") \
    .option("mode", "FAILFAST") \
    .csv("gs://ecomm_bucket001/output_files/from_api/customers.csv")

In [None]:
# Alignement check:
df_customers.select('Address Zip').distinct().show()

In [None]:
# Write to products table
df_customers.write \
    .format("bigquery") \
    .option("table", "de-project-nyc-taxi.ecomm_staging.raw_customers") \
    .option("temporaryGcsBucket", "ecomm_bucket001") \
    .mode("append") \
    .save()