In [1]:
from pyspark.sql import SparkSession

# Start Spark
spark = SparkSession.builder.appName("MidMarksAnalysis").getOrCreate()

# Load CSV
df = spark.read.csv("MIDMARKS.csv", header=True, inferSchema=True)

# Show sample
df.show(5)


+----+-------+---+----+---+----+---+----+
|S.NO|SECTION| DV|M-II| PP|BEEE| FL|FIMS|
+----+-------+---+----+---+----+---+----+
|   1|  ALPHA| 12|   0| 17|   9| 19|  15|
|   2|  ALPHA| 19|  12| 16|  16| 18|   3|
|   3|  ALPHA| 18|  14| 18|  18| 18|  16|
|   4|  ALPHA| 15|   9| 19|  17| 19|  15|
|   5|  ALPHA| 18|  17| 19|  19| 20|  18|
+----+-------+---+----+---+----+---+----+
only showing top 5 rows


In [8]:
# Schema
df.printSchema()

df = df.withColumnRenamed("S.NO", "SNO") \
       .withColumnRenamed("M-II", "MII")
print("-----------------------")
df.describe().show()


root
 |-- SNO: integer (nullable = true)
 |-- SECTION: string (nullable = true)
 |-- DV: string (nullable = true)
 |-- MII: string (nullable = true)
 |-- PP: string (nullable = true)
 |-- BEEE: string (nullable = true)
 |-- FL: string (nullable = true)
 |-- FIMS: string (nullable = true)

-----------------------
+-------+------------------+-------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|               SNO|SECTION|                DV|               MII|                PP|              BEEE|                FL|              FIMS|
+-------+------------------+-------+------------------+------------------+------------------+------------------+------------------+------------------+
|  count|               480|    439|               479|               477|               480|               478|               479|               480|
|   mean|             240.5|   NULL|14.594017094017094|10.455531453362257|13.15811

In [9]:
from pyspark.sql.functions import col, sum
null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts.show()



+---+-------+---+---+---+----+---+----+
|SNO|SECTION| DV|MII| PP|BEEE| FL|FIMS|
+---+-------+---+---+---+----+---+----+
|  0|     41|  1|  3|  0|   2|  1|   0|
+---+-------+---+---+---+----+---+----+



In [13]:
df.describe().show(truncate=False)
df.summary().show(truncate=False)

+-------+------------------+-------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|SNO               |SECTION|DV                |MII               |PP                |BEEE              |FL                |FIMS              |
+-------+------------------+-------+------------------+------------------+------------------+------------------+------------------+------------------+
|count  |480               |439    |479               |477               |480               |478               |479               |480               |
|mean   |240.5             |NULL   |14.594017094017094|10.455531453362257|13.158119658119658|13.665217391304347|15.860813704496788|14.043290043290042|
|stddev |138.70832707519762|NULL   |4.2875406486016745|6.270416807932639 |5.634913181568081 |5.4609813171354205|3.6686682018262085|4.143608150075935 |
|min    |1                 |ALPHA  |1                 |0                 |0                 |0

In [14]:
# Cleaning dataset

In [15]:
df.groupBy("Section").count().show()

+-------+-----+
|Section|count|
+-------+-----+
|   GAMA|    1|
|   ZETA|   19|
|   SGMA|    1|
|   NULL|   41|
|   BETA|   60|
|  OMEGA|   60|
|EPSILON|   60|
|  SIGMA|   59|
|  ALPHA|   60|
|  GAMMA|   59|
|  DELTA|   60|
+-------+-----+



In [17]:
df = df.na.fill({"Section":"ZETA"})

In [18]:
df.groupBy("Section").count().show()

+-------+-----+
|Section|count|
+-------+-----+
|   GAMA|    1|
|   ZETA|   60|
|   SGMA|    1|
|   BETA|   60|
|  OMEGA|   60|
|EPSILON|   60|
|  SIGMA|   59|
|  ALPHA|   60|
|  GAMMA|   59|
|  DELTA|   60|
+-------+-----+



In [19]:
from pyspark.sql.functions import when, col

df = df.withColumn(
    "SECTION",
    when(col("SECTION") == "SGMA","SIGMA")
    .when(col("SECTION") == "GAMMA", "GAMA")
    .otherwise(col("SECTION"))
)

In [21]:
df = df.withColumn(
    "DV",
    when(col("DV") == "A",0)
    .when(col("DV") == "o",0)
    .otherwise(col("DV"))
)

In [22]:
df = df.na.fill({"DV":"0"})

In [23]:
df.groupBy("Section").count().show()

+-------+-----+
|Section|count|
+-------+-----+
|   GAMA|   60|
|   ZETA|   60|
|   BETA|   60|
|  OMEGA|   60|
|EPSILON|   60|
|  SIGMA|   60|
|  ALPHA|   60|
|  DELTA|   60|
+-------+-----+



In [25]:
df.groupBy("Section").count().show()

+-------+-----+
|Section|count|
+-------+-----+
|   GAMA|   60|
|   ZETA|   60|
|   BETA|   60|
|  OMEGA|   60|
|EPSILON|   60|
|  SIGMA|   60|
|  ALPHA|   60|
|  DELTA|   60|
+-------+-----+



In [26]:
# Number of rows & columns
print("Rows:", df.count())
print("Columns:", len(df.columns))

Rows: 480
Columns: 8


In [27]:
df.groupBy("SECTION").count().orderBy("count", ascending=False).show()

+-------+-----+
|SECTION|count|
+-------+-----+
|   GAMA|   60|
|   ZETA|   60|
|   BETA|   60|
|  OMEGA|   60|
|EPSILON|   60|
|  SIGMA|   60|
|  ALPHA|   60|
|  DELTA|   60|
+-------+-----+



In [31]:
from pyspark.sql.functions import expr

subjects = ["DV", "MII", "PP", "BEEE", "FL", "FIMS"]

# Try casting each subject column to int, invalid values become NULL
for i in subjects:
    df = df.withColumn(i, expr(f"try_cast({i} as int)"))


In [32]:
df.select(subjects).show(10)


+---+---+----+----+---+----+
| DV|MII|  PP|BEEE| FL|FIMS|
+---+---+----+----+---+----+
| 12|  0|  17|   9| 19|  15|
| 19| 12|  16|  16| 18|   3|
| 18| 14|  18|  18| 18|  16|
| 15|  9|  19|  17| 19|  15|
| 18| 17|  19|  19| 20|  18|
| 17| 16|  18|  10| 15|   9|
| 15| 10|  20|  20| 15|  14|
| 17| 17|  19|  20| 19|  13|
| 10| 18|NULL|  20| 19|  15|
| 18| 19|  20|  20| 20|  15|
+---+---+----+----+---+----+
only showing top 10 rows


In [33]:
df = df.na.fill({"PP":"0"})

In [37]:
df = df.na.fill({"BEEE":"0"})
df = df.na.fill({"PP":"0"})
df = df.na.fill({"FL":"0"})
df = df.na.fill({"FIMS":"0"})
df = df.na.fill({"DV":"0"})
df = df.na.fill({"MII":"0"})

In [38]:
df.select(subjects).show(10)

+---+---+---+----+---+----+
| DV|MII| PP|BEEE| FL|FIMS|
+---+---+---+----+---+----+
| 12|  0| 17|   9| 19|  15|
| 19| 12| 16|  16| 18|   3|
| 18| 14| 18|  18| 18|  16|
| 15|  9| 19|  17| 19|  15|
| 18| 17| 19|  19| 20|  18|
| 17| 16| 18|  10| 15|   9|
| 15| 10| 20|  20| 15|  14|
| 17| 17| 19|  20| 19|  13|
| 10| 18|  0|  20| 19|  15|
| 18| 19| 20|  20| 20|  15|
+---+---+---+----+---+----+
only showing top 10 rows


In [40]:
from pyspark.sql.functions import coalesce, lit

for subj in subjects:
    df = df.withColumn(subj, coalesce(df[subj], lit(0)))

In [50]:
from pyspark.sql.functions import expr

df = df.withColumn("SNO_int", expr("try_cast(SNO as int)"))

In [51]:
df.select("SNO", "SNO_int").show(10)


+---+-------+
|SNO|SNO_int|
+---+-------+
|  1|      1|
|  2|      2|
|  3|      3|
|  4|      4|
|  5|      5|
|  6|      6|
|  7|      7|
|  8|      8|
|  9|      9|
| 10|     10|
+---+-------+
only showing top 10 rows


In [52]:
df.dtypes

[('SNO', 'int'),
 ('SECTION', 'string'),
 ('DV', 'int'),
 ('MII', 'int'),
 ('PP', 'int'),
 ('BEEE', 'int'),
 ('FL', 'int'),
 ('FIMS', 'int'),
 ('SNO_int', 'int')]

In [24]:
# Done with data cleaning

In [53]:
from pyspark.sql.functions import col, avg, min, max

# Summary stats for all numeric columns
df.describe().show()

# More precise summary
numeric_cols = [c for c, t in df.dtypes if t in ('int', 'double', 'bigint')]
df.select([avg(col(c)).alias(f"avg_{c}") for c in numeric_cols]).show()
df.select([min(col(c)).alias(f"min_{c}") for c in numeric_cols]).show()
df.select([max(col(c)).alias(f"max_{c}") for c in numeric_cols]).show()


{"ts": "2025-09-23 07:28:58.478", "level": "ERROR", "logger": "DataFrameQueryContextLogger", "msg": "[CAST_INVALID_INPUT] The value 'MP' of the type \"STRING\" cannot be cast to \"BIGINT\" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018", "context": {"file": "line 5 in cell [21]", "line": "", "fragment": "otherwise", "errorClass": "CAST_INVALID_INPUT"}, "exception": {"class": "Py4JJavaError", "msg": "An error occurred while calling o481.showString.\n: org.apache.spark.SparkNumberFormatException: [CAST_INVALID_INPUT] The value 'MP' of the type \"STRING\" cannot be cast to \"BIGINT\" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018\n== DataFrame ==\n\"otherwise\" was called from\nline 5 in cell [21]\n\r\n\tat org.apache.spark.sql.errors.QueryE

NumberFormatException: [CAST_INVALID_INPUT] The value 'MP' of the type "STRING" cannot be cast to "BIGINT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018
== DataFrame ==
"otherwise" was called from
line 5 in cell [21]
