<a href="https://colab.research.google.com/github/shahrukhvarzgani/Pyspark_Project/blob/main/Pyspark_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Creating Spark environement**

In [None]:
import os

os.environ["PYSPARK_HADOOP_VERSION"] = "3"
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark==3.5.1
!pip install findspark==2.0.1
!pip install pandas==2.2.2
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]

Collecting pyspark==3.5.1
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=84b88062461d35feb84c7748737ffa58e53468c75a0ff37201cca0aea70a8d20
  Stored in directory: /root/.cache/pip/wheels/95/13/41/f7f135ee114175605fb4f0a89e7389f3742aa6c1e1a5bcb657
Successfully built pyspark
Installing collected packages: pyspark
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.5.5
    Uninstalling pyspark-3.5.5:
      Successfully uninstalled pyspark-3.5.5
Successfully installed pyspark-3.5.1
Collecting findspark==2.0.1
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading 

In [None]:
#Setting the following environment variables

#Create dummy_hdfs folder
os.makedirs("dummy_hdfs",exist_ok=True)
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
#Check if JAVA_HOME is set to Java 1.17
print(os.environ["JAVA_HOME"])

#Check if HADOOP_HOME is set, needed for windows only
#print(os.environ["HADOOP_HOME"])
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
#Check if JAVA_HOME & HADOOP_HOME (windows only) are in the PATH
print(os.environ["PATH"])

/usr/lib/jvm/java-8-openjdk-amd64
/usr/lib/jvm/java-8-openjdk-amd64/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin:/opt/bin:/usr/local/nvidia/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/tools/node/bin:/tools/google-cloud-sdk/bin


In [None]:
#Create a Spark Session
from pyspark.sql import SparkSession
import findspark

findspark.init()

spark = SparkSession\
            .builder\
            .appName("SparkWriterJob")\
            .config("spark.sql.shuffle.partitions", 2)\
            .config("spark.default.parallelism", 2)\
            .master("local[2]")\
            .getOrCreate()
print(spark.version)

3.5.1


**Mounting the drive**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


**Uploading the file**

In [None]:
# Loading the file
raw_student_data= spark\
                .read\
                .option("header", "true")\
                .option("inferSchema", "true")\
                .csv("/content/drive/MyDrive/Pyspark/datasets/student_scores.csv")

raw_student_data.printSchema()
raw_student_data.show(5)

root
 |-- Student: string (nullable = true)
 |-- Subject: string (nullable = true)
 |-- ClassScore: double (nullable = true)
 |-- TestScore: double (nullable = true)

+-------+---------+----------+---------+
|Student|  Subject|ClassScore|TestScore|
+-------+---------+----------+---------+
|   Katy|     Math|      0.95|     2.37|
|   Katy|Chemistry|       0.5|     1.18|
|   Katy|  Physics|      0.48|     1.37|
|   Katy|  Biology|      0.75|     2.79|
|   Mike|     Math|      0.45|     1.79|
+-------+---------+----------+---------+
only showing top 5 rows



**Creating the HDFS file**

In [None]:
# Creating HDFS file
raw_student_data.write\
                  .option("compression","gzip")\
                  .partitionBy("Subject")\
                  .parquet("dummy_hdfs/student_scores.parquet",
                    mode="Overwrite");

**Bucketing the Student data by subject**

In [None]:
raw_student_data.write\
                  .format("parquet")\
                .bucketBy(4, "Subject")\
                .saveAsTable("Student_Table")


Caculating the total score

In [None]:
from pyspark.sql.functions import col
# adding the total score which comprise of ClassScore and TestScore
total_score = raw_student_data.withColumn("TotalScore", col("ClassScore") + col("TestScore"))
# Total score of physics
Physics_score = raw_student_data.where(col("Subject") == "Physics")
Physics_score.show()

# Show the execution plan
print("\n---------Explain----------")
Physics_score.explain()
print("--------End Explain--------\n")


+-------+-------+----------+---------+------------------+
|Student|Subject|ClassScore|TestScore|        TotalScore|
+-------+-------+----------+---------+------------------+
|   Katy|Physics|      0.48|     1.37|              1.85|
|   Mike|Physics|      0.34|     2.72|              3.06|
|    Bob|Physics|      0.93|     2.89|3.8200000000000003|
|   Lisa|Physics|      0.42|     2.34|              2.76|
|   John|Physics|      0.82|      2.8|3.6199999999999997|
+-------+-------+----------+---------+------------------+


---------Explain----------
== Physical Plan ==
*(1) Project [Student#111, Subject#112, ClassScore#113, TestScore#114, (ClassScore#113 + TestScore#114) AS TotalScore#730]
+- *(1) Filter (isnotnull(Subject#112) AND (Subject#112 = Physics))
   +- FileScan csv [Student#111,Subject#112,ClassScore#113,TestScore#114] Batched: false, DataFilters: [isnotnull(Subject#112), (Subject#112 = Physics)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/drive/MyDrive/Pyspar

**Finding average of total score across all the subjects of each student**

In [None]:
# cache the total score data frame
total_score.persist()
average_score = total_score\
                          .groupBy("Student")\
                          .agg(avg("TotalScore").alias("AverageScore"))


print("\n---------Explain----------")
average_score.explain()
print("--------End Explain--------\n")
average_score.show(5)


---------Explain----------
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[Student#111], functions=[avg(TotalScore#761)])
   +- Exchange hashpartitioning(Student#111, 2), ENSURE_REQUIREMENTS, [plan_id=907]
      +- HashAggregate(keys=[Student#111], functions=[partial_avg(TotalScore#761)])
         +- InMemoryTableScan [Student#111, TotalScore#761]
               +- InMemoryRelation [Student#111, Subject#112, ClassScore#113, TestScore#114, TotalScore#761], StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- *(1) Project [Student#111, Subject#112, ClassScore#113, TestScore#114, (ClassScore#113 + TestScore#114) AS TotalScore#761]
                        +- FileScan csv [Student#111,Subject#112,ClassScore#113,TestScore#114] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/drive/MyDrive/Pyspark/datasets/student_scores.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Stud

Find top Student by Subject

In [None]:
#Top score table
top_score = total_score\
                .groupBy("Subject")\
                .max("TotalScore")\
                .withColumnRenamed("max(TotalScore)", "TotalScore")

top_score.show()

# Finding the student with top score
top_score= total_score.alias("a")\
                .join(top_score_df.alias("b"),
                      (col("a.Subject") == col("b.Subject")) &
                      (col("a.TotalScore") == col("b.TotalScore")))\
                .select(col("a.Student"),
                        col("a.Subject"),
                        col("a.TotalScore"))

top_score.show()

+---------+------------------+
|  Subject|        TotalScore|
+---------+------------------+
|  Biology|              3.54|
|     Math|3.3200000000000003|
|Chemistry|3.1999999999999997|
|  Physics|3.8200000000000003|
+---------+------------------+

+-------+---------+------------------+
|Student|  Subject|        TotalScore|
+-------+---------+------------------+
|   Katy|     Math|3.3200000000000003|
|   Katy|  Biology|              3.54|
|    Bob|  Physics|3.8200000000000003|
|   John|Chemistry|3.1999999999999997|
+-------+---------+------------------+

