In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName("ETL")\
        .config('spark.jars.packages', 'org.postgresql:postgresql:42.6.2') \
        .getOrCreate()
sqlContext = SparkSession(spark)
#Dont Show warning only error
spark.sparkContext.setLogLevel("ERROR")

In [6]:
# đọc dữ liệu từ các bảng dim
product_df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://103.245.236.247:5432/data_warehouse") \
    .option("driver", "org.postgresql.Driver")\
    .option("user", "traianthai") \
    .option("password", "123123") \
    .option("dbtable", "public.\"Dim_Product\"") \
    .load()

store_df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://103.245.236.247:5432/data_warehouse") \
    .option("driver", "org.postgresql.Driver")\
    .option("user", "traianthai") \
    .option("password", "123123") \
    .option("dbtable", "public.\"Dim_Store\"") \
    .load()

time_df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://103.245.236.247:5432/data_warehouse") \
    .option("driver", "org.postgresql.Driver")\
    .option("user", "traianthai") \
    .option("password", "123123") \
    .option("dbtable", "public.\"Dim_Time\"") \
    .load()

In [7]:
# đọc dữ liệu từ bảng nguồn
order_df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://103.245.236.247:5432/e-commerce") \
    .option("driver", "org.postgresql.Driver")\
    .option("user", "traianthai") \
    .option("password", "123123") \
    .option("dbtable", "public.\"order\"") \
    .load()

order_product_df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://103.245.236.247:5432/e-commerce") \
    .option("driver", "org.postgresql.Driver")\
    .option("user", "traianthai") \
    .option("password", "123123") \
    .option("dbtable", "public.\"order_product\"") \
    .load()

In [8]:
from pyspark.sql.functions import col, month, year, to_date, when, sum, expr

# Chuyển đổi thời gian trong order_product_df sang định dạng ngày tháng năm
order_product_df = order_product_df.withColumn("date", to_date(col("createdAt"))) \
                                   .withColumn("month", month(col("date"))) \
                                   .withColumn("quarter", when(col("month").between(1, 3), 1)
                                                          .when(col("month").between(4, 6), 2)
                                                          .when(col("month").between(7, 9), 3)
                                                          .otherwise(4)) \
                                   .withColumn("year", year(col("date"))) \
                                   .drop("createdAt")
# Đổi tên cột `id` trong các DataFrame để tránh xung đột
order_product_df = order_product_df.withColumnRenamed("id", "order_product_id")
order_df = order_df.withColumnRenamed("id", "order_id")
product_df = product_df.withColumnRenamed("id", "product_id")

# Thực hiện ánh xạ và tạo dataframe cho bảng Fact_Sales
fact_product_df = order_product_df \
    .join(order_df, order_product_df["orderId"] == order_df["order_id"]) \
    .join(time_df, (order_product_df["month"] == time_df["month"]) & 
                  (order_product_df["quarter"] == time_df["quarter"]) & 
                  (order_product_df["year"] == time_df["year"])) \
    .join(product_df, order_product_df["productsId"] == product_df["product_id"]) \
    .groupBy(
        col("time_id"),
        col("product_id"),
        col("shopId")
    ) \
    .agg(
        sum(col("quantity")).alias("Quantity"),
        sum(col("totalPrice")).alias("Revenue")
    )

# Kiểm tra dữ liệu trước khi đổ vào bảng Fact
fact_product_df.show()


+-------+----------+------+--------+-------+
|time_id|product_id|shopId|Quantity|Revenue|
+-------+----------+------+--------+-------+
|      7|        39|   131|      25| 600500|
|      6|        24|     1|      16| 553000|
|      3|        27|     1|      12| 316000|
|      2|        26|     1|      14|1708000|
|      8|       303|   111|       5|  65000|
|      7|        41|     2|       1|  24000|
|      3|        24|     1|      16| 553000|
|      9|       180|   131|      30|1200000|
|     11|        40|   111|       6| 390500|
|      2|        27|     1|      15|1535500|
|     11|        33|     1|       2| 178000|
|      5|        41|     2|       1|  24000|
|      8|        28|     1|      10|1503000|
|     11|        28|     1|      10|1503000|
|      7|        29|     1|      21|1748500|
|      7|        26|     1|      14|1708000|
|      9|        36|   190|       1|  48000|
|     11|       303|   111|       5|  65000|
|     10|        41|     2|       1|  24000|
|      4| 

In [9]:
# Ghi DataFrame vào bảng "Fact_Product" trong schema "public"
fact_product_df.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://103.245.236.247:5432/data_warehouse") \
    .option("dbtable", "public.\"Fact_Product\"") \
    .option("user", "traianthai") \
    .option("password", "123123") \
    .mode("overwrite") \
    .save()


In [10]:
# Đóng SparkSession
spark.stop()