## **Step 2: Transforming and Cleaning Data**

### **Background & Introduction**
Data cleaning and transformation are critical steps in any ETL pipeline. Before data can be analyzed or loaded into storage systems, it must be structured, deduplicated, and properly formatted.

In this step, you will perform key transformations to prepare the video streaming dataset for analysis. This includes handling missing values, deduplicating data, converting data types, enriching records with joins, and applying aggregations.

---

### **Objective:**
Perform data cleaning, deduplication, type conversions, and transformations.


In [1]:
# Import necessary libraries
from pyspark.sql import SparkSession
import logging

# Initialize Spark session
spark = SparkSession.builder.appName("VideoStreamingETL").getOrCreate()

# Reduce verbose logging
spark.sparkContext.setLogLevel("ERROR")

# Configure logging to suppress warnings
logging.getLogger("py4j").setLevel(logging.ERROR)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/11 07:05:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Define dataset paths
viewing_history_path = "video_streaming_data/viewing_history.csv"
users_path = "video_streaming_data/users.json"

# Load data
viewing_history_df = spark.read.option("header", True).csv(viewing_history_path)
users_df = spark.read.option("multiline", "true").json(users_path)

### **Task 1: Handle Missing Values**

In [3]:
from pyspark.sql.functions import col, sum as _sum, when

# Get the Null counts in viewing_history
print(" Null counts in viewing_history_df:")
viewing_history_df.select([
    _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in viewing_history_df.columns
]).show()

# Get the Null counts in users_df
users_df.select([
    _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in users_df.columns
]).show()

 Null counts in viewing_history_df:
+-------+--------+----------+-----------+--------------+
|user_id|video_id|watched_at|device_type|account_status|
+-------+--------+----------+-----------+--------------+
|      0|       0|         0|      34212|         34076|
+-------+--------+----------+-----------+--------------+

+-----+----+------------------+-----------------+-------+
|email|name|preferred_language|subscription_date|user_id|
+-----+----+------------------+-----------------+-------+
|    0|   0|             25376|            34231|      0|
+-----+----+------------------+-----------------+-------+



In [4]:
# Fill missing device_type with "Unknown"
viewing_history_df = viewing_history_df.na.fill(value="Unknown", subset=["device_type"])


# Drop account_status column (mostly null)
if "account_status" in viewing_history_df.columns:
    viewing_history_df = viewing_history_df.drop("account_status")

# Drop rows missing user_id or watched_at (critical fields)
viewing_history_df = viewing_history_df.dropna(subset=["user_id", "watched_at"])

# Fill preferred_language with "Unknown"
users_df = users_df.fillna({"preferred_language": "Unknown"})

# Fill missing subscription_date with a placeholder
users_df = users_df.fillna({"subscription_date": "2020-01-01"})



In [5]:
# Verify by rerunning

from pyspark.sql.functions import col, sum as _sum, when

# Get the Null counts in viewing_history
viewing_history_df.select([
    _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in viewing_history_df.columns
]).show()

# Get the Null counts in users_df
users_df.select([
    _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in users_df.columns
]).show()


+-------+--------+----------+-----------+
|user_id|video_id|watched_at|device_type|
+-------+--------+----------+-----------+
|      0|       0|         0|          0|
+-------+--------+----------+-----------+

+-----+----+------------------+-----------------+-------+
|email|name|preferred_language|subscription_date|user_id|
+-----+----+------------------+-----------------+-------+
|    0|   0|                 0|                0|      0|
+-----+----+------------------+-----------------+-------+



### **Task 2: Remove Duplicates**

In [6]:
# Check for duplicates
users_total = users_df.count()
unique_user_ids = users_df.select("user_id").distinct().count()

# Print total rows, distinct rows, and duplicate rows
print(f"Total rows = {users_total}")
print(f"Distinct rows = {unique_user_ids}")
print(f"Duplicate rows = {users_total - unique_user_ids}")


Total rows = 102000
Distinct rows = 100000
Duplicate rows = 2000


In [7]:
# Remove duplicates based on user_id
users_df = users_df.dropDuplicates(["user_id"])

In [8]:
# Confirm cleanup
users_df.groupBy("user_id").count().filter(col("count") > 1).show()

+-------+-----+
|user_id|count|
+-------+-----+
+-------+-----+



### **Task 3: Convert Data Types**

In [9]:
# Check the current schema
viewing_history_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- video_id: string (nullable = true)
 |-- watched_at: string (nullable = true)
 |-- device_type: string (nullable = false)



In [10]:
from pyspark.sql.functions import to_timestamp, col

# Convert user_id to IntegerType
viewing_history_df = viewing_history_df.withColumn("user_id", col("user_id").cast("int"))

# Convert watched_at to TimestampType
viewing_history_df = viewing_history_df.withColumn("watched_at", to_timestamp(col("watched_at")))

In [11]:
# Confirm the changes
viewing_history_df.printSchema()

viewing_history_df.select("user_id", "watched_at").show(5)

root
 |-- user_id: integer (nullable = true)
 |-- video_id: string (nullable = true)
 |-- watched_at: timestamp (nullable = true)
 |-- device_type: string (nullable = false)

+-------+-------------------+
|user_id|         watched_at|
+-------+-------------------+
|  21558|2024-01-01 00:00:00|
| 190681|2024-01-01 00:00:01|
| 190802|2024-01-01 00:00:02|
|  40874|2024-01-01 00:00:03|
|  92704|2024-01-01 00:00:04|
+-------+-------------------+
only showing top 5 rows



### **Task 4: Filtering & Joins**

In [12]:
# Optional: Cast user_id in both dataframes

viewing_history_df = viewing_history_df.withColumn("user_id", col("user_id").cast("int"))

users_df = users_df.withColumn("user_id", col("user_id").cast("int"))

In [13]:
# Make sure both DataFrames are clean and have user_id
viewing_history_df.select("user_id").show(5, truncate=False)
users_df.select("user_id").show(5)

+-------+
|user_id|
+-------+
|21558  |
|190681 |
|190802 |
|40874  |
|92704  |
+-------+
only showing top 5 rows

+-------+
|user_id|
+-------+
| 849311|
| 521357|
| 400692|
| 798647|
| 869136|
+-------+
only showing top 5 rows



In [14]:
# Join both DataFrames

joined_df = viewing_history_df.join(users_df, on="user_id", how="inner")

In [15]:
# Preview Results 

joined_df.select("user_id", "video_id", "watched_at", "email", "preferred_language").show(5, truncate=False)


                                                                                

+-------+--------+-------------------+---------------------+------------------+
|user_id|video_id|watched_at         |email                |preferred_language|
+-------+--------+-------------------+---------------------+------------------+
|10017  |18662   |2024-01-04 18:26:07|zachary06@example.com|Spanish           |
|10017  |18485   |2024-01-01 01:41:54|zachary06@example.com|Spanish           |
|10037  |16415   |2024-01-02 18:53:51|kyle00@example.org   |Unknown           |
|10046  |7534    |2024-01-03 04:10:25|tracey42@example.org |English           |
|10046  |1315    |2024-01-03 03:23:54|tracey42@example.org |English           |
+-------+--------+-------------------+---------------------+------------------+
only showing top 5 rows



### **Task 5: Aggregations & Window Functions**

In [16]:
from pyspark.sql.functions import count

# Count Views per User per Video
joined_df.groupBy("video_id").agg(count("*").alias("views_per_video"))

# Count how many times each user watched each video
most_watched_df = joined_df.groupBy("user_id", "video_id").agg(
    count("*").alias("watch_count")
)

# Show most watched videos overall
most_watched_df.orderBy("watch_count", ascending=False).show(10)




Exception in thread "block-manager-ask-thread-pool-89" java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
	at java.base/java.lang.Thread.start0(Native Method)
	at java.base/java.lang.Thread.start(Thread.java:1553)
	at java.base/java.lang.System$2.start(System.java:2573)
	at java.base/jdk.internal.vm.SharedThreadContainer.start(SharedThreadContainer.java:152)
	at java.base/java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:953)
	at java.base/java.util.concurrent.ThreadPoolExecutor.processWorkerExit(ThreadPoolExecutor.java:1021)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1158)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)




Py4JJavaError: An error occurred while calling o262.showString.
: java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
	at java.base/java.lang.Thread.start0(Native Method)
	at java.base/java.lang.Thread.start(Thread.java:1553)
	at java.base/java.lang.System$2.start(System.java:2573)
	at java.base/jdk.internal.vm.SharedThreadContainer.start(SharedThreadContainer.java:152)
	at java.base/java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:953)
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1364)
	at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:145)
	at scala.concurrent.impl.ExecutionContextImpl$$anon$4.submit(ExecutionContextImpl.scala:144)
	at org.apache.spark.sql.execution.SQLExecution$.withThreadLocalCaptured(SQLExecution.scala:219)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.relationFuture$lzycompute(BroadcastExchangeExec.scala:134)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.relationFuture(BroadcastExchangeExec.scala:131)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doPrepare(BroadcastExchangeExec.scala:203)
	at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:295)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:244)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeLike.submitBroadcastJob(BroadcastExchangeExec.scala:68)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeLike.submitBroadcastJob$(BroadcastExchangeExec.scala:67)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.submitBroadcastJob(BroadcastExchangeExec.scala:83)
	at org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec.doMaterialize(QueryStageExec.scala:248)
	at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:61)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:302)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:300)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:300)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:272)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:419)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)


In [None]:
## Add Total Views per User
# Count total watch events per user
total_views_df = joined_df.groupBy("user_id").agg(
    count("*").alias("total_views")
)

# Show most active users
total_views_df.orderBy("total_views", ascending=False).show(10)

In [None]:
## Rank Top 3 Most-Watched Videos per User
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Create window spec to rank videos within each user
window_spec = Window.partitionBy("user_id").orderBy(col("watch_count").desc())

# Add ranking column
ranked_df = most_watched_df.withColumn("rank", row_number().over(window_spec))

# Filter to get top 3 videos per user
top_videos_per_user = ranked_df.filter(col("rank") <= 3)

# Show results
top_videos_per_user.orderBy("user_id", "rank").show(10)


### **Task 6: Apply UDFs & Vectorized UDFs**

In [None]:
# Preview device types
viewing_history_df.select("device_type").distinct().show()


In [None]:
## Define a Python UDF to clean device labels
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define the normalization logic
def normalize_device(device):
    if device is None:
        return "Unknown"
    device = device.lower()
    if "iphone" in device:
        return "iPhone"
    elif "android" in device:
        return "Android"
    else:
        return "Other"

# Register as a PySpark UDF
normalize_device_udf = udf(normalize_device, StringType())


In [None]:
## Apply the UDF to your DataFrame
# Create a new column with normalized values
viewing_history_df = viewing_history_df.withColumn(
    "normalized_device", normalize_device_udf(viewing_history_df["device_type"])
)


In [None]:
# Inspect the result
viewing_history_df.select("device_type", "normalized_device").distinct().show()




26/01/11 07:05:52 ERROR Utils: Uncaught exception in thread executor-heartbeater
java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
	at java.base/java.lang.Thread.start0(Native Method)
	at java.base/java.lang.Thread.start(Thread.java:1526)
	at org.apache.spark.util.Utils$.processStreamByLine(Utils.scala:1327)
	at org.apache.spark.util.Utils$.executeAndGetOutput(Utils.scala:1302)
	at org.apache.spark.executor.ProcfsMetricsGetter.computePageSize(ProcfsMetricsGetter.scala:93)
	at org.apache.spark.executor.ProcfsMetricsGetter.<init>(ProcfsMetricsGetter.scala:47)
	at org.apache.spark.executor.ProcfsMetricsGetter$.<init>(ProcfsMetricsGetter.scala:233)
	at org.apache.spark.executor.ProcfsMetricsGetter$.<clinit>(ProcfsMetricsGetter.scala)
	at org.apache.spark.metrics.ProcessTreeMetrics$.getMetricValues(ExecutorMetricType.scala:93)
	at org.apache.spark.executor.ExecutorMetrics$.$anonfun$getCurrentMetrics$1(ExecutorMetrics.scala: