In [1]:
import os
import sys
import subprocess
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession
os.environ['PYSPARK_PYTHON'] = sys.executable

os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
# Create Spark session
spark = SparkSession.builder.appName("HMP Spark Loader").getOrCreate()
spark.catalog.clearCache()

In [3]:
import os

def list_available_activities(base_path="HMP_Dataset"):
    return sorted([f for f in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, f))])

# Example usage
available_activities = list_available_activities()
print("Available activities:")
for act in available_activities:
    print("-", act)

Available activities:
- Brush_teeth
- Climb_stairs
- Comb_hair
- Descend_stairs
- Drink_glass
- Eat_meat
- Eat_soup
- Getup_bed
- Liedown_bed
- Pour_water
- Sitdown_chair
- Standup_chair
- Use_telephone
- Walk


In [4]:
def load_hmp_from_repo(base_path="HMP_Dataset", selected_activities=None):
    rows = []
    for activity in os.listdir(base_path):
        if selected_activities and activity not in selected_activities:
            continue
        activity_path = os.path.join(base_path, activity)
        if not os.path.isdir(activity_path):
            continue
        for fname in os.listdir(activity_path):
            fpath = os.path.join(activity_path, fname)
            with open(fpath, 'r') as f:
                for line in f:
                    parts = line.strip().split()
                    if len(parts) == 3:
                        x, y, z = map(float, parts)
                        rows.append((activity, x, y, z))  # ✅ tuple (not keyword args!)
    return rows


In [5]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

schema = StructType([
    StructField("activity", StringType(), True),
    StructField("x", DoubleType(), True),
    StructField("y", DoubleType(), True),
    StructField("z", DoubleType(), True),
])

# Example
selected_activities = ["Walk", "Climb_stairs", "Sitdown_chair"]
rows = load_hmp_from_repo("HMP_Dataset", selected_activities)
df = spark.createDataFrame(rows, schema)

df.printSchema()
df.show(5)


root
 |-- activity: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)

+------------+----+----+----+
|    activity|   x|   y|   z|
+------------+----+----+----+
|Climb_stairs| 5.0|39.0|34.0|
|Climb_stairs| 2.0|41.0|34.0|
|Climb_stairs| 5.0|39.0|34.0|
|Climb_stairs|12.0|38.0|34.0|
|Climb_stairs| 9.0|38.0|30.0|
+------------+----+----+----+
only showing top 5 rows



In [6]:
# Step 3: Apply StringIndexer
indexer = StringIndexer(inputCol="activity", outputCol="activityIndex")
indexer_model = indexer.fit(df)
indexed_df = indexer_model.transform(df)

# Step 4: View the result
indexed_df.select("activity", "activityIndex", "x", "y", "z").show(10)

+------------+-------------+----+----+----+
|    activity|activityIndex|   x|   y|   z|
+------------+-------------+----+----+----+
|Climb_stairs|          1.0| 5.0|39.0|34.0|
|Climb_stairs|          1.0| 2.0|41.0|34.0|
|Climb_stairs|          1.0| 5.0|39.0|34.0|
|Climb_stairs|          1.0|12.0|38.0|34.0|
|Climb_stairs|          1.0| 9.0|38.0|30.0|
|Climb_stairs|          1.0|10.0|36.0|29.0|
|Climb_stairs|          1.0|10.0|36.0|30.0|
|Climb_stairs|          1.0|12.0|36.0|30.0|
|Climb_stairs|          1.0|16.0|36.0|29.0|
|Climb_stairs|          1.0|16.0|37.0|30.0|
+------------+-------------+----+----+----+
only showing top 10 rows



In [7]:
# Check Index Mapping
# View class-to-index mapping
print("Label mapping (in order of index):", indexer_model.labels)


Label mapping (in order of index): ['Walk', 'Climb_stairs', 'Sitdown_chair']


In [10]:
from pyspark.ml.feature import OneHotEncoder

# Create and fit the OneHotEncoder
encoder = OneHotEncoder(inputCols=['activityIndex'], outputCols=['categoryVec'])  # Note plural!
encoder_model = encoder.fit(indexed_df)

# Transform the DataFrame
encoded = encoder_model.transform(indexed_df)

# View encoded data
encoded.select("activity", "activityIndex", "categoryVec").show(5, truncate=False)


+------------+-------------+-------------+
|activity    |activityIndex|categoryVec  |
+------------+-------------+-------------+
|Climb_stairs|1.0          |(2,[1],[1.0])|
|Climb_stairs|1.0          |(2,[1],[1.0])|
|Climb_stairs|1.0          |(2,[1],[1.0])|
|Climb_stairs|1.0          |(2,[1],[1.0])|
|Climb_stairs|1.0          |(2,[1],[1.0])|
+------------+-------------+-------------+
only showing top 5 rows



In [11]:
from pyspark.ml.feature import VectorAssembler

# Initialize the assembler
vectorAssembler = VectorAssembler(
    inputCols=['x', 'y', 'z'], 
    outputCol='features'
)

# Apply the assembler to the encoded DataFrame
features_vectorized = vectorAssembler.transform(encoded)

# View output
features_vectorized.select("activity", "activityIndex", "categoryVec", "features").show(5, truncate=False)


+------------+-------------+-------------+----------------+
|activity    |activityIndex|categoryVec  |features        |
+------------+-------------+-------------+----------------+
|Climb_stairs|1.0          |(2,[1],[1.0])|[5.0,39.0,34.0] |
|Climb_stairs|1.0          |(2,[1],[1.0])|[2.0,41.0,34.0] |
|Climb_stairs|1.0          |(2,[1],[1.0])|[5.0,39.0,34.0] |
|Climb_stairs|1.0          |(2,[1],[1.0])|[12.0,38.0,34.0]|
|Climb_stairs|1.0          |(2,[1],[1.0])|[9.0,38.0,30.0] |
+------------+-------------+-------------+----------------+
only showing top 5 rows



In [None]:
agg_df = eda_df.groupby('activity')[['x', 'y', 'z']].agg(['mean', 'std']).reset_index()
agg_df.columns = ['activity', 'x_mean', 'x_std', 'y_mean', 'y_std', 'z_mean', 'z_std']

import plotly.graph_objects as go

fig = go.Figure()
for axis in ['x', 'y', 'z']:
    fig.add_trace(go.Bar(
        name=f'{axis.upper()}',
        x=agg_df['activity'],
        y=agg_df[f'{axis}_mean'],
        error_y=dict(type='data', array=agg_df[f'{axis}_std'])
    ))

fig.update_layout(barmode='group', title="Mean ± Std of Axes by Activity", xaxis_title="Activity", yaxis_title="Mean Value")
fig.show()


In [None]:
fig = px.scatter_3d(sampled, x='x', y='y', z='z', color='activity',
                    title="3D Acceleration Scatter: x, y, z", opacity=0.6)
fig.show()


In [None]:
from sklearn.preprocessing import StandardScaler
import plotly.express as px

# Basic features
features = eda_df.groupby('activity')[['x', 'y', 'z']].agg(['mean', 'std', 'max', 'min']).reset_index()
features.columns = ['activity'] + [f"{col}_{stat}" for col, stat in features.columns.tolist()[1:]]

# Normalize for plotting
scaler = StandardScaler()
scaled = pd.DataFrame(scaler.fit_transform(features.iloc[:, 1:]), columns=features.columns[1:])
scaled['activity'] = features['activity']

px.parallel_coordinates(scaled, color=scaled.columns[0], dimensions=scaled.columns[1:], title="Parallel Coordinates of Features")


In [None]:
eda_df.groupby("activity")[["x", "y", "z"]].mean().hvplot.bar(rot=0, title="Mean Acceleration per Axis by Activity")

In [None]:
eda_df.hvplot.box(y=['x', 'y', 'z'], by='activity', height=400, title="Boxplot of Acceleration by Activity and Axis")