## 1. Data Processing & Cleaning
- Load and process the CSV file using PySpark
- Handle missing values
- Convert data types where needed (especially 'Sleep Duration' to numeric)
- Remove any inconsistent values
- Output the data quality metrics (nulls, value counts, basic statistics)

##### 1.a Load and process the CSV file using PySpark

In [None]:
from pyspark.sql import SparkSession

sparkSession = SparkSession.builder.appName('StudentDepressionAnalysis').getOrCreate()
df = sparkSession.read.csv('./data/Student Depression Dataset.csv', header=True, inferSchema=True)
df.show()

In [None]:
df.filter("`Work Pressure` != 0").select('Academic Pressure','Work Pressure').show()

df.groupBy('Sleep Duration').count().show()

##### 1.b Handle missing values

I want first to output rows that have missing values and then remove them from data frame

In [None]:
from pyspark.sql import functions as F

filter_expr = F.exists(F.array(*df.columns), lambda x: x.isNull())
df.filter(filter_expr).show()

In [4]:
# remove null values 
df = df.na.drop(subset=['Financial Stress'])

# or maybe setting value to 0 would also be an option
#df = df.na.fill(value=0,subset=['Financial Stress']).

##### 1.c Convert data types where needed (especially 'Sleep Duration' to numeric)

First I want to output data type for each column just to check if everything look ok. Then I want to convert columns which don't have appopriate type

In [None]:
df.printSchema()

First I want to group 'Sleep Duration' column to check data we are working with 

In [None]:
df.groupBy('Sleep Duration').count().show()

In [None]:
from pyspark.sql.functions import when, col

#drop rows where value is set to 'Others'
df = df.filter('`Sleep Duration` != "Others"')

df = df.withColumn(
    'Sleep Duration',
    when(col('Sleep Duration') == 'More than 8 hours', 9)
    .when(col('Sleep Duration') == '7-8 hours', 7.5)
    .when(col('Sleep Duration') == '5-6 hours', 5.5)
    .when(col('Sleep Duration') == 'Less than 5 hours', 4)
)

df = df.withColumn('Sleep Duration', col('Sleep Duration').cast('float'))
df.show()

In [None]:
# verify schema
df.printSchema()

##### 1.d Remove any inconsistent values

Could not find any inconsistenct values except 'Other' in 'Dietary Habits' I but unsure what to do with it...


##### 1.e Output the data quality metrics (nulls, value counts, basic statistics)

In [None]:
from pyspark.sql.functions import countDistinct

# Value counts
df.select([(countDistinct(c).alias(f'{c}_distinct_count')) for c in df.columns]).show()

# Basic statistics
df.describe().show()

### 2. Feature Engineering - Create and output these specific features:
- Stress Index = weighted average of (Academic Pressure, Work Pressure, Financial Stress)
- Sleep Categories (Low: <6 hours, Normal: 6-8 hours, High: >8 hours)
- Age Groups (18-21, 22-25, 26-30, >30)
- Normalized versions of all numerical features (0-1 scale)
- Dummy variables for categorical columns

##### 2.a Stress Index = weighted average of (Academic Pressure, Work Pressure, Financial Stress)

In [None]:
from pyspark.sql.functions import desc, asc

df = df.withColumn('Stress Index', F.round((col('Academic Pressure') * 0.4 + col('Work Pressure') * 0.4 + col('Financial Stress') * 0.6), 2))
df.show()

##### 2.b Sleep Categories (Low: <6 hours, Normal: 6-8 hours, High: >8 hours)

In [None]:
df = df.withColumn('Sleep Category', 
                   when(col('Sleep Duration') < 6, 'Low')
                   .when((col('Sleep Duration') >= 6) & (col('Sleep Duration') <= 8), 'Normal')
                   .otherwise('High'))

df.select('Sleep Category', 'Sleep Duration').show()

##### 2.c Age Groups (18-21, 22-25, 26-30, >30)

In [None]:
df = df.withColumn('Age Group', 
                   when((col('Age') >= 18) & (col('Age') <= 21), '18-21')
                   .when((col('Age') >= 22) & (col('Age') <= 25), '22-25')
                   .when((col('Age') >= 26) & (col('Age') <= 30), '26-30')
                   .otherwise('30+'))
df.select('Age', 'Age Group').show()

##### 2.d Normalized versions of all numerical features (0-1 scale)

In [None]:
numeric_cols = ["Stress Index"]

for col_name in numeric_cols:
    min_val = df.agg(F.min(col_name)).collect()[0][0]
    max_val = df.agg(F.max(col_name)).collect()[0][0]

    df = df.withColumn(
        f"{col_name}_normalized",
        F.round((F.col(col_name) - min_val) / (max_val - min_val), 2),
    )

df.show()

##### 2.e Dummy variables for categorical columns

In [None]:
from pyspark.sql.functions import lit

gender_pivot_df = df.groupBy('id').pivot('Gender').agg(lit(1)).fillna(0)

age_group_pivot_df = df.groupBy('id').pivot('Age Group').agg(lit(1)).fillna(0)

sleep_category_pivot_df = df.groupBy('id').pivot('Sleep Category').agg(lit(1)).fillna(0)

df_joined = df.join(
        gender_pivot_df, on='id', how='inner'
    ).join(
        age_group_pivot_df, on='id', how='inner'
    ).join(
        sleep_category_pivot_df, on='id', how='inner'
    )

df_joined.show()

### 3. Outputs

##### 3.a Distribution Statistics
- Depressions by age group and profession
- CGPA statistics by sleep category

In [None]:
# Depressions by age group and profession
df.groupBy(
        'Age Group', 'Profession'
    ).agg(
        F.mean('Depression').alias('Avg Depression'), F.count('Depression').alias('Count')
    ).filter(
        F.col('Count') > 100
    ).orderBy(
        'Age Group'
    ).show()

# CGPA statistics by sleep category
df.groupBy(
        'Sleep Category'
    ).agg(
        F.mean('CGPA').alias('Avg CGPA'), F.stddev('CGPA').alias('StdDev CGPA')
    ).orderBy(
        'Sleep Category'
    ).show()


##### 3.b Correlation Outputs
- Correlation matrix for numerical variables
- Top 5 factors correlated with depression scores

In [None]:
columns_for_corelations = ['Age', 'Academic Pressure', 'CGPA', 'Study Satisfaction', 'Sleep Duration', 'Stress Index']

for idx, val1 in enumerate(columns_for_corelations):
    for val2 in range(idx+1, len(columns_for_corelations)):
        correlation = df.stat.corr(val1, columns_for_corelations[val2])
        print(f'Correlation between {val1} and {columns_for_corelations[val2]}: {correlation}')



correlations = [ (col, df.stat.corr('Depression', col)) for col in columns_for_corelations]
print(correlations)
top_5_correlations = sorted(correlations, key=lambda x: x[1], reverse=True)[:5]

print('Top 5 factors correlated with Depression Score:')
for factor, corr_value in top_5_correlations:
    print(f'{factor}: {corr_value}')


##### 3.c Aggregated Results:
- Depression aggregated by city and degree
- Stress index aggregated by age group and gender
- Academic performance metrics by sleep category

In [None]:
# Depression aggregated by city and degree
df.groupBy(
        'City', 'Degree'
    ).agg(
        F.round((F.mean('Depression') * 100), 2).alias('Depression Percentage'),
        F.count('City').alias('Count')
    ).orderBy(
        'City', 'Degree'
    ).show()

# Stress index aggregated by age group and gender
df.groupBy(
        'Age Group', 'Gender'
    ).agg(
        F.mean('Stress Index').alias('Avg Stress Index'),
        F.count('Age Group').alias('Count')
    ).orderBy(
        'Age Group', 'Gender'
    ).show()

# Academic performance metrics by sleep category
df.groupBy(
    'Sleep Category'
    ).agg(
        F.mean('CGPA').alias('Avg CGPA'),
        F.count('Sleep Category').alias('Count')
    ).orderBy(
        'Sleep Category'
    ).show()


##### 3.d Risk Analysis Output:
- List of high-risk students based on:
    * Stress index
    * Sleep duration
    * Academic/job satisfaction
    * Financial stress

In [None]:
high_stress_threshold = 4
low_sleep_threshold = 6
low_job_satisfaction_threshold = 2
high_academin_pressure_threshold = 4
high_financial_stress_threshold = 4

df.filter(
    (F.col('Stress Index') > high_stress_threshold) &
    (F.col('Sleep Duration') < low_sleep_threshold) &
    (F.col('Job Satisfaction') < low_job_satisfaction_threshold) & 
    (F.col('Academic Pressure') > high_academin_pressure_threshold) &
    (F.col('Financial Stress') > high_financial_stress_threshold)
).show()