In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ETL") \
    .config("spark.jars", "/home/jovyan/drivers/postgresql-42.7.3.jar") \
    .getOrCreate()

In [2]:
from pyspark.sql.functions import to_date, col , when , explode , coalesce , lit

In [3]:
social_df = spark.read.option("multiline", "true").json("social_media_info.json") 
social_df.printSchema()

root
 |-- age: long (nullable = true)
 |-- date_created: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- posts: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- comments: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- comment: string (nullable = true)
 |    |    |    |    |-- timestamp: string (nullable = true)
 |    |    |    |    |-- user_id: string (nullable = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- post_id: string (nullable = true)
 |    |    |-- post_text: string (nullable = true)
 |    |    |-- reactions: struct (nullable = true)
 |    |    |    |-- angry: long (nullable = true)
 |    |    |    |-- haha: long (nullable = true)
 |    |    |    |-- like: long (nullable = true)
 |    |    |    |-- love: long (nullable = true)
 |    |    |    |-- sad: long (nullable

In [4]:
social_df.show()

+---+-------------------+--------------------+------+-----------------+--------------------+--------------------+-------------------+
|age|       date_created|               email|gender|             name|               posts|             user_id|           username|
+---+-------------------+--------------------+------+-----------------+--------------------+--------------------+-------------------+
| 57|2010-03-22 13:20:34|stephenstimothy@e...|female|Austin Chavez Jr.|                  []|b2ddab24-f19c-458...|           edward74|
| 66|2018-03-28 04:18:48| rmeyers@example.org|female| Donald Mcfarland|                  []|33d9493d-7073-494...|      aprilthompson|
| 57|2018-11-13 04:34:21|jefferybeard@exam...|  male|      Joshua Hull|                  []|abd87864-b134-42a...|            kduncan|
| 32|2015-07-16 14:57:48|daviskrista@examp...|  male|    Joseph Santos|[{[{Citizen produ...|4b30cfe3-d4af-4fc...|         courtney76|
| 36|2023-03-25 19:40:01|ruthgonzalez@exam...|female|    Brand

In [5]:
users_dimension = social_df["user_id" , "username" , "age" , "date_created" ,"email" , "gender" ,  "name"  ]

In [7]:
users_dimension.show()

+--------------------+-------------------+---+-------------------+--------------------+------+-----------------+
|             user_id|           username|age|       date_created|               email|gender|             name|
+--------------------+-------------------+---+-------------------+--------------------+------+-----------------+
|b2ddab24-f19c-458...|           edward74| 57|2010-03-22 13:20:34|stephenstimothy@e...|female|Austin Chavez Jr.|
|33d9493d-7073-494...|      aprilthompson| 66|2018-03-28 04:18:48| rmeyers@example.org|female| Donald Mcfarland|
|abd87864-b134-42a...|            kduncan| 57|2018-11-13 04:34:21|jefferybeard@exam...|  male|      Joshua Hull|
|4b30cfe3-d4af-4fc...|         courtney76| 32|2015-07-16 14:57:48|daviskrista@examp...|  male|    Joseph Santos|
|e5577616-a1ea-45e...|     richardgoodwin| 36|2023-03-25 19:40:01|ruthgonzalez@exam...|female|    Brandy Phelps|
|fce1b46d-224e-413...|        murphydavid| 49|2017-10-14 13:44:35|rhodeschristopher...|female|  

In [8]:
posts_df = social_df.select("user_id", "username", explode("posts").alias("post"))

In [9]:
posts_dimension = posts_df.select(
    "user_id",
    "post.post_id",
    "post.post_text",
    "post.location",
    "post.timestamp",
    "post.shares",
    "post.tags",
    "post.comments" ,
   col("post.reactions.angry").alias("reaction_angry"),
    col("post.reactions.haha").alias("reaction_haha"),
    col("post.reactions.like").alias("reaction_like"),
    col("post.reactions.love").alias("reaction_love"),
    col("post.reactions.sad").alias("reaction_sad"),
    col("post.reactions.wow").alias("reaction_wow")

).withColumn(
    "total_reactions",
      coalesce(col("reaction_angry"), lit(0)) +
    coalesce(col("reaction_haha"), lit(0)) +
    coalesce(col("reaction_like"), lit(0)) +
    coalesce(col("reaction_love"), lit(0)) +
    coalesce(col("reaction_sad"), lit(0)) +
    coalesce(col("reaction_wow"), lit(0))
)

In [10]:
posts_dimension.show()

+--------------------+--------------------+--------------------+--------+-------------------+------+--------------------+--------------------+--------------+-------------+-------------+-------------+------------+------------+---------------+
|             user_id|             post_id|           post_text|location|          timestamp|shares|                tags|            comments|reaction_angry|reaction_haha|reaction_like|reaction_love|reaction_sad|reaction_wow|total_reactions|
+--------------------+--------------------+--------------------+--------+-------------------+------+--------------------+--------------------+--------------+-------------+-------------+-------------+------------+------------+---------------+
|4b30cfe3-d4af-4fc...|e47dcbb7-ddf9-48c...|Church claim bloo...|    NULL|2022-10-08 01:41:04|     2|               [art]|[{Citizen product...|          NULL|         NULL|         NULL|         NULL|        NULL|        NULL|              0|
|4b30cfe3-d4af-4fc...|6cb5bc0e-7

In [11]:
comments_df = posts_dimension.select("post_id", explode("comments").alias("comment"))

In [12]:
comments_dimension = comments_df.select(
    "post_id",
    "comment.comment",
    "comment.timestamp",
    "comment.user_id",
)

In [13]:
comments_dimension.show()

+--------------------+--------------------+-------------------+--------------------+
|             post_id|             comment|          timestamp|             user_id|
+--------------------+--------------------+-------------------+--------------------+
|e47dcbb7-ddf9-48c...|Citizen productio...|2014-02-10 03:52:11|d36126d3-cdaa-41a...|
|e47dcbb7-ddf9-48c...|Field piece garde...|2019-11-17 12:23:03|34b6b3f1-02d6-417...|
|e47dcbb7-ddf9-48c...|With decision law...|2021-01-12 08:46:48|2cee5448-9069-49f...|
|e47dcbb7-ddf9-48c...|Along agreement a...|2022-12-05 07:35:54|8a222916-ddb3-4c1...|
|6cb5bc0e-7548-44e...|Join respond win ...|2014-04-01 06:50:05|83f1199f-bd9f-4a6...|
|6cb5bc0e-7548-44e...|Early performance...|2013-03-27 01:58:55|20d4d6cd-baba-417...|
|6cb5bc0e-7548-44e...|Organization anot...|2023-06-10 16:09:37|7d5adfb0-f724-476...|
|6cb5bc0e-7548-44e...|Add account succe...|2023-06-25 17:23:10|4fe13c30-1b2c-4e2...|
|4362b53c-03a6-496...|At speak close ag...|2012-01-20 12:19:41|5d

In [14]:
initial_interaction_df =  social_df.select(  "user_id" , explode("posts").alias("post"))

In [15]:

flattened_initial_interaction_df =initial_interaction_df.select(
    "user_id",
    "post.post_id",
    "post.timestamp",
    "post.shares",
    "post.tags",
    "post.comments",
  col("post.reactions.angry").alias("reaction_angry"),
    col("post.reactions.haha").alias("reaction_haha"),
    col("post.reactions.like").alias("reaction_like"),
    col("post.reactions.love").alias("reaction_love"),
    col("post.reactions.sad").alias("reaction_sad"),
    col("post.reactions.wow").alias("reaction_wow")
    
).withColumn(
    "total_reactions",
      coalesce(col("reaction_angry"), lit(0)) +
    coalesce(col("reaction_haha"), lit(0)) +
    coalesce(col("reaction_like"), lit(0)) +
    coalesce(col("reaction_love"), lit(0)) +
    coalesce(col("reaction_sad"), lit(0)) +
    coalesce(col("reaction_wow"), lit(0))
)


In [16]:
second_interaction_df = flattened_initial_interaction_df.select("user_id" , "post_id"  , "timestamp" , "shares" , "tags" ,  "reaction_angry" , "reaction_haha" ,  "reaction_like" , "reaction_love" ,  "reaction_sad" ,  "reaction_wow" , "total_reactions" , explode("comments").alias("comment"))

In [17]:
interaction_fact = second_interaction_df.select(
  "user_id" , 
    "post_id"  , 
     col("timestamp").alias("post_timestamp") ,
    "shares" , 
    "tags" ,  
    "reaction_angry" ,
    "reaction_haha" ,  
    "reaction_like" , 
    "reaction_love" ,
    "reaction_sad" , 
    "reaction_wow" ,
    "total_reactions" ,
    col("comment.timestamp").alias("comment_timestamp"),
    col("comment.user_id").alias("comment_user_id")
    
)

In [18]:
interaction_fact.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- post_id: string (nullable = true)
 |-- post_timestamp: string (nullable = true)
 |-- shares: long (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- reaction_angry: long (nullable = true)
 |-- reaction_haha: long (nullable = true)
 |-- reaction_like: long (nullable = true)
 |-- reaction_love: long (nullable = true)
 |-- reaction_sad: long (nullable = true)
 |-- reaction_wow: long (nullable = true)
 |-- total_reactions: long (nullable = false)
 |-- comment_timestamp: string (nullable = true)
 |-- comment_user_id: string (nullable = true)



In [19]:
interaction_fact.show()

+--------------------+--------------------+-------------------+------+--------------------+--------------+-------------+-------------+-------------+------------+------------+---------------+-------------------+--------------------+
|             user_id|             post_id|     post_timestamp|shares|                tags|reaction_angry|reaction_haha|reaction_like|reaction_love|reaction_sad|reaction_wow|total_reactions|  comment_timestamp|     comment_user_id|
+--------------------+--------------------+-------------------+------+--------------------+--------------+-------------+-------------+-------------+------------+------------+---------------+-------------------+--------------------+
|4b30cfe3-d4af-4fc...|e47dcbb7-ddf9-48c...|2022-10-08 01:41:04|     2|               [art]|          NULL|         NULL|         NULL|         NULL|        NULL|        NULL|              0|2014-02-10 03:52:11|d36126d3-cdaa-41a...|
|4b30cfe3-d4af-4fc...|e47dcbb7-ddf9-48c...|2022-10-08 01:41:04|     2|  

In [20]:
interaction_fact =interaction_fact.fillna({"shares" : 0  })

In [21]:
interaction_fact =interaction_fact.fillna({
    "reaction_like": 0,
    "reaction_love": 0,
    "reaction_sad": 0,
    "reaction_angry": 0,
    "reaction_wow": 0,
    "reaction_haha": 0  })

In [22]:
posts_dimension = posts_dimension.fillna({"location" : "unknown"})

In [23]:


email_validation_df = users_dimension.withColumn(
    "is_valid_email",
    when(
        col("email").rlike(r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$"),
        True
    ).otherwise(False)
)

In [24]:
email_validation_df.show()

+--------------------+-------------------+---+-------------------+--------------------+------+-----------------+--------------+
|             user_id|           username|age|       date_created|               email|gender|             name|is_valid_email|
+--------------------+-------------------+---+-------------------+--------------------+------+-----------------+--------------+
|b2ddab24-f19c-458...|           edward74| 57|2010-03-22 13:20:34|stephenstimothy@e...|female|Austin Chavez Jr.|          true|
|33d9493d-7073-494...|      aprilthompson| 66|2018-03-28 04:18:48| rmeyers@example.org|female| Donald Mcfarland|          true|
|abd87864-b134-42a...|            kduncan| 57|2018-11-13 04:34:21|jefferybeard@exam...|  male|      Joshua Hull|          true|
|4b30cfe3-d4af-4fc...|         courtney76| 32|2015-07-16 14:57:48|daviskrista@examp...|  male|    Joseph Santos|          true|
|e5577616-a1ea-45e...|     richardgoodwin| 36|2023-03-25 19:40:01|ruthgonzalez@exam...|female|    Brandy

In [25]:
invalid_email_df = email_validation_df.filter(col("is_valid_email") == False)

In [26]:
invalid_email_df.show()

+-------+--------+---+------------+-----+------+----+--------------+
|user_id|username|age|date_created|email|gender|name|is_valid_email|
+-------+--------+---+------------+-----+------+----+--------------+
+-------+--------+---+------------+-----+------+----+--------------+



so there is no invalid entry in email column 

In [27]:


users_dimension  = users_dimension.withColumn("user_creation_date", to_date(col("date_created"))).drop("date_created")


In [28]:

posts_dimension = posts_dimension.withColumn("post_date", to_date(col("timestamp"))).drop("timestamp" , "comments")


In [29]:

interaction_fact = interaction_fact.withColumn("post_date", to_date(col("post_timestamp"))).drop("post_timestamp")


In [30]:
interaction_fact = interaction_fact.withColumn("comment_date", to_date(col("comment_timestamp"))).drop("comment_timestamp")


In [31]:
interaction_fact.show()

+--------------------+--------------------+------+--------------------+--------------+-------------+-------------+-------------+------------+------------+---------------+--------------------+----------+------------+
|             user_id|             post_id|shares|                tags|reaction_angry|reaction_haha|reaction_like|reaction_love|reaction_sad|reaction_wow|total_reactions|     comment_user_id| post_date|comment_date|
+--------------------+--------------------+------+--------------------+--------------+-------------+-------------+-------------+------------+------------+---------------+--------------------+----------+------------+
|4b30cfe3-d4af-4fc...|e47dcbb7-ddf9-48c...|     2|               [art]|             0|            0|            0|            0|           0|           0|              0|d36126d3-cdaa-41a...|2022-10-08|  2014-02-10|
|4b30cfe3-d4af-4fc...|e47dcbb7-ddf9-48c...|     2|               [art]|             0|            0|            0|            0|        

In [32]:
posts_dimension.show()

+--------------------+--------------------+--------------------+--------+------+--------------------+--------------+-------------+-------------+-------------+------------+------------+---------------+----------+
|             user_id|             post_id|           post_text|location|shares|                tags|reaction_angry|reaction_haha|reaction_like|reaction_love|reaction_sad|reaction_wow|total_reactions| post_date|
+--------------------+--------------------+--------------------+--------+------+--------------------+--------------+-------------+-------------+-------------+------------+------------+---------------+----------+
|4b30cfe3-d4af-4fc...|e47dcbb7-ddf9-48c...|Church claim bloo...| unknown|     2|               [art]|          NULL|         NULL|         NULL|         NULL|        NULL|        NULL|              0|2022-10-08|
|4b30cfe3-d4af-4fc...|6cb5bc0e-7548-44e...|Behind practice l...|  Sydney|    94|[tech, fashion, s...|          NULL|         NULL|         NULL|        

In [33]:
users_dimension.write \
  .format("jdbc") \
  .option("url", "jdbc:postgresql://postgres_warehouse:5432/warehouse") \
  .option("dbtable", "users_dimension") \
  .option("user", "warehouse") \
  .option("password", "warehouse") \
  .option("driver", "org.postgresql.Driver") \
  .mode("overwrite") \
  .save()

In [34]:
posts_dimension.write \
  .format("jdbc") \
  .option("url", "jdbc:postgresql://postgres_warehouse:5432/warehouse") \
  .option("dbtable", "posts_dimension") \
  .option("user", "warehouse") \
  .option("password", "warehouse") \
  .option("driver", "org.postgresql.Driver") \
  .mode("overwrite") \
  .save()

In [35]:
comments_dimension.write \
  .format("jdbc") \
  .option("url", "jdbc:postgresql://postgres_warehouse:5432/warehouse") \
  .option("dbtable", "comments_dimension") \
  .option("user", "warehouse") \
  .option("password", "warehouse") \
  .option("driver", "org.postgresql.Driver") \
  .mode("overwrite") \
  .save()

In [36]:
interaction_fact.write \
  .format("jdbc") \
  .option("url", "jdbc:postgresql://postgres_warehouse:5432/warehouse") \
  .option("dbtable", "interaction_fact") \
  .option("user", "warehouse") \
  .option("password", "warehouse") \
  .option("driver", "org.postgresql.Driver") \
  .mode("overwrite") \
  .save()