# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [None]:
# إعداد Spark و GlueContext
import sys
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, year, month, quarter, weekofyear, dayofmonth, date_format
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

# Spark & GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)





#### Example: Read Data from S3 bucket


In [None]:
input_path = "s3://project-data-preperations/wearables_health_6mo_daily.csv"
df = spark.read.option("header", True).option("inferSchema", True).csv(input_path)
df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- region: string (nullable = true)
 |-- device_model: string (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- resting_hr_bpm: double (nullable = true)
 |-- avg_hr_day_bpm: double (nullable = true)
 |-- hrv_rmssd_ms: double (nullable = true)
 |-- spo2_avg_pct: double (nullable = true)
 |-- sbp_mmHg: double (nullable = true)
 |-- dbp_mmHg: double (nullable = true)
 |-- steps: double (nullable = true)
 |-- distance_km: double (nullable = true)
 |-- calories_kcal: double (nullable = true)
 |-- workout_type: string (nullable = true)
 |-- workout_minutes: integer (nullable = true)
 |-- caffeine_mg: double (nullable = true)
 |-- alcohol_units: double (nullable = true)
 |-- screen_time_min: double (nullable = true)
 |-- sleep_duration_hours: doub

#### Example: Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [None]:
dyf = DynamicFrame.fromDF(df, glueContext, "wearables_dynamicframe")
df = dyf.toDF()
df.show() 

+-------+-------------------+---+------+------------------+------------+---------+---------+----+------------------+-----------------+------------------+-----------------+------------------+-----------------+-------+-----------+-------------+------------+---------------+------------------+-------------------+------------------+--------------------+------------------+-----------------+--------------------------+-------------------+--------------------+---------------------+------------+-------------------+--------+
|user_id|               date|age|gender|            region|device_model|height_cm|weight_kg| bmi|    resting_hr_bpm|   avg_hr_day_bpm|      hrv_rmssd_ms|     spo2_avg_pct|          sbp_mmHg|         dbp_mmHg|  steps|distance_km|calories_kcal|workout_type|workout_minutes|       caffeine_mg|      alcohol_units|   screen_time_min|sleep_duration_hours|  sleep_efficiency|sleep_latency_min|wake_after_sleep_onset_min|sleep_stage_rem_pct|sleep_stage_deep_pct|sleep_stage_light_pct|str

#### to know columns that contains null


In [None]:
null_counts = df.select([F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns])
null_counts.show(truncate=False)

+-------+----+---+------+------+------------+---------+---------+---+--------------+--------------+------------+------------+--------+--------+-----+-----------+-------------+------------+---------------+-----------+-------------+---------------+--------------------+----------------+-----------------+--------------------------+-------------------+--------------------+---------------------+------------+-------------------+----+
|user_id|date|age|gender|region|device_model|height_cm|weight_kg|bmi|resting_hr_bpm|avg_hr_day_bpm|hrv_rmssd_ms|spo2_avg_pct|sbp_mmHg|dbp_mmHg|steps|distance_km|calories_kcal|workout_type|workout_minutes|caffeine_mg|alcohol_units|screen_time_min|sleep_duration_hours|sleep_efficiency|sleep_latency_min|wake_after_sleep_onset_min|sleep_stage_rem_pct|sleep_stage_deep_pct|sleep_stage_light_pct|stress_score|mindfulness_minutes|mood|
+-------+----+---+------+------+------------+---------+---------+---+--------------+--------------+------------+------------+--------+----

#### Example: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [None]:
null_counts = df.select([F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns])
null_counts_vertical = null_counts.selectExpr("stack({0}, {1}) as (column_name, num_nulls)".format(len(df.columns),", ".join(["'{0}', `{0}`".format(c) for c in df.columns])))
null_counts_vertical.show(truncate=False)

+---------------+---------+
|column_name    |num_nulls|
+---------------+---------+
|user_id        |0        |
|date           |0        |
|age            |0        |
|gender         |0        |
|region         |0        |
|device_model   |0        |
|height_cm      |0        |
|weight_kg      |0        |
|bmi            |0        |
|resting_hr_bpm |0        |
|avg_hr_day_bpm |1057     |
|hrv_rmssd_ms   |1057     |
|spo2_avg_pct   |0        |
|sbp_mmHg       |0        |
|dbp_mmHg       |0        |
|steps          |1057     |
|distance_km    |1057     |
|calories_kcal  |1057     |
|workout_type   |0        |
|workout_minutes|0        |
+---------------+---------+
only showing top 20 rows


### Clean Null Values

In [None]:
numeric_cols_with_nulls = ['avg_hr_day_bpm','hrv_rmssd_ms','steps','distance_km',
                           'calories_kcal','sleep_duration_hours','sleep_efficiency',
                           'sleep_latency_min','wake_after_sleep_onset_min',
                           'sleep_stage_rem_pct','sleep_stage_deep_pct','sleep_stage_light_pct']

windowSpec = Window.partitionBy("user_id").orderBy("date").rowsBetween(Window.unboundedPreceding, 0)

for c in numeric_cols_with_nulls:
    df = df.withColumn(c, F.last(col(c), ignorenulls=True).over(windowSpec))

df = df.dropna(subset=numeric_cols_with_nulls)

null_counts = df.select([F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in numeric_cols_with_nulls])

null_counts_verticals = null_counts.selectExpr("stack({0}, {1}) as (column_name, num_nulls)".format(len(numeric_cols_with_nulls),
                                                ", ".join(["'{0}', `{0}`".format(c) for c in numeric_cols_with_nulls])))

null_counts_verticals.show(truncate=False)


+--------------------------+---------+
|column_name               |num_nulls|
+--------------------------+---------+
|avg_hr_day_bpm            |0        |
|hrv_rmssd_ms              |0        |
|steps                     |0        |
|distance_km               |0        |
|calories_kcal             |0        |
|sleep_duration_hours      |0        |
|sleep_efficiency          |0        |
|sleep_latency_min         |0        |
|wake_after_sleep_onset_min|0        |
|sleep_stage_rem_pct       |0        |
|sleep_stage_deep_pct      |0        |
|sleep_stage_light_pct     |0        |
+--------------------------+---------+


In [None]:
df = df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

df = df.withColumn("year", year("date")) \
       .withColumn("month", month("date")) \
       .withColumn("quarter", quarter("date")) \
       .withColumn("week", weekofyear("date")) \
       .withColumn("day", dayofmonth("date")) \
       .withColumn("month_name", date_format("date", "MMMM")) \
       .withColumn("day_name", date_format("date", "EEEE"))

df.select("date","year","month","quarter","week","day","month_name","day_name").show(5)

+----------+----+-----+-------+----+---+----------+---------+
|      date|year|month|quarter|week|day|month_name| day_name|
+----------+----+-----+-------+----+---+----------+---------+
|2025-05-11|2025|    5|      2|  19| 11|       May|   Sunday|
|2025-05-12|2025|    5|      2|  20| 12|       May|   Monday|
|2025-05-13|2025|    5|      2|  20| 13|       May|  Tuesday|
|2025-05-14|2025|    5|      2|  20| 14|       May|Wednesday|
|2025-05-15|2025|    5|      2|  20| 15|       May| Thursday|
+----------+----+-----+-------+----+---+----------+---------+
only showing top 5 rows


In [None]:

df = df.withColumn("date_key", date_format(col("date"), "yyyyMMdd").cast("int"))

df.select("date","date_key").show(5)

+----------+--------+
|      date|date_key|
+----------+--------+
|2025-05-11|20250511|
|2025-05-12|20250512|
|2025-05-13|20250513|
|2025-05-14|20250514|
|2025-05-15|20250515|
+----------+--------+
only showing top 5 rows


In [None]:
dim_date_cols = ['date_key', 'year', 'month', 'quarter', 'week', 'day', 'month_name', 'day_name']
dim_date = df.select(dim_date_cols).dropDuplicates()

dim_date.show(5)


+--------+----+-----+-------+----+---+----------+--------+
|date_key|year|month|quarter|week|day|month_name|day_name|
+--------+----+-----+-------+----+---+----------+--------+
|20250601|2025|    6|      2|  22|  1|      June|  Sunday|
|20250603|2025|    6|      2|  23|  3|      June| Tuesday|
|20250708|2025|    7|      3|  28|  8|      July| Tuesday|
|20250724|2025|    7|      3|  30| 24|      July|Thursday|
|20250821|2025|    8|      3|  34| 21|    August|Thursday|
+--------+----+-----+-------+----+---+----------+--------+
only showing top 5 rows


In [None]:
df = df.orderBy(['user_id', 'date'])





In [None]:
dim_user_cols = ['user_id', 'age', 'gender', 'region', 'device_model', 'height_cm', 'weight_kg', 'bmi']
dim_user = df.select(dim_user_cols).dropDuplicates(["user_id"])
w = Window.orderBy("user_id")

dim_user = dim_user.withColumn("user_sk", row_number().over(w))






In [None]:
dim_user = dim_user.select("user_sk","user_id","age", "gender", "region", "device_model","height_cm", "weight_kg", "bmi")
dim_user.show(5, truncate=False)



+-------+-------+---+------+------------------+--------------+---------+---------+----+
|user_sk|user_id|age|gender|region            |device_model  |height_cm|weight_kg|bmi |
+-------+-------+---+------+------------------+--------------+---------+---------+----+
|1      |U0001  |35 |female|TR-EasternAnatolia|FitPulse X    |170      |58.7     |20.3|
|2      |U0002  |23 |male  |TR-Marmara        |SleepSense 2  |174      |63.6     |21.0|
|3      |U0003  |27 |male  |TR-Aegean         |HealthBand Pro|188      |65.6     |18.6|
|4      |U0004  |31 |male  |TR-Aegean         |FitPulse X    |169      |81.3     |28.5|
|5      |U0005  |34 |male  |TR-EasternAnatolia|PulseOne Mini |180      |69.9     |21.6|
+-------+-------+---+------+------------------+--------------+---------+---------+----+
only showing top 5 rows


In [None]:
# join على user_id لإحضار user_sk لكل صف
fact_daily_with_sk = fact_daily.join(
    dim_user.select("user_id", "user_sk"),
    on="user_id",
    how="left"   # left join لضمان بقاء كل الصفوف في fact_daily
)





In [None]:
cols = ["user_sk"] + [c for c in fact_daily_with_sk.columns if c != "user_sk"]
fact_daily_with_sk = fact_daily_with_sk.select(cols)





In [None]:
fact_daily_with_sk.select("user_sk", "user_id", "date_key", "steps").show(10, truncate=False)
fact_daily=fact_daily_with_sk

+-------+-------+--------+-------+
|user_sk|user_id|date_key|steps  |
+-------+-------+--------+-------+
|1      |U0001  |20250511|7279.0 |
|1      |U0001  |20250512|9413.0 |
|1      |U0001  |20250513|8231.0 |
|1      |U0001  |20250514|8677.0 |
|1      |U0001  |20250515|12036.0|
|1      |U0001  |20250516|7946.0 |
|1      |U0001  |20250517|6744.0 |
|1      |U0001  |20250518|7340.0 |
|1      |U0001  |20250519|6331.0 |
|1      |U0001  |20250520|6624.0 |
+-------+-------+--------+-------+
only showing top 10 rows


In [None]:
fact_daily.printSchema()


root
 |-- user_sk: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- date_key: integer (nullable = true)
 |-- steps: double (nullable = true)
 |-- distance_km: double (nullable = true)
 |-- calories_kcal: double (nullable = true)
 |-- workout_minutes: integer (nullable = true)
 |-- resting_hr_bpm: double (nullable = true)
 |-- avg_hr_day_bpm: double (nullable = true)
 |-- hrv_rmssd_ms: double (nullable = true)
 |-- sbp_mmHg: double (nullable = true)
 |-- dbp_mmHg: double (nullable = true)
 |-- spo2_avg_pct: double (nullable = true)
 |-- sleep_duration_hours: double (nullable = true)
 |-- sleep_efficiency: double (nullable = true)
 |-- sleep_latency_min: double (nullable = true)
 |-- wake_after_sleep_onset_min: double (nullable = true)
 |-- sleep_stage_rem_pct: double (nullable = true)
 |-- sleep_stage_deep_pct: double (nullable = true)
 |-- sleep_stage_light_pct: double (nullable = true)
 |-- stress_score: integer (nullable = true)
 |-- mindfulness_minutes: integer 

In [None]:
cols = ["user_sk_fk"] + [c for c in fact_daily.columns if c != "user_sk_fk"]
fact_daily = fact_daily.select(cols)





In [None]:
df_fact = df.join(dim_user.select("user_id"), on="user_id", how="left")

fact_cols = [
    'user_sk', 'date_key', 'steps','distance_km','calories_kcal','workout_minutes',
    'resting_hr_bpm','avg_hr_day_bpm','hrv_rmssd_ms','sbp_mmHg','dbp_mmHg','spo2_avg_pct',
    'sleep_duration_hours','sleep_efficiency','sleep_latency_min','wake_after_sleep_onset_min',
    'sleep_stage_rem_pct','sleep_stage_deep_pct','sleep_stage_light_pct',
    'stress_score','mindfulness_minutes','screen_time_min','caffeine_mg','alcohol_units','mood','workout_type'
]

fact_daily = df_fact.select(fact_cols)

fact_daily.show(5, truncate=False)



+-------+--------+-------+-----------+-------------+---------------+-----------------+-----------------+------------------+------------------+-----------------+-----------------+--------------------+------------------+-----------------+--------------------------+-------------------+--------------------+---------------------+------------+-------------------+------------------+------------------+-------------------+--------+------------+
|user_id|date_key|steps  |distance_km|calories_kcal|workout_minutes|resting_hr_bpm   |avg_hr_day_bpm   |hrv_rmssd_ms      |sbp_mmHg          |dbp_mmHg         |spo2_avg_pct     |sleep_duration_hours|sleep_efficiency  |sleep_latency_min|wake_after_sleep_onset_min|sleep_stage_rem_pct|sleep_stage_deep_pct|sleep_stage_light_pct|stress_score|mindfulness_minutes|screen_time_min   |caffeine_mg       |alcohol_units      |mood    |workout_type|
+-------+--------+-------+-----------+-------------+---------------+-----------------+-----------------+----------------

In [None]:
dim_date.show(5)

+--------+----+-----+-------+----+---+----------+--------+
|date_key|year|month|quarter|week|day|month_name|day_name|
+--------+----+-----+-------+----+---+----------+--------+
|20250601|2025|    6|      2|  22|  1|      June|  Sunday|
|20250603|2025|    6|      2|  23|  3|      June| Tuesday|
|20250708|2025|    7|      3|  28|  8|      July| Tuesday|
|20250724|2025|    7|      3|  30| 24|      July|Thursday|
|20250821|2025|    8|      3|  34| 21|    August|Thursday|
+--------+----+-----+-------+----+---+----------+--------+
only showing top 5 rows


In [None]:
dim_user.show(5)


+-------+-------+---+------+------------------+--------------+---------+---------+----+
|user_sk|user_id|age|gender|            region|  device_model|height_cm|weight_kg| bmi|
+-------+-------+---+------+------------------+--------------+---------+---------+----+
|      1|  U0001| 35|female|TR-EasternAnatolia|    FitPulse X|      170|     58.7|20.3|
|      2|  U0002| 23|  male|        TR-Marmara|  SleepSense 2|      174|     63.6|21.0|
|      3|  U0003| 27|  male|         TR-Aegean|HealthBand Pro|      188|     65.6|18.6|
|      4|  U0004| 31|  male|         TR-Aegean|    FitPulse X|      169|     81.3|28.5|
|      5|  U0005| 34|  male|TR-EasternAnatolia| PulseOne Mini|      180|     69.9|21.6|
+-------+-------+---+------+------------------+--------------+---------+---------+----+
only showing top 5 rows


In [None]:
fact_daily.show(2)

+-------+--------+------+-----------+-------------+---------------+-----------------+-----------------+-----------------+------------------+-----------------+-----------------+--------------------+------------------+-----------------+--------------------------+-------------------+--------------------+---------------------+------------+-------------------+-----------------+-----------------+-------------------+--------+------------+
|user_id|date_key| steps|distance_km|calories_kcal|workout_minutes|   resting_hr_bpm|   avg_hr_day_bpm|     hrv_rmssd_ms|          sbp_mmHg|         dbp_mmHg|     spo2_avg_pct|sleep_duration_hours|  sleep_efficiency|sleep_latency_min|wake_after_sleep_onset_min|sleep_stage_rem_pct|sleep_stage_deep_pct|sleep_stage_light_pct|stress_score|mindfulness_minutes|  screen_time_min|      caffeine_mg|      alcohol_units|    mood|workout_type|
+-------+--------+------+-----------+-------------+---------------+-----------------+-----------------+-----------------+-------

In [None]:

for c in fact_daily.columns:
    print(f"Column: {c}")
    fact_daily.select(c).show(2, truncate=False)


AnalysisException: Reference 'user_sk' is ambiguous, could be: user_sk, user_sk.


In [None]:
dim_date.write.mode("overwrite").option("header", True).csv("s3://project-data-preperations/dim_date")

dim_user.write.mode("overwrite").option("header", True).csv("s3://project-data-preperations/dim_user")

fact_daily.write.mode("overwrite").option("header", True).csv("s3://project-data-preperations/fact_daily")



