# Prerequisites

In [2]:
import $cp.`/usr/local/spark/jars/*`

// Set log level
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)

// Create spark session
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("Test")
  .master("local[*]")
  .config("spark.driver.memory", "2g")
  .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

// imports
import spark.implicits._

println("Spark ready")

Spark ready


[32mimport [39m[36m$cp.$[39m
[32mimport [39m[36morg.apache.log4j.{Level, Logger}[39m
[32mimport [39m[36morg.apache.spark.sql.SparkSession[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@30bd2373
[32mimport [39m[36mspark.implicits._[39m

In [3]:
// class for pretty pandas like output
implicit class DataFrameExtensions(df: org.apache.spark.sql.DataFrame) {
  def showPretty(numRows: Int = 20): Unit = {
    val html = "<table border='1' style='border-collapse: collapse;'>" +
      "<tr style='background-color: #f0f0f0;'>" + 
      df.columns.map(c => s"<th style='padding: 5px;'>$c</th>").mkString +
      "</tr>" +
      df.take(numRows).map { row =>
        "<tr>" + row.toSeq.map(v => s"<td style='padding: 5px;'>$v</td>").mkString + "</tr>"
      }.mkString +
      "</table>"
    
    kernel.publish.html(html)
  }
}

defined [32mclass[39m [36mDataFrameExtensions[39m

# HW1

### Dataset description

Working with dataset https://www.kaggle.com/datasets/mahdimashayekhi/mental-health
It is a synthetic dataset of global mental health survey responses from 10,000 individuals. The reasons I chose this dataset are an increasing problem of mental issues with every new generation and influence from a recently read book "The anxious generation"

I couldn't find a real dataset that matches requirements of the class so decided to proceed with a synthetic one which contains continious values and not only categorical.

In [62]:
import org.apache.spark.sql.{DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val path = "/Users/admin/Downloads/mental_health_dataset.csv"

val schema = StructType(Array(
  StructField("age", IntegerType, true),
  StructField("gender", StringType, true),
  StructField("employment_status", StringType, true),
  StructField("work_environment", StringType, true),
  StructField("mental_health_history", StringType, true),
  StructField("seeks_treatment", StringType, true),
  StructField("stress_level", IntegerType, true),
  StructField("sleep_hours", DoubleType, true),
  StructField("physical_activity_days", IntegerType, true),
  StructField("depression_score", IntegerType, true),
  StructField("anxiety_score", IntegerType, true),
  StructField("social_support_score", IntegerType, true),
  StructField("productivity_score", DoubleType, true),
  StructField("mental_health_risk", StringType, true)
))

    
val df = spark.read
  .option("header", "true")
  .option("delimiter", ",")
  .option("ignoreLeadingWhiteSpace", "true")
  .option("ignoreTrailingWhiteSpace", "true")
  .schema(schema)
  .csv(path)

println("                                               Top 5 rows of df")
df.showPretty(5)
println("\n\n")

// Check number of not null values
val notNullCounts = df.select(
  df.columns.map(c => count(col(c)).alias(c)): _*
)

println("                                               Not NULL counts")
notNullCounts.showPretty()
println("\n\n")


// Check all unique values for categorical columns
val categoricalCols = df.schema.fields
  .filter(_.dataType == StringType)
  .map(_.name)

val uniqueArraysDf = df.select(
  categoricalCols.map { c =>
    collect_set(col(c)).as(c)
  }: _*
)

println("                                  Unique values of categorical columns")

uniqueArraysDf.showPretty()
println("\n\n")


                                               Top 5 rows of df


age,gender,employment_status,work_environment,mental_health_history,seeks_treatment,stress_level,sleep_hours,physical_activity_days,depression_score,anxiety_score,social_support_score,productivity_score,mental_health_risk
56,Male,Employed,On-site,Yes,Yes,6,6.2,3,28,17,54,59.7,High
46,Female,Student,On-site,No,Yes,10,9.0,4,30,11,85,54.9,High
32,Female,Employed,On-site,Yes,No,7,7.7,2,24,7,62,61.3,Medium
60,Non-binary,Self-employed,On-site,No,No,4,4.5,4,6,0,95,97.0,Low
25,Female,Self-employed,On-site,Yes,Yes,3,5.4,0,24,12,70,69.0,High





                                               Not NULL counts


age,gender,employment_status,work_environment,mental_health_history,seeks_treatment,stress_level,sleep_hours,physical_activity_days,depression_score,anxiety_score,social_support_score,productivity_score,mental_health_risk
10000,10000,10000,10000,10000,10000,10000,10000,10000,10000,10000,10000,10000,10000





                                  Unique values of categorical columns


gender,employment_status,work_environment,mental_health_history,seeks_treatment,mental_health_risk
"WrappedArray(Prefer not to say, Female, Non-binary, Male)","WrappedArray(Self-employed, Student, Employed, Unemployed)","WrappedArray(Remote, On-site, Hybrid)","WrappedArray(No, Yes)","WrappedArray(No, Yes)","WrappedArray(Medium, Low, High)"







[32mimport [39m[36morg.apache.spark.sql.{DataFrame}[39m
[32mimport [39m[36morg.apache.spark.sql.types._[39m
[32mimport [39m[36morg.apache.spark.sql.functions._[39m
[36mpath[39m: [32mString[39m = [32m"/Users/admin/Downloads/mental_health_dataset.csv"[39m
[36mschema[39m: [32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"age"[39m, IntegerType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"gender"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"employment_status"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"work_environment"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"mental_health_history"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"seeks_treatment"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"stress_level"[39m, IntegerType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"sleep_hours"[39m, DoubleType, [32mtrue[39m,

### First look

As we see data contains no missing values. For the fun purpose let's generate them. Also could cutoff some gender values, but according to the book it should be a correlation between all specified genders and mental health score.

We shouldn't just randomly generate missing values. Purpose is to avoid [MNAR, but have MCAR and MAR](https://stefvanbuuren.name/fimd/sec-MCAR.html)

Assumptions for this dataset:
    
    - mental_health_history	- people don’t disclose
    - seeks_treatment	- sensitive question
    - depression_score	- not all respondents complete test
    - anxiety_score	        - same
    - sleep_hours	        - recall bias
    - social_support_score	- optional questionnaire

In [63]:
// Generate missing values

val seed = 42

val dfWithMissing = df
  // Sensitive categorical fields
  .withColumn(
    "mental_health_history",
    when(rand(seed) < 0.12, lit(null)).otherwise(col("mental_health_history"))
  )
  .withColumn(
    "seeks_treatment",
    when(rand(seed + 1) < 0.10, lit(null)).otherwise(col("seeks_treatment"))
  )

  // Psychometric scores (MAR: more missing if stress is high (assuming missingness correlates with stress (realistic)))
  .withColumn(
    "depression_score",
    when(col("stress_level") >= 7 && rand(seed + 2) < 0.25, lit(null))
      .when(rand(seed + 3) < 0.05, lit(null))
      .otherwise(col("depression_score"))
  )
  .withColumn(
    "anxiety_score",
    when(col("stress_level") >= 7 && rand(seed + 4) < 0.25, lit(null))
      .when(rand(seed + 5) < 0.05, lit(null))
      .otherwise(col("anxiety_score"))
  )

  // Recall-based
  .withColumn(
    "sleep_hours",
    when(rand(seed + 6) < 0.08, lit(null)).otherwise(col("sleep_hours"))
  )
  .withColumn(
    "social_support_score",
    when(rand(seed + 7) < 0.10, lit(null)).otherwise(col("social_support_score"))
  )


[36mseed[39m: [32mInt[39m = [32m42[39m
[36mdfWithMissing[39m: [32mDataFrame[39m = [age: int, gender: string ... 12 more fields]

In [64]:
val notNullCounts = dfWithMissing.select(
  df.columns.map(c => count(col(c)).alias(c)): _*
)

println("                                               Not NULL counts")
notNullCounts.showPretty()

                                               Not NULL counts


age,gender,employment_status,work_environment,mental_health_history,seeks_treatment,stress_level,sleep_hours,physical_activity_days,depression_score,anxiety_score,social_support_score,productivity_score,mental_health_risk
10000,10000,10000,10000,8719,8995,10000,9221,10000,8528,8547,9030,10000,10000


[36mnotNullCounts[39m: [32mDataFrame[39m = [age: bigint, gender: bigint ... 12 more fields]

Now we have some missing values

Let's create outliers. Outliers should be: rare (1–3%), asymmetric, domain-plausible. Let's procceed with:

    - sleep_hours	         - very low (1–3h) or very high (12–16h)  
    - depression_score	 - maxed out (35-40)
    - anxiety_score	         - same
    - productivity_score     - near zero


In [67]:
val dfWithOutliers = dfWithMissing
  // Sleep hours: extreme insomnia or hypersomnia
  .withColumn(
    "sleep_hours",
    round(
      when(rand(seed + 8) < 0.015, lit(1) + rand(seed + 9) * 2)   // 1–3 hours
        .when(rand(seed + 10) < 0.015, lit(12) + rand(seed + 11) * 4) // 12–16 hours
        .otherwise(col("sleep_hours")),
      1
    )
  )
  // Depression score spikes
  .withColumn(
    "depression_score",
    when(rand(seed + 12) < 0.02, lit(35) + (rand(seed + 13) * 5).cast(IntegerType))
      .otherwise(col("depression_score"))
  )

  // Anxiety score spikes
  .withColumn(
    "anxiety_score",
    when(rand(seed + 14) < 0.02, lit(30) + (rand(seed + 15) * 7).cast(IntegerType))
      .otherwise(col("anxiety_score"))
  )

  // Productivity: collapse or unrealistic inflation
  .withColumn(
    "productivity_score",
    round(
        when(rand(seed + 16) < 0.015, rand(seed + 17) * 10)      // near zero
          .otherwise(col("productivity_score")),
        1
    )
  )

val finalDf = dfWithOutliers

[36mdfWithOutliers[39m: [32mDataFrame[39m = [age: int, gender: string ... 12 more fields]
[36mfinalDf[39m: [32mDataFrame[39m = [age: int, gender: string ... 12 more fields]

In [68]:
val continuousColumns = finalDf.schema.fields
  .filter(_.dataType != StringType)
  .map(_.name)

finalDf.select(continuousColumns.map(col): _*).describe().showPretty()

summary,age,stress_level,sleep_hours,physical_activity_days,depression_score,anxiety_score,social_support_score,productivity_score
count,10000.0,10000.0,9249.0,10000.0,8548.0,8580.0,9030.0,10000.0
mean,41.5576,5.572,6.530749270191372,3.5057,15.572765559195132,11.054428904428905,50.19656699889258,76.28565000000013
stddev,13.749581351398849,2.887741315455824,1.8459097126727808,2.282737084917989,9.44771712206466,7.124486977124483,29.291024666123764,16.40523703753545
min,18.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0
max,65.0,10.0,16.0,7.0,39.0,36.0,100.0,100.0


[36mcontinuousColumns[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"age"[39m,
  [32m"stress_level"[39m,
  [32m"sleep_hours"[39m,
  [32m"physical_activity_days"[39m,
  [32m"depression_score"[39m,
  [32m"anxiety_score"[39m,
  [32m"social_support_score"[39m,
  [32m"productivity_score"[39m
)

As we see we received outliers (like sleep hours = 1)
Let's proceed with the actual task:

Calculate the main statistical characteristics for each of the numerical features:
- Mode, median, mathematical expectation;
- Variance;
- 25%, 75%, 50% quantiles.

Let's create a function which will output following metrics.

In [71]:
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

def numericSummary(df: DataFrame, numericCols: Seq[String])(implicit spark: SparkSession): DataFrame = {

  val rows = numericCols.map { colName =>
    val quantiles = Seq(0.0, 0.25, 0.5, 0.75, 1.0)
    val quantileValues = df.stat.approxQuantile(colName, quantiles.toArray, 0.01)

    val modeValue = df.groupBy(col(colName)).count()
      .orderBy(desc("count"))
      .limit(1)
      .collect()
      .headOption
      .map(_.get(0) match {
        case n: Number => n.doubleValue()
        case _ => null
      })
      .getOrElse(null)

    val meanValue = df.agg(avg(col(colName))).collect()(0).getDouble(0)
    val varianceValue = df.agg(variance(col(colName))).collect()(0).getDouble(0)
    val stdDevValue = df.agg(stddev(col(colName))).collect()(0).getDouble(0)

    Row(
      colName,
      modeValue,
      quantileValues(2),       // Median
      meanValue,
      stdDevValue,
      varianceValue,
      quantileValues(0),       // min
      quantileValues(1),       // 25%
      quantileValues(3),       // 75%
      quantileValues(4),       // max
    )
  }

  val outputSchema = StructType(Seq(
    StructField("feature", StringType, nullable = false),
    StructField("mode", DoubleType, nullable = true),
    StructField("median", DoubleType, nullable = true),
    StructField("mean", DoubleType, nullable = true),
    StructField("stdDevValue", DoubleType, nullable = true),
    StructField("variance", DoubleType, nullable = true),
    StructField("min", DoubleType, nullable = true),
    StructField("25%", DoubleType, nullable = true),
    StructField("75%", DoubleType, nullable = true),
    StructField("max", DoubleType, nullable = true),
  ))

  spark.createDataFrame(spark.sparkContext.parallelize(rows), outputSchema)
}


[32mimport [39m[36morg.apache.spark.sql.{DataFrame, Row, SparkSession}[39m
[32mimport [39m[36morg.apache.spark.sql.types._[39m
[32mimport [39m[36morg.apache.spark.sql.functions._[39m
defined [32mfunction[39m [36mnumericSummary[39m

In [72]:
numericSummary(finalDf, continuousColumns)(spark).showPretty()

feature,mode,median,mean,stdDevValue,variance,min,25%,75%,max
age,43.0,41.0,41.5576,13.749581351398849,189.05098733873496,18.0,30.0,53.0,65.0
stress_level,10.0,6.0,5.572,2.887741315455824,8.339049904990532,1.0,3.0,8.0,10.0
sleep_hours,,6.4,6.530749270191372,1.8459097126727808,3.4073826673397085,1.0,5.4,7.5,16.0
physical_activity_days,1.0,3.0,3.5057,2.282737084917989,5.2108885988598805,0.0,2.0,5.0,7.0
depression_score,,15.0,15.572765559195132,9.44771712206466,89.25935881855378,0.0,8.0,23.0,39.0
anxiety_score,,11.0,11.054428904428905,7.124486977124483,50.75831468721636,0.0,5.0,16.0,36.0
social_support_score,,49.0,50.19656699889258,29.291024666123764,857.9641259914707,0.0,24.0,75.0,100.0
productivity_score,100.0,76.9,76.28565000000013,16.40523703753545,269.13180225772487,0.0,65.3,88.5,100.0


As we can see:
1. Age is evenly distributed between people 18-65 years
2. Stress level is evenly disributed from 1 to 10 so the most common stress level is still 10
3. Median sleep time is 6.4 hours. The distribution is most likely Gaussian as variance is low.
4. Physical activity days are also most likely evenly distibuted
5. All scores have high deviation which can mean distibution accross all values with spikes of some specific values (like 100 for productivity score)


Some columns can be shown as numerical:
    
    mental_health_risk column can be showed as numerical low = 0, medium = 1, high = 2.
    mental_health_history can be showed as numerical no = 0, yes = 1
    seeks_treatment can be showed as numerical no = 0, yes = 1
    
 

In [75]:
val dfEncoded = finalDf.select(
  when(col("mental_health_risk") === "Low", 0)
    .when(col("mental_health_risk") === "Medium", 1)
    .when(col("mental_health_risk") === "High", 2)
    .otherwise(null).alias("mental_health_risk"),
  when(col("mental_health_history") === "No", 0)
    .when(col("mental_health_history") === "Yes", 1)
    .otherwise(null).alias("mental_health_history"),
  when(col("seeks_treatment") === "No", 0)
    .when(col("seeks_treatment") === "Yes", 1)
    .otherwise(null).alias("seeks_treatment")
)

numericSummary(dfEncoded, Seq("mental_health_risk", "mental_health_history", "seeks_treatment"))(spark).showPretty()

feature,mode,median,mean,stdDevValue,variance,min,25%,75%,max
mental_health_risk,1.0,1.0,1.063,0.6378649443014696,0.4068716871687168,0.0,1.0,1.0,2.0
mental_health_history,0.0,0.0,0.3026723248078908,0.4594409630078261,0.2110859984895586,0.0,0.0,1.0,1.0
seeks_treatment,0.0,0.0,0.397331851028349,0.489372941139736,0.2394858755197555,0.0,0.0,1.0,1.0


[36mdfEncoded[39m: [32mDataFrame[39m = [mental_health_risk: int, mental_health_history: int ... 1 more field]

We can see that most people (> 50%) have medium risk for mental health. Less then 50% percent of people have mental health history and seek treatment.

For categorical columns we can only calculate mode (the most frequent value and number of unique values.

In [76]:
def categoricalSummary(df: DataFrame, catCols: Seq[String]): DataFrame = {
  val rows = catCols.map { colName =>
    val countsDF = df.groupBy(col(colName)).count()
    val modeRow = countsDF.orderBy(desc("count")).limit(1).collect().headOption
    val modeValue = modeRow.map(_.get(0)).getOrElse(null)
    val uniqueCount = df.select(col(colName)).distinct().count()

    Row(colName, modeValue, uniqueCount)
  }

  val schema = StructType(Seq(
    StructField("feature", StringType, nullable = false),
    StructField("mode", StringType, nullable = true),
    StructField("unique_count", LongType, nullable = true)
  ))

  df.sparkSession.createDataFrame(df.sparkSession.sparkContext.parallelize(rows), schema)
}

val categoricalCols = Seq(
  "gender",
  "employment_status",
  "work_environment"
)

categoricalSummary(
    finalDf.select(  
        "gender",
        "employment_status",
        "work_environment"
    ), categoricalCols).showPretty()

feature,mode,unique_count
gender,Male,4
employment_status,Employed,4
work_environment,On-site,3


defined [32mfunction[39m [36mcategoricalSummary[39m
[36mcategoricalCols[39m: [32mSeq[39m[[32mString[39m] = [33mList[39m(
  [32m"gender"[39m,
  [32m"employment_status"[39m,
  [32m"work_environment"[39m
)

As we see most people in the distribution are males. The most frequent employment status - Employed, work_environment - offline