# Use Case Project

In [1]:
import os

#Create a Spark Session
from pyspark.sql import SparkSession
import findspark

findspark.init()

spark = SparkSession\
            .builder\
            .appName("SparkWriterJob")\
            .config("spark.sql.shuffle.partitions", 2)\
            .config("spark.default.parallelism", 2)\
            .master("local[2]")\
            .getOrCreate()
print(spark.version)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/06 17:39:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/06 17:39:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


3.5.1


## Reading files into Spark

In [16]:
#Read the raw CSV file int a Spark DataFrame
#    Use inferSchema to infer the schema automatically from the CSV file

student_scores_data = spark\
                .read\
                .option("inferSchema", "true")\
                .option("header", "true")\
                .csv("datasets/student_scores.csv")

# Print the schema for verification
student_scores_data.printSchema();

#Print the first 5 records for verification
student_scores_data.show(5)

root
 |-- Student: string (nullable = true)
 |-- Subject: string (nullable = true)
 |-- ClassScore: double (nullable = true)
 |-- TestScore: double (nullable = true)

+-------+---------+----------+---------+
|Student|  Subject|ClassScore|TestScore|
+-------+---------+----------+---------+
|   Katy|     Math|      0.95|     2.37|
|   Katy|Chemistry|       0.5|     1.18|
|   Katy|  Physics|      0.48|     1.37|
|   Katy|  Biology|      0.75|     2.79|
|   Mike|     Math|      0.45|     1.79|
+-------+---------+----------+---------+
only showing top 5 rows



## Writing to HDFS as a parquet file, use GZIP as a compression codec

In [17]:
student_scores_data.write\
            .option("compression", "gzip")\
            .parquet(path="dummy_student_hdfs/raw_parquet",
                    mode="overwrite");

## Write to HDFS with partitioning

In [18]:
student_scores_data.write\
            .option("compression", "gzip")\
            .partitionBy("Student")\             # I choose student column to be partitioned upon
            .parquet(path="dummy_student_hdfs/partitioned_parquet",
                    mode="overwrite");

SyntaxError: unexpected character after line continuation character (2479332955.py, line 3)

In [19]:
from pyspark.sql.functions import col

#Use a partition attribute for filtering
john_df = student_scores_data.where(col("Student") == 'John')
john_df.show(5)

+-------+---------+----------+---------+
|Student|  Subject|ClassScore|TestScore|
+-------+---------+----------+---------+
|   John|     Math|      0.27|      1.2|
|   John|Chemistry|      0.44|     2.76|
|   John|  Physics|      0.82|      2.8|
|   John|  Biology|      0.41|     2.71|
+-------+---------+----------+---------+



In [24]:
john_df_physics = john_df.filter(col("Subject") == 'Physics').withColumn("totalscore", col('ClassScore') + col ('TestScore'))

john_df_physics

DataFrame[Student: string, Subject: string, ClassScore: double, TestScore: double, totalscore: double]

In [25]:
# Print total score for each student in Physics
john_df_physics.show()

+-------+-------+----------+---------+------------------+
|Student|Subject|ClassScore|TestScore|        totalscore|
+-------+-------+----------+---------+------------------+
|   John|Physics|      0.82|      2.8|3.6199999999999997|
+-------+-------+----------+---------+------------------+



In [29]:
student_scores_data.show()

+-------+---------+----------+---------+
|Student|  Subject|ClassScore|TestScore|
+-------+---------+----------+---------+
|   Katy|     Math|      0.95|     2.37|
|   Katy|Chemistry|       0.5|     1.18|
|   Katy|  Physics|      0.48|     1.37|
|   Katy|  Biology|      0.75|     2.79|
|   Mike|     Math|      0.45|     1.79|
|   Mike|Chemistry|      0.39|     1.21|
|   Mike|  Physics|      0.34|     2.72|
|   Mike|  Biology|      0.57|     2.35|
|    Bob|     Math|      0.36|     2.37|
|    Bob|Chemistry|      0.86|     1.26|
|    Bob|  Physics|      0.93|     2.89|
|    Bob|  Biology|      0.52|     2.87|
|   Lisa|     Math|      0.33|     2.86|
|   Lisa|Chemistry|      0.64|     1.05|
|   Lisa|  Physics|      0.42|     2.34|
|   Lisa|  Biology|      0.39|     1.53|
|   John|     Math|      0.27|      1.2|
|   John|Chemistry|      0.44|     2.76|
|   John|  Physics|      0.82|      2.8|
|   John|  Biology|      0.41|     2.71|
+-------+---------+----------+---------+



In [30]:
student_scores_data_physics = student_scores_data.filter(col('Subject') == 'Physics')
student_scores_data_physics.show()

+-------+-------+----------+---------+
|Student|Subject|ClassScore|TestScore|
+-------+-------+----------+---------+
|   Katy|Physics|      0.48|     1.37|
|   Mike|Physics|      0.34|     2.72|
|    Bob|Physics|      0.93|     2.89|
|   Lisa|Physics|      0.42|     2.34|
|   John|Physics|      0.82|      2.8|
+-------+-------+----------+---------+



In [32]:
student_scores_data_physics = student_scores_data_physics.withColumn("totalscore", col('ClassScore') + col ('TestScore'))
student_scores_data_physics.show()

+-------+-------+----------+---------+------------------+
|Student|Subject|ClassScore|TestScore|        totalscore|
+-------+-------+----------+---------+------------------+
|   Katy|Physics|      0.48|     1.37|              1.85|
|   Mike|Physics|      0.34|     2.72|              3.06|
|    Bob|Physics|      0.93|     2.89|3.8200000000000003|
|   Lisa|Physics|      0.42|     2.34|              2.76|
|   John|Physics|      0.82|      2.8|3.6199999999999997|
+-------+-------+----------+---------+------------------+



In [34]:
# Compute avg total score for each student across all subjects
student_scores_data = student_scores_data.withColumn("totalscore", col('ClassScore') + col ('TestScore'))
student_scores_data.show()

+-------+---------+----------+---------+------------------+
|Student|  Subject|ClassScore|TestScore|        totalscore|
+-------+---------+----------+---------+------------------+
|   Katy|     Math|      0.95|     2.37|3.3200000000000003|
|   Katy|Chemistry|       0.5|     1.18|              1.68|
|   Katy|  Physics|      0.48|     1.37|              1.85|
|   Katy|  Biology|      0.75|     2.79|              3.54|
|   Mike|     Math|      0.45|     1.79|              2.24|
|   Mike|Chemistry|      0.39|     1.21|               1.6|
|   Mike|  Physics|      0.34|     2.72|              3.06|
|   Mike|  Biology|      0.57|     2.35|              2.92|
|    Bob|     Math|      0.36|     2.37|              2.73|
|    Bob|Chemistry|      0.86|     1.26|              2.12|
|    Bob|  Physics|      0.93|     2.89|3.8200000000000003|
|    Bob|  Biology|      0.52|     2.87|              3.39|
|   Lisa|     Math|      0.33|     2.86|              3.19|
|   Lisa|Chemistry|      0.64|     1.05|

In [37]:
from pyspark.sql.functions import col, avg
# agg: This function applies one or more aggregate functions to each group created by groupBy. 
# Aggregates include functions like sum(), count(), avg(), etc.
# alias("avg_total_score"): This renames the result of the average calculation to avg_total_score. Without this, the result would have the default name avg(totalscore), which can be less readable. 
# Using alias makes the column name more meaningful.
avg_total_scores = student_scores_data.groupby('Student').agg(avg('totalscore').alias("avg_total_score")) 
avg_total_scores.show()

+-------+------------------+
|Student|   avg_total_score|
+-------+------------------+
|   Mike|             2.455|
|   Lisa|2.3899999999999997|
|   John|            2.8525|
|   Katy|            2.5975|
|    Bob|             3.015|
+-------+------------------+



In [43]:
# Find the student with highest score for each subject

high_score = student_scores_data.groupby('Subject').max('totalscore')
high_score.show()

+---------+------------------+
|  Subject|   max(totalscore)|
+---------+------------------+
|  Biology|              3.54|
|     Math|3.3200000000000003|
|Chemistry|3.1999999999999997|
|  Physics|3.8200000000000003|
+---------+------------------+



In [46]:
# Step 1: Compute the max total score for each subject
high_score = student_scores_data.groupby('Subject').agg({"totalscore": "max"}).withColumnRenamed("max(totalscore)", "max_totalscore")
high_score.show()

+---------+------------------+
|  Subject|    max_totalscore|
+---------+------------------+
|  Biology|              3.54|
|     Math|3.3200000000000003|
|Chemistry|3.1999999999999997|
|  Physics|3.8200000000000003|
+---------+------------------+



In [48]:
# Step 2: Join with the original DataFrame to get the student name
high_score_with_student = student_scores_data.alias("original").join(
    high_score.alias("max_scores"),
    (col("original.Subject") == col("max_scores.Subject")) & (col("original.totalscore") == col("max_scores.max_totalscore")),
    how='inner'
)

# Show the result
high_score_with_student.select("original.Student", "original.Subject", "original.totalscore").show()

+-------+---------+------------------+
|Student|  Subject|        totalscore|
+-------+---------+------------------+
|   Katy|     Math|3.3200000000000003|
|   Katy|  Biology|              3.54|
|    Bob|  Physics|3.8200000000000003|
|   John|Chemistry|3.1999999999999997|
+-------+---------+------------------+

