In [None]:
df = (spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv("/dbfs/mnt/data/students_habit_short.csv"))

df.show()
df.printSchema()
from pyspark.sql.functions import when

df_selected = df.select("Name", "Age", "StudyHours")

df_filtered = df_selected.filter(df_selected.StudyHours > 2)

df_with_agegroup = df_filtered.withColumn(
    "AgeGroup",
    when(df_filtered.Age < 20, "Teen").otherwise("Adult")
)

df_with_agegroup.show()
from pyspark.sql.functions import avg, max

df_agg = df_with_agegroup.groupBy("AgeGroup").agg(
    avg("StudyHours").alias("AvgStudyHours"),
    max("StudyHours").alias("MaxStudyHours")
)

df_agg.show()
df_sorted = df_with_agegroup.orderBy(df_with_agegroup.StudyHours.desc())
df_sorted.show()
print(f"Total Students: {df_with_agegroup.count()}")

top5 = df_sorted.take(5)
print("Top 5 students with highest study hours:")
for row in top5:
    print(row)

random3 = df_with_agegroup.take(3)
print("Random 3 students:")
for row in random3:
    print(row)
df_with_agegroup.write.mode("overwrite").parquet("/tmp/students_with_agegroup")

from pyspark import StorageLevel

# Persist the dataframe
df_with_agegroup.persist(StorageLevel.MEMORY_AND_DISK)

# Trigger materialization
print(f"Persisted Students Count: {df_with_agegroup.count()}")

# After processing
df_with_agegroup.unpersist()

print(f"Original partitions: {df_with_agegroup.rdd.getNumPartitions()}")

df_repartitioned = df_with_agegroup.repartition(4)
print(f"Partitions after repartition: {df_repartitioned.rdd.getNumPartitions()}")

df_coalesced = df_repartitioned.coalesce(2)
print(f"Partitions after coalesce: {df_coalesced.rdd.getNumPartitions()}")

from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

# Study Category
df_study_category = df_with_agegroup.withColumn(
    "StudyCategory",
    when(col("StudyHours") <= 2, "Low")
    .when((col("StudyHours") > 2) & (col("StudyHours") <= 5), "Medium")
    .otherwise("High")
)

# Window Rank
windowSpec = Window.orderBy(col("StudyHours").desc())

df_ranked = df_study_category.withColumn(
    "StudyRank",
    rank().over(windowSpec)
)

df_ranked.show()

from pyspark.sql.functions import isnan, isnull

# Show nulls
df_with_agegroup.select([count(when(isnull(c) | isnan(c), c)).alias(c) for c in df_with_agegroup.columns]).show()

# Fill missing StudyHours
df_filled = df_with_agegroup.fillna({'StudyHours': 0})
df_filled.show()


df_sampled = df_with_agegroup.sample(fraction=0.3, seed=42)
df_sampled.cache()
df_sampled.show()

df_with_agegroup.explain()

df_with_agegroup.describe().show()

df_with_agegroup.write.format("delta").mode("overwrite").save("/tmp/delta/students")
