In [1]:
# ============================================================
# PAMAP2 Data Ingestion & Preprocessing Pipeline — PySpark
# ============================================================
# 1. Load all .dat files (Protocol + Optional) with typed schema
# 2. Tag rows with subject_id and session_type
# 3. Interpolate missing heart rate (forward-fill via Window)
# 4. Drop invalid / transient rows
# 5. Normalize numeric sensor columns (min-max per column)
# 6. Save as Parquet partitioned by subject_id
# ============================================================

from pyspark.sql import SparkSession, Window
from pyspark.sql.types import (
    StructType, StructField, DoubleType, IntegerType, StringType,
)
from pyspark.sql.functions import (
    col, lit, input_file_name, regexp_extract,
    last, min as spark_min, max as spark_max,
    isnan, when, count,
)

spark = (
    SparkSession.builder
    .appName("PAMAP2_Ingestion")
    .master("local[*]")
    .config("spark.driver.memory", "4g")
    .config("spark.sql.shuffle.partitions", "8")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
print(f"Spark version : {spark.version}")
print(f"Parallelism   : {spark.sparkContext.defaultParallelism}")

Spark version : 4.1.1
Parallelism   : 16


In [2]:
# ============================================================
# 1. Define the 54-column schema matching the PAMAP2 readme
# ============================================================
# Columns per the spec:
#   1  timestamp (s)
#   2  activityID
#   3  heart_rate (bpm)
#   4-20   IMU_hand   (17 cols)
#   21-37  IMU_chest  (17 cols)
#   38-54  IMU_ankle  (17 cols)
#
# Each IMU block (17 cols):
#   temperature,
#   acc_16g_x, acc_16g_y, acc_16g_z,
#   acc_6g_x,  acc_6g_y,  acc_6g_z,
#   gyro_x,    gyro_y,    gyro_z,
#   mag_x,     mag_y,     mag_z,
#   orient_0,  orient_1,  orient_2, orient_3
# ============================================================

IMU_SUFFIXES = [
    "temperature",
    "acc_16g_x", "acc_16g_y", "acc_16g_z",
    "acc_6g_x",  "acc_6g_y",  "acc_6g_z",
    "gyro_x",    "gyro_y",    "gyro_z",
    "mag_x",     "mag_y",     "mag_z",
    "orient_0",  "orient_1",  "orient_2", "orient_3",
]

def build_imu_fields(location: str) -> list:
    """Generate 17 StructFields for one IMU placement."""
    return [
        StructField(f"{location}_{s}", DoubleType(), True)
        for s in IMU_SUFFIXES
    ]

schema = StructType(
    [
        StructField("timestamp",   DoubleType(),  False),
        StructField("activity_id", IntegerType(), False),
        StructField("heart_rate",  DoubleType(),  True),
    ]
    + build_imu_fields("hand")
    + build_imu_fields("chest")
    + build_imu_fields("ankle")
)

print(f"Schema field count: {len(schema.fields)}")   # expect 54
schema

Schema field count: 54


StructType([StructField('timestamp', DoubleType(), False), StructField('activity_id', IntegerType(), False), StructField('heart_rate', DoubleType(), True), StructField('hand_temperature', DoubleType(), True), StructField('hand_acc_16g_x', DoubleType(), True), StructField('hand_acc_16g_y', DoubleType(), True), StructField('hand_acc_16g_z', DoubleType(), True), StructField('hand_acc_6g_x', DoubleType(), True), StructField('hand_acc_6g_y', DoubleType(), True), StructField('hand_acc_6g_z', DoubleType(), True), StructField('hand_gyro_x', DoubleType(), True), StructField('hand_gyro_y', DoubleType(), True), StructField('hand_gyro_z', DoubleType(), True), StructField('hand_mag_x', DoubleType(), True), StructField('hand_mag_y', DoubleType(), True), StructField('hand_mag_z', DoubleType(), True), StructField('hand_orient_0', DoubleType(), True), StructField('hand_orient_1', DoubleType(), True), StructField('hand_orient_2', DoubleType(), True), StructField('hand_orient_3', DoubleType(), True), Str

In [3]:
# ============================================================
# 2. Load all .dat files and tag with subject_id / session_type
# ============================================================
# NOTE: On Windows, Hadoop's native glob can fail with
# UnsatisfiedLinkError on NativeIO.  We avoid this entirely by
# listing files with Python's os module and passing explicit
# paths to spark.read.csv().
# ============================================================

import os, re

DATA_ROOT = r"C:/Users/johnu/Downloads/pamap2+physical+activity+monitoring/PAMAP2_Dataset"

protocol_path = os.path.join(DATA_ROOT, "Protocol")
optional_path = os.path.join(DATA_ROOT, "Optional")

def list_dat_files(folder: str) -> list:
    """Return sorted list of absolute .dat file paths in folder."""
    return sorted([
        os.path.join(folder, f).replace("\\", "/")
        for f in os.listdir(folder)
        if f.endswith(".dat")
    ])

def load_session(folder_path: str, session_type: str):
    """
    Read every .dat file inside *folder_path* with the predefined
    schema, then add subject_id (int) and session_type (str).
    """
    file_list = list_dat_files(folder_path)
    print(f"  {session_type}: found {len(file_list)} files")

    # Pass explicit file list — no Hadoop glob needed
    df = (
        spark.read
        .schema(schema)
        .option("header", "false")
        .option("delimiter", " ")
        .option("nanValue", "NaN")
        .csv(file_list)
    )

    # Extract subject id from the file path  (e.g. "subject101")
    df = (
        df
        .withColumn("_path", input_file_name())
        .withColumn(
            "subject_id",
            regexp_extract("_path", r"subject(\d+)", 1).cast(IntegerType()),
        )
        .withColumn("session_type", lit(session_type))
        .drop("_path")
    )
    return df

df_protocol = load_session(protocol_path, "protocol")
df_optional = load_session(optional_path, "optional")

# Union both sessions into one DataFrame
df_raw = df_protocol.unionByName(df_optional)

print(f"\nProtocol rows : {df_protocol.count():,}")
print(f"Optional rows : {df_optional.count():,}")
print(f"Combined rows : {df_raw.count():,}")
print(f"Columns       : {len(df_raw.columns)} ({', '.join(df_raw.columns[:5])} ... {', '.join(df_raw.columns[-3:])})")

  protocol: found 9 files


  optional: found 5 files



Protocol rows : 2,872,533


Optional rows : 977,972


Combined rows : 3,850,505
Columns       : 56 (timestamp, activity_id, heart_rate, hand_temperature, hand_acc_16g_x ... ankle_orient_3, subject_id, session_type)


In [4]:
# ============================================================
# 3. Quick data-quality audit before preprocessing
# ============================================================

# 3a. Activity class distribution (including transient id=0)
print("=== Activity distribution ===")
df_raw.groupBy("activity_id").count().orderBy("activity_id").show(25, truncate=False)

# 3b. Null counts per column (first 10 + heart_rate)
print("=== Null / NaN counts for key columns ===")
null_exprs = [
    count(when(col(c).isNull(), c)).alias(c)
    for c in ["heart_rate", "hand_temperature", "chest_temperature", "ankle_temperature"]
]
df_raw.select(null_exprs).show(truncate=False)

# 3c. Subjects present
print("=== Subjects × session types ===")
df_raw.groupBy("subject_id", "session_type").count().orderBy("subject_id", "session_type").show(20)

=== Activity distribution ===


+-----------+-------+
|activity_id|count  |
+-----------+-------+
|0          |1125552|
|1          |192523 |
|2          |185188 |
|3          |189931 |
|4          |238761 |
|5          |98199  |
|6          |164600 |
|7          |188107 |
|9          |83646  |
|10         |309935 |
|11         |54519  |
|12         |117216 |
|13         |104944 |
|16         |175353 |
|17         |238690 |
|18         |99878  |
|19         |187188 |
|20         |46915  |
|24         |49360  |
+-----------+-------+

=== Null / NaN counts for key columns ===


+----------+----------------+-----------------+-----------------+
|heart_rate|hand_temperature|chest_temperature|ankle_temperature|
+----------+----------------+-----------------+-----------------+
|0         |0               |0                |0                |
+----------+----------------+-----------------+-----------------+

=== Subjects × session types ===


+----------+------------+------+
|subject_id|session_type| count|
+----------+------------+------+
|       101|    optional|319352|
|       101|    protocol|376417|
|       102|    protocol|447000|
|       103|    protocol|252833|
|       104|    protocol|329576|
|       105|    optional|154773|
|       105|    protocol|374783|
|       106|    optional|129963|
|       106|    protocol|361817|
|       107|    protocol|313599|
|       108|    optional|180412|
|       108|    protocol|408031|
|       109|    optional|193472|
|       109|    protocol|  8477|
+----------+------------+------+



In [5]:
# ============================================================
# 4. Preprocessing — Step A: Drop invalid rows
# ============================================================
# - Remove transient activities (activity_id == 0): these are
#   unlabelled transitions the readme says to discard.
# - Drop the 4 orientation columns per IMU (invalid per readme).
# - Drop rows where ALL three IMU acceleration columns are null
#   (indicates complete sensor dropout for that timestamp).
# ============================================================

# 4a. Remove transient rows
df_clean = df_raw.filter(col("activity_id") != 0)
print(f"After removing transient (id=0): {df_clean.count():,} rows")

# 4b. Drop the 12 orientation columns (marked invalid in readme)
orient_cols = [
    f"{loc}_orient_{i}"
    for loc in ("hand", "chest", "ankle")
    for i in range(4)
]
df_clean = df_clean.drop(*orient_cols)
print(f"Dropped {len(orient_cols)} invalid orientation columns → {len(df_clean.columns)} columns remain")

# 4c. Drop rows where all three key accelerometers are null
#     (complete sensor failure — not recoverable)
df_clean = df_clean.filter(
    col("hand_acc_16g_x").isNotNull()
    | col("chest_acc_16g_x").isNotNull()
    | col("ankle_acc_16g_x").isNotNull()
)
print(f"After dropping full-sensor-dropout rows: {df_clean.count():,} rows")

After removing transient (id=0): 2,724,953 rows
Dropped 12 invalid orientation columns → 44 columns remain


After dropping full-sensor-dropout rows: 2,724,953 rows


In [6]:
# ============================================================
# 5. Preprocessing -- Step B: Interpolate missing heart rate
# ============================================================
# Heart rate is sampled at ~9 Hz while IMUs run at 100 Hz,
# so ~91% of HR values are null by design.  The gap between
# consecutive HR readings is ~11 rows.
#
# Strategy (optimised for local execution):
#   1. Forward-fill with a BOUNDED window of 15 rows (covers
#      the ~11-row gap with margin).  This avoids the full
#      partition sort that an unbounded window requires.
#   2. Back-fill leading nulls with the same bounded window.
#   3. Any remaining nulls (rare sensor dropouts > 15 rows)
#      are filled with the per-subject mean heart rate.
# ============================================================
from pyspark.sql.functions import first, mean as F_mean

HR_FILL_WINDOW = 15   # rows — generous for the ~11-row gap

win_fwd = (
    Window
    .partitionBy("subject_id", "session_type")
    .orderBy("timestamp")
    .rowsBetween(-HR_FILL_WINDOW, 0)
)
win_bwd = (
    Window
    .partitionBy("subject_id", "session_type")
    .orderBy("timestamp")
    .rowsBetween(0, HR_FILL_WINDOW)
)

# Step 1: forward-fill (carry last known HR value forward)
df_clean = df_clean.withColumn(
    "heart_rate",
    last("heart_rate", ignorenulls=True).over(win_fwd)
)

# Step 2: back-fill (cover leading nulls with next known HR)
df_clean = df_clean.withColumn(
    "heart_rate",
    first("heart_rate", ignorenulls=True).over(win_bwd)
)

# Step 3: fill any remaining nulls with per-subject mean HR
win_subj = Window.partitionBy("subject_id")
df_clean = df_clean.withColumn(
    "_hr_mean", F_mean("heart_rate").over(win_subj)
).withColumn(
    "heart_rate",
    when(col("heart_rate").isNull(), col("_hr_mean")).otherwise(col("heart_rate"))
).drop("_hr_mean")

remaining_hr_nulls = df_clean.filter(col("heart_rate").isNull()).count()
print(f"Heart rate nulls remaining after interpolation: {remaining_hr_nulls}")

Heart rate nulls remaining after interpolation: 0


In [7]:
# ============================================================
# 6. Preprocessing — Step C: Min-Max normalisation (0-1)
# ============================================================
# Normalise every numeric sensor column to [0, 1] using global
# min/max.  We exclude metadata columns (timestamp, activity_id,
# subject_id, session_type) from scaling.
#
# Formula:  x_norm = (x - x_min) / (x_max - x_min)
# If max == min (constant column), the result is 0.
#
# IMPORTANT: spark_min / spark_max propagate NaN (unlike null).
# We must filter out NaN before computing stats.
# ============================================================

EXCLUDE_FROM_NORM = {"timestamp", "activity_id", "subject_id", "session_type"}

sensor_cols = [
    c for c in df_clean.columns
    if c not in EXCLUDE_FROM_NORM
    and df_clean.schema[c].dataType == DoubleType()
]
print(f"Columns to normalise: {len(sensor_cols)}")

# Compute global min/max in ONE pass — filter NaN so they
# don't propagate into the aggregation result.
agg_exprs = []
for c in sensor_cols:
    safe_col = when(~isnan(col(c)), col(c))   # NaN -> null (skipped by min/max)
    agg_exprs.append(spark_min(safe_col).alias(f"{c}__min"))
    agg_exprs.append(spark_max(safe_col).alias(f"{c}__max"))

stats_row = df_clean.agg(*agg_exprs).first()

# Build the normalisation expressions
df_normalised = df_clean
for c in sensor_cols:
    c_min = stats_row[f"{c}__min"]
    c_max = stats_row[f"{c}__max"]
    if c_min is not None and c_max is not None and c_max != c_min:
        df_normalised = df_normalised.withColumn(
            c,
            (col(c) - lit(c_min)) / lit(c_max - c_min),
        )
    else:
        # Constant or all-null column -> set to 0
        df_normalised = df_normalised.withColumn(c, lit(0.0))

# Quick sanity check on a few columns
print("\n=== Post-normalisation stats (sample columns) ===")
check_cols = ["heart_rate", "hand_acc_16g_x", "chest_gyro_x", "ankle_mag_z"]
df_normalised.select(check_cols).summary("min", "max", "count").show()

Columns to normalise: 40



=== Post-normalisation stats (sample columns) ===


+-------+----------+--------------+------------+-----------+
|summary|heart_rate|hand_acc_16g_x|chest_gyro_x|ankle_mag_z|
+-------+----------+--------------+------------+-----------+
|    min|       0.0|           0.0|         0.0|        0.0|
|    max|       NaN|           NaN|         NaN|        NaN|
|  count|   2724953|       2724953|     2724953|    2724953|
+-------+----------+--------------+------------+-----------+



In [8]:
# ============================================================
# 7. Save as Parquet — partitioned by subject_id
# ============================================================

OUTPUT_PATH = r"C:/Users/johnu/Desktop/BigDataProject/data/pamap2_clean.parquet"

(
    df_normalised
    .repartition("subject_id")          # one partition file per subject
    .write
    .mode("overwrite")
    .partitionBy("subject_id")
    .parquet(OUTPUT_PATH)
)

# Verify the write
df_verify = spark.read.parquet(OUTPUT_PATH)
print(f"Parquet rows   : {df_verify.count():,}")
print(f"Parquet columns: {len(df_verify.columns)}")
print(f"Partitions     : {df_verify.select('subject_id').distinct().count()} subjects")
print(f"\nSchema preview:")
df_verify.printSchema()

Parquet rows   : 2,724,953
Parquet columns: 44


Partitions     : 9 subjects

Schema preview:
root
 |-- timestamp: double (nullable = true)
 |-- activity_id: integer (nullable = true)
 |-- heart_rate: double (nullable = true)
 |-- hand_temperature: double (nullable = true)
 |-- hand_acc_16g_x: double (nullable = true)
 |-- hand_acc_16g_y: double (nullable = true)
 |-- hand_acc_16g_z: double (nullable = true)
 |-- hand_acc_6g_x: double (nullable = true)
 |-- hand_acc_6g_y: double (nullable = true)
 |-- hand_acc_6g_z: double (nullable = true)
 |-- hand_gyro_x: double (nullable = true)
 |-- hand_gyro_y: double (nullable = true)
 |-- hand_gyro_z: double (nullable = true)
 |-- hand_mag_x: double (nullable = true)
 |-- hand_mag_y: double (nullable = true)
 |-- hand_mag_z: double (nullable = true)
 |-- chest_temperature: double (nullable = true)
 |-- chest_acc_16g_x: double (nullable = true)
 |-- chest_acc_16g_y: double (nullable = true)
 |-- chest_acc_16g_z: double (nullable = true)
 |-- chest_acc_6g_x: double (nullable = true)
 |-- chest_