'''
Graded Challenge 6

Nama  : Nabila Sulistiowati
Batch : CODA-RMT-003

Melakukan proses data cleaning dan transformasi data dengan PySpark

'''

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum
from pyspark.sql.functions import col
import findspark
findspark.init()

from pyspark.sql.functions import col, lit, to_date
from pyspark.sql.types import DateType

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("WriteToPostgres") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") \
    .getOrCreate()

In [2]:
# List nama file CSV yang akan dibaca
csv_files = ["fact_transaction.csv","dim_users.csv", "dim_products.csv", "dim_date.csv", ]

# Dictionary untuk menyimpan DataFrame setiap file
dfs = {}

# Load setiap CSV ke dalam DataFrame PySpark
for file in csv_files:
    df_name = file.replace(".csv", "")  # Nama dataframe sesuai nama file
    dfs[df_name] = spark.read.csv(file, header=True, inferSchema=True)
    print(f"Data {df_name} loaded successfully!")

# Contoh akses DataFrame (misalnya ingin melihat fact_transaction)
dfs["fact_transaction"].show(5)
dfs["dim_users"].show(5)
dfs["dim_products"].show(5)
dfs["dim_date"].show(5)

Data fact_transaction loaded successfully!
Data dim_users loaded successfully!
Data dim_products loaded successfully!
Data dim_date loaded successfully!
+--------------+--------+-------+----------+----------+------------------+--------+------------------+
|transaction_id|order_id|user_id|product_id|   date_id|      retail_price|quantity|      total_amount|
+--------------+--------+-------+----------+----------+------------------+--------+------------------+
|        109027|   75280|  59996|      9347|2025-02-06|               6.5|       2|              13.0|
|          1332|     921|    744|      9028|2025-02-06| 9.989999771118164|       2|19.979999542236328|
|        126309|   87159|  69618|      9209|2025-02-06|              10.0|       3|              30.0|
|         78859|   54430|  43491|     10767|2025-02-06|              10.5|       4|              42.0|
|        102830|   70958|  56592|      9420|2025-02-06|11.989999771118164|       3| 35.96999931335449|
+--------------+-------

In [3]:
# Cek struktur masing-masing tabel
# =============================================
dfs["fact_transaction"].printSchema()
dfs["dim_users"].printSchema()
dfs["dim_products"].printSchema()
dfs["dim_date"].printSchema()

root
 |-- transaction_id: integer (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- date_id: date (nullable = true)
 |-- retail_price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- total_amount: double (nullable = true)

root
 |-- user_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_brand: string (nullable = true)
 |-- retail_price: double (nullable = true)
 |-- cost: double (nullable = true)

root
 |-- date_id: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- wee

In [4]:
# Cek jumlah missing values di setiap kolom
# =============================================
dfs["dim_users"].select([sum(col(c).isNull().cast("int")).alias(c) for c in dfs["dim_users"].columns]).show()
dfs["dim_products"].select([sum(col(c).isNull().cast("int")).alias(c) for c in dfs["dim_products"].columns]).show()
dfs["dim_date"].select([sum(col(c).isNull().cast("int")).alias(c) for c in dfs["dim_date"].columns]).show()
dfs["fact_transaction"].select([sum(col(c).isNull().cast("int")).alias(c) for c in dfs["fact_transaction"].columns]).show()

+-------+----------+---------+-----+---+----+
|user_id|first_name|last_name|email|age|city|
+-------+----------+---------+-----+---+----+
|      0|         0|        0|    0|  0|   0|
+-------+----------+---------+-----+---+----+

+----------+------------+-------------+------------+----+
|product_id|product_name|product_brand|retail_price|cost|
+----------+------------+-------------+------------+----+
|         0|           2|           24|           0|   0|
+----------+------------+-------------+------------+----+

+-------+----+-------+-----+---+------------+-----------+--------+----------+----------+
|date_id|year|quarter|month|day|week_of_year|day_of_week|day_name|month_name|is_weekend|
+-------+----+-------+-----+---+------------+-----------+--------+----------+----------+
|      0|   0|      0|    0|  0|           0|          0|       0|         0|         0|
+-------+----+-------+-----+---+------------+-----------+--------+----------+----------+

+--------------+--------+-------

In [5]:
# Handling Missing Value dim_products
# =============================================
dfs["dim_products"] = dfs["dim_products"].dropna()
dfs["dim_products"] = dfs["dim_products"].fillna({
    "product_brand": "Unknown",
})
dfs["dim_products"]

DataFrame[product_id: int, product_name: string, product_brand: string, retail_price: double, cost: double]

In [6]:
# Cek dim_products setelah handling missing value
# =================================================
dfs["dim_products"].select([sum(col(c).isNull().cast("int")).alias(c) for c in dfs["dim_products"].columns]).show()

+----------+------------+-------------+------------+----+
|product_id|product_name|product_brand|retail_price|cost|
+----------+------------+-------------+------------+----+
|         0|           0|            0|           0|   0|
+----------+------------+-------------+------------+----+



In [7]:
# Cek duplikasi data
# =============================================
dfs["dim_users"].groupBy(dfs["dim_users"].columns).count().filter(col("count") > 1).show()
dfs["dim_products"].groupBy(dfs["dim_products"].columns).count().filter(col("count") > 1).show()
dfs["dim_date"].groupBy(dfs["dim_date"].columns).count().filter(col("count") > 1).show()
dfs["fact_transaction"].groupBy(dfs["fact_transaction"].columns).count().filter(col("count") > 1).show()

+-------+----------+---------+-----+---+----+-----+
|user_id|first_name|last_name|email|age|city|count|
+-------+----------+---------+-----+---+----+-----+
+-------+----------+---------+-----+---+----+-----+

+----------+------------+-------------+------------+----+-----+
|product_id|product_name|product_brand|retail_price|cost|count|
+----------+------------+-------------+------------+----+-----+
+----------+------------+-------------+------------+----+-----+

+-------+----+-------+-----+---+------------+-----------+--------+----------+----------+-----+
|date_id|year|quarter|month|day|week_of_year|day_of_week|day_name|month_name|is_weekend|count|
+-------+----+-------+-----+---+------------+-----------+--------+----------+----------+-----+
+-------+----+-------+-----+---+------------+-----------+--------+----------+----------+-----+

+--------------+--------+-------+----------+-------+------------+--------+------------+-----+
|transaction_id|order_id|user_id|product_id|date_id|retail_

In [8]:
# Mengubah Data Type pada table fact_transaction
# ===============================================
dfs["fact_transaction"] = dfs["fact_transaction"].withColumn("transaction_id", col("transaction_id").cast("string")) \
                          .withColumn("order_id", col("order_id").cast("string")) \
                          .withColumn("user_id", col("user_id").cast("string")) \
                          .withColumn("product_id", col("product_id").cast("string")) \
                          .withColumn("date_id", to_date(col("date_id"), "timestamp")) \
                          .withColumn("retail_price", col("retail_price").cast("float")) \
                          .withColumn("total_amount", col("total_amount").cast("float"))
                           
# Mengubah Data Type pada tabel dim_users
dfs["dim_users"] = dfs["dim_users"].withColumn("user_id", col("user_id").cast("string"))

# Mengubah Data Type pada tabel dim_products
dfs["dim_products"] = dfs["dim_products"].withColumn("product_id", col("product_id").cast("string")) \
                      .withColumn("retail_price", col("retail_price").cast("float")) \
                      .withColumn("cost", col("cost").cast("float"))

# Mmengubah Data Type pada tabel dim_date
dfs["dim_date"] = dfs["dim_date"].withColumn("date_id", to_date(col("date_id"), "timestamp"))

In [9]:
# Cek struktur data setelah data type diubah
# =============================================
dfs["fact_transaction"].printSchema()
dfs["dim_users"].printSchema()
dfs["dim_products"].printSchema()
dfs["dim_date"].printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- date_id: date (nullable = true)
 |-- retail_price: float (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- total_amount: float (nullable = true)

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_brand: string (nullable = false)
 |-- retail_price: float (nullable = true)
 |-- cost: float (nullable = true)

root
 |-- date_id: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week_of_year

In [10]:
# Transaformasi data
# ==============================
# Ubah nama kolom agar lebih deskriptif
dfs["dim_users"] = dfs["dim_users"].withColumnRenamed("id", "user_id")
dfs["dim_products"] = dfs["dim_products"].withColumnRenamed("id", "product_id")
dfs["dim_date"] = dfs["dim_date"].withColumnRenamed("date", "date_id")
dfs["fact_transaction"] = dfs["fact_transaction"].withColumnRenamed("id", "transaction_id")


In [19]:
# PostgreSQL JDBC Connection
# ============================
postgres_url = "jdbc:postgresql://host.docker.internal:5433/rmt003" #host pysql diuganti sesuai dengan nama database
postgres_properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

In [20]:
# Write DataFrame to PostgreSQL
# ================================
dfs["fact_transaction"].write.jdbc(url=postgres_url, table="Fact_Table_Transaction", mode="append", properties=postgres_properties)#table sesuaikan dengan DF
dfs["dim_users"].write.jdbc(url=postgres_url, table="Dim_Users", mode="append", properties=postgres_properties)
dfs["dim_products"].write.jdbc(url=postgres_url, table="Dim_Products", mode="append", properties=postgres_properties)
dfs["dim_date"].write.jdbc(url=postgres_url, table="Dim_Date", mode="append", properties=postgres_properties)

link Google Slide : https://docs.google.com/presentation/d/1oIBnYoNSXCo1nLAcFTrNXUyYaA4LFB48UMXcTcXKc-c/edit?usp=sharing