# Part 2: Python Data Transformation
In this section, I am using PySpark to convert the raw numeric timestamps into human-readable Date/Time formats for better analysis.

This Python section mirrors the SQL steps, but prepares a compact features table (avg/min/max/variance of distance within each test window). We will reuse this features table in ML weeks.

In [0]:
from pyspark.sql import functions as F

dm = spark.read.table("workspace.bronze.device_message_raw").withColumn("source_table", F.lit("device_message_raw"))
rt = spark.read.table("workspace.bronze.Rapid_step_test_raw").withColumn("source_table", F.lit("Rapid_step_test_raw"))

dm.printSchema()
rt.printSchema()

df_refined = dm.withColumn("readable_time", F.from_unixtime(F.col("timestamp") / 1000))
display(df_refined.select("source_table", "device_id", "timestamp", "readable_time").limit(5))

###Per-device descriptive stats

Showing summarized stats for each device.

In [0]:
dm_clean = (dm
    .withColumn("distance_cm", F.regexp_replace(F.col("distance").cast("string"), "[^0-9.]", "").cast("double"))
    .withColumn("ts_ms", F.col("timestamp").cast("bigint")))

dm_stats = (dm_clean.groupBy("device_id")
    .agg(F.count("*").alias("n"), F.avg("distance_cm").alias("avg_cm"))
    .orderBy(F.desc("n")))
display(dm_stats.limit(20))

###Explode stepPoints to long form

This is the python version of the explode.

In [0]:
from pyspark.sql import functions as F
rt = spark.table("workspace.bronze.rapid_step_test_raw")
rt_exploded = rt.select("customer", "device_id", "start_time", "stop_time", 
                        F.posexplode("step_points").alias("step_index", "step_ms"))
display(rt_exploded.limit(20))

###Windowed join: feature prep

This Python section mirrors the SQL steps, but prepares a compact features table. We will reuse this features table in ML weeks.

In [0]:
# 1. Prepare messages (Left side)
msgs = dm_clean.select("source_table", "device_id", "ts_ms", "distance_cm")

# 2. Join and drop the duplicate device_id from the test table (rt)
curated_df = msgs.alias("m").join(
    rt.alias("t"),
    (F.col("m.device_id") == F.col("t.device_id")) & 
    (F.col("m.ts_ms").between(F.col("t.start_time"), F.col("t.stop_time"))),
    how="left"
).drop(rt.device_id) # This line prevents the "Column Already Exists" error

# 3. Add the Label logic
curated_labeled = curated_df.withColumn(
    "label", 
    F.when(F.col("t.start_time").isNotNull(), "step").otherwise("no_step")
)

# 4. Display to verify
display(curated_labeled.select("m.source_table", "device_id", "ts_ms", "label").limit(20))

In [0]:
# Verification: Count the labels
display(curated_labeled.groupBy("label").count())

In [0]:
# Save the curated dataset as the Silver Layer
curated_labeled.write.mode("overwrite").saveAsTable("workspace.silver.curated_stedi_data")

###Quick Visual Check

Creating a simple line plot of distances over time for one device.

In [0]:
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import functions as F

sample_device = dm_clean.select("device_id").first()["device_id"]

pdf = (dm_clean
       .filter(F.col("device_id") == sample_device)
       .orderBy("ts_ms")
       .limit(1000)
       .select("ts_ms", "distance_cm")
       .toPandas())

plt.figure(figsize=(10,5))
plt.plot(pdf["ts_ms"], pdf["distance_cm"], color='blue', marker='o', markersize=2)
plt.title(f"Variação da Distância - Dispositivo: {sample_device}")
plt.xlabel("Tempo (ms)")
plt.ylabel("Distância (cm)")
plt.grid(True)
plt.show()

# Final Reflection
* What was easy: Navigating the Catalog UI to upload Parquet files was straightforward, as was performing basic SQL queries to verify the Bronze layer data.

* What was confusing: Understanding the difference between a Managed table and an External table in the Databricks workspace took some time. Additionally, reconciling column naming differences (snake_case vs. CamelCase) between the raw data and the lab instructions required careful debugging.

* Ethics Risk: The customer and device_id columns could be used to identify specific individuals. If this data were leaked, it would expose their personal physical activity levels and daily routines. If we mislabel or incorrectly clean sensor data, it could lead to an AI model giving a inaccurate physical assessments, which could potentially harm a user's health.

###Ethics Check

- Are we labeling data fairly? Yes. By using a left join, we include both step and no_step data. This prevents the model from being biased and ensures it understands what "standing still" looks like.

- Are we protecting identity? We are using device_id and customer (UUIDs) instead of names or PII (Personally Identifiable Information). However, we must ensure these IDs cannot be linked back to a master customer list outside this environment.

- Are we avoiding medical claims? Yes. We are strictly labeling physical movement (mechanical steps), not diagnosing health conditions or making medical "gait" assessments.