# Student Scores Project | Big Data Analytics w/ Hadoop and Apache Spark

<img src = "https://hadoopinrealworld.com/wp-content/uploads/2017/09/Spark-vs-Hadoop-Comparison-Chart.png" width = "500">

This project shows the combined capabilities of Hadoop and Apache Spark for a project showing some statistics on a student score data set. The practice of combining the strong sides of these two projects (i.e., Hadoop HDFS + Apache Spark) is well accepted by the data teams in those days.

### What is Hadoop?

The Apache Hadoop Project is an open source project and consists of four main modules:

*  HDFS – Hadoop Distributed File System.
*  MapReduce. The processing component of the Hadoop ecosystem. It scales horizontally and is slow as it uses data storage. In the last decade, professionals use Apache Spark or Flink instead of MapReduce.
*  YARN. Yet Another Resource Negotiator. It is used for computing resources and job scheduling.
*  Hadoop Common. Also called as Hadoop core providing support for all other Hadoop components.

Among these modules, this project focuses on Hadoop HDFS. It is the file system managing the storage of large data sets. It can handle both structured and unstructured data. Hadoop stores the data to disks using HDFS.

### What is Apache Spark?

Apache Spark is an open source project. It uses RAM for caching and processing data ans is designed for fast performance. Resilient Distributed Dataset (RDD) is the data structure of Spark.  It consists of five main components:

*  Apache Spark Core. Spark Core is the basis and includes functions of *scheduling, task dispatching, input and output operations, etc.* 
*  Spark Streaming. It is used for processing of live data streams with data sources of Kafka, Kinesis, Flume, etc.
*  Spark SQL uses this component to gather information about the structured data and how the data is processed.
*  Machine Learning Library (MLlib) includes machine learning algorithms.
*  GraphX is for facilitating graph analytics tasks.

In this notebook, we use Apache Spark Core functions as it uses memory to speed up the computations.  

## Dataset ▶

Dataset consists of student scores of different subjects in CSV file format. There are 40 rows of data and one header. It is relatively small for the sake of practicing functions. In big data world, data size can go up to petabytes. 4 columns are given below:

Student | Subject |	Class Score |	Test Score


*Please note that: the project herein is inspired by the lecture notes of "Big Data Analytics with Hadoop and Apache Spark" by Kumaran Ponnambalam.*

## Target 🎯

This notebook will walk you through the (1) data loading into HDFS format and (2) data processing with Spark. Here's the outline:

* Import functions
* Data load 
    * Parquet File + Gzip codec  
    We prefer Parquet in HDFS as it reads col by col, provides schema, is compressible and splittable. It is ideal for analytics  
    We prefer gzip codec as it provides good compression but is not splittable and provides moderate performance. It is good for analytical purposes.  
    * Schema optimization
* Data processing
    * Computing total score
    * Printing total score for physics
    * Computing avg total score
    * Finding student with highest score

# 1 Data Loading 📊

Reading CSV file, partioning in HDFS format and reading parquet file 🤯

In [0]:
%scala
//1 - Reading csv file
val filepath = "dbfs:/FileStore/FileStore/st_scores.csv"

//read the raw CSV file - a spark DataFrame
val raw_stdata = spark.read.format("csv")
                      .option("inferSchema", "true")
                      .option("header", "true")
                      .load(filepath)
                      .withColumnRenamed("Class Score","ClassScore")
                      .withColumnRenamed("Test Score","TestScore");

//checking schema - ensuring whether everything is gone good or not
raw_stdata.printSchema()

//checking data
raw_stdata.show(5)

In [0]:
%scala
// 2 - Creating partitioned HDFS store
// storing CSV file as Parquet (read col by col, schema, compressible and splittable +> ideal for analytics) in HDFS to improve the performance
// w/  gzip compression: good compression + not splittable + moderate performance => good for analytical purposes
// then partitioning by Subject - provides limitied list of partitions
val fileout = "dbfs:/FileStore/FileStore/partitioned_st"

raw_stdata.write
          .format("parquet")
          .mode("overwrite")
          .option("compression", "gzip")
          .partitionBy("Subject")
          .save(fileout)

In [0]:
%scala
// 3 - reading partitioned data into Data Frame

val st_data = spark.read
                   .parquet(fileout)

println("# of partitions in dataset : " + st_data.rdd.getNumPartitions)

The HDFS file is created correctly on the gicen path above. This data is then read into a partitioned data frame. The number of partitions in the data frame is printed as 4. It iis the default parallelism for this installation on DataBricks. It can be customized. 

In the following section, let's perform the data analytics.

# 2 - Data Processing 🔧

Computing total scores, average score, top score, etc...

## 2.1- Let's find out total score: class score + test score

In [0]:
%scala
// 1- Let's find out total score: class score + test score
// Transform and Map operations to add columns

val tot_score = st_data.withColumn("TotScore",
                        st_data.col("ClassScore")
                        + st_data.col("TestScore"))
                        
tot_score.show(5)

println("--------Explain--------")
tot_score.explain
println("--------End of Explain--------")
println("# of partitions in dataset : " + tot_score.rdd.getNumPartitions)

**Observation**  
Let's take a look at the execution plan to understand how the spark function was executed. Alternatively, click on view jobs and then DAG visualization to understand the map operation. We can see that this is a simple map operation. In this case, we have   
<1- FileScanRDD => 2- MapPartitionsRDD > as a part of scan parquet  
<3- MapPartitionsRDD> as a part of whole stage codegen 

## 2.2- Let's print out total score for pyhsics

In [0]:
%scala
// 2- Let's print out total score for pyhsics
// Filter use case which should trigger a filter pushdown for the subject

val physics_score = tot_score.filter($"Subject" === "Physics")
                        
physics_score.show()

println("--------Explain--------")
physics_score.explain
println("--------End of Explain--------")
println("# of partitions in dataset : " + physics_score.rdd.getNumPartitions)

**Observations**  
A simple filter is executed on the subject column and it gives the scores for Physics for all. On the execution plan, we can see that this filter was pushed down to HDFS and a single partition was read.  
It reads the data source once again to calculate the scores without **caching**.   
In the following part, we will employ **cache** for data analytics.

## 2.3 Computing average total score with caching

In this part, we will also use aggregation with Spark

In [0]:
%scala
// 3- caching and using aggregation functions : groupby and avg
tot_score.cache()

// group by keys
val avgscore = tot_score.groupBy("Student")
                  .avg("TotScore")

avgscore.show()

println("--------Explain--------")
avgscore.explain
println("--------End of Explain--------")
println("# of partitions in dataset : " + avgscore.rdd.getNumPartitions)

**Observations**  
In the execution plan, we can see that in-memory table scan was used so it signifies that tot_scores were not re-computed and the cache worked properly. 

In the spark jobs, there is shuffling read and write happening since we trigerred an action. Due to the caching some stages has been skipped. This is the desired outcome.

## 2.4 Computing top student analytics

There are multiple ways to compute top student analytics.

Our roadmap is given below:

1 - we will find the top student by each subject. This brings us the question on whether repartioning should be done or not. Our answer is No: we do not need to check repartioning in this case. Since no dfs from an action were generated and it does not require any further transformations. 

2 - we need to determine students with the top score by using groupBy and saving top scores.


3 - we will join the top score data frame with the total score data frame based on both the subject and the total score value. This will extract the list of student results that had this top score. 

4 - finally, we will print the results.

In [0]:
%scala
// adding library so we can use sql functions
import org.apache.spark.sql.functions._ 

// finding top score
val top = tot_score.groupBy("Subject")
                   .agg(max("TotScore").alias("Tops"))

top.show()

// sorting out the students with top scores
val top_student = tot_score.as("t1")
                    .join(top.as("t2"),
                          $"t2.Tops" === $"t1.TotScore"
                          && $"t2.Subject" === $"t1.Subject")
                    .select("t1.Subject","t1.Student","Tops")

top_student.show()

println("--------Explain--------")
top_student.explain
println("--------End of Explain--------")
println("# of partitions in dataset : " + top_student.rdd.getNumPartitions)

**Observations**  
Students with top scores are printed correctly and there are two students with equal score on math.   
In the execution plan, the cache is employed.  
Spark Optimizer resulted in a **broadcast hash join** for broadcasting top score df. 
It also trigerred re-execution of aggregation. This part can be imporoved by adding another caching.

# 3- Conclusions


In this notebook, we started with a CSV file including student scores by subject for a school year. It had four attributes: student name, subject, class score, and test score.

First, we loaded data into HDFS in Parquet format with GZIP compression (a partitioning scheme) and then read the data. C
Then, we computed the total score for each student per subject, printed physics scores, computed average total score across all subjects, and found the highest scoring student for each subject. 

Additionally, further analytics can be done on data by including sql function libraries, applying ML models and graphing.

For the mechanics of SQL aggregation with Spark, one needs to include library below: 

*import org.apache.spark.sql.functions._*
* GroupedData
    * `.mean()`
    * `.sum()`
    * `.count()`
    * Other aggregations
    * `.agg(exprs)`
        * exprs as dict
        * exprs as list
        * advanced expressions

End of Notebook
# --- END ---