In [0]:
# 03_4_stedi_etl_pipeline
# Loads raw tables from bronze and keeps everything deterministic for job runs.

from pyspark.sql.functions import col, regexp_extract, when, lit

# Always set schemas explicitly so the job doesn't depend on whatever schema the cluster "feels like" today.
spark.sql("USE bronze")

device_raw = spark.table("device_messages_raw")
steps_raw  = spark.table("rapid_step_tests_raw")

print("Loaded:", device_raw.count(), "device rows")
print("Loaded:", steps_raw.count(), "step-test rows")


In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

spark.sql("USE bronze")

device_raw = spark.table("device_messages_raw")
steps_raw  = spark.table("rapid_step_tests_raw")

print("Loaded:", device_raw.count(), "device rows")
print("Loaded:", steps_raw.count(), "step-test rows")


In [0]:
from pyspark.sql.functions import col, regexp_extract, when, lit


In [0]:
# Clean distance (e.g., "75cm" -> 75) and keep the columns needed for labeling/output.
device_prepped = (
    device_raw
    .withColumn("distance_cm", regexp_extract(col("distance"), r"(\d+)", 1).cast("int"))
    .select("timestamp", "device_id", "sensor_type", "distance_cm")
)

# Keep only step-test windows (cast to long to match timestamp type).
from pyspark.sql.functions import col, when, lit

# Normalize step-test window timestamps to millisecond epoch.
# Heuristic: seconds epochs are ~1e9–1e10; milliseconds epochs are ~1e12–1e13.
steps_window = (
    steps_raw
    .select(
        "device_id",
        col("start_time").cast("long").alias("start_raw"),
        col("stop_time").cast("long").alias("stop_raw"),
    )
    .withColumn(
        "start_time_ms",
        when(col("start_raw") < lit(10**11), col("start_raw") * lit(1000)).otherwise(col("start_raw"))
    )
    .withColumn(
        "stop_time_ms",
        when(col("stop_raw") < lit(10**11), col("stop_raw") * lit(1000)).otherwise(col("stop_raw"))
    )
    .select(
        "device_id",
        col("start_time_ms").alias("start_time"),
        col("stop_time_ms").alias("stop_time"),
    )
)

display(steps_window.limit(5))


In [0]:
# Show what variables exist (quick sanity check)
[x for x in dir() if x.endswith("_df") or "final" in x or "device" in x or "steps" in x]


In [0]:
from pyspark.sql.functions import col, when, lit

final_df = (
    device_prepped.alias("d")
    .join(
        steps_window.alias("s"),
        (col("d.device_id") == col("s.device_id")) &
        (col("d.timestamp").between(col("s.start_time"), col("s.stop_time"))),
        "left"
    )
    .withColumn(
        "step_label",
        when(col("s.start_time").isNotNull(), lit("step")).otherwise(lit("no_step"))
    )
    .withColumn(
        "source_label",
        when(col("s.start_time").isNotNull(), lit("step")).otherwise(lit("device"))
    )
    .select(
        col("d.timestamp").alias("timestamp"),
        col("d.device_id").alias("device_id"),
        col("d.sensor_type").alias("sensor_type"),
        col("d.distance_cm").alias("distance_cm"),
        "step_label",
        "source_label"
    )
)


In [0]:
from pyspark.sql.functions import col, from_unixtime, to_timestamp

final_df_renamed = (
    final_df
    .withColumn("timestamp_ts", to_timestamp(from_unixtime(col("timestamp") / 1000)))
    .select(
        col("timestamp_ts").alias("timestamp"),
        "device_id",
        "sensor_type",
        "distance_cm",
        "step_label",
        "source_label"
    )
)

display(final_df_renamed.limit(20))


In [0]:
from pyspark.sql.functions import col, to_timestamp, from_unixtime

# Add human-readable timestamp columns (epoch ms -> timestamp)
final_df_readable = (
    final_df
    .withColumn("timestamp_ts", to_timestamp(from_unixtime(col("timestamp") / 1000)))
)

display(final_df_readable.select("timestamp", "timestamp_ts", "device_id", "sensor_type", "distance_cm", "step_label").limit(20))


In [0]:
from pyspark.sql.functions import col, when, lit

final_df = (
    device_prepped.alias("d")
    .join(
        steps_window.alias("s"),
        (col("d.device_id") == col("s.device_id")) &
        (col("d.timestamp").between(col("s.start_time"), col("s.stop_time"))),
        "left"
    )
    .withColumn(
        "step_label",
        when(col("s.start_time").isNotNull(), lit("step")).otherwise(lit("no_step"))
    )
    .select(
        col("d.timestamp").alias("timestamp"),
        col("d.device_id").alias("device_id"),
        col("d.sensor_type").alias("sensor_type"),
        col("d.distance_cm").alias("distance_cm"),
        "step_label"
    )
)


In [0]:
# Join on device_id and timestamp window to label step/no_step.
final_df = (
    device_prepped.alias("d")
    .join(
        steps_window.alias("s"),
        (col("d.device_id") == col("s.device_id")) &
        (col("d.timestamp").between(col("s.start_time"), col("s.stop_time"))),
        "left"
    )
    .withColumn(
        "step_label",
        when(col("s.start_time").isNotNull(), lit("step")).otherwise(lit("no_step"))
    )
    .withColumn(
        "source_label",
        when(col("s.start_time").isNotNull(), lit("step")).otherwise(lit("device"))
    )
    .select(
        col("d.timestamp").alias("timestamp"),
        col("d.device_id").alias("device_id"),
        col("d.sensor_type").alias("sensor_type"),
        col("d.distance_cm").alias("distance_cm"),
        "step_label",
        "source_label"
    )
)

display(final_df.limit(20))
final_df.groupBy("step_label").count().show()
final_df.groupBy("source_label").count().show()


In [0]:
final_df.createOrReplaceTempView("final_df")


In [0]:
%sql
CREATE OR REPLACE TABLE labeled_step_test AS
SELECT * FROM final_df;


In [0]:
%sql
SELECT COUNT(*) AS row_count
FROM labeled_step_test;


In [0]:
%sql
SELECT step_label, COUNT(*) AS cnt
FROM labeled_step_test
GROUP BY step_label;


In [0]:
%sql
SELECT *
FROM labeled_step_test
WHERE step_label NOT IN ('step', 'no_step')
   OR step_label IS NULL
LIMIT 50;


In [0]:
%sql
SELECT source_label, COUNT(*) AS cnt
FROM labeled_step_test
GROUP BY source_label;


In [0]:
%sql
SELECT *
FROM labeled_step_test
WHERE source_label NOT IN ('device', 'step')
   OR source_label IS NULL
LIMIT 50;


When you automate health-ish data pipelines, you’re responsible for protecting privacy and not exposing identifiers like device IDs more than necessary. You also need validation every run so bad labels or schema changes don’t quietly spread downstream. Since people tend to trust automated outputs, document what the labels really mean and where they can be wrong. Watch for bias from missing data or uneven participation, and avoid medical claims. This is activity labeling, not a diagnosis.