In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import pyspark.sql.functions as fn
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler as sparkMinMaxScaler
from pyspark.ml import Pipeline as SparkPipeline
from pyspark.sql.types import *
from pyspark.sql import Window
import sys
import warnings
from functools import reduce
warnings.filterwarnings('ignore')

In [2]:
from pyspark.sql import SparkSession

def create_spark_session(app_name="PAMAP2 Analysis",
                         master_url="spark://spark-master:7077",
                         executor_memory="2g",
                         driver_memory="2g",
                         hdfs_url="hdfs://namenode:8020"):
    spark = SparkSession.builder \
        .appName(app_name) \
        .master(master_url) \
        .config("spark.executor.memory", executor_memory) \
        .config("spark.driver.memory", driver_memory) \
        .config("spark.hadoop.fs.defaultFS", hdfs_url) \
        .getOrCreate()
    return spark

In [3]:
def define_schema():
    """
    Định nghĩa schema cho dữ liệu PAMAP2
    """
    schema = StructType([
        StructField("timestamp", DoubleType(), True),
        StructField("activityID", IntegerType(), True),
        StructField("heartrate", DoubleType(), True),
        StructField("handTemperature", DoubleType(), True),
        StructField("handAcc16_1", DoubleType(), True),
        StructField("handAcc16_2", DoubleType(), True),
        StructField("handAcc16_3", DoubleType(), True),
        StructField("handAcc6_1", DoubleType(), True),
        StructField("handAcc6_2", DoubleType(), True),
        StructField("handAcc6_3", DoubleType(), True),
        StructField("handGyro1", DoubleType(), True),
        StructField("handGyro2", DoubleType(), True),
        StructField("handGyro3", DoubleType(), True),
        StructField("handMagne1", DoubleType(), True),
        StructField("handMagne2", DoubleType(), True),
        StructField("handMagne3", DoubleType(), True),
        StructField("handOrientation1", DoubleType(), True),
        StructField("handOrientation2", DoubleType(), True),
        StructField("handOrientation3", DoubleType(), True),
        StructField("handOrientation4", DoubleType(), True),
        StructField("chestTemperature", DoubleType(), True),
        StructField("chestAcc16_1", DoubleType(), True),
        StructField("chestAcc16_2", DoubleType(), True),
        StructField("chestAcc16_3", DoubleType(), True),
        StructField("chestAcc6_1", DoubleType(), True),
        StructField("chestAcc6_2", DoubleType(), True),
        StructField("chestAcc6_3", DoubleType(), True),
        StructField("chestGyro1", DoubleType(), True),
        StructField("chestGyro2", DoubleType(), True),
        StructField("chestGyro3", DoubleType(), True),
        StructField("chestMagne1", DoubleType(), True),
        StructField("chestMagne2", DoubleType(), True),
        StructField("chestMagne3", DoubleType(), True),
        StructField("chestOrientation1", DoubleType(), True),
        StructField("chestOrientation2", DoubleType(), True),
        StructField("chestOrientation3", DoubleType(), True),
        StructField("chestOrientation4", DoubleType(), True),
        StructField("ankleTemperature", DoubleType(), True),
        StructField("ankleAcc16_1", DoubleType(), True),
        StructField("ankleAcc16_2", DoubleType(), True),
        StructField("ankleAcc16_3", DoubleType(), True),
        StructField("ankleAcc6_1", DoubleType(), True),
        StructField("ankleAcc6_2", DoubleType(), True),
        StructField("ankleAcc6_3", DoubleType(), True),
        StructField("ankleGyro1", DoubleType(), True),
        StructField("ankleGyro2", DoubleType(), True),
        StructField("ankleGyro3", DoubleType(), True),
        StructField("ankleMagne1", DoubleType(), True),
        StructField("ankleMagne2", DoubleType(), True),
        StructField("ankleMagne3", DoubleType(), True),
        StructField("ankleOrientation1", DoubleType(), True),
        StructField("ankleOrientation2", DoubleType(), True),
        StructField("ankleOrientation3", DoubleType(), True),
        StructField("ankleOrientation4", DoubleType(), True)
    ])
    return schema

def load_data(spark, hdfs_path):
    """
    Load dữ liệu từ HDFS với schema định nghĩa
    """
    schema = define_schema()

    print("Đang load dữ liệu từ HDFS...")
    df = spark.read \
        .option("delimiter", " ") \
        .option("header", "false") \
        .option("mode", "DROPMALFORMED") \
        .schema(schema) \
        .csv(hdfs_path) \
        .cache()

    count = df.count()
    print(f"Đã load {count} dòng dữ liệu từ {hdfs_path}")
    return df

In [4]:
def basic_cleaning(df):
    """
    Làm sạch dữ liệu cơ bản
    - Loại bỏ activityID = 0 và null
    """
    print("Đang thực hiện basic cleaning...")

    df_cleaned = df.filter((df.activityID.isNotNull()) & (df.activityID != 0)).cache()

    print(f"Số dòng sau khi lọc activityID != 0 và != null: {df_cleaned.count()}")

    return df_cleaned

In [5]:
def remove_irrelevant_features(df):
    """
    Loại bỏ các features không có ý nghĩa cho phân loại hoạt động thể chất
    và cache lại DataFrame để tối ưu hiệu năng khi xử lý tiếp.
    """
    from pyspark.sql import DataFrame

    print("Đang loại bỏ các features không liên quan...")

    # Giữ lại các features quan trọng cho phân loại hoạt động
    relevant_features = [
        'activityID', 'heartrate',
        'handAcc16_1', 'handAcc16_2', 'handAcc16_3',
        'handAcc6_1', 'handAcc6_2', 'handAcc6_3',
        'chestAcc16_1', 'chestAcc16_2', 'chestAcc16_3',
        'chestAcc6_1', 'chestAcc6_2', 'chestAcc6_3',
        'ankleAcc16_1', 'ankleAcc16_2', 'ankleAcc16_3',
        'ankleAcc6_1', 'ankleAcc6_2', 'ankleAcc6_3',
        'handGyro1', 'handGyro2', 'handGyro3',
        'chestGyro1', 'chestGyro2', 'chestGyro3',
        'ankleGyro1', 'ankleGyro2', 'ankleGyro3',
        'handMagne1', 'handMagne2', 'handMagne3',
        'chestMagne1', 'chestMagne2', 'chestMagne3',
        'ankleMagne1', 'ankleMagne2', 'ankleMagne3'
    ]

    df_selected = df.select(*relevant_features).cache()

    print(f"Giữ lại {len(relevant_features)} / {len(df.columns)} cột")
    print(f"Số dòng dữ liệu sau khi lọc cột: {df_selected.count()}")

    return df_selected

In [6]:
from pyspark.sql.window import Window
from pyspark.sql.functions import last, col, coalesce
import sys

def fill_missing_values_ffill_bfill(df, time_col='timestamp', activity_col='activityID'):
    print("Đang điền missing values bằng forward + backward fill (tối ưu)...")

    # Định nghĩa window
    window_ffill = Window.partitionBy(activity_col).orderBy(time_col).rowsBetween(-sys.maxsize, 0)
    window_bfill = Window.partitionBy(activity_col).orderBy(time_col).rowsBetween(0, sys.maxsize)

    # Các cột cần điền
    cols_to_fill = [c for c in df.columns if c not in [time_col, activity_col]]

    for c in cols_to_fill:
        ffill_col = last(c, ignorenulls=True).over(window_ffill)
        bfill_col = last(c, ignorenulls=True).over(window_bfill)
        df = df.withColumn(c, coalesce(ffill_col, bfill_col))

    print(f"Đã hoàn tất điền missing cho {len(cols_to_fill)} cột.")
    return df

In [7]:
from pyspark.sql import functions as fn
from pyspark.sql import DataFrame

def remove_outliers_iqr(df: DataFrame, outlier_cols: list, quantile_approx=0.05) -> DataFrame:
    """
    Loại bỏ outliers bằng IQR method trên từng cột được chỉ định.
    Áp dụng IQR theo từng nhóm `activityID`.
    """
    print("Đang loại bỏ outliers bằng IQR...")

    for col in outlier_cols:
        if col not in df.columns:
            print(f"Bỏ qua cột không tồn tại: {col}")
            continue

        print(f"Đang xử lý cột: {col}")

        # Tính Q1 và Q3 cho mỗi activityID
        quantiles = df.groupBy("activityID").agg(
            fn.expr(f"percentile_approx({col}, 0.25, {quantile_approx})").alias("q1"),
            fn.expr(f"percentile_approx({col}, 0.75, {quantile_approx})").alias("q3")
        )

        bounds = quantiles.withColumn("iqr", fn.col("q3") - fn.col("q1")) \
                          .withColumn("lower_bound", fn.col("q1") - 1.5 * fn.col("iqr")) \
                          .withColumn("upper_bound", fn.col("q3") + 1.5 * fn.col("iqr")) \
                          .select("activityID", "lower_bound", "upper_bound")

        df = df.join(bounds, on="activityID", how="left")
        before = df.count()

        df = df.filter((fn.col(col) >= fn.col("lower_bound")) & (fn.col(col) <= fn.col("upper_bound")))
        after = df.count()

        print(f"Cột {col}: Đã loại {before - after} dòng outlier ({before} → {after})")

        df = df.drop("lower_bound", "upper_bound")

    print("Hoàn tất xử lý outliers.")
    return df

In [8]:
def create_feature_engineering(df):
    """
    Tối ưu hóa tạo các feature magnitude từ accelerometer và gyroscope
    bằng cách gom các phép tính vào một bước duy nhất.
    """
    print("Đang tạo features mới (tối ưu)...")

    df = df.withColumns({
        'hand_acc_magnitude': fn.sqrt(fn.col('handAcc16_1')**2 + fn.col('handAcc16_2')**2 + fn.col('handAcc16_3')**2),
        'chest_acc_magnitude': fn.sqrt(fn.col('chestAcc16_1')**2 + fn.col('chestAcc16_2')**2 + fn.col('chestAcc16_3')**2),
        'ankle_acc_magnitude': fn.sqrt(fn.col('ankleAcc16_1')**2 + fn.col('ankleAcc16_2')**2 + fn.col('ankleAcc16_3')**2),

        'hand_gyro_magnitude': fn.sqrt(fn.col('handGyro1')**2 + fn.col('handGyro2')**2 + fn.col('handGyro3')**2),
        'chest_gyro_magnitude': fn.sqrt(fn.col('chestGyro1')**2 + fn.col('chestGyro2')**2 + fn.col('chestGyro3')**2),
        'ankle_gyro_magnitude': fn.sqrt(fn.col('ankleGyro1')**2 + fn.col('ankleGyro2')**2 + fn.col('ankleGyro3')**2)
    })

    print("Đã tạo 6 features magnitude mới (hiệu năng cao)")
    return df

In [9]:
def balance_dataset(df):
    """
    Cân bằng dataset bằng stratified sampling theo activityID
    """
    print("Đang cân bằng dataset...")

    # Đếm số lượng mẫu cho mỗi activity
    activity_counts = df.groupBy("activityID").count().cache()
    activity_stats = activity_counts.collect()

    # Tìm số lượng mẫu nhỏ nhất
    min_samples = min(row["count"] for row in activity_stats)

    print("Phân bố activities:")
    for row in activity_stats:
        print(f"Activity {row['activityID']}: {row['count']} samples")

    # Ánh xạ tỉ lệ sample cần thiết cho từng activity
    fractions = {row["activityID"]: min_samples / row["count"] for row in activity_stats}

    # Thực hiện stratified sampling
    balanced_df = df.stat.sampleBy("activityID", fractions=fractions, seed=42)

    print(f"Dataset sau khi cân bằng: {balanced_df.count()} samples")
    return balanced_df

In [10]:
def prepare_for_ml(df):
    """
    Chuẩn bị dữ liệu cho machine learning: VectorAssembler + MinMaxScaler + StringIndexer
    """
    print("Đang chuẩn bị dữ liệu cho ML...")

    # Xác định feature columns
    feature_cols = [col for col in df.columns if col != 'activityID']

    # Khởi tạo các transformers
    assembler = VectorAssembler(
        inputCols=feature_cols,
        outputCol="features"
    )

    scaler = MinMaxScaler(
        inputCol="features",
        outputCol="scaled_features"
    )

    label_indexer = StringIndexer(
        inputCol="activityID",
        outputCol="label"
    )

    # Xây dựng pipeline
    pipeline = Pipeline(stages=[assembler, scaler, label_indexer])

    # Fit và transform dữ liệu
    pipeline_model = pipeline.fit(df)
    df_transformed = pipeline_model.transform(df)

    # Lọc các cột quan trọng
    final_df = df_transformed.select("scaled_features", "label", "activityID")

    print("Dữ liệu đã sẵn sàng cho Machine Learning")
    return final_df, pipeline_model

In [11]:
spark = create_spark_session()
hdfs_path = "hdfs://namenode:8020/user/student/pamap2/protocol/*.dat"
path = "hdfs://namenode:8020/user/student/pamap2/protocol/"
df = load_data(spark, hdfs_path)
df = basic_cleaning(df)
df = remove_irrelevant_features(df)
df = fill_missing_values_ffill_bfill(df)
outlier_cols = [c for c in df.columns if c != 'activityID']
df = remove_outliers_iqr(df, outlier_cols)
df = create_feature_engineering(df)
df = balance_dataset(df)
final_df, pipeline_model = prepare_for_ml(df)

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:89)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:603)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [2]:
path = "hdfs://namenode:8020/user/student/pamap2/protocol/"

In [3]:
df_raw = spark.read.text("hdfs://namenode:8020/user/student/pamap2/protocol/*.dat")
df_raw.show(5, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                                                                                                            |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------