## Sales Business Process Details

1. User melihat-lihat produk di platform
2. User menambahkan produk ke keranjang belanja
3. User melakukan checkout dan membuat pesanan
4. Sistem membuat dan mencatat detail pesanan seperti product, harga dan informasi pengiriman
5. Pesanan diproses dan disiapkan di distribution center
6. Pesanan dikirim ke User
7. Pesanan diterima oleh User

## Data Modeling

1. Fact Table

In this context, we will use a single fact table that focuses on sales:

Fact_Penjualan  
* order_id (Foreign key ke Dim_Order)
* users_id (Foreign Key ke Dim_Users)
* products_id (Foreign Key ke Dim_Products)
* distribution_center_id (Foreign Key ke Dim_Distribusi)
* jumlah_item (Jumlah item yang terjual atau dikembalikan)
* harga_satuan (Harga satuan per item)

2. Dimension Table <br>
Dimension table akan menyimpan informasi deskriptif terkait dengan fact table:

Dimension Table: Dim_Users

Dim_Users: Informasi pelanggan.
users_id (Primary Key)
first_name, last_name, email, age, gender, state, city, country


Dimension Table: Dim_Products

Dim_Products: Informasi produk.
products_id (Primary Key)
product_name, cost, category, brand, retail_price, department, sku


Dimension Table: Dim_Order

Dim_Order: Informasi order
order_id (Primary Key)
status, created_at, returned_at, shipped_at, delivered_at, num_of_item


Dimension Table: Dim_OrderItems

Dim_OrderItems: Informasi Item order.
Orderitems_id (Primary Key)
status, created_at, returned_at, shipped_at, delivered_at, sale_price


Dimension Table: Dim_Distribusi

Dim_Distribusi: Informasi distribution center.
distribusi_id (Primary Key)
distribusi_geo, distribusi_name, latitude, longitude


Penjelasan:
* Desain data warehouse ini menggunakan skema Star Schema. Skema ini dipilih karena kesederhanaannya dan kemudahan dalam melakukan query analisis. Fact table Fact_Penjualan berada di tengah, dikelilingi oleh dimension table yang terhubung langsung.
* Tiap  dimension table itu memberikan konteks deskriptif untuk data penjualan Misalnya, Dim_Pelanggan memberikan informasi tentang siapa yang membeli produk, Dim_Produk memberikan informasi tentang produk apa yang dibeli, dan seterusnya.
* Hubungan antara fact table dan dimension table adalah one-to-many. Satu record di dimension table dapat dihubungkan dengan banyak record di fact table.

Google Slide : https://docs.google.com/presentation/d/1VA2u2p26w_f5fUNoq7upYZj-XMQjbReAogvrbkN6zXA/edit#slide=id.g3303b384963_0_429


In [2]:
# import library
import pyspark

In [3]:
sc = pyspark.SparkConf()

In [4]:
sc = pyspark.SparkContext()
sc

In [5]:
sc.version

'4.0.0-preview2'

In [6]:
sc.master

'local[*]'

In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark

In [24]:
# Memasukan data CSV dan Viewing Data

In [46]:
FactTable = spark.read.csv('csv/FactTable.csv', header=True, inferSchema=True)
FactTable.show()

+--------+---+----+----+-----------+------------+
|order_id| id|id_1|id_2|num_of_item|retail_price|
+--------+---+----+----+-----------+------------+
|       7|  7|   7|   7|          1|        39.5|
|       2|  2|   2|   2|          1|        69.5|
|       5|  5|   5|   5|          1|        94.0|
|       8|  8|   8|   8|          1|       168.0|
|       9|  9|   9|   9|          1|        54.0|
|       6|  6|   6|   6|          1|       132.0|
|       1|  1|   1|   1|          2|        49.0|
|       3|  3|   3|   3|          1|        69.5|
|       4|  4|   4|   4|          1|       108.0|
|      10| 10|  10|  10|          1|        59.5|
+--------+---+----+----+-----------+------------+



In [47]:
Dim_Users = spark.read.csv('csv/Dim_Users.csv', header=True, inferSchema=True)
Dim_Users.show()

+-----+----------+---------+--------------------+---+------+-----+--------------+-------+
|   id|first_name|last_name|               email|age|gender|state|          city|country|
+-----+----------+---------+--------------------+---+------+-----+--------------+-------+
| 7351|    Teresa| Robinson|teresarobinson@ex...| 55|     F| Acre|          null| Brasil|
|58395|      Paul|  Mueller|paulmueller@examp...| 28|     M| Acre|          null| Brasil|
|58804|     Angel|     Lane|angellane@example...| 69|     F| Acre|          null| Brasil|
|19200|     James|     Bell|jamesbell@example...| 63|     M| Acre|          null| Brasil|
|95648|     Julia|  Richard|juliarichard@exam...| 37|     F| Acre|          null| Brasil|
|79611|    Victor|      Cox|victorcox@example...| 59|     M| Acre|          null| Brasil|
|68040|      John|   Obrien|johnobrien@exampl...| 13|     M| Acre|          null| Brasil|
| 5811|    Ashley|  Johnson|ashleyjohnson@exa...| 66|     F| Acre|          null| Brasil|
|16079|   

In [10]:
Dim_Products = spark.read.csv('csv/Dim_Products.csv', header=True, inferSchema=True)
Dim_Products.show()

+-----+--------------------+------------------+-----------+-----+------------------+----------+--------------------+
|   id|                name|              cost|   category|brand|      retail_price|department|                 sku|
+-----+--------------------+------------------+-----------+-----+------------------+----------+--------------------+
|13842|Low Profile Dyed ...| 2.518749990849756|Accessories|   MG|              6.25|     Women|EBD58B8A3F1D72F42...|
|13928|Low Profile Dyed ...|2.3383499148894105|Accessories|   MG| 5.949999809265137|     Women|2EAC42424D12436BD...|
|14115|Enzyme Regular So...| 4.879559879379869|Accessories|   MG|10.989999771118164|     Women|EE364229B2791D1EF...|
|14157|Enzyme Regular So...| 4.648769887297898|Accessories|   MG|10.989999771118164|     Women|00BD13095D06C20B1...|
|14273|Washed Canvas Ivy...| 6.507929886473045|Accessories|   MG|15.989999771118164|     Women|F531DC20FDE20B7AD...|
|15674|Low Profile Dyed ...|3.1062499998370185|       Plus|   MG

In [48]:
Dim_Orders = spark.read.csv('csv/Dim_Orders.csv', header=True, inferSchema=True)
Dim_Orders.show()

+--------+---------+--------------------+-----------+----------+------------+-----------+
|order_id|   status|          created_at|returned_at|shipped_at|delivered_at|num_of_item|
+--------+---------+--------------------+-----------+----------+------------+-----------+
|       7|Cancelled| 2021-03-13 07:52:00|       NULL|      NULL|        NULL|          1|
|      14|Cancelled| 2021-09-12 14:02:00|       NULL|      NULL|        NULL|          3|
|      24|Cancelled| 2022-12-07 15:16:00|       NULL|      NULL|        NULL|          1|
|      26|Cancelled| 2023-10-03 04:22:00|       NULL|      NULL|        NULL|          1|
|      79|Cancelled| 2025-01-28 14:21:00|       NULL|      NULL|        NULL|          1|
|      90|Cancelled| 2024-07-01 10:37:00|       NULL|      NULL|        NULL|          1|
|     140|Cancelled| 2023-02-19 16:58:00|       NULL|      NULL|        NULL|          2|
|     144|Cancelled| 2024-04-04 18:47:00|       NULL|      NULL|        NULL|          2|
|     153|

In [12]:
Dim_OrderItem = spark.read.csv('csv/Dim_OrderItem.csv', header=True, inferSchema=True)
Dim_OrderItem.show()

+------+---------+-------------------+-------------------+-------------------+-------------------+
|    id|   status|         created_at|        returned_at|         shipped_at|       delivered_at|
+------+---------+-------------------+-------------------+-------------------+-------------------+
| 73167| Complete|2023-04-20 09:54:08|               NULL|2023-04-18 13:05:00|2023-04-21 07:48:00|
|139830| Complete|2021-01-14 22:31:39|               NULL|2021-01-14 15:42:00|2021-01-19 13:16:00|
|156347| Complete|2024-08-01 10:48:09|               NULL|2024-08-03 04:33:00|2024-08-07 10:01:00|
| 88338|  Shipped|2024-08-24 03:57:46|               NULL|2024-08-25 04:10:00|               NULL|
|148166|Cancelled|2024-02-23 06:42:03|               NULL|               NULL|               NULL|
| 76351| Complete|2022-06-09 06:38:34|               NULL|2022-06-09 15:30:00|2022-06-14 06:28:00|
|125401| Complete|2021-11-05 13:58:23|               NULL|2021-11-07 09:43:00|2021-11-12 01:40:00|
|132246| C

In [49]:
Dim_Distribution = spark.read.csv('csv/Dim_distribution.csv', header=True, inferSchema=True)
Dim_Distribution.show()

+---+------------------------+--------------------+--------+---------+
| id|distribution_center_geom|                name|latitude|longitude|
+---+------------------------+--------------------+--------+---------+
|  4|    POINT(-118.25 34.05)|      Los Angeles CA|   34.05|  -118.25|
|  6|    POINT(-73.7834 40...|Port Authority of...|  40.634| -73.7834|
|  1|    POINT(-89.9711 35...|          Memphis TN| 35.1174| -89.9711|
|  3|    POINT(-95.3698 29...|          Houston TX| 29.7604| -95.3698|
|  7|    POINT(-75.1667 39...|     Philadelphia PA|   39.95| -75.1667|
|  5|    POINT(-90.0667 29...|      New Orleans LA|   29.95| -90.0667|
|  9|    POINT(-79.9333 32...|       Charleston SC| 32.7833| -79.9333|
|  8|    POINT(-88.0431 30...|           Mobile AL| 30.6944| -88.0431|
|  2|    POINT(-87.6847 41...|          Chicago IL| 41.8369| -87.6847|
| 10|    POINT(-81.1167 32...|         Savannah GA| 32.0167| -81.1167|
+---+------------------------+--------------------+--------+---------+



In [27]:
# Cek informasi data

In [50]:
FactTable.printSchema()
Dim_Users.printSchema()
Dim_Products.printSchema()
Dim_Orders.printSchema()
Dim_OrderItem.printSchema()
Dim_Distribution.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- id_1: integer (nullable = true)
 |-- id_2: integer (nullable = true)
 |-- num_of_item: integer (nullable = true)
 |-- retail_price: double (nullable = true)

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)

root
 |-- products_id: integer (nullable = true)
 |-- products_name: string (nullable = true)
 |-- cost: double (nullable = true)
 |-- category: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- retail_price: double (nullable = true)
 |-- department: string (nullable = true)
 |-- sku: string (nullable = true)

root
 |-- order_id: integer (nullable = true)
 |-- status: string (nullable = tr

In [27]:
# Data Cleaning

In [62]:
# Mengganti Nama Kolom FactTable
FactTable = FactTable.withColumnRenamed("id", "user_id")
FactTable = FactTable.withColumnRenamed("id_1", "products_id")
FactTable = FactTable.withColumnRenamed("id_2", "distribution_id")

In [51]:
# Mengganti Nama Kolom id
Dim_Users = Dim_Users.withColumnRenamed("id", "user_id")
Dim_Products = Dim_Products.withColumnRenamed("id", "products_id")
Dim_OrderItem = Dim_OrderItem.withColumnRenamed("id", "OrderItem_id")

In [59]:
# Mengganti Nama Kolom
Dim_Products = Dim_Products.withColumnRenamed("name", "products_name")
Dim_Distribution = Dim_Distribution.withColumnRenamed("id", "distribution_id")
Dim_Distribution = Dim_Distribution.withColumnRenamed("name", "distribution_center_name")

In [63]:
FactTable.columns

['order_id',
 'user_id',
 'products_id',
 'distribution_id',
 'num_of_item',
 'retail_price']

In [54]:
Dim_Users.columns

['user_id',
 'first_name',
 'last_name',
 'email',
 'age',
 'gender',
 'state',
 'city',
 'country']

In [55]:
Dim_Products.columns

['products_id',
 'products_name',
 'cost',
 'category',
 'brand',
 'retail_price',
 'department',
 'sku']

In [56]:
Dim_Orders.columns

['order_id',
 'status',
 'created_at',
 'returned_at',
 'shipped_at',
 'delivered_at',
 'num_of_item']

In [57]:
Dim_OrderItem.columns

['OrderItem_id',
 'status',
 'created_at',
 'returned_at',
 'shipped_at',
 'delivered_at']

In [60]:
Dim_Distribution.columns

['distribution_id',
 'distribution_center_geom',
 'distribution_center_name',
 'latitude',
 'longitude']

In [38]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("WriteToPostgres") \
    .config("Spark.jars.packakges","org.postgresql:42.6.0") \
    .getOrCreate()

In [41]:
# Access SparkCOntext
sc = spark.sparkContext

# List the loaded JAR files
print("JAR Files:")
print(sc._jsc.sc().listJars())

JAR Files:
List()


In [61]:
# PostgreSQL
postgres_url = "jdbc:postgresql://host.docker.internal:5432/datawarehouse_thelook"
postgres_properties = {
    "user": "postgres",
    "password": "Intaka",
    "driver": "org.postgresql.Driver"
}

# Write DF to PosgreSQL
FactTable.write.jdbc(url=postgres_url, table="FactTable", mode="overwrite", properties=postgres_properties)

Py4JJavaError: An error occurred while calling o175.jdbc.
: java.lang.ClassNotFoundException: org.postgresql.Driver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:47)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:112)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:112)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:112)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:265)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:269)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:48)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:53)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:77)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:86)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$2(QueryExecution.scala:126)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:155)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:267)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:118)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:742)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:74)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:222)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:126)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:608)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$eagerlyExecute$1(QueryExecution.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:140)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:135)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:326)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:135)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:100)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:157)
	at org.apache.spark.sql.internal.DataFrameWriterImpl.runCommand(DataFrameWriterImpl.scala:614)
	at org.apache.spark.sql.internal.DataFrameWriterImpl.saveToV1Source(DataFrameWriterImpl.scala:271)
	at org.apache.spark.sql.internal.DataFrameWriterImpl.saveInternal(DataFrameWriterImpl.scala:239)
	at org.apache.spark.sql.internal.DataFrameWriterImpl.save(DataFrameWriterImpl.scala:125)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:334)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
