In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

#Configuration
spark = SparkSession\
    .builder\
    .master('local[2]')\
    .appName('quake_etl')\
    .config('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.12:2.4.1')\
    .getOrCreate() 

In [3]:
#load data
edu = spark.read.csv(r"C:\Users\israt\OneDrive\Desktop\DM_Project\project\education.csv", header=True)

In [5]:
#delete column
delete_columns = ['ISO','Note']
education = edu.drop(*delete_columns)
education.show(5)

+----------+-----------+----------+
|      Date|    Country|    Status|
+----------+-----------+----------+
|17/02/2020|      Aruba|Fully open|
|17/02/2020|Afghanistan|Fully open|
|17/02/2020|     Angola|Fully open|
|17/02/2020|   Anguilla|Fully open|
|17/02/2020|    Albania|Fully open|
+----------+-----------+----------+
only showing top 5 rows



In [6]:
#displaying variables types
education.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Status: string (nullable = true)



In [7]:
#insert column: year
education.printSchema()
education = education.withColumn('Year', year(to_timestamp('Date','dd/MM/yyyy')))
education.show(5)

root
 |-- Date: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Status: string (nullable = true)

+----------+-----------+----------+----+
|      Date|    Country|    Status|Year|
+----------+-----------+----------+----+
|17/02/2020|      Aruba|Fully open|2020|
|17/02/2020|Afghanistan|Fully open|2020|
|17/02/2020|     Angola|Fully open|2020|
|17/02/2020|   Anguilla|Fully open|2020|
|17/02/2020|    Albania|Fully open|2020|
+----------+-----------+----------+----+
only showing top 5 rows



In [8]:
#convert string to double
education = education.withColumn('Date',education['Date'].cast(DoubleType()))
education.printSchema()

root
 |-- Date: double (nullable = true)
 |-- Country: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Year: integer (nullable = true)



In [9]:
#count the numbers of each status
status_count = education.groupBy('Status').count().withColumnRenamed('count','total Status')

status_count.show(100)

+--------------------+------------+
|              Status|total Status|
+--------------------+------------+
|      Partially open|        4858|
|      Academic break|        5571|
|          Fully open|       12897|
|Closed due to COV...|        2294|
+--------------------+------------+



In [10]:
#filtering the status with Fully open status
status_Fully_open=education.filter(education.Status == "Fully open")

status_Fully_open.show(10)

+----+--------------------+----------+----+
|Date|             Country|    Status|Year|
+----+--------------------+----------+----+
|null|               Aruba|Fully open|2020|
|null|         Afghanistan|Fully open|2020|
|null|              Angola|Fully open|2020|
|null|            Anguilla|Fully open|2020|
|null|             Albania|Fully open|2020|
|null|             Andorra|Fully open|2020|
|null|United Arab Emirates|Fully open|2020|
|null|           Argentina|Fully open|2020|
|null|             Armenia|Fully open|2020|
|null| Antigua and Barbuda|Fully open|2020|
+----+--------------------+----------+----+
only showing top 10 rows



In [11]:
#filtering the status
status_Academic_break=education.filter(education.Status == "Academic break")
status_Partially_open=education.filter(education.Status == "Partially open")
status_Closed_due_to_COVID =education.filter(education.Status == "Closed due to COVID-19")


status_Academic_break.show(10)
status_Partially_open.show(10)
status_Closed_due_to_COVID.show(10)

+----+-----------------+--------------+----+
|Date|          Country|        Status|Year|
+----+-----------------+--------------+----+
|null|          Armenia|Academic break|2020|
|null|Brunei Darussalam|Academic break|2020|
|null|           Canada|Academic break|2020|
|null|         Ethiopia|Academic break|2020|
|null|          Georgia|Academic break|2020|
|null|         Suriname|Academic break|2020|
|null|          Tunisia|Academic break|2020|
|null|         Anguilla|Academic break|2020|
|null|          Albania|Academic break|2020|
|null|            Benin|Academic break|2020|
+----+-----------------+--------------+----+
only showing top 10 rows

+----+--------------------+--------------+----+
|Date|             Country|        Status|Year|
+----+--------------------+--------------+----+
|null|               China|Partially open|2020|
|null|              Monaco|Partially open|2020|
|null|              Brazil|Partially open|2020|
|null|              Bhutan|Partially open|2020|
|null|  

In [12]:
#count total year of each year
year_count = education.groupBy('Year').count().withColumnRenamed('count','total year')

year_count.show(100)

+----+----------+
|Year|total year|
+----+----------+
|2022|     12390|
|2020|      2310|
|2021|     10920|
+----+----------+



In [13]:
#filtering the year
year_2022 =education.filter(education.Year == 2022)
year_2021 =education.filter(education.Year == 2021)
year_2020 =education.filter(education.Year == 2020)

year_2022.show(10)
year_2021.show(10)
year_2020.show(10)

+----+--------------------+--------------+----+
|Date|             Country|        Status|Year|
+----+--------------------+--------------+----+
|null|               Aruba|Academic break|2022|
|null|         Afghanistan|Academic break|2022|
|null|              Angola|Academic break|2022|
|null|            Anguilla|Academic break|2022|
|null|             Albania|Academic break|2022|
|null|             Andorra|Academic break|2022|
|null|United Arab Emirates|Academic break|2022|
|null|           Argentina|Academic break|2022|
|null|             Armenia|Academic break|2022|
|null| Antigua and Barbuda|Academic break|2022|
+----+--------------------+--------------+----+
only showing top 10 rows

+----+--------------------+--------------+----+
|Date|             Country|        Status|Year|
+----+--------------------+--------------+----+
|null|               Aruba|Academic break|2021|
|null|         Afghanistan|Academic break|2021|
|null|              Angola|Partially open|2021|
|null|        

In [14]:
# connect with mongodb by creating database

edu.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri','mongodb://127.0.0.1/education_in_covid.edu').save()

education.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri','mongodb://127.0.0.1/education_in_covid.education').save()

status_count.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri','mongodb://127.0.0.1/education_in_covid.status_count').save()

status_Fully_open.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri','mongodb://127.0.0.1/education_in_covid.status_Fully_open').save()

status_Academic_break.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri','mongodb://127.0.0.1/education_in_covid.status_Academic_break').save()

status_Partially_open.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri','mongodb://127.0.0.1/education_in_covid.status_Partially_open').save()

status_Closed_due_to_COVID.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri','mongodb://127.0.0.1/education_in_covid.status_Closed_due_to_COVID').save()

year_2022.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri','mongodb://127.0.0.1/education_in_covid.year_2022').save()

year_2021.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri','mongodb://127.0.0.1/education_in_covid.year_2021').save()

year_2020.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri','mongodb://127.0.0.1/education_in_covid.year_2020').save()


