# Imports

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import StorageLevel
from pyspark.sql.functions import col, sum, count
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler

# Create spark session

In [2]:
spark = SparkSession.builder \
    .appName("YourAppName") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.driver.cores", "2") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "2g") \
    .config("spark.driver.host", "127.0.0.1") \
    .getOrCreate()

print("Spark session created with custom configurations.")

Spark session created with custom configurations.


# Import data

In [3]:
dfs_stress=spark.read.csv('data/stress_detection.csv', header=True, inferSchema=True)

In [4]:
dfs_stress.printSchema()

root
 |-- participant_id: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- PSS_score: integer (nullable = true)
 |-- Openness: double (nullable = true)
 |-- Conscientiousness: double (nullable = true)
 |-- Extraversion: double (nullable = true)
 |-- Agreeableness: double (nullable = true)
 |-- Neuroticism: double (nullable = true)
 |-- sleep_time: double (nullable = true)
 |-- wake_time: double (nullable = true)
 |-- sleep_duration: double (nullable = true)
 |-- PSQI_score: integer (nullable = true)
 |-- call_duration: double (nullable = true)
 |-- num_calls: integer (nullable = true)
 |-- num_sms: integer (nullable = true)
 |-- screen_on_time: double (nullable = true)
 |-- skin_conductance: double (nullable = true)
 |-- accelerometer: double (nullable = true)
 |-- mobility_radius: double (nullable = true)
 |-- mobility_distance: double (nullable = true)



In [5]:
dfs_stress.show()

+--------------+---+---------+-----------+-----------------+------------+-------------+-----------+-----------+-----------+--------------+----------+-------------+---------+-------+--------------+----------------+-------------+---------------+-----------------+
|participant_id|day|PSS_score|   Openness|Conscientiousness|Extraversion|Agreeableness|Neuroticism| sleep_time|  wake_time|sleep_duration|PSQI_score|call_duration|num_calls|num_sms|screen_on_time|skin_conductance|accelerometer|mobility_radius|mobility_distance|
+--------------+---+---------+-----------+-----------------+------------+-------------+-----------+-----------+-----------+--------------+----------+-------------+---------+-------+--------------+----------------+-------------+---------------+-----------------+
|             1|  1|       34| 2.32273211|      4.332193156| 1.185877791|  1.570213395|3.782093775|7.726792258|  5.1906604|   6.572068901|         1|  3.924526753|       12|     32|   10.70371376|     3.115729826| 

#### Persist dataframe

In [6]:
dfs_stress.persist(StorageLevel.MEMORY_AND_DISK).count()

3000

# Nettoyage des données

In [7]:
missing_counts = dfs_stress.select([sum(col(c).isNull().cast("int")).alias(c) for c in dfs_stress.columns])
missing_counts.show()

+--------------+---+---------+--------+-----------------+------------+-------------+-----------+----------+---------+--------------+----------+-------------+---------+-------+--------------+----------------+-------------+---------------+-----------------+
|participant_id|day|PSS_score|Openness|Conscientiousness|Extraversion|Agreeableness|Neuroticism|sleep_time|wake_time|sleep_duration|PSQI_score|call_duration|num_calls|num_sms|screen_on_time|skin_conductance|accelerometer|mobility_radius|mobility_distance|
+--------------+---+---------+--------+-----------------+------------+-------------+-----------+----------+---------+--------------+----------+-------------+---------+-------+--------------+----------------+-------------+---------------+-----------------+
|             0|  0|        0|       0|                0|           0|            0|          0|         0|        0|             0|         0|            0|        0|      0|             0|               0|            0|           

Il n'y a pas de données manquantes

In [8]:
duplicate_counts = dfs_stress.groupBy("participant_id", "day").count()
duplicates = duplicate_counts.filter(col("count") > 1)
duplicates.show()

+--------------+---+-----+
|participant_id|day|count|
+--------------+---+-----+
+--------------+---+-----+



Il n'y a pas de doublons

#### Valeurs aberrantes

In [9]:
# participant_id : 1 à 100
out_of_range_participant_id = dfs_stress.filter((col("participant_id") < 1) | (col("participant_id") > 100))
out_of_range_participant_id.show()

# day : 1 à 30
out_of_range_day = dfs_stress.filter((col("day") < 1) | (col("day") > 30))
out_of_range_day.show()

# PSS_score : 10 à 40
out_of_range_pss_score = dfs_stress.filter((col("PSS_score") < 10) | (col("PSS_score") > 40))
out_of_range_pss_score.show()

# Openness, Conscientiousness, Extraversion, Agreeableness, Neuroticism : 1.0 à 5.0
traits = ["Openness", "Conscientiousness", "Extraversion", "Agreeableness", "Neuroticism"]
for trait in traits:
    dfs_stress.filter((col(trait) < 1.0) | (col(trait) > 5.0)).show()

# sleep_time : 5.0 à 9.0
out_of_range_sleep_time = dfs_stress.filter((col("sleep_time") < 5.0) | (col("sleep_time") > 9.0))
out_of_range_sleep_time.show()

# wake_time : 5.0 à 9.0
out_of_range_wake_time = dfs_stress.filter((col("wake_time") < 5.0) | (col("wake_time") > 9.0))
out_of_range_wake_time.show()

# sleep_duration : 6.0 à 9.0
out_of_range_sleep_duration = dfs_stress.filter((col("sleep_duration") < 6.0) | (col("sleep_duration") > 9.0))
out_of_range_sleep_duration.show()

# PSQI_score : 1 à 5
out_of_range_psqi_score = dfs_stress.filter((col("PSQI_score") < 1) | (col("PSQI_score") > 5))
out_of_range_psqi_score.show()

# call_duration : 0 à 60
out_of_range_call_duration = dfs_stress.filter((col("call_duration") < 0) | (col("call_duration") > 60))
out_of_range_call_duration.show()

# num_calls : 0 à 20
out_of_range_num_calls = dfs_stress.filter((col("num_calls") < 0) | (col("num_calls") > 20))
out_of_range_num_calls.show()

# num_sms : 0 à 50
out_of_range_num_sms = dfs_stress.filter((col("num_sms") < 0) | (col("num_sms") > 50))
out_of_range_num_sms.show()

# screen_on_time : 1.0 à 12.0
out_of_range_screen_on_time = dfs_stress.filter((col("screen_on_time") < 1.0) | (col("screen_on_time") > 12.0))
out_of_range_screen_on_time.show()

# skin_conductance : 0.5 à 5.0 µS
out_of_range_skin_conductance = dfs_stress.filter((col("skin_conductance") < 0.5) | (col("skin_conductance") > 5.0))
out_of_range_skin_conductance.show()

# accelerometer : 0.1 à 2.5 g
out_of_range_accelerometer = dfs_stress.filter((col("accelerometer") < 0.1) | (col("accelerometer") > 2.5))
out_of_range_accelerometer.show()

# mobility_radius : 0.1 à 1.5 km
out_of_range_mobility_radius = dfs_stress.filter((col("mobility_radius") < 0.1) | (col("mobility_radius") > 1.5))
out_of_range_mobility_radius.show()

# mobility_distance : 0.5 à 5.0 km
out_of_range_mobility_distance = dfs_stress.filter((col("mobility_distance") < 0.5) | (col("mobility_distance") > 5.0))
out_of_range_mobility_distance.show()



+--------------+---+---------+--------+-----------------+------------+-------------+-----------+----------+---------+--------------+----------+-------------+---------+-------+--------------+----------------+-------------+---------------+-----------------+
|participant_id|day|PSS_score|Openness|Conscientiousness|Extraversion|Agreeableness|Neuroticism|sleep_time|wake_time|sleep_duration|PSQI_score|call_duration|num_calls|num_sms|screen_on_time|skin_conductance|accelerometer|mobility_radius|mobility_distance|
+--------------+---+---------+--------+-----------------+------------+-------------+-----------+----------+---------+--------------+----------+-------------+---------+-------+--------------+----------------+-------------+---------------+-----------------+
+--------------+---+---------+--------+-----------------+------------+-------------+-----------+----------+---------+--------------+----------+-------------+---------+-------+--------------+----------------+-------------+-----------

Il n'y pas de valeurs aberrantes

# 3. Normalisation des variables numériques

on supprime les colonnes inutiles

In [10]:
dfs_stress = dfs_stress.drop("participant_id", "day")

### Assembler les colonnes numériques pour la normalisation

In [11]:
numeric_cols = ["PSS_score", "Openness", "Conscientiousness", "Extraversion", 
                "Agreeableness", "Neuroticism", "sleep_time", "wake_time", 
                "sleep_duration", "PSQI_score", "call_duration", "num_calls", 
                "num_sms", "screen_on_time", "skin_conductance", "accelerometer", 
                "mobility_radius", "mobility_distance"]

In [12]:
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="numeric_features")
dfs_stress = assembler.transform(dfs_stress)

In [13]:
dfs_stress.persist(StorageLevel.MEMORY_AND_DISK).count()

3000

### Appliquer MinMaxScaler pour normaliser les données

In [14]:
scaler = MinMaxScaler(inputCol="numeric_features", outputCol="scaled_numeric_features")
scaler_model = scaler.fit(dfs_stress)
dfs_stress = scaler_model.transform(dfs_stress)

In [15]:
dfs_stress.persist(StorageLevel.MEMORY_AND_DISK).count()

3000

###  Assembler toutes les caractéristiques en une seule colonne `features`

In [16]:
feature_cols = ["scaled_numeric_features"]
assembler_final = VectorAssembler(inputCols=feature_cols, outputCol="features")
dfs_stress = assembler_final.transform(dfs_stress)

In [17]:
dfs_regression=dfs_stress.select("features", "PSS_score")
dfs_regression.persist(StorageLevel.MEMORY_AND_DISK).count()

3000

In [18]:
dfs_regression.show()

+--------------------+---------+
|            features|PSS_score|
+--------------------+---------+
|[0.82758620689655...|       34|
|[0.93103448275862...|       37|
|[0.68965517241379...|       30|
|[0.20689655172413...|       16|
|[0.75862068965517...|       32|
|[0.62068965517241...|       28|
|[1.0,0.4730534583...|       39|
|[0.31034482758620...|       19|
|[0.41379310344827...|       22|
|[0.17241379310344...|       15|
|[0.58620689655172...|       27|
|[0.03448275862068...|       11|
|[0.0,0.2365329049...|       10|
|[0.03448275862068...|       11|
|[0.31034482758620...|       19|
|[0.41379310344827...|       22|
|[0.24137931034482...|       17|
|[0.13793103448275...|       14|
|[0.44827586206896...|       23|
|[0.89655172413793...|       36|
+--------------------+---------+
only showing top 20 rows



In [19]:
dfs_stress.show()

+---------+-----------+-----------------+------------+-------------+-----------+-----------+-----------+--------------+----------+-------------+---------+-------+--------------+----------------+-------------+---------------+-----------------+--------------------+-----------------------+--------------------+
|PSS_score|   Openness|Conscientiousness|Extraversion|Agreeableness|Neuroticism| sleep_time|  wake_time|sleep_duration|PSQI_score|call_duration|num_calls|num_sms|screen_on_time|skin_conductance|accelerometer|mobility_radius|mobility_distance|    numeric_features|scaled_numeric_features|            features|
+---------+-----------+-----------------+------------+-------------+-----------+-----------+-----------+--------------+----------+-------------+---------+-------+--------------+----------------+-------------+---------------+-----------------+--------------------+-----------------------+--------------------+
|       34| 2.32273211|      4.332193156| 1.185877791|  1.570213395|3.782

# Analyse Exploratoire des Données

### Calculer les statistiques descriptives pour toutes les colonnes numériques

In [20]:
dfs_stress.describe().show()

+-------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+-----------------+
|summary|        PSS_score|         Openness| Conscientiousness|      Extraversion|     Agreeableness|       Neuroticism|        sleep_time|         wake_time|    sleep_duration|        PSQI_score|     call_duration|        num_calls|           num_sms|   screen_on_time|  skin_conductance|     accelerometer|   mobility_radius|mobility_distance|
+-------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+----------------

### Visualisation exploratoire

In [21]:
import plotly.express as px

# Extraire les données de PySpark sans conversion complète en Pandas
x_data = dfs_stress.select("Openness").rdd.flatMap(lambda x: x).collect()
y_data = dfs_stress.select("PSS_score").rdd.flatMap(lambda x: x).collect()

# Créer un scatter plot avec Plotly
fig = px.scatter(x=x_data, y=y_data, labels={'x': 'Openness', 'y': 'PSS_score'},
                 title="Relation entre Openness et PSS_score")
fig.show()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 53.0 failed 1 times, most recent failure: Lost task 0.0 in stage 53.0 (TID 45) (132.213.96.104 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)
	at java.net.ServerSocket.implAccept(ServerSocket.java:560)
	at java.net.ServerSocket.accept(ServerSocket.java:528)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 17 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)
	at java.net.ServerSocket.implAccept(ServerSocket.java:560)
	at java.net.ServerSocket.accept(ServerSocket.java:528)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 17 more
