### Connect spark and deltalake

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .appName("DeltaLake with Hive Integration") \
    .master("local[*]") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0,io.delta:delta-spark_2.13:4.0.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.warehouse.dir", "file:///C:/Uni_Master/Project/tgp-mdik-2/hive/") \
    .config("spark.hadoop.javax.jdo.option.ConnectionURL", "jdbc:derby:;databaseName=metastore_db;create=true") \
    .config("spark.hadoop.javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver") \
    .config("spark.python.worker.timeout", "1200") \
    .config("spark.network.timeout", "1200s") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .config("spark.python.worker.reuse", "true") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .enableHiveSupport() \
    .getOrCreate()

df_twcs = spark.read.format("delta").load("hdfs://localhost:9000/delta_twcs")
print(df_twcs)

DataFrame[message: string, timestamp_kafka: timestamp]


### Preprocessing tweets data

In [3]:
import pandas as pd 
import numpy as np 
import json

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import StopWordsRemover

import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer

Map delta format data to dataframe

In [4]:
df_parsed = df_twcs.select(
    col("timestamp_kafka"),
    from_json(
        col("message"),
        "tweet_id string, author_id string, inbound string, created_at string, text string, response_tweet_id string, in_response_to_tweet_id string"
    ).alias("data"))


df_column = df_parsed.select(
    col("timestamp_kafka"),
    col("data.tweet_id").alias("tweet_id"),
    col("data.author_id").alias("author_id"),
    (col("data.inbound") == "True").alias("inbound"),
    col("data.created_at").alias("created_at"),
    col("data.text").alias("text"),
    col("data.response_tweet_id").alias("response_tweet_id"),
    col("data.in_response_to_tweet_id").alias("in_response_to_tweet_id")
).filter(col("tweet_id").isNotNull())


In [5]:
df_parsed.show(10, truncate=False)
df_column.show(10, truncate=False)
df_parsed.printSchema()

+-----------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|timestamp_kafka        |data                                                                                                                                                                                                                                                                                                                                   |
+-----------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Convert to lowercase

In [6]:
df_twcs_clean = df_column.withColumn("text_clean", lower(col("text")))
df_twcs_clean.show(10, truncate=False)

+-----------------------+--------+------------+-------+------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+-----------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|timestamp_kafka        |tweet_id|author_id   |inbound|created_at                    |text                                                                                                                                                                                                                                                               |respo

Remove punctuation

In [7]:
df_twcs_clean = df_twcs_clean.withColumn(
    "text_clean", regexp_replace(col("text"), r"[^\w\s]", "")
)

df_twcs_clean.show(10, truncate=False)

+-----------------------+--------+------------+-------+------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+-----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|timestamp_kafka        |tweet_id|author_id   |inbound|created_at                    |text                                                                                                                                                                                                                                                               |response_tweet_id|

Remove stopwords

In [8]:
stop_words_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

df_twcs_clean = df_twcs_clean.withColumn("words", split(col("text_clean"), " ")) \
                  .transform(stop_words_remover.transform) \
                  .withColumn("text_clean", concat_ws(" ", col("filtered_words"))) \
                  .drop("words", "filtered_words")

In [9]:
df_twcs_clean.show(10, truncate=False)

+-----------------------+--------+------------+-------+------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+-----------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|timestamp_kafka        |tweet_id|author_id   |inbound|created_at                    |text                                                                                                                                                                                                                                                               |response_tweet_id|in_response_to_tweet_id|text_clean                                    

Tokenization

In [10]:
df_twcs_clean = df_twcs_clean.withColumn(
    "tokens", 
    expr("filter(split(text_clean, ' '), x -> NOT x RLIKE '^[0-9]+$')")
)

df_twcs_clean.select("text_clean", "tokens").show(10, truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|text_clean                                                                                                                                                                      |tokens                                                                                                                                                                                     |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------

In [11]:
df_twcs_clean.select("text_clean").show(10, truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|text_clean                                                                                                                                                                      |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|246955 Thanks contacting us understand important alarms working properly able readd alarm without issue                                                                         |
|AppleSupport Yes added back                                                                                                                                                     |
|246955 Lets keep eye see issue comes back let us know Also havent already done wed recommend making sure

## Sentiment Analysis

Create label

In [12]:
nltk.download('opinion_lexicon')
from nltk.corpus import opinion_lexicon

[nltk_data] Downloading package opinion_lexicon to
[nltk_data]     C:\Users\mrifq\AppData\Roaming\nltk_data...
[nltk_data]   Package opinion_lexicon is already up-to-date!


In [13]:
positive_keywords = list(opinion_lexicon.positive())
negative_list = list(opinion_lexicon.negative())
custom_negative = ["sucks", "bad", "hate", "worst", "terrible", "awful", "disappointing", "fuck", "shit"]
negative_keywords = negative_list + custom_negative

df_twcs_clean = df_twcs_clean.withColumn(
    "sentiment",
    when(
        size(array_intersect(col("tokens"), array([lit(x) for x in positive_keywords]))) > 0, "positive"
    ).when(
        size(array_intersect(col("tokens"), array([lit(x) for x in negative_keywords]))) > 0, "negative"
    ).otherwise("neutral")
)

In [14]:
df_twcs_clean.select("author_id","text_clean", "sentiment").show(1000, truncate=False)

+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
|author_id      |text_clean                                                                                                                                                                                                                  |sentiment|
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
|AppleSupport   |246955 Thanks contacting us understand important alarms working properly able readd alarm without issue                                                                                                                     |positive |
|246

### Save to Data Warehouse

Create database

In [15]:
spark.sql("CREATE DATABASE IF NOT EXISTS db_tgp2")

DataFrame[]

Schema-on-Write

In [16]:
spark.sql("USE db_tgp2")
spark.sql("CREATE TABLE IF NOT EXISTS customer_tweets (id BIGINT GENERATED ALWAYS AS IDENTITY, author_id STRING, inbound STRING, created_at STRING, text STRING, text_clean STRING, response_tweet_id STRING, in_response_to_tweet_id STRING, sentiment STRING) USING hive")

DataFrame[]

Write dataframe to Hive Table

In [17]:
df_twcs_clean.write.mode("overwrite").format("hive").saveAsTable("db_tgp2.customer_tweets")

Test inserted data

In [18]:
df = spark.sql("SELECT * FROM customer_tweets LIMIT 1000")
df.show()

+--------------------+--------+------------+-------+--------------------+--------------------+--------------------+-----------------------+--------------------+--------------------+---------+
|     timestamp_kafka|tweet_id|   author_id|inbound|          created_at|                text|   response_tweet_id|in_response_to_tweet_id|          text_clean|              tokens|sentiment|
+--------------------+--------+------------+-------+--------------------+--------------------+--------------------+-----------------------+--------------------+--------------------+---------+
|2025-06-26 15:54:...| 2555292|AppleSupport|  false|Thu Nov 16 16:05:...|@246955 Thanks fo...|             2555293|                2555294|246955 Thanks con...|[Thanks, contacti...| positive|
|2025-06-26 15:54:...| 2555293|      246955|   true|Thu Nov 16 19:28:...|@AppleSupport Yes...|             2555295|                2555292|AppleSupport Yes ...|[AppleSupport, Ye...|  neutral|
|2025-06-26 15:54:...| 2555295|AppleSupp

### Save cleaned data to Data Lake

In [19]:
df_twcs_clean.write.mode("overwrite").format("parquet").save("hdfs://localhost:9000/hdfs_twcs_clean")

In [20]:
df2 = spark.read.parquet("hdfs://localhost:9000/hdfs_twcs_clean")
df2.show(10, truncate=False)


+-----------------------+--------+------------+-------+------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+-----------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
|timestamp_kafka        |tweet_id|author_id   |inbound|created_at                    |text                                                                                                                                                  

In [21]:
df_superstore = spark.read.format("delta").load("hdfs://localhost:9000/delta_superstore")

In [22]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

def parse_superstore(df):
    superstore_schema = StructType([
        StructField("Row ID", StringType(), True),
        StructField("Order ID", StringType(), True),
        StructField("Order Date", StringType(), True),
        StructField("Ship Date", StringType(), True),
        StructField("Ship Mode", StringType(), True),
        StructField("Customer ID", StringType(), True),
        StructField("Customer Name", StringType(), True),
        StructField("Segment", StringType(), True),
        StructField("Country", StringType(), True),
        StructField("City", StringType(), True),
        StructField("State", StringType(), True),
        StructField("Postal Code", StringType(), True),
        StructField("Region", StringType(), True),
        StructField("Product ID", StringType(), True),
        StructField("Category", StringType(), True),
        StructField("Sub-Category", StringType(), True),
        StructField("Product Name", StringType(), True),
        StructField("Sales", StringType(), True),
        StructField("Quantity", StringType(), True),
        StructField("Discount", StringType(), True),
        StructField("Profit", StringType(), True)
    ])
    
    parsed_superstore = df.select(
        col("timestamp_kafka"),
        from_json(col("message"), superstore_schema).alias("parsed_data")
    ).select(
        col("timestamp_kafka"),
        col("parsed_data.Row ID").cast(IntegerType()).alias("row_id"),
        col("parsed_data.Order ID").alias("order_id"),
        to_date(col("parsed_data.Order Date"), "M/d/yyyy").alias("order_date"),
        to_date(col("parsed_data.Ship Date"), "M/d/yyyy").alias("ship_date"),
        col("parsed_data.Ship Mode").alias("ship_mode"),
        col("parsed_data.Customer ID").alias("customer_id"),
        col("parsed_data.Customer Name").alias("customer_name"),
        col("parsed_data.Segment").alias("segment"),
        col("parsed_data.Country").alias("country"),
        col("parsed_data.City").alias("city"),
        col("parsed_data.State").alias("state"),
        col("parsed_data.Postal Code").cast(IntegerType()).alias("postal_code"),
        col("parsed_data.Region").alias("region"),
        col("parsed_data.Product ID").alias("product_id"),
        col("parsed_data.Category").alias("category"),
        col("parsed_data.Sub-Category").alias("sub_category"),
        col("parsed_data.Product Name").alias("product_name"),
        col("parsed_data.Sales").cast(DecimalType(10,2)).alias("sales"),
        col("parsed_data.Quantity").cast(IntegerType()).alias("quantity"),
        col("parsed_data.Discount").cast(DecimalType(5,4)).alias("discount"),
        col("parsed_data.Profit").cast(DecimalType(10,2)).alias("profit")
    )
    
    return parsed_superstore
df_parsed_superstore = parse_superstore(df_superstore)

In [23]:
all_null_rows = df_parsed_superstore.filter(
    col("row_id").isNull() & 
    col("order_id").isNull() & 
    col("customer_name").isNull()
)
all_null_rows.show(10, truncate=False)
print(f"Total null: {all_null_rows.count()}")

+-----------------------+------+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
|timestamp_kafka        |row_id|order_id|order_date|ship_date|ship_mode|customer_id|customer_name|segment|country|city|state|postal_code|region|product_id|category|sub_category|product_name|sales|quantity|discount|profit|
+-----------------------+------+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
|2025-06-26 15:42:39.472|NULL  |NULL    |NULL      |NULL     |NULL     |NULL       |NULL         |NULL   |NULL   |NULL|NULL |NULL       |NULL  |NULL      |NULL    |NULL        |NULL        |NULL |NULL    |NULL    |NULL  |
|2025-06-26 15:42:39.473|NULL  |NULL    |NULL      |NULL     |NULL     |NULL       |NULL         |NULL   |NULL  

In [24]:
df_parsed_superstore=df_parsed_superstore.dropna()
print(df_parsed_superstore.count())

9668


In [25]:
spark.sql("USE db_tgp2")
spark.sql("DROP TABLE IF EXISTS dim_date")
spark.sql("CREATE TABLE dim_date (date_id INT, date INT, month INT, year INT) USING hive")

spark.sql("DROP TABLE IF EXISTS dim_ship")
spark.sql("CREATE TABLE dim_ship (ship_id INT, ship_mode STRING) USING hive")

spark.sql("DROP TABLE IF EXISTS dim_city")
spark.sql("CREATE TABLE dim_city (city_id INT, city_name STRING) USING hive")

spark.sql("DROP TABLE IF EXISTS dim_customer")
spark.sql("CREATE TABLE dim_customer (customer_id INT, customer_name STRING, zipcode STRING) USING hive")

spark.sql("DROP TABLE IF EXISTS dim_state")
spark.sql("CREATE TABLE dim_state (state_id INT, state_name STRING) USING hive")

spark.sql("DROP TABLE IF EXISTS dim_segment")
spark.sql("CREATE TABLE dim_segment (segment_id INT, segment STRING) USING hive")

spark.sql("DROP TABLE IF EXISTS dim_region")
spark.sql("CREATE TABLE dim_region (region_id INT, region STRING) USING hive")

spark.sql("DROP TABLE IF EXISTS dim_product")
spark.sql("CREATE TABLE dim_product (product_id INT, product_name STRING) USING hive")

spark.sql("DROP TABLE IF EXISTS dim_product_category")
spark.sql("CREATE TABLE dim_product_category (category_id INT, product_category STRING) USING hive")

spark.sql("DROP TABLE IF EXISTS dim_product_subcategory")
spark.sql("CREATE TABLE dim_product_subcategory (subcategory_id INT, product_subcategory STRING) USING hive")

spark.sql("DROP TABLE IF EXISTS order_fact")
spark.sql("CREATE TABLE order_fact (order_id INT, sales DECIMAL(10,2), quantity INT, profits DECIMAL(10,2), discount DECIMAL(5,2)) USING hive")

spark.sql("DROP TABLE IF EXISTS dim_product_hierarchy")
spark.sql("CREATE TABLE dim_product_hierarchy (product_id INT, category_id INT, subcategory_id INT) USING hive")

spark.sql("DROP TABLE IF EXISTS dim_order")
spark.sql("CREATE TABLE dim_order (order_id INT, product_id INT, customer_id INT, ship_mode_id INT, order_date_id INT, shipment_date_id INT) USING hive")

spark.sql("DROP TABLE IF EXISTS dim_customer_location")
spark.sql("CREATE TABLE dim_customer_location (customer_id INT, city_id INT, state_id INT, segment_id INT, region_id INT) USING hive")

DataFrame[]

In [26]:
from pyspark.sql.window import Window
spark.sql("USE db_tgp2")

DataFrame[]

In [27]:
# dim_date
dates = df_parsed_superstore.select("order_date") \
    .union(df_parsed_superstore.select(col("ship_date").alias("order_date"))) \
    .distinct()

windowSpec = Window.orderBy("order_date")

dim_date = dates.withColumn("date_id", row_number().over(windowSpec)).select(
    col("date_id"),
    col("order_date").alias("date"),
    dayofmonth("order_date").alias("day"),
    month("order_date").alias("month"),
    year("order_date").alias("year")
)

dim_date.write.mode("overwrite").saveAsTable("dim_date")

spark.sql("SELECT * FROM dim_date").show()


+-------+----------+---+-----+----+
|date_id|      date|day|month|year|
+-------+----------+---+-----+----+
|      1|2014-01-03|  3|    1|2014|
|      2|2014-01-04|  4|    1|2014|
|      3|2014-01-05|  5|    1|2014|
|      4|2014-01-06|  6|    1|2014|
|      5|2014-01-07|  7|    1|2014|
|      6|2014-01-08|  8|    1|2014|
|      7|2014-01-09|  9|    1|2014|
|      8|2014-01-10| 10|    1|2014|
|      9|2014-01-11| 11|    1|2014|
|     10|2014-01-12| 12|    1|2014|
|     11|2014-01-13| 13|    1|2014|
|     12|2014-01-14| 14|    1|2014|
|     13|2014-01-15| 15|    1|2014|
|     14|2014-01-16| 16|    1|2014|
|     15|2014-01-17| 17|    1|2014|
|     16|2014-01-18| 18|    1|2014|
|     17|2014-01-19| 19|    1|2014|
|     18|2014-01-20| 20|    1|2014|
|     19|2014-01-21| 21|    1|2014|
|     20|2014-01-23| 23|    1|2014|
+-------+----------+---+-----+----+
only showing top 20 rows


In [28]:
# dim_ship
dim_ship = df_parsed_superstore.select("ship_mode").distinct()
windowSpec = Window.orderBy("ship_mode")
dim_ship = dim_ship.withColumn("ship_id", row_number().over(windowSpec)).select("ship_id", "ship_mode")
dim_ship.write.mode("overwrite").saveAsTable("dim_ship")

spark.sql("SELECT * FROM dim_ship").show()

+-------+--------------+
|ship_id|     ship_mode|
+-------+--------------+
|      1|   First Class|
|      2|      Same Day|
|      3|  Second Class|
|      4|Standard Class|
+-------+--------------+



In [29]:
# dim_city
dim_city = df_parsed_superstore.select("city").distinct()
windowSpec = Window.orderBy("city")
dim_city = dim_city.withColumn("city_id", row_number().over(windowSpec)).select(
    col("city_id"), col("city").alias("city_name")
)
dim_city.write.mode("overwrite").saveAsTable("dim_city")
spark.sql("SELECT * FROM dim_city").show()

+-------+-----------------+
|city_id|        city_name|
+-------+-----------------+
|      1|         Aberdeen|
|      2|          Abilene|
|      3|            Akron|
|      4|      Albuquerque|
|      5|       Alexandria|
|      6|            Allen|
|      7|        Allentown|
|      8|          Altoona|
|      9|         Amarillo|
|     10|          Anaheim|
|     11|          Andover|
|     12|        Ann Arbor|
|     13|          Antioch|
|     14|           Apopka|
|     15|     Apple Valley|
|     16|         Appleton|
|     17|        Arlington|
|     18|Arlington Heights|
|     19|           Arvada|
|     20|        Asheville|
+-------+-----------------+
only showing top 20 rows


In [30]:
# dim_customer
dim_customer = df_parsed_superstore.select("customer_id", "customer_name").distinct()
windowSpec = Window.orderBy("customer_name")
dim_customer = dim_customer.withColumn("customer_id", row_number().over(windowSpec)).select(
    col("customer_id"), col("customer_name")
)
dim_customer.write.mode("overwrite").saveAsTable("dim_customer")
spark.sql("SELECT * FROM dim_customer").show()

+-----------+--------------------+
|customer_id|       customer_name|
+-----------+--------------------+
|          1|       Aaron Bergman|
|          2|       Aaron Hawkins|
|          3|      Aaron Smayling|
|          4|     Adam Bellavance|
|          5|           Adam Hart|
|          6|  Adam Shillingsburg|
|          7|       Adrian Barton|
|          8|         Adrian Hane|
|          9|        Adrian Shami|
|         10|         Aimee Bixby|
|         11|         Alan Barnes|
|         12|      Alan Dominguez|
|         13|         Alan Haines|
|         14|          Alan Hwang|
|         15|   Alan Schoenberger|
|         16|        Alan Shonely|
|         17|Alejandro Ballentine|
|         18|     Alejandro Grove|
|         19|    Alejandro Savely|
|         20| Aleksandra Gannaway|
+-----------+--------------------+
only showing top 20 rows


In [31]:
# dim_state
dim_state = df_parsed_superstore.select("state").distinct()
windowSpec = Window.orderBy("state")
dim_state = dim_state.withColumn("state_id", row_number().over(windowSpec)).select(
    col("state_id"), col("state").alias("state_name")
)
dim_state.write.mode("overwrite").saveAsTable("dim_state")
spark.sql("SELECT * FROM dim_state").show()

+--------+--------------------+
|state_id|          state_name|
+--------+--------------------+
|       1|             Alabama|
|       2|             Arizona|
|       3|            Arkansas|
|       4|          California|
|       5|            Colorado|
|       6|         Connecticut|
|       7|            Delaware|
|       8|District of Columbia|
|       9|             Florida|
|      10|             Georgia|
|      11|               Idaho|
|      12|            Illinois|
|      13|             Indiana|
|      14|                Iowa|
|      15|              Kansas|
|      16|            Kentucky|
|      17|           Louisiana|
|      18|               Maine|
|      19|            Maryland|
|      20|       Massachusetts|
+--------+--------------------+
only showing top 20 rows


In [32]:
# dim_segment
dim_segment = df_parsed_superstore.select("segment").distinct()
windowSpec = Window.orderBy("segment")
dim_segment = dim_segment.withColumn("segment_id", row_number().over(windowSpec)).select("segment_id", "segment")
dim_segment.write.mode("overwrite").saveAsTable("dim_segment")
spark.sql("SELECT * FROM dim_segment").show()

+----------+-----------+
|segment_id|    segment|
+----------+-----------+
|         1|   Consumer|
|         2|  Corporate|
|         3|Home Office|
+----------+-----------+



In [33]:
# dim_region
dim_region = df_parsed_superstore.select("region").distinct()
windowSpec = Window.orderBy("region")
dim_region = dim_region.withColumn("region_id", row_number().over(windowSpec)).select("region_id", "region")
dim_region.write.mode("overwrite").saveAsTable("dim_region")
spark.sql("SELECT * FROM dim_region").show()

+---------+-------+
|region_id| region|
+---------+-------+
|        1|Central|
|        2|   East|
|        3|  South|
|        4|   West|
+---------+-------+



In [34]:
# dim_product
dim_product = df_parsed_superstore.select("product_id", "product_name").distinct()
windowSpec = Window.orderBy("product_name")
dim_product = dim_product.withColumn("product_id", row_number().over(windowSpec)).select("product_id", "product_name")
dim_product.write.mode("overwrite").saveAsTable("dim_product")
spark.sql("SELECT * FROM dim_product").show()

+----------+--------------------+
|product_id|        product_name|
+----------+--------------------+
|         1|"While you Were O...|
|         2|#10 Gummed Flap W...|
|         3|#10 Self-Seal Whi...|
|         4|#10 White Busines...|
|         5|#10- 4 1/8" x 9 1...|
|         6|#10- 4 1/8" x 9 1...|
|         7|#10- 4 1/8" x 9 1...|
|         8|#10-4 1/8" x 9 1/...|
|         9|#6 3/4 Gummed Fla...|
|        10|1.7 Cubic Foot Co...|
|        11|1/4 Fold Party De...|
|        12|12 Colored Short ...|
|        13|12-1/2 Diameter R...|
|        14|14-7/8 x 11 Blue ...|
|        15|2300 Heavy-Duty T...|
|        16|24 Capacity Maxi ...|
|        17|24-Hour Round Wal...|
|        18|  3-ring staple pack|
|        19|3.6 Cubic Foot Co...|
|        20|36X48 HARDFLOOR C...|
+----------+--------------------+
only showing top 20 rows


In [35]:
# dim_product_category
dim_product_category = df_parsed_superstore.select("category").distinct()
windowSpec = Window.orderBy("category")
dim_product_category = dim_product_category.withColumn("category_id", row_number().over(windowSpec)).select(
    col("category_id"), col("category").alias("product_category")
)
dim_product_category.write.mode("overwrite").saveAsTable("dim_product_category")
spark.sql("SELECT * FROM dim_product_category").show()

+-----------+----------------+
|category_id|product_category|
+-----------+----------------+
|          1|       Furniture|
|          2| Office Supplies|
|          3|      Technology|
+-----------+----------------+



In [36]:
# dim_product_subcategory
dim_product_subcategory = df_parsed_superstore.select("sub_category").distinct()
windowSpec = Window.orderBy("sub_category")
dim_product_subcategory = dim_product_subcategory.withColumn("subcategory_id", row_number().over(windowSpec)).select(
    col("subcategory_id"), col("sub_category").alias("product_subcategory")
)
dim_product_subcategory.write.mode("overwrite").saveAsTable("dim_product_subcategory")
spark.sql("SELECT * FROM dim_product_subcategory").show()

+--------------+-------------------+
|subcategory_id|product_subcategory|
+--------------+-------------------+
|             1|        Accessories|
|             2|         Appliances|
|             3|                Art|
|             4|            Binders|
|             5|          Bookcases|
|             6|             Chairs|
|             7|            Copiers|
|             8|          Envelopes|
|             9|          Fasteners|
|            10|        Furnishings|
|            11|             Labels|
|            12|           Machines|
|            13|              Paper|
|            14|             Phones|
|            15|            Storage|
|            16|           Supplies|
|            17|             Tables|
+--------------+-------------------+



In [37]:
# dim_product_hierarchy
product_hier = df_parsed_superstore.select("product_name", "category", "sub_category").distinct()

product_lookup = spark.sql("SELECT product_id, product_name FROM dim_product")
category_lookup = spark.sql("SELECT category_id, product_category FROM dim_product_category")
subcategory_lookup = spark.sql("SELECT subcategory_id, product_subcategory FROM dim_product_subcategory")

product_hierarchy = product_hier.join(
    product_lookup, product_hier.product_name == product_lookup.product_name, "inner"
).join(
    category_lookup, product_hier.category == category_lookup.product_category, "inner"
).join(
    subcategory_lookup, product_hier.sub_category == subcategory_lookup.product_subcategory, "inner"
).select(
    product_lookup.product_id,
    category_lookup.category_id,
    subcategory_lookup.subcategory_id
)

product_hierarchy.write.mode("overwrite").saveAsTable("dim_product_hierarchy")
spark.sql("SELECT * FROM dim_product_hierarchy").show()

+----------+-----------+--------------+
|product_id|category_id|subcategory_id|
+----------+-----------+--------------+
|      1014|          2|            16|
|      1430|          2|            13|
|       590|          1|            10|
|       187|          2|            11|
|      1296|          2|             8|
|       184|          2|            11|
|      1303|          2|            13|
|      1219|          2|             3|
|      1241|          3|             1|
|      1256|          2|            15|
|       881|          1|            10|
|       147|          3|            14|
|        98|          2|            16|
|      1397|          2|            11|
|       989|          3|             1|
|      1451|          2|            15|
|         6|          2|             8|
|         5|          2|             8|
|       739|          1|             6|
|       108|          2|            13|
+----------+-----------+--------------+
only showing top 20 rows


In [38]:
# dim_customer_location
cust_loc = df_parsed_superstore.select("customer_name", "city", "state", "segment", "region").distinct()

customer_lookup = spark.sql("SELECT customer_id, customer_name FROM dim_customer")
city_lookup = spark.sql("SELECT city_id, city_name FROM dim_city")
state_lookup = spark.sql("SELECT state_id, state_name FROM dim_state")
segment_lookup = spark.sql("SELECT segment_id, segment FROM dim_segment")
region_lookup = spark.sql("SELECT region_id, region FROM dim_region")

customer_location = cust_loc.join(
    customer_lookup, cust_loc.customer_name == customer_lookup.customer_name, "inner"
).join(
    city_lookup, cust_loc.city == city_lookup.city_name, "inner"
).join(
    state_lookup, cust_loc.state == state_lookup.state_name, "inner"
).join(
    segment_lookup, cust_loc.segment == segment_lookup.segment, "inner"
).join(
    region_lookup, cust_loc.region == region_lookup.region, "inner"
).select(
    customer_lookup.customer_id,
    city_lookup.city_id,
    state_lookup.state_id,
    segment_lookup.segment_id,
    region_lookup.region_id
)

customer_location.write.mode("overwrite").saveAsTable("dim_customer_location")
spark.sql("SELECT * FROM dim_customer_location").show()

+-----------+-------+--------+----------+---------+
|customer_id|city_id|state_id|segment_id|region_id|
+-----------+-------+--------+----------+---------+
|        792|    428|      36|         1|        4|
|        201|    449|      46|         3|        4|
|        361|     61|      32|         1|        3|
|        145|    374|      37|         1|        2|
|        160|    374|      37|         1|        2|
|        476|     84|      34|         1|        2|
|        674|     81|      12|         3|        1|
|        728|    148|      34|         1|        2|
|        687|    217|       9|         3|        3|
|        480|    330|      31|         1|        2|
|        618|    208|      42|         1|        1|
|        310|     81|      12|         1|        1|
|        255|    406|      45|         2|        3|
|        757|    518|       7|         2|        2|
|        220|    395|       4|         1|        4|
|        721|    438|       4|         1|        4|
|        232

In [39]:
# order_fact
order_fact = df_parsed_superstore.groupBy("order_id").agg(
    sum("sales").alias("sales"),
    sum("quantity").alias("quantity"), 
    sum("profit").alias("profits"),
    sum(col("sales") * col("discount")).alias("total_discount_amount")
).withColumn("id", row_number().over(Window.orderBy("order_id")))  # Surrogate key

order_fact = order_fact.select("id", "order_id", "sales", "quantity", "profits", "total_discount_amount")

order_fact.write.mode("overwrite").saveAsTable("order_fact")
spark.sql("SELECT * FROM order_fact").show()

+---+--------------+------+--------+-------+---------------------+
| id|      order_id| sales|quantity|profits|total_discount_amount|
+---+--------------+------+--------+-------+---------------------+
|  1|CA-2014-100006|377.97|       3| 109.61|             0.000000|
|  2|CA-2014-100090|699.19|       9| -19.09|           139.838000|
|  3|CA-2014-100293| 91.06|       6|  31.87|            18.212000|
|  4|CA-2014-100328|  3.93|       1|   1.33|             0.786000|
|  5|CA-2014-100363| 21.38|       5|   7.72|             4.276000|
|  6|CA-2014-100391| 14.62|       2|   6.73|             0.000000|
|  7|CA-2014-100678|697.08|      11|  61.80|           171.122000|
|  8|CA-2014-100706|129.44|       8|  17.72|             0.000000|
|  9|CA-2014-100762|508.62|      11| 219.08|             0.000000|
| 10|CA-2014-100860| 18.75|       5|   9.00|             0.000000|
| 11|CA-2014-100867|321.55|       6|  20.10|            64.310000|
| 12|CA-2014-100881|302.38|       3|  22.68|            60.476

In [40]:
from pyspark.sql.functions import col, dayofmonth, month, year, row_number
from pyspark.sql.window import Window

base_order = df_parsed_superstore.select(
    "order_id", "product_name", "customer_name", "ship_mode", "order_date", "ship_date"
)

# Step 2: Ambil dimensi
order_fact = spark.sql("SELECT id AS dim_order_id, order_id FROM order_fact")
product = spark.sql("SELECT product_id, product_name FROM dim_product")
customer = spark.sql("SELECT customer_id, customer_name FROM dim_customer")
ship = spark.sql("SELECT ship_id, ship_mode FROM dim_ship")
date = spark.sql("SELECT date_id, date, day, month, year FROM dim_date")

dim_order = base_order \
    .join(order_fact, on="order_id", how="inner") \
    .join(product, on="product_name", how="inner") \
    .join(customer, on="customer_name", how="inner") \
    .join(ship, on="ship_mode", how="inner") \
    .join(
        date.alias("od"),
        col("order_date") == col("od.date"),
        "inner"
    ) \
    .join(
        date.alias("sd"),
        col("ship_date") == col("sd.date"),
        "inner"
    ) \
    .select(
        col("dim_order_id"),
        col("product_id").alias("dim_product_id"),
        col("customer_id").alias("dim_customer_id"),
        col("ship_id").alias("dim_ship_mode_id"),
        col("od.date_id").alias("dim_order_date_id"),
        col("sd.date_id").alias("dim_shipment_date_id")
    ).dropDuplicates()


# Step 4: Join semua
# dim_order = base_order \
#     .join(order_fact, on="order_id", how="inner") \
#     .join(product, on="product_name", how="inner") \
#     .join(ship, on="ship_mode", how="inner") \
#     .join(
#         date.alias("od"),
#         (dayofmonth("order_date") == col("od.day")) &
#         (month("order_date") == col("od.month")) &
#         (year("order_date") == col("od.year")),
#         "inner"
#     ) \
#     .join(
#         date.alias("sd"),
#         (dayofmonth("ship_date") == col("sd.date")) &
#         (month("ship_date") == col("sd.month")) &
#         (year("ship_date") == col("sd.year")),
#         "inner"
#     ) \
#     .select(
#         col("dim_order_id"),
#         col("product_id").alias("dim_product_id"),
#         col("customer_id").alias("dim_customer_id"),
#         col("ship_id").alias("dim_ship_mode_id"),
#         col("od.date_id").alias("dim_order_date_id"),
#         col("sd.date_id").alias("dim_shipment_date_id")
#     ).dropDuplicates()

# Step 5: Simpan ke Hive
dim_order.write.mode("overwrite").saveAsTable("dim_order")
# Cek hasilnya
spark.sql("SELECT * FROM dim_order ORDER BY dim_order_id").show()


+------------+--------------+---------------+----------------+-----------------+--------------------+
|dim_order_id|dim_product_id|dim_customer_id|dim_ship_mode_id|dim_order_date_id|dim_shipment_date_id|
+------------+--------------+---------------+----------------+-----------------+--------------------+
|           1|            46|            224|               4|              237|                 243|
|           2|          1628|            246|               4|              178|                 182|
|           2|           826|            246|               4|              178|                 182|
|           3|          1657|            564|               4|               66|                  70|
|           4|          1268|            355|               4|               24|                  30|
|           5|          1557|            376|               4|               90|                  96|
|           5|           327|            376|               4|               90|  