In [20]:
import sys, os
sys.path.append(os.path.abspath('/spark-data'))
from pyspark.sql import SparkSession
from utils.Data_Ingestion.data_ingestion import load_files
from utils.Data_Transformation.transformation import add_age_column
from utils.Data_Transformation.transformation import add_state_columns
from utils.postgres_setup import save_dfs_to_postgres_upsert

In [21]:
# Initialize Spark session with both configurations
spark = SparkSession.builder \
    .appName("Tranfromations") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.2.20,com.crealytics:spark-excel_2.12:3.4.1_0.19.0") \
    .getOrCreate()


In [22]:
dfs = load_files(spark, "/spark-data/Cleaned_Csv_File")

2024-09-09 07:51:51,338 - INFO - Successfully loaded CSV file: /spark-data/Cleaned_Csv_File/df_review_cleaned.csv
2024-09-09 07:51:51,529 - INFO - Successfully loaded CSV file: /spark-data/Cleaned_Csv_File/df_product_cleaned.csv
2024-09-09 07:51:51,706 - INFO - Successfully loaded CSV file: /spark-data/Cleaned_Csv_File/df_order_item_cleaned.csv
2024-09-09 07:51:51,885 - INFO - Successfully loaded CSV file: /spark-data/Cleaned_Csv_File/df_customer_cleaned.csv
2024-09-09 07:51:52,064 - INFO - Successfully loaded CSV file: /spark-data/Cleaned_Csv_File/df_order_cleaned.csv


In [23]:
df_customer = dfs["df_df_customer_cleaned"]
df_product = dfs["df_df_product_cleaned"]
df_order = dfs["df_df_order_cleaned"]
df_review = dfs["df_df_review_cleaned"]
df_order_item =dfs["df_df_order_item_cleaned"]

In [24]:
df_customer.show()

+-----------+-----------+---------+--------------------+--------------------+-----------------+-------------+--------+----------+
|customer_id| first_name|last_name|               email|        phone_number|             city|        state|zip_code|birth_date|
+-----------+-----------+---------+--------------------+--------------------+-----------------+-------------+--------+----------+
|          1|    Crystal|  Edwards|mcmahonemily@exam...|   +1 (511) 782-4381|    Fergusonville|         Iowa|   91344|1995-02-28|
|          2|   Jennifer|    Jones|douglasedwards@ex...|   +1 (918) 720-2495|     Jenniferport|   New Jersey|   59371|1951-01-27|
|          3|  Catherine|    Crane|douglaskevin@exam...|+1 (381) 601-2272...|  Jenniferchester|West Virginia|   49812|1966-10-07|
|          4|     Lauren|    Irwin|aguirresteven@exa...|Invalid phone number|      New Melissa|     Michigan|   17279|1936-07-28|
|          5|Christopher|   Nelson|dustin50@example.org|+1 (735) 814-4145...|       Hannah

In [25]:
age_added_customer_df = add_age_column(df_customer, "birth_date")

2024-09-09 07:51:52,221 - INFO - Starting to add 'age' column based on the birth date column.
2024-09-09 07:51:52,226 - INFO - Column 'birth_date' found. Calculating age and also ensure that birth date column is date type if it is a string type then convert this into date type
2024-09-09 07:51:52,263 - INFO - Age column added successfully.


In [26]:
age_added_customer_df.show()

+-----------+-----------+---------+--------------------+--------------------+-----------------+-------------+--------+----------+---+
|customer_id| first_name|last_name|               email|        phone_number|             city|        state|zip_code|birth_date|age|
+-----------+-----------+---------+--------------------+--------------------+-----------------+-------------+--------+----------+---+
|          1|    Crystal|  Edwards|mcmahonemily@exam...|   +1 (511) 782-4381|    Fergusonville|         Iowa|   91344|1995-02-28| 29|
|          2|   Jennifer|    Jones|douglasedwards@ex...|   +1 (918) 720-2495|     Jenniferport|   New Jersey|   59371|1951-01-27| 73|
|          3|  Catherine|    Crane|douglaskevin@exam...|+1 (381) 601-2272...|  Jenniferchester|West Virginia|   49812|1966-10-07| 57|
|          4|     Lauren|    Irwin|aguirresteven@exa...|Invalid phone number|      New Melissa|     Michigan|   17279|1936-07-28| 88|
|          5|Christopher|   Nelson|dustin50@example.org|+1 (73

In [27]:
df_customer = age_added_customer_df

In [28]:
df_order.show()

+--------+-----------+----------+------------+--------------------+--------------------+
|order_id|customer_id|order_date|total_amount|    shipping_address|     billing_address|
+--------+-----------+----------+------------+--------------------+--------------------+
|     131|       4958|2020-05-10|         569|7886 Higgins Moto...|14788 Robert Land...|
|     297|         46|2020-03-11|        1331|PSC 4470, Box 299...|16387 Novak Shoal...|
|     330|       1082|2025-05-03|        4010|758 Sarah Plaza P...|938 Brian Road La...|
|      62|       2934|2024-06-01|        1727|7971 Vicki Key Ap...|465 Kramer Way Ap...|
|     266|       3929|2022-05-30|         680|57853 Chen Way La...|055 Butler Extens...|
|      43|        118|2021-09-26|        4884|9822 Anne Trace A...|1511 Hill Keys Ap...|
|      95|       4222|2023-08-27|        3765|Unit 4050 Box 161...|06811 Sean Crossi...|
|     167|       3800|2022-12-09|        6259|16060 Mason Creek...|196 Joshua Meadow...|
|     300|       1337

In [29]:
df_order_add_states = add_state_columns(df_order, shipping_col="shipping_address", billing_col="billing_address")

2024-09-09 07:51:52,570 - INFO - Starting to add state columns based on the specified address columns.
2024-09-09 07:51:52,576 - INFO - Columns 'shipping_address' and 'billing_address' found. Extracting state abbreviations...
2024-09-09 07:51:52,596 - INFO - Extracted 'shipping_state' from column 'shipping_address'.
2024-09-09 07:51:52,615 - INFO - Extracted 'billing_state' from column 'billing_address'.
2024-09-09 07:51:52,617 - INFO - State columns added successfully.


In [30]:
df_order_add_states.show()

+--------+-----------+----------+------------+--------------------+--------------------+--------------+-------------+
|order_id|customer_id|order_date|total_amount|    shipping_address|     billing_address|shipping_state|billing_state|
+--------+-----------+----------+------------+--------------------+--------------------+--------------+-------------+
|     131|       4958|2020-05-10|         569|7886 Higgins Moto...|14788 Robert Land...|            MP|           CO|
|     297|         46|2020-03-11|        1331|PSC 4470, Box 299...|16387 Novak Shoal...|            AP|           IA|
|     330|       1082|2025-05-03|        4010|758 Sarah Plaza P...|938 Brian Road La...|            FM|           FL|
|      62|       2934|2024-06-01|        1727|7971 Vicki Key Ap...|465 Kramer Way Ap...|            CT|           VA|
|     266|       3929|2022-05-30|         680|57853 Chen Way La...|055 Butler Extens...|            MO|           OR|
|      43|        118|2021-09-26|        4884|9822 Anne 

In [31]:
df_order = df_order_add_states

In [32]:
df_order_item.show()

+--------+-------------+-----+----------+--------+
|order_id|order_item_id|price|product_id|quantity|
+--------+-------------+-----+----------+--------+
|    1493|          618|   28|      2098|       3|
|    4734|          766|   21|      2864|       9|
|    2582|          809|   19|      1622|       6|
|    4589|         1181|    8|      2274|       8|
|    4954|         1258|   32|      3693|       8|
|     114|         1321|   45|      1739|       7|
|     381|         1717|   91|       429|       9|
|    2697|         1903|   24|      3119|       9|
|    2807|         2003|   83|      4378|       8|
|     652|         2139|    1|      2086|       6|
|    3399|         2255|   84|      2035|       1|
|    1419|         2263|   99|      4911|       6|
|    4270|         2354|   98|      1845|       3|
|    3523|         2374|   32|      1740|       2|
|    1453|         2590|   49|      1694|       0|
|    3588|         2768|   15|      4873|       0|
|    3554|         3147|   98| 

In [33]:
df_product.show()

+----------+-------------+---------+-----+--------------------+---------+
|product_id| product_name| category|price|         description|inventory|
+----------+-------------+---------+-----+--------------------+---------+
|         1|          bag|       no|448.0|Summer now health...|      655|
|         2|        happy|   answer|844.0|Employee discuss ...|      230|
|         3|         same|      his|681.0|Million mother pu...|      831|
|         4|      prevent|   around| 34.0|Because under mea...|      804|
|         5|        quite|   summer|663.0|Approach Mr task ...|      351|
|         6|        which|  purpose|386.0|Leave charge mode...|       10|
|         7|       beyond|   beyond|879.0|Shoulder power in...|      282|
|         8|     although|     home|650.0|Exactly continue ...|      818|
|         9|environmental| although|  7.0|Really admit kind...|      128|
|        10|       assume|professor|945.0|Bar truth here gr...|      168|
|        11|      usually|   figure|43

In [34]:
df_order_item.show()

+--------+-------------+-----+----------+--------+
|order_id|order_item_id|price|product_id|quantity|
+--------+-------------+-----+----------+--------+
|    1493|          618|   28|      2098|       3|
|    4734|          766|   21|      2864|       9|
|    2582|          809|   19|      1622|       6|
|    4589|         1181|    8|      2274|       8|
|    4954|         1258|   32|      3693|       8|
|     114|         1321|   45|      1739|       7|
|     381|         1717|   91|       429|       9|
|    2697|         1903|   24|      3119|       9|
|    2807|         2003|   83|      4378|       8|
|     652|         2139|    1|      2086|       6|
|    3399|         2255|   84|      2035|       1|
|    1419|         2263|   99|      4911|       6|
|    4270|         2354|   98|      1845|       3|
|    3523|         2374|   32|      1740|       2|
|    1453|         2590|   49|      1694|       0|
|    3588|         2768|   15|      4873|       0|
|    3554|         3147|   98| 

In [35]:
df_review.show()

+-----------+----------+------+-----------+---------+--------------------+
|customer_id|product_id|rating|review_date|review_id|         review_text|
+-----------+----------+------+-----------+---------+--------------------+
|        166|      3108|     1| 2024-02-12|      363|Realize option ap...|
|       1662|      3924|     5| 2024-05-21|      493|Goal rich travel ...|
|       4238|       281|     4| 2023-01-01|      619|Fish future kind ...|
|        520|       498|     2| 2023-07-03|      678|Computer itself t...|
|         58|      1360|     1| 2024-02-01|     1290|Into upon enough ...|
|       2277|      3726|     3| 2023-12-11|     1580|Word peace race m...|
|       3290|      2684|     5| 2024-06-20|     1627|Market process pr...|
|       4476|      2171|     1| 2023-07-07|     1939|South song race p...|
|         63|      4951|     2| 2022-12-29|     2547|Tv character seni...|
|         84|       983|     2| 2024-03-12|     2572|Six the admit bes...|
|        572|       416| 

# Save This all transformed dataframes to the Postgres Database 

In [36]:
# PostgreSQL connection details
jdbc_url = "jdbc:postgresql://host.docker.internal:5432/spark_data"
properties = {
    "user": "postgres",
    "password": "Mayank@123",
    "driver": "org.postgresql.Driver"
}

In [37]:
unique_keys = {
    'review': ['review_id'],
    'product': ['product_id'],
    'order_item' : ['order_item_id'],
    'customer' : ['customer_id'],
    'orders': ['order_id']
}


In [38]:
save_dfs_to_postgres_upsert(spark, jdbc_url, properties, unique_key_columns=unique_keys, review = df_review, product = df_product, order_item = df_order_item, customer = df_customer, orders = df_order)

DataFrame review upserted into PostgreSQL.
DataFrame product upserted into PostgreSQL.
DataFrame order_item upserted into PostgreSQL.
DataFrame customer upserted into PostgreSQL.
DataFrame orders upserted into PostgreSQL.
