In [1]:
# import library
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# import spark
spark = SparkSession.builder \
    .appName('pyspark-run-with-gcp-bucket') \
    .config("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()

# Set GCS credentials
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile", "/home/jovyan/work/my-project-04-421705-6cc3b65adceb.json")

# Define GCS bucket and file path
bucket_name = "example_123654"
file_name = "store.csv"
file_path = f"gs://{bucket_name}/{file_name}"

In [2]:
# Read CSV file from GCS into DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)

In [3]:
# Define new column after ETL 
split_column = "Ship Mode"
split_by = " "
new_columns = ["class_name", "class"]

# ETL process: Split the Ship Mode column into ship_name and class_name
split_columns = [
    split(df[split_column], split_by, len(new_columns)).getItem(i).alias(new_columns[i])
    for i in range(len(new_columns))
]

df = df.select("*",*split_columns)

In [4]:
# rename columns
df = df.withColumnRenamed("Row ID","index") \
    .withColumnRenamed("Order ID","order_id") \
    .withColumnRenamed("Customer ID","customer_id") \
    .withColumnRenamed("Customer Name","customer_name") \
    .withColumnRenamed("Segment","segment") \
    .withColumnRenamed("Country","country") \
    .withColumnRenamed("City","city") \
    .withColumnRenamed("State","state") \
    .withColumnRenamed("Postal Code","postal_code") \
    .withColumnRenamed("Region","region") \
    .withColumnRenamed("Product ID","product_id") \
    .withColumnRenamed("Product Name","product_name") \
    .withColumnRenamed("Category","category") \
    .withColumnRenamed("Sub-Category","sub_category") \
    .withColumnRenamed("Sales","sales") \
    .withColumnRenamed("Quantity","quantity") \
    .withColumnRenamed("Discount","discount") \
    .withColumnRenamed("Profit","profit") 

# Coalesce the DataFrame to a single partition
df = df.coalesce(1)

# show result
df.show()

+-----+--------------+----------+----------+--------------+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+----------+-----+
|index|      order_id|Order Date| Ship Date|     Ship Mode|customer_id|     customer_name|    segment|      country|           city|         state|postal_code| region|     product_id|       category|sub_category|        product_name|   sales|quantity|discount|  profit|class_name|class|
+-----+--------------+----------+----------+--------------+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+----------+-----+
|    1|CA-2016-152156| 11/8/2016|11/11/2016|  Second Class|   CG-12520|       Claire Gute|   Consumer|United States|      Henderson|      K

In [5]:
# Transform date format from dd-mm-yyyy to be yyyy-mm-dd following ISO format
# We will do 2 columns are order_date and ship_date

# order_date column
split_column2 = "Order Date"
split_by2 = "/"

split_col = split(df[split_column2], split_by2)
df = df.withColumn('day', split_col.getItem(0))
df = df.withColumn('mon', split_col.getItem(1))
df = df.withColumn('yr', split_col.getItem(2))

# Create a new column with the "yyyy-MM-dd" format
df = df.withColumn("order_date", concat(col("yr"), lit("-"), col("mon"), lit("-"), col("day")))

# order_date column
split_column2 = "Ship Date"
split_by2 = "/"

split_col = split(df[split_column2], split_by2)
df = df.withColumn('day', split_col.getItem(0))
df = df.withColumn('mon', split_col.getItem(1))
df = df.withColumn('yr', split_col.getItem(2))

# Create a new column with the "yyyy-MM-dd" format
df = df.withColumn("ship_date", concat(col("yr"), lit("-"), col("mon"), lit("-"), col("day")))

df.show()

+-----+--------------+----------+----------+--------------+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+----------+-----+---+---+----+----------+----------+
|index|      order_id|Order Date| Ship Date|     Ship Mode|customer_id|     customer_name|    segment|      country|           city|         state|postal_code| region|     product_id|       category|sub_category|        product_name|   sales|quantity|discount|  profit|class_name|class|day|mon|  yr|order_date| ship_date|
+-----+--------------+----------+----------+--------------+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+----------+-----+---+---+----+----------+----------+
|    1|CA-2016-152156| 11/8/2016|1

In [6]:
# Select data from df
order_trans = df.select("*")

# Coalesce the DataFrame to a single partition
order_trans = order_trans.coalesce(1)

# Filter columns
order_trans = order_trans.select("index", "order_id", "order_date", "ship_date", "class_name", "customer_id", "product_id", \
                "sales", "quantity", "discount", "profit")

# Coalesce the DataFrame to a single partition
order_trans = order_trans.coalesce(1)

# Show the result
order_trans.show()

+-----+--------------+----------+----------+----------+-----------+---------------+--------+--------+--------+--------+
|index|      order_id|order_date| ship_date|class_name|customer_id|     product_id|   sales|quantity|discount|  profit|
+-----+--------------+----------+----------+----------+-----------+---------------+--------+--------+--------+--------+
|    1|CA-2016-152156| 2016-8-11|2016-11-11|    Second|   CG-12520|FUR-BO-10001798|  261.96|       2|       0| 41.9136|
|    2|CA-2016-152156| 2016-8-11|2016-11-11|    Second|   CG-12520|FUR-CH-10000454|  731.94|       3|       0| 219.582|
|    3|CA-2016-138688| 2016-12-6| 2016-16-6|    Second|   DV-13045|OFF-LA-10000240|   14.62|       2|       0|  6.8714|
|    4|US-2015-108966|2015-11-10|2015-18-10|  Standard|   SO-20335|FUR-TA-10000577|957.5775|       5|    0.45|-383.031|
|    5|US-2015-108966|2015-11-10|2015-18-10|  Standard|   SO-20335|OFF-ST-10000760|  22.368|       2|     0.2|  2.5164|
|    6|CA-2014-115812|  2014-9-6| 2014-1

In [7]:
# Select data from df
cust_data = df.select("*")

# Coalesce the DataFrame to a single partition
cust_data = cust_data.coalesce(1)

# Filter columns
cust_data = cust_data.select("customer_id", "customer_name", "segment", \
                "country", "city", "state", "postal_code", "region")

# Select distinct row values
cust_data = cust_data.distinct()

# Coalesce the DataFrame to a single partition
cust_data = cust_data.coalesce(1)

# Show the result
cust_data.show()

+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+
|customer_id|     customer_name|    segment|      country|           city|         state|postal_code| region|
+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+
|   CG-12520|       Claire Gute|   Consumer|United States|      Henderson|      Kentucky|      42420|  South|
|   DV-13045|   Darrin Van Huff|  Corporate|United States|    Los Angeles|    California|      90036|   West|
|   SO-20335|    Sean O'Donnell|   Consumer|United States|Fort Lauderdale|       Florida|      33311|  South|
|   BH-11710|   Brosina Hoffman|   Consumer|United States|    Los Angeles|    California|      90032|   West|
|   AA-10480|      Andrew Allen|   Consumer|United States|        Concord|North Carolina|      28027|  South|
|   IM-15070|      Irene Maddox|   Consumer|United States|        Seattle|    Washington|      98103|   West|
|   HP-148

In [8]:
df.createOrReplaceTempView("df")
cust_data.createOrReplaceTempView("c")

c1 = spark.sql("""
    select
        count(customer_id) as count_all
        
    from
        df
""")

c2 = spark.sql("""
    select distinct
        count(customer_id)  as count_distinct
        
    from
       c
""")


c1.show()
c2.show()

+---------+
|count_all|
+---------+
|     9994|
+---------+

+--------------+
|count_distinct|
+--------------+
|          4910|
+--------------+



In [9]:
# Select data from df
product_data = df.select("*")

# Coalesce the DataFrame to a single partition
product_data = product_data.coalesce(1)

# Filter columns
product_data = product_data.select("product_id", "product_name", "category", "sub_category")

# Select distinct row values
product_data = product_data.distinct()

# Coalesce the DataFrame to a single partition
product_data = product_data.coalesce(1)

# Show the result
product_data.show()

+---------------+--------------------+---------------+------------+
|     product_id|        product_name|       category|sub_category|
+---------------+--------------------+---------------+------------+
|FUR-BO-10001798|Bush Somerset Col...|      Furniture|   Bookcases|
|FUR-CH-10000454|Hon Deluxe Fabric...|      Furniture|      Chairs|
|OFF-LA-10000240|Self-Adhesive Add...|Office Supplies|      Labels|
|FUR-TA-10000577|Bretford CR4500 S...|      Furniture|      Tables|
|OFF-ST-10000760|Eldon Fold 'N Rol...|Office Supplies|     Storage|
|FUR-FU-10001487|Eldon Expressions...|      Furniture| Furnishings|
|OFF-AR-10002833|          Newell 322|Office Supplies|         Art|
|TEC-PH-10002275|Mitel 5320 IP Pho...|     Technology|      Phones|
|OFF-BI-10003910|DXL Angle-View Bi...|Office Supplies|     Binders|
|OFF-AP-10002892|Belkin F5C206VTEL...|Office Supplies|  Appliances|
|FUR-TA-10001539|Chromcraft Rectan...|      Furniture|      Tables|
|TEC-PH-10002033|Konftel 250 Confe...|     Techn

In [10]:
df.createOrReplaceTempView("df")
product_data.createOrReplaceTempView("p")

p1 = spark.sql("""
    select distinct
        count(product_id) as count_all
        
    from
        df
""")

p2 = spark.sql("""
    select distinct
        count(product_id)  as count_distinct
        
    from
       p
""")


p1.show()
p2.show()

+---------+
|count_all|
+---------+
|     9994|
+---------+

+--------------+
|count_distinct|
+--------------+
|          1894|
+--------------+



In [13]:
# Write the DataFrame to a single CSV file in GCS
order_trans.write.csv(f"gs://{bucket_name}/cleaned_zone/transaction_data", header=True)
cust_data.write.csv(f"gs://{bucket_name}/cleaned_zone/customer_data", header=True)
product_data.write.csv(f"gs://{bucket_name}/cleaned_zone/product_data", header=True)

In [12]:
# Stop SparkSession
spark.stop()