# Spark - Batch processing

## Create spark session

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

credentials_location = '/workspaces/de-retail-sales/creds/my-creds.json'
project_id = 'woven-edge-412500'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('gcs_bigquery') \
    .set("spark.jars", "/home/codespace/bin/gcs-connector-hadoop3-latest.jar, /home/codespace/bin/spark-3.1-bigquery-0.36.1.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location) \
    .set("spark.hadoop.google.cloud.auth.project.id", project_id)

sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .config("spark.sql.extensions", "com.google.cloud.spark.bigquery.BigQuerySparkRegistrator") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location) \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .config("spark.hadoop.google.cloud.auth.project.id", project_id) \
    .getOrCreate()


24/03/23 01:28:52 WARN Utils: Your hostname, codespaces-eeb36d resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
24/03/23 01:28:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/03/23 01:28:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
spark

## Load and process data from gcs bucket

In [3]:
df = spark.read.parquet('gs://woven-edge-412500-de-retail-sales-bucket/retail_data/*')

                                                                                

In [4]:
df.show()

                                                                                

+----+-----+--------------------+---------+--------------------+---------+------------+----------------+---------------+
|year|month|            supplier|item_code|    item_description|item_type|retail_sales|retail_transfers|warehouse_sales|
+----+-----+--------------------+---------+--------------------+---------+------------+----------------+---------------+
|2020|    1|REPUBLIC NATIONAL...|   100009| BOOTLEG RED - 750ML|     WINE|         0.0|             0.0|            2.0|
|2020|    1|           PWSWN INC|   100024|MOMENT DE PLAISIR...|     WINE|         0.0|             1.0|            4.0|
|2020|    1|RELIABLE CHURCHIL...|     1001|S SMITH ORGANIC P...|     BEER|         0.0|             0.0|            1.0|
|2020|    1|LANTERNA DISTRIBU...|   100145|SCHLINK HAUS KABI...|     WINE|         0.0|             0.0|            1.0|
|2020|    1|DIONYSOS IMPORTS INC|   100293|SANTORINI GAVALA ...|     WINE|        0.82|             0.0|            0.0|
|2020|    1|KYSELA PERE ET FI...

In [5]:
df.count()

307645

In [6]:
import pandas as pd 
import argparse
import logging

from pyspark.sql.functions import monotonically_increasing_id

logging.basicConfig(level=logging.INFO, 
                              format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

In [7]:
# Supplier Dimension Table Creation
supplier_df = df.select("supplier").dropDuplicates().withColumnRenamed("supplier", "SUPPLIER").withColumn("supplier_id", monotonically_increasing_id() + 1)

# Item Dimension Table Creation
item_df = df.selectExpr("item_code", "item_type", "item_description").dropDuplicates().withColumnRenamed("item_code", "ITEM_CODE")

# Date Dimension Table Creation
date_df = df.select("year", "month").dropDuplicates().withColumnRenamed("year", "YEAR").withColumnRenamed("month", "MONTH").withColumn("DATE_ID", monotonically_increasing_id() + 1)

# Fact Table Creation
fact_table = df.join(supplier_df, "SUPPLIER") \
    .join(item_df, df["item_code"] == item_df["ITEM_CODE"]) \
    .join(date_df, (df["year"] == date_df["YEAR"]) & (df["month"] == date_df["MONTH"])) \
    .select(df["item_code"], supplier_df["supplier_id"], date_df["DATE_ID"], df["retail_sales"], df["retail_transfers"], df["warehouse_sales"]) \
    .dropDuplicates()

# Lowercase column names for all DataFrames
supplier_df = supplier_df.toDF(*[col.lower() for col in supplier_df.columns])
item_df = item_df.toDF(*[col.lower() for col in item_df.columns])
date_df = date_df.toDF(*[col.lower() for col in date_df.columns])
fact_table = fact_table.toDF(*[col.lower() for col in fact_table.columns])

# Show the resulting fact table
fact_table.show()
fact_table.count()


24/03/23 01:29:06 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

+---------+-----------+-------+------------+----------------+---------------+
|item_code|supplier_id|date_id|retail_sales|retail_transfers|warehouse_sales|
+---------+-----------+-------+------------+----------------+---------------+
|    12483|        286|     11|         0.0|             0.0|            8.0|
|    12527|        268|     11|       12.67|            14.0|            0.0|
|    12976|        386|     11|        1.75|             3.0|           24.0|
|    13722|         68|     11|        0.24|             0.0|            0.0|
|   166280|        157|     11|        1.05|             1.0|            2.0|
|    17000|        217|     11|       29.29|            28.0|            3.0|
|    18104|        275|     11|        1.01|             1.0|            0.0|
|   185515|        376|     11|        1.32|             2.0|           26.0|
|   232662|        265|     11|        0.24|             0.0|           13.0|
|    23433|        353|     11|        31.5|            57.0|   

                                                                                

307645

In [8]:
supplier_df.show(5)
supplier_df.count()

+--------------------+-----------+
|            supplier|supplier_id|
+--------------------+-----------+
|  BARON FRANCOIS LTD|          1|
|STE MICHELLE WINE...|          2|
|AZIZ SHAFI TANNIC...|          3|
|           PWSWN INC|          4|
|  E & J GALLO WINERY|          5|
+--------------------+-----------+
only showing top 5 rows



397

In [9]:
item_df.show(5)
item_df.count()

                                                                                

+---------+---------+--------------------+
|item_code|item_type|    item_description|
+---------+---------+--------------------+
|    11274|     WINE|BLACKSTONE CALIF ...|
|    11545|     BEER|BRECKENRIDGE MANG...|
|    11860|   LIQUOR|ELIJAH CRAIG BOUR...|
|    12709|     BEER|TWO ROADS PERSIAN...|
|   134066|     WINE|TOMMASI LE ROSSE ...|
+---------+---------+--------------------+
only showing top 5 rows



                                                                                

35165

In [10]:
date_df.show(5)
date_df.count()

+----+-----+-------+
|year|month|date_id|
+----+-----+-------+
|2019|    5|      1|
|2019|    1|      2|
|2017|   10|      3|
|2017|    6|      4|
|2020|    3|      5|
+----+-----+-------+
only showing top 5 rows



24

In [11]:
# Convert to function
def data_modeling(df):
    ### Building the star schema
    df.createOrReplaceTempView("df_view")

    # Supplier Dimension Table Creation
    supplier_df = df.select("supplier").dropDuplicates().withColumnRenamed("supplier", "SUPPLIER").withColumn("supplier_id", monotonically_increasing_id() + 1)
    
    # Item Dimension Table Creation
    item_df = df.selectExpr("item_code", "item_type", "item_description").dropDuplicates().withColumnRenamed("item_code", "ITEM_CODE")
    
    # Date Dimension Table Creation
    date_df = df.select("year", "month").dropDuplicates().withColumnRenamed("year", "YEAR").withColumnRenamed("month", "MONTH").withColumn("DATE_ID", monotonically_increasing_id() + 1)
    
    # Fact Table Creation
    fact_table = df.join(supplier_df, "SUPPLIER") \
        .join(item_df, df["item_code"] == item_df["ITEM_CODE"]) \
        .join(date_df, (df["year"] == date_df["YEAR"]) & (df["month"] == date_df["MONTH"])) \
        .select(df["item_code"], supplier_df["supplier_id"], date_df["DATE_ID"], df["retail_sales"], df["retail_transfers"], df["warehouse_sales"]) \
        .dropDuplicates()

    # Lowercase column names for all DataFrames
    supplier_df = supplier_df.toDF(*[col.lower() for col in supplier_df.columns])
    item_df = item_df.toDF(*[col.lower() for col in item_df.columns])
    date_df = date_df.toDF(*[col.lower() for col in date_df.columns])
    fact_table = fact_table.toDF(*[col.lower() for col in fact_table.columns])

    return {
        "supplier": supplier_df,
        "item": item_df,
        "date": date_df,
        "fact_table": fact_table
    }

In [12]:
# Call the data_modeling function and store the result in dict_tables
star_schema = data_modeling(df)

# Print the output of the data_modeling function
for key, value in star_schema.items():
    print(f"Table: {key}")
    value.show()

Table: supplier
+--------------------+-----------+
|            supplier|supplier_id|
+--------------------+-----------+
|  BARON FRANCOIS LTD|          1|
|STE MICHELLE WINE...|          2|
|AZIZ SHAFI TANNIC...|          3|
|           PWSWN INC|          4|
|  E & J GALLO WINERY|          5|
|HARVEST IMPORTING...|          6|
|             A&E INC|          7|
|DMV DISTRIBUTING LLC|          8|
|VINTAGE VIRGINIA ...|          9|
|LYON DISTILLING C...|         10|
|             Default|         11|
|  MACK & SCHUHLE INC|         12|
|   MAISON JOMERE LTD|         13|
|PAMPA BEVERAGES L...|         14|
|THREE HENS LLC T/...|         15|
|HEAVEN HILL DISTI...|         16|
| PROXIMO SPIRITS INC|         17|
|DUCKHORN WINE COM...|         18|
|           OENOS LLC|         19|
|   CHATEAU DIANA LLC|         20|
+--------------------+-----------+
only showing top 20 rows

Table: item


                                                                                

+---------+---------+--------------------+
|item_code|item_type|    item_description|
+---------+---------+--------------------+
|    11274|     WINE|BLACKSTONE CALIF ...|
|    11545|     BEER|BRECKENRIDGE MANG...|
|    11860|   LIQUOR|ELIJAH CRAIG BOUR...|
|    12709|     BEER|TWO ROADS PERSIAN...|
|   134066|     WINE|TOMMASI LE ROSSE ...|
|    13846|     WINE|OXFORD LANDING S/...|
|    16519|   LIQUOR|SAUZA TRES GEN AN...|
|   242068|   LIQUOR|PAUL JOHN WHISKEY...|
|   242076|   LIQUOR|REDNECK RIVIERA W...|
|    26205|     BEER|NATURAL ICE CANS ...|
|    26833|     BEER|BALTIMORE BEER WO...|
|    27197|   LIQUOR|CHIVAS REGAL 12YR...|
|    28579|     BEER|NOT YOUR FATHER'S...|
|   302571|     WINE|SANTERO MANGO MOS...|
|   304749|     WINE|BARTENURA OVADIA ...|
|   311322|     WINE|BLACK BOX CAB TET...|
|   311634|     WINE|MASTROBERARDINO L...|
|   312924|     WINE|HEINZ EIFEL EISWE...|
|   318502|     WINE|FISH HOUSE S/BLC ...|
|   332218|     WINE|      EVOLET - 750ML|
+---------+



+---------+-----------+-------+------------+----------------+---------------+
|item_code|supplier_id|date_id|retail_sales|retail_transfers|warehouse_sales|
+---------+-----------+-------+------------+----------------+---------------+
|    12483|        286|     11|         0.0|             0.0|            8.0|
|    12527|        268|     11|       12.67|            14.0|            0.0|
|    12976|        386|     11|        1.75|             3.0|           24.0|
|    13722|         68|     11|        0.24|             0.0|            0.0|
|   166280|        157|     11|        1.05|             1.0|            2.0|
|    17000|        217|     11|       29.29|            28.0|            3.0|
|    18104|        275|     11|        1.01|             1.0|            0.0|
|   185515|        376|     11|        1.32|             2.0|           26.0|
|   232662|        265|     11|        0.24|             0.0|           13.0|
|    23433|        353|     11|        31.5|            57.0|   

                                                                                

## Write star schema tables to GCS bucket

In [13]:
# Define the GCS path where you want to save the Parquet files
gcs_output_path = "gs://woven-edge-412500-de-retail-sales-bucket/star-schema/"

# Save each table to GCS as Parquet files
for table_name, dataframe in star_schema.items():
    # Define the full GCS path for the table
    table_gcs_path = f"{gcs_output_path}{table_name}/"
    
    # Write the DataFrame to GCS as Parquet files
    dataframe.write.parquet(table_gcs_path, mode="overwrite")

    print(f"Table '{table_name}' saved to GCS at: {table_gcs_path}")


                                                                                

Table 'supplier' saved to GCS at: gs://woven-edge-412500-de-retail-sales-bucket/star-schema/supplier/


                                                                                

Table 'item' saved to GCS at: gs://woven-edge-412500-de-retail-sales-bucket/star-schema/item/


                                                                                

Table 'date' saved to GCS at: gs://woven-edge-412500-de-retail-sales-bucket/star-schema/date/


                                                                                

Table 'fact_table' saved to GCS at: gs://woven-edge-412500-de-retail-sales-bucket/star-schema/fact_table/


## Write tables to bigquery DWH

In [14]:
# # Call the data_modeling function and store the result in dict_tables
# star_schema = data_modeling(df)

# # Define BigQuery dataset and table names
# dataset_id = 'de_retail_sales_data'  
# table_dataframes = {
#     "supplier": star_schema["supplier"],
#     "item": star_schema["item"],
#     "date": star_schema["date"],
#     "fact_table": star_schema["fact_table"]
# }

# # Load data into BigQuery tables using Spark
# for table_name, dataframe in table_dataframes.items():
#     full_table_name = f'{project_id}.{dataset_id}.{table_name}'
#     dataframe.write \
#         .format('bigquery') \
#         .option('table', full_table_name) \
#         .mode('overwrite') \
#         .save()

#     print(f"Data loaded into BigQuery table: {full_table_name}")
