Trial one: Using SparkML

In [1]:
import pyspark
from pyspark.sql.functions import col, when, regexp_extract, count, isnan, collect_set, lit
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext

appName = "Big Data Analytics"
master = "local"

# Create Configuration object for Spark.
conf = pyspark.SparkConf()\
    .set('spark.driver.host','127.0.0.1')\
    .setAppName(appName)\
    .setMaster(master)

sc = SparkContext.getOrCreate(conf=conf)

sqlContext = SQLContext(sc)

spark = sqlContext.sparkSession.builder.getOrCreate()



In [30]:
col_names = ["subject_no", "Gender", "mean_hr", "avnn_ms", "sdnn_ms", 
    "nn50", "pnn50", "rmssd", "lf", "lf_norm", "hf_ms2", "hf_norm", "lf_hf_ratio", "Stress_level"]

In [31]:
df = spark.read.csv("D:\Spring 25\Projects in Biomedical AI\zenflow\dataset\ECG (EO, AC1, AC2).csv",header=True, inferSchema= True).toDF(*col_names)
df.show(4)

+----------+------+-------+--------+-------+----+-------+-------+--------+-------+--------+-------+-----------+------------+
|subject_no|Gender|mean_hr| avnn_ms|sdnn_ms|nn50|  pnn50|  rmssd|      lf|lf_norm|  hf_ms2|hf_norm|lf_hf_ratio|Stress_level|
+----------+------+-------+--------+-------+----+-------+-------+--------+-------+--------+-------+-----------+------------+
|         1|Female|85.8474|698.9147|45.8957|  46|10.7477|29.6913|412.1663|46.8523|467.3008|53.1197|      0.882|          EO|
|         2|Female|88.3727|678.9429|23.8804|   0|    0.0|11.6837|314.3801|87.1339| 46.3382|12.8431|     6.7845|          EO|
|         3|Female|79.4924|754.7887|50.0888|  71|17.4877| 37.805|612.5444|45.8684|722.3938|54.0941|     0.8479|          EO|
|         4|Female|78.8327|761.1057|41.4575|  27| 6.4593|27.0164|446.1722|64.1144|249.5922|35.8661|     1.7876|          EO|
+----------+------+-------+--------+-------+----+-------+-------+--------+-------+--------+-------+-----------+------------+


In [32]:
df.printSchema()

root
 |-- subject_no: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- mean_hr: double (nullable = true)
 |-- avnn_ms: double (nullable = true)
 |-- sdnn_ms: double (nullable = true)
 |-- nn50: integer (nullable = true)
 |-- pnn50: double (nullable = true)
 |-- rmssd: double (nullable = true)
 |-- lf: double (nullable = true)
 |-- lf_norm: double (nullable = true)
 |-- hf_ms2: double (nullable = true)
 |-- hf_norm: double (nullable = true)
 |-- lf_hf_ratio: double (nullable = true)
 |-- Stress_level: string (nullable = true)



In [46]:
#Handling binary variables Gender and one hot encoding stress_level
df_drop = df.withColumn("Gender", 
            when(df["Gender"] == "Male", 1)
            .when(df["Gender"] == "Female", 0)
             .otherwise(df["Gender"]))

df_drop = df_drop.withColumn("Stress_level",
                             when(df_drop['Stress_level'] == 'EO',0)
                             .when(df_drop['Stress_level'] == 'AC1',1)
                             .when(df_drop['Stress_level'] == 'AC2',2))

In [47]:
df_binary = (df_drop.withColumn("gender", df_drop['Gender'].cast('integer')))
df_binary = (df_binary.withColumn("stress_level", df_drop['Stress_level'].cast('integer')))

In [48]:
#df_binary.show(3)

#Distinct entries check
distinct_rows = df_binary.select(collect_set("stress_level")).first()[0]
print(distinct_rows)

[0, 1, 2]


Preprocessing done, assembling vectors and input in model

In [51]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

# Assembling all features into a single vector
assembler = VectorAssembler(inputCols=df_binary.columns,outputCol="features")

# Scaling features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")


In [53]:
pipeline = Pipeline(stages= [assembler, scaler])
pipeline_model = pipeline.fit(df_binary)
df_encoded = pipeline_model.transform(df_binary)

In [55]:
df_encoded.show(3)

+----------+------+-------+--------+-------+----+-------+-------+--------+-------+--------+-------+-----------+------------+--------------------+--------------------+
|subject_no|gender|mean_hr| avnn_ms|sdnn_ms|nn50|  pnn50|  rmssd|      lf|lf_norm|  hf_ms2|hf_norm|lf_hf_ratio|stress_level|            features|     scaled_features|
+----------+------+-------+--------+-------+----+-------+-------+--------+-------+--------+-------+-----------+------------+--------------------+--------------------+
|         1|     0|85.8474|698.9147|45.8957|  46|10.7477|29.6913|412.1663|46.8523|467.3008|53.1197|      0.882|           0|[1.0,0.0,85.8474,...|[0.08626790448707...|
|         2|     0|88.3727|678.9429|23.8804|   0|    0.0|11.6837|314.3801|87.1339| 46.3382|12.8431|     6.7845|           0|[2.0,0.0,88.3727,...|[0.17253580897414...|
|         3|     0|79.4924|754.7887|50.0888|  71|17.4877| 37.805|612.5444|45.8684|722.3938|54.0941|     0.8479|           0|[3.0,0.0,79.4924,...|[0.25880371346121...

In [56]:
df_ready = df_encoded.select('stress_level', 'scaled_features')
df_ready.show(4)

+------------+--------------------+
|stress_level|     scaled_features|
+------------+--------------------+
|           0|[0.08626790448707...|
|           0|[0.17253580897414...|
|           0|[0.25880371346121...|
|           0|[0.34507161794828...|
+------------+--------------------+
only showing top 4 rows



Random Forest  Classifier 

In [59]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

train_data, test_data = df_ready.randomSplit([0.8, 0.2], seed=42)

In [60]:
rf = RandomForestClassifier(featuresCol="scaled_features", labelCol="stress_level", numTrees=50, maxDepth=5, seed=42)
model = rf.fit(train_data)

In [None]:
predictions = model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="stress_level", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)


Test Accuracy: 0.95


In [63]:
print(f"Test Accuracy!!!!!!: {accuracy:.2f}")

Test Accuracy!!!!!!: 0.95
