# Data Integration by PySpark

In [1]:
from pyspark.sql import SparkSession

In [9]:
spark = SparkSession.builder \
    .appName("Data Merge") \
    .getOrCreate()

In [10]:
merged_df1 = user_activity_df.join(recommendation_df, on='userID', how='left')
merged_df2 = merged_df1.join(nearby_place_df, on='postID', how='left')
final_df = merged_df2.join(engagement_type_df, (merged_df2.userID == engagement_type_df.userID) & (merged_df2.postID == engagement_type_df.postID), how='left')

final_df = final_df.drop(engagement_type_df.userID).drop(engagement_type_df.postID)
final_df.show()

+------+------+----------+------------+-------------------+----------------+-----------+--------+--------+--------------+-------------------+
|postID|userID|activityID|        page|          timestamp|recommendationID|nearPlaceID| placeID| eventID|engagementType|          timestamp|
+------+------+----------+------------+-------------------+----------------+-----------+--------+--------+--------------+-------------------+
|   469|user20|   Comment|Profile Page|2024-05-01 00:06:00|           rec63|       NULL|    NULL|    NULL|          NULL|               NULL|
|   365|user20|   Comment|Profile Page|2024-05-01 00:06:00|           rec75|      np133|place129|    NULL|          NULL|               NULL|
|   365|user20|   Comment|Profile Page|2024-05-01 00:06:00|           rec75|        np7| place72|    NULL|          NULL|               NULL|
|   365|user20|   Comment|Profile Page|2024-05-01 00:06:00|           rec75|      np181|place182|    NULL|          NULL|               NULL|
|   36

In [15]:
user_activity_df = spark.read.csv('user_activity_data.csv', header=True, inferSchema=True) \
    .withColumnRenamed("timestamp", "user_activity_timestamp")
recommendation_df = spark.read.csv('recommendation_data.csv', header=True, inferSchema=True)
nearby_place_df = spark.read.csv('nearby_place_data.csv', header=True, inferSchema=True)
engagement_type_df = spark.read.csv('engagementType_data.csv', header=True, inferSchema=True) \
    .withColumnRenamed("timestamp", "engagement_timestamp")

In [16]:
merged_df1 = user_activity_df.join(recommendation_df, on='userID', how='left')

merged_df2 = merged_df1.join(nearby_place_df, on='postID', how='left')

final_df = merged_df2.join(engagement_type_df, 
                           (merged_df2.userID == engagement_type_df.userID) & 
                           (merged_df2.postID == engagement_type_df.postID), 
                           how='left')

final_df = final_df.drop(engagement_type_df.userID).drop(engagement_type_df.postID)

In [17]:
output_path = 'merged_data.csv'
final_df.write.csv(output_path, header=True, mode='overwrite')

In [19]:
from pyspark.sql.functions import col, count, to_date

In [23]:
from pyspark.sql.functions import col, isnan, when, count

In [24]:
spark = SparkSession.builder \
    .appName("Inspect Missing Values") \
    .getOrCreate()

In [26]:
merged_df = spark.read.csv('merged_data.csv', header=True, inferSchema=True)

numeric_columns = [column for column, dtype in merged_df.dtypes if dtype in ['int', 'double', 'float']]
all_columns = merged_df.columns

missing_value_counts = merged_df.select(
    [count(when(col(c).isNull() | (col(c).cast('string') == ''), c)).alias(c) for c in all_columns]
)

missing_value_counts.show()

merged_df.printSchema()

rows_with_missing_values = merged_df.filter(
    col("nearPlaceID").isNull() |
    col("placeID").isNull() |
    col("eventID").isNull() |
    col("engagementType").isNull() |
    col("engagement_timestamp").isNull()
)

rows_with_missing_values.show()

+------+------+----------+----+-----------------------+----------------+-----------+-------+-------+--------------+--------------------+
|postID|userID|activityID|page|user_activity_timestamp|recommendationID|nearPlaceID|placeID|eventID|engagementType|engagement_timestamp|
+------+------+----------+----+-----------------------+----------------+-----------+-------+-------+--------------+--------------------+
|     0|     0|         0|   0|                      0|               0|       3234|   3234|  19089|         19089|               19089|
+------+------+----------+----+-----------------------+----------------+-----------+-------+-------+--------------+--------------------+

root
 |-- postID: integer (nullable = true)
 |-- userID: string (nullable = true)
 |-- activityID: string (nullable = true)
 |-- page: string (nullable = true)
 |-- user_activity_timestamp: timestamp (nullable = true)
 |-- recommendationID: string (nullable = true)
 |-- nearPlaceID: string (nullable = true)
 |-- 

In [43]:
from pyspark.sql.functions import col, lit, when, count, last, coalesce
import pyspark.sql.functions as F
from pyspark.sql.window import Window
import random

In [34]:
spark = SparkSession.builder \
    .appName("Handle Missing Values") \
    .getOrCreate()

In [35]:
merged_df = spark.read.csv('merged_data.csv', header=True, inferSchema=True)

### Fill `nearPlaceID` and `placeID` with the nearest value using forward fill

In [36]:
#merged_df = merged_df.fillna(method='ffill', subset=['nearPlaceID', 'placeID'])

window_spec = Window.orderBy("postID").rowsBetween(Window.unboundedPreceding, 0)
merged_df = merged_df.withColumn("nearPlaceID", last(col("nearPlaceID"), ignorenulls=True).over(window_spec))
merged_df = merged_df.withColumn("placeID", last(col("placeID"), ignorenulls=True).over(window_spec))

### Generate random values for `eventID` and `engagementType`

In [37]:
event_ids = [f"event{i}" for i in range(501)]
engagement_types = ["Like", "Save", "Share"]

### UDF to generate random values for `eventID` and `engagementType`

In [38]:
random_event_id_udf = F.udf(lambda: random.choice(event_ids), returnType=F.StringType())
random_engagement_type_udf = F.udf(lambda: random.choice(engagement_types), returnType=F.StringType())

### Fill missing values for `eventID` and `engagementType` with random values

In [39]:
merged_df = merged_df.withColumn("eventID", when(col("eventID").isNull(), random_event_id_udf()).otherwise(col("eventID")))
merged_df = merged_df.withColumn("engagementType", when(col("engagementType").isNull(), random_engagement_type_udf()).otherwise(col("engagementType")))

### Fill `engagement_timestamp` with forward fill method

In [40]:
window_spec_timestamp = Window.orderBy("postID").rowsBetween(Window.unboundedPreceding, 0)
merged_df = merged_df.withColumn("engagement_timestamp", last(col("engagement_timestamp"), ignorenulls=True).over(window_spec_timestamp))

In [41]:
output_path_cleaned = 'cleaned_merged_data.csv'
merged_df.write.csv(output_path_cleaned, header=True, mode='overwrite')

In [42]:
merged_df = spark.read.csv('cleaned_merged_data.csv', header=True, inferSchema=True)

numeric_columns = [column for column, dtype in merged_df.dtypes if dtype in ['int', 'double', 'float']]
all_columns = merged_df.columns

missing_value_counts = merged_df.select(
    [count(when(col(c).isNull() | (col(c).cast('string') == ''), c)).alias(c) for c in all_columns]
)

missing_value_counts.show()

merged_df.printSchema()

rows_with_missing_values = merged_df.filter(
    col("nearPlaceID").isNull() |
    col("placeID").isNull() |
    col("eventID").isNull() |
    col("engagementType").isNull() |
    col("engagement_timestamp").isNull()
)

rows_with_missing_values.show()

+------+------+----------+----+-----------------------+----------------+-----------+-------+-------+--------------+--------------------+
|postID|userID|activityID|page|user_activity_timestamp|recommendationID|nearPlaceID|placeID|eventID|engagementType|engagement_timestamp|
+------+------+----------+----+-----------------------+----------------+-----------+-------+-------+--------------+--------------------+
|     0|     0|         0|   0|                      0|               0|        101|    101|      0|             0|                 191|
+------+------+----------+----+-----------------------+----------------+-----------+-------+-------+--------------+--------------------+

root
 |-- postID: integer (nullable = true)
 |-- userID: string (nullable = true)
 |-- activityID: string (nullable = true)
 |-- page: string (nullable = true)
 |-- user_activity_timestamp: timestamp (nullable = true)
 |-- recommendationID: string (nullable = true)
 |-- nearPlaceID: string (nullable = true)
 |-- 

In [44]:
merged_df = spark.read.csv('cleaned_merged_data.csv', header=True, inferSchema=True)

In [45]:
window_spec_ffill = Window.orderBy("postID").rowsBetween(Window.unboundedPreceding, 0)
window_spec_bfill = Window.orderBy("postID").rowsBetween(0, Window.unboundedFollowing)

merged_df = merged_df.withColumn("nearPlaceID_ffill", last(col("nearPlaceID"), ignorenulls=True).over(window_spec_ffill))
merged_df = merged_df.withColumn("placeID_ffill", last(col("placeID"), ignorenulls=True).over(window_spec_ffill))
merged_df = merged_df.withColumn("nearPlaceID_bfill", last(col("nearPlaceID_ffill"), ignorenulls=True).over(window_spec_bfill))
merged_df = merged_df.withColumn("placeID_bfill", last(col("placeID_ffill"), ignorenulls=True).over(window_spec_bfill))
merged_df = merged_df.withColumn("nearPlaceID", coalesce(col("nearPlaceID_ffill"), col("nearPlaceID_bfill")))
merged_df = merged_df.withColumn("placeID", coalesce(col("placeID_ffill"), col("placeID_bfill")))

event_ids = [f"event{i}" for i in range(501)]
engagement_types = ["Like", "Save", "Share"]

random_event_id_udf = F.udf(lambda: random.choice(event_ids), returnType=F.StringType())
random_engagement_type_udf = F.udf(lambda: random.choice(engagement_types), returnType=F.StringType())

merged_df = merged_df.withColumn("eventID", when(col("eventID").isNull(), random_event_id_udf()).otherwise(col("eventID")))
merged_df = merged_df.withColumn("engagementType", when(col("engagementType").isNull(), random_engagement_type_udf()).otherwise(col("engagementType")))
merged_df = merged_df.withColumn("engagement_timestamp_ffill", last(col("engagement_timestamp"), ignorenulls=True).over(window_spec_ffill))
merged_df = merged_df.withColumn("engagement_timestamp_bfill", last(col("engagement_timestamp_ffill"), ignorenulls=True).over(window_spec_bfill))
merged_df = merged_df.withColumn("engagement_timestamp", coalesce(col("engagement_timestamp_ffill"), col("engagement_timestamp_bfill")))
merged_df = merged_df.drop("nearPlaceID_ffill", "nearPlaceID_bfill", "placeID_ffill", "placeID_bfill", "engagement_timestamp_ffill", "engagement_timestamp_bfill")

In [46]:
output_path_cleaned = 'cleaned_merged_data.csv'
merged_df.write.csv(output_path_cleaned, header=True, mode='overwrite')

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 50374)
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.8/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/anaconda3/lib/python3.8/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/anaconda3/lib/python3.8/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/anaconda3/lib/python3.8/socketserver.py", line 720, in __init__
    self.handle()
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/accumulators.py", line 271,

In [None]:
spark.stop()