<a href="https://colab.research.google.com/github/sharathchandra919-bot/Login-Data-Pipeline-Bronze-Silver-Gold-Architecture-/blob/main/login_data_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Raw CSV

  ↓
Bronze Layer
  - Raw ingestion
  - No assumptions
  
  ↓
  Silver Layer
  - Column standardization
  - Time-of-day validation
  - Business-key deduplication
  - Data quality checks

  ↓
Gold Layer
  - Aggregated metrics
  - Engagement vs satisfaction insights


In [37]:
!apt-get install openjdk-11-jdk -qq
!pip install pyspark




In [24]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("LoginPipeline") \
    .getOrCreate()


In [2]:
#BRONZE LAYER

bronze_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv("/content/Time-Wasters on Social Media.csv")
)

bronze_df.printSchema()
bronze_df.show(5)


root
 |-- UserID: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Debt: boolean (nullable = true)
 |-- Owns Property: boolean (nullable = true)
 |-- Profession: string (nullable = true)
 |-- Demographics: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Total Time Spent: integer (nullable = true)
 |-- Number of Sessions: integer (nullable = true)
 |-- Video ID: integer (nullable = true)
 |-- Video Category: string (nullable = true)
 |-- Video Length: integer (nullable = true)
 |-- Engagement: integer (nullable = true)
 |-- Importance Score: integer (nullable = true)
 |-- Time Spent On Video: integer (nullable = true)
 |-- Number of Videos Watched: integer (nullable = true)
 |-- Scroll Rate: integer (nullable = true)
 |-- Frequency: string (nullable = true)
 |-- ProductivityLoss: integer (nullable = true)
 |-- Satisfaction: integer 

In [40]:
#SILVER LAYER
import re

def normalize_column_names(df):
  new_cols = []
  for col in df.columns:
    col_clean = col.lower()
    col_clean = re.sub(r'[^\w]+', "_",col_clean)
    # col_clean = col_clean.strip("_")
    new_cols.append(col_clean)
  df = df.toDF(*new_cols)
  return df

silver_df = normalize_column_names(bronze_df)
# silver_df.printSchema()
silver_df.show(5)

+------+---+------+-------------+------+-----+-------------+-------------+------------+---------+----------------+------------------+--------+--------------+------------+----------+----------------+-------------------+------------------------+-----------+---------+----------------+------------+---------------+----------+-------+----------+------------+---------------+---------------+--------------+
|userid|age|gender|     location|income| debt|owns_property|   profession|demographics| platform|total_time_spent|number_of_sessions|video_id|video_category|video_length|engagement|importance_score|time_spent_on_video|number_of_videos_watched|scroll_rate|frequency|productivityloss|satisfaction|   watch_reason|devicetype|     os|watch_time|self_control|addiction_level|currentactivity|connectiontype|
+------+---+------+-------------+------+-----+-------------+-------------+------------+---------+----------------+------------------+--------+--------------+------------+----------+---------------

In [41]:
from pyspark.sql.functions import col,try_to_timestamp,current_timestamp,when

silver_ts_df = (
    silver_df.withColumn("event_ts",try_to_timestamp(col("watch_time")))
    .withColumn("event_ts",when(col("event_ts") <= current_timestamp(),col("event_ts")).otherwise(None))
)

silver_ts_df.select("watch_time","event_ts").show(5,truncate= False)
silver_ts_df.filter(col("event_ts").isNull()).count()
silver_ts_df.filter(col("watch_time").isNotNull()).count()


+----------+--------+
|watch_time|event_ts|
+----------+--------+
|9:00 PM   |NULL    |
|5:00 PM   |NULL    |
|2:00 PM   |NULL    |
|9:00 PM   |NULL    |
|8:00 AM   |NULL    |
+----------+--------+
only showing top 5 rows


1000

In [42]:
silver_df = silver_df.withColumnRenamed("watch_time","watch_time_of_the_day")

from pyspark.sql.functions import col,regexp_extract
silver_df = silver_df.withColumn("is_valid_time",regexp_extract(
    col("watch_time_of_the_day"),
    r"^(1[0-2]|[1-9]):[0-5][0-9]\s?(AM|PM)$",
    0
) != ""
)

silver_df = silver_df.filter(col("is_valid_time"))

# silver_df.show()        # no reassignment
# silver_df.printSchema()



In [43]:
silver_df = silver_df.dropDuplicates(["userid","platform","watch_time_of_the_day","video_id"])


In [44]:
# GOLD LAYER
from pyspark.sql.functions import sum as spark_sum

gold_df_platform = silver_df.groupBy("platform") \
                            .agg(spark_sum("total_time_spent").alias("total_time_watched")) \
                            .orderBy(col("total_time_watched").desc())

gold_df_platform.show()

gold_df_platform.count()

+---------+------------------+
| platform|total_time_watched|
+---------+------------------+
|   TikTok|             41297|
|  YouTube|             38205|
|Instagram|             37609|
| Facebook|             34295|
+---------+------------------+



4

In [45]:
from pyspark.sql.functions import avg,count

gold_engagement = silver_df.groupBy("engagement").agg(
    avg("satisfaction").alias("avg_satisfaction"),
    count("userid").alias("user_count")
).orderBy("engagement")

gold_engagement.show()
silver_df.select("engagement", "satisfaction").summary().show()


+----------+----------------+----------+
|engagement|avg_satisfaction|user_count|
+----------+----------------+----------+
|        15|             5.5|         2|
|        45|             7.0|         1|
|        48|             3.0|         1|
|        56|             4.0|         2|
|        63|             7.0|         1|
|        70|             3.0|         1|
|        92|             4.0|         1|
|       131|             9.0|         1|
|       142|             7.0|         1|
|       143|             4.0|         1|
|       158|             7.0|         1|
|       170|             2.0|         1|
|       171|             7.0|         1|
|       184|             4.0|         1|
|       236|             4.0|         1|
|       246|             4.0|         1|
|       260|             5.0|         1|
|       276|             4.0|         1|
|       279|             2.0|         1|
|       303|             3.0|         1|
+----------+----------------+----------+
only showing top

In [46]:
total_rows = silver_df.count()

null_check = silver_df.select([count(col(c))/total_rows for c in silver_df.columns])
null_check.show()


#We define acceptable null thresholds and fail the pipeline if exceeded.



+----------------------+-------------------+----------------------+------------------------+----------------------+--------------------+-----------------------------+--------------------------+----------------------------+------------------------+--------------------------------+----------------------------------+------------------------+------------------------------+----------------------------+--------------------------+--------------------------------+-----------------------------------+----------------------------------------+---------------------------+-------------------------+--------------------------------+----------------------------+----------------------------+--------------------------+------------------+-------------------------------------+----------------------------+-------------------------------+-------------------------------+------------------------------+-----------------------------+
|(count(userid) / 1000)|(count(age) / 1000)|(count(gender) / 1000)|(count(locat

In [47]:
silver_df.filter(
    (col("engagement") < 0) | (col("satisfaction") < 0)
).count()


0

In [49]:
#spark implementation
from pyspark.sql.functions import current_timestamp, lit

gold_engagement_satisfaction = (
    gold_engagement
    .withColumn("metric_date", lit(None).cast("date"))
    .withColumn("created_at", current_timestamp())
)

gold_engagement_satisfaction.printSchema()
gold_engagement_satisfaction.show()



root
 |-- engagement: integer (nullable = true)
 |-- avg_satisfaction: double (nullable = true)
 |-- user_count: long (nullable = false)
 |-- metric_date: date (nullable = true)
 |-- created_at: timestamp (nullable = false)

+----------+----------------+----------+-----------+--------------------+
|engagement|avg_satisfaction|user_count|metric_date|          created_at|
+----------+----------------+----------+-----------+--------------------+
|        15|             5.5|         2|       NULL|2025-12-31 09:36:...|
|        45|             7.0|         1|       NULL|2025-12-31 09:36:...|
|        48|             3.0|         1|       NULL|2025-12-31 09:36:...|
|        56|             4.0|         2|       NULL|2025-12-31 09:36:...|
|        63|             7.0|         1|       NULL|2025-12-31 09:36:...|
|        70|             3.0|         1|       NULL|2025-12-31 09:36:...|
|        92|             4.0|         1|       NULL|2025-12-31 09:36:...|
|       131|             9.0|      