In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
credentials_location = '/home/olalekan/data_Engineering_Journey/02_workflow_orchestration/keys/credk.json'

spark = SparkSession.builder \
    .appName("BigQueryAccess") \
    .config("spark.jars", "/home/olalekan/data_Engineering_Journey/05_batch_processing/code/lib/gcs-connector-hadoop3-latest.jar,/home/olalekan/DE-project-1/spark_kestra_docker/jars/spark-bigquery-with-dependencies_2.12-0.30.0.jar") \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location) \
    .getOrCreate()


25/04/04 22:36:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


spark.conf.set('temporaryGcsBucket', 'olalekan-de2753')

In [3]:
# Read from BigQuery
df_transact = spark.read \
    .format("bigquery") \
    .option("table", "my-de-journey.Fashion_retail_dataset.transactions_partitioned") \
    .load()

In [4]:
df_transact.show()

                                                                                

+-------------------+----------+---------+----+----------+--------+-------------------+--------+------+-------+----------+--------+----------------+--------------+--------+
|          InvoiceID|CustomerID|ProductID|Size|Unit Price|Quantity|               Date|Discount|  COGS|StoreID|EmployeeID|Currency|Transaction Type|Payment Method|COGS_usd|
+-------------------+----------+---------+----+----------+--------+-------------------+--------+------+-------+----------+--------+----------------+--------------+--------+
|RET-US-001-03881400|     23235|    17879|   M|     145.0|       1|2023-01-01 00:00:00|     0.0|-145.0|      1|         5|     USD|          Return|   Credit Card|  -145.0|
|RET-US-001-03881122|      1004|    16086|   M|      71.0|       2|2023-01-01 00:00:00|     0.0|-142.0|      1|        11|     USD|          Return|   Credit Card|  -142.0|
|RET-US-001-03881450|     35099|    15998|   S|     138.5|       1|2023-01-01 00:00:00|     0.0|-138.5|      1|         6|     USD|    

In [5]:
df_transact.printSchema()

root
 |-- InvoiceID: string (nullable = true)
 |-- CustomerID: long (nullable = true)
 |-- ProductID: long (nullable = true)
 |-- Size: string (nullable = true)
 |-- Unit Price: double (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Discount: double (nullable = true)
 |-- COGS: double (nullable = true)
 |-- StoreID: long (nullable = true)
 |-- EmployeeID: long (nullable = true)
 |-- Currency: string (nullable = true)
 |-- Transaction Type: string (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- COGS_usd: double (nullable = true)



In [6]:
# Perform an aggregation
aggregated_df = df_transact.groupBy("ProductID").agg(
   F.round(F.sum("COGS_usd"), 2).alias("total_COGS"),  # Round the sum of COGS to 2 decimal places
    F.sum("Quantity").alias("total_sales")
)

In [7]:
aggregated_df.head(10)

                                                                                

[Row(ProductID=964, total_COGS=3252.78, total_sales=167),
 Row(ProductID=1677, total_COGS=4634.31, total_sales=183),
 Row(ProductID=2529, total_COGS=3392.27, total_sales=155),
 Row(ProductID=2509, total_COGS=6055.75, total_sales=182),
 Row(ProductID=2250, total_COGS=4066.45, total_sales=123),
 Row(ProductID=2214, total_COGS=4258.56, total_sales=125),
 Row(ProductID=1806, total_COGS=10073.2, total_sales=208),
 Row(ProductID=26, total_COGS=647.09, total_sales=36),
 Row(ProductID=2453, total_COGS=3281.76, total_sales=126),
 Row(ProductID=29, total_COGS=4509.87, total_sales=129)]

In [8]:
df_product = spark.read \
    .format("bigquery") \
    .option("table", "my-de-journey.Fashion_retail_dataset.products") \
    .load()

In [9]:
# Perform an inner join with the product table on column ProductID
product_rev_df = aggregated_df.join(df_product, on="ProductID", how="inner")

In [10]:
product_rev_df.head(5)

                                                                                

[Row(ProductID=964, total_COGS=3252.78, total_sales=167, Category='Feminine', Description EN='Boho Lace Beige Printed', Sizes='S|M|L', Production Cost=6.61),
 Row(ProductID=1677, total_COGS=4634.31, total_sales=183, Category='Masculine', Description EN='Luxury Burgundy Suede With Glitter', Sizes='M|L|XL|XXL', Production Cost=17.87),
 Row(ProductID=2529, total_COGS=3392.27, total_sales=155, Category='Feminine', Description EN='High Waist Skirt With Buttons', Sizes='S|M|L|XL', Production Cost=15.09),
 Row(ProductID=2509, total_COGS=6055.75, total_sales=182, Category='Masculine', Description EN='Casual Jacquard Black With Zipper', Sizes='M|L|XL|XXL', Production Cost=24.45),
 Row(ProductID=2250, total_COGS=4066.45, total_sales=123, Category='Masculine', Description EN='Classic Beige Tricot With Fringes', Sizes='M|L|XL', Production Cost=14.76)]

In [11]:
product_rev_df = product_rev_df.withColumnRenamed("Description EN", "Description")
product_rev_df = product_rev_df.withColumnRenamed("Production Cost", "Production_cost")

In [12]:
product_rev_df.write \
    .format("bigquery") \
    .option("temporaryGcsBucket", "olalekan-de2753") \
    .option("writeMethod", "direct") \
    .option("createDisposition", "CREATE_IF_NEEDED") \
    .option("writeDisposition", "WRITE_TRUNCATE") \
    .mode("overwrite") \
    .save('my-de-journey.Fashion_retail_dataset.Product_Revenue')

                                                                                

In [13]:
df_customer = spark.read \
    .format("bigquery") \
    .option("table", "my-de-journey.Fashion_retail_dataset.customers") \
    .load()

In [14]:
# Perform an aggregation with customer
agg_cust_df = df_transact.groupBy("CustomerID").agg(
    F.round(F.sum("COGS_usd"), 2).alias("total_COGS"),  # Round the sum of COGS to 2 decimal places
    F.sum("Quantity").alias("total_sales")
)

In [15]:
agg_cust_df.head()

                                                                                

Row(CustomerID=73287, total_COGS=560.6, total_sales=15)

In [16]:
# Perform an inner join with the customer table on column CustomerID
customer_rev_df = agg_cust_df.join(df_customer, on="CustomerID", how="inner")

In [17]:
customer_rev_df.head(5)

                                                                                

[Row(CustomerID=6, total_COGS=474.5, total_sales=11, Name='Steven Chavez', City='New York', Country='United States', Gender='M', DOB=datetime.date(1988, 1, 6)),
 Row(CustomerID=6, total_COGS=474.5, total_sales=11, Name='Steven Chavez', City='New York', Country='United States', Gender='M', DOB=datetime.date(1988, 1, 6)),
 Row(CustomerID=7, total_COGS=23.65, total_sales=2, Name='Steven Snyder', City='New York', Country='United States', Gender='M', DOB=datetime.date(2000, 10, 27)),
 Row(CustomerID=7, total_COGS=23.65, total_sales=2, Name='Steven Snyder', City='New York', Country='United States', Gender='M', DOB=datetime.date(2000, 10, 27)),
 Row(CustomerID=9, total_COGS=181.0, total_sales=4, Name='Edwin Bryant', City='New York', Country='United States', Gender='M', DOB=datetime.date(1999, 7, 19))]

In [18]:
customer_rev_df.write \
    .format("bigquery") \
    .option("temporaryGcsBucket", "olalekan-de2753") \
    .option("writeMethod", "direct") \
    .option("createDisposition", "CREATE_IF_NEEDED") \
    .option("writeDisposition", "WRITE_TRUNCATE") \
    .mode("overwrite") \
    .save('my-de-journey.Fashion_retail_dataset.Customer_Revenue')

                                                                                

In [19]:
df_employee = spark.read \
    .format("bigquery") \
    .option("table", "my-de-journey.Fashion_retail_dataset.employees") \
    .load()

In [20]:
# Perform an aggregation with employee
agg_emp_df = df_transact.groupBy("EmployeeID").agg(
    F.round(F.sum("COGS_usd"), 2).alias("total_COGS"),  # Round the sum of COGS to 2 decimal places
    F.sum("Quantity").alias("total_sales")
)

In [21]:
# Perform an inner join with the customer table on column CustomerID
employee_rev_df = agg_emp_df.join(df_employee, on="EmployeeID", how="inner")

In [22]:
employee_rev_df.head(5)

                                                                                

[Row(EmployeeID=29, total_COGS=1315221.32, total_sales=32912, StoreID=3, Name='Cynthia Serrano', Position='Sales Associate'),
 Row(EmployeeID=191, total_COGS=1092710.61, total_sales=34839, StoreID=16, Name='Christina Mccarthy', Position='Sales Associate'),
 Row(EmployeeID=222, total_COGS=395539.92, total_sales=12292, StoreID=19, Name='Michael Gardner', Position='Sales Associate'),
 Row(EmployeeID=243, total_COGS=1362143.63, total_sales=34793, StoreID=21, Name='David Sauvage', Position='Sales Associate'),
 Row(EmployeeID=278, total_COGS=740725.59, total_sales=19334, StoreID=24, Name='Arthur Hamon', Position='Sales Associate')]

In [23]:
employee_rev_df.write \
    .format("bigquery") \
    .option("temporaryGcsBucket", "olalekan-de2753") \
    .option("writeMethod", "direct") \
    .option("createDisposition", "CREATE_IF_NEEDED") \
    .option("writeDisposition", "WRITE_TRUNCATE") \
    .mode("overwrite") \
    .save('my-de-journey.Fashion_retail_dataset.Employee_Revenue')

                                                                                

In [24]:
# Perform an aggregation on store
agg_prod_df = df_transact.groupBy("StoreID").agg(
    F.round(F.sum("COGS_usd"), 2).alias("total_COGS"),  # Round the sum of COGS to 2 decimal places
    F.sum("Quantity").alias("total_sales")
)

In [25]:
# Load in the store table from BigQuery
df_store = spark.read \
    .format("bigquery") \
    .option("table", "my-de-journey.Fashion_retail_dataset.stores") \
    .load()

In [26]:
# Perform an inner join with the store table on column ProductID
store_rev_df = agg_prod_df.join(df_store, on="StoreID", how="inner")


In [27]:
# Removing whitespaces from column name
store_rev_df = store_rev_df.withColumnRenamed("Number of Employees", "Employee_Count")


In [28]:
# Upload to BigQuery as Store_Revenue
store_rev_df.write \
    .format("bigquery") \
    .option("temporaryGcsBucket", "olalekan-de2753") \
    .option("writeMethod", "direct") \
    .option("createDisposition", "CREATE_IF_NEEDED") \
    .option("writeDisposition", "WRITE_TRUNCATE") \
    .mode("overwrite") \
    .save('my-de-journey.Fashion_retail_dataset.Store_Revenue')

                                                                                