**Setting up necessary libraries**

In [1]:
pip install pyspark





[notice] A new release of pip is available: 23.1.2 -> 23.3.1
[notice] To update, run: F:\Data Engineering\Pyspark\my_venv\Scripts\python.exe -m pip install --upgrade pip


In [2]:
pip install findspark

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.1.2 -> 23.3.1
[notice] To update, run: F:\Data Engineering\Pyspark\my_venv\Scripts\python.exe -m pip install --upgrade pip


In [3]:
import findspark
findspark.init()

In [4]:
import pyspark
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName('Spark-Assesment').master('local[2]').getOrCreate()

**Reading the File** 

In [159]:
df = spark.read.option('header', True).option('inferSchema', True).csv("database.csv")                              # Reading database.csv file in to a dataframe

In [160]:
dfTable = df.createOrReplaceTempView('neic_earthquakes')                   #Creating and storing the csv data into the table 'neic_earthquakes'

In [161]:
spark.sql("SELECT * FROM neic_earthquakes limit 2").show()

+----------+-------------------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+------------+------+---------------+----------------+---------+
|      Date|               Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|          ID|Source|Location Source|Magnitude Source|   Status|
+----------+-------------------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+------------+------+---------------+----------------+---------+
|01-02-1965|2023-11-22 13:44:18|  19.246|  145.616|Earthquake|131.6|       NULL|           

In [150]:
df2 = spark.sql("SELECT * from neic_earthquakes")                 #Reading the data from a table into a Pyspark dataframe

In [151]:
df2.select('Type').distinct().show()

+-----------------+
|             Type|
+-----------------+
|        Explosion|
|       Rock Burst|
|Nuclear Explosion|
|       Earthquake|
+-----------------+



In [152]:
from pyspark.sql.functions import dayofweek
df3 = df.withColumn('day_of_week', dayofweek('Time'))

In [153]:
df3.select('day_of_week').distinct().show()              #Day of Week 1 == Monday ; 2 == tuesday...

+-----------+
|day_of_week|
+-----------+
|          1|
|          4|
+-----------+



In [189]:
tempdf = df3.filter(df3.Type == 'Earthquake')

In [190]:
tempdf2 = tempdf.select('Date', 'day_of_week', 'Type')                    #Selecting Required columns

In [191]:
from pyspark.sql.functions import count                                                      # Day of a Week affecting the number of earthquakes
tempdf2.groupBy('day_of_week').agg(count("*").alias('count_of_earthquakes')).show()            

+-----------+--------------------+
|day_of_week|count_of_earthquakes|
+-----------+--------------------+
|          1|                   3|
|          4|               23229|
+-----------+--------------------+



In [208]:
from pyspark.sql.functions import *                                  #Extracting Year column and day of month into a pyspark dateframe
tempDF = df.withColumn("Time", col("Time").cast("timestamp"))
tempDF = tempDF.withColumn("Year", year("Time")).withColumn("Day_of_month", day("Time"))

In [194]:
#Question:-
## What is the relation between Day of the month and Number of earthquakes that happened in a year?

day_of_month_result = tempDF.groupBy('Year', 'Day_of_month').agg({'Type':"count"})

In [210]:
#Question : What is the relation between Year and Number of earthquakes that happened in that year?
yearly_results = tempDF.groupBy("Year").agg({"Type": "count"})

In [212]:
#Question: How has the earthquake magnitude on average been varied over the years?
magnitude_over_years = tempDF.groupBy("Year").agg({"Magnitude": "avg"})

In [214]:
# Question : How does year impact the standard deviation of the earthquakes?
stddev_results = tempDF.groupBy('Year').agg(stddev("Magnitude").alias("Magnitude_StdDev"))

In [219]:
# Question: Does geographic location have anything to do with earthquakes?
location_results = tempDF.groupBy("Location Source").agg({'Type':'count'})

In [221]:
#Question: What is the relation between Magnitude, Magnitude Type , Status and Root Mean Square of the earthquakes?
relation_status_results = tempDF.groupBy("Magnitude Type", "Status").agg({"Magnitude":"avg", "Root Mean Square":"avg"})

In [213]:
magnitude_over_years.show()

+----+-----------------+
|Year|   avg(Magnitude)|
+----+-----------------+
|1975|              5.6|
|2023|5.882558417702829|
|1985|              5.6|
|2011|              5.8|
+----+-----------------+



In [216]:
stddev_results.show()

+----+------------------+
|Year|  Magnitude_StdDev|
+----+------------------+
|1975|              NULL|
|2023|0.4230843439717061|
|1985|              NULL|
|2011|              NULL|
+----+------------------+



In [220]:
location_results.show()

+---------------+-----------+
|Location Source|count(Type)|
+---------------+-----------+
|             CI|         61|
|            CAR|          1|
|            MDD|          2|
|            UCR|          1|
|           GCMT|         56|
|         ISCGEM|       2581|
|            OTT|          1|
|           RSPR|          3|
|           CASC|          4|
|             UW|          6|
|            ROM|          7|
|            TEH|          7|
|              B|          2|
|            SJA|          1|
|            BOU|          1|
|           AEIC|         40|
|            ATH|         14|
|            JMA|          3|
|              U|          1|
|            TUL|          2|
+---------------+-----------+
only showing top 20 rows



In [222]:
relation_status_results.show()

+--------------+---------+------------------+---------------------+
|Magnitude Type|   Status|    avg(Magnitude)|avg(Root Mean Square)|
+--------------+---------+------------------+---------------------+
|            ML| Reviewed| 5.814675324675323|  0.46337894736842095|
|            MH| Reviewed| 6.540000000000001|  0.11379999999999998|
|            MW| Reviewed| 5.896276988912613|   1.0882880094043805|
|            MW|Automatic| 6.008523827973685|                 NULL|
|           MWC| Reviewed|5.8579087994299375|   1.0121028716830256|
|          NULL|Automatic| 5.706666666666666|                 NULL|
|            MD| Reviewed| 5.966666666666668|  0.19833333333333333|
|           MWC|Automatic| 5.885454545454545|                 NULL|
|            MB| Reviewed|  5.68295666046264|    0.989051266941661|
|           MWR| Reviewed|  5.63076923076923|   0.9381818181818183|
|           MWW| Reviewed| 6.008673726676766|    0.962074795081968|
|            MS| Reviewed| 5.994359576968296|   

In [223]:
yearly_results.show()

+----+-----------+
|Year|count(Type)|
+----+-----------+
|1975|          1|
|2023|      23409|
|1985|          1|
|2011|          1|
+----+-----------+



In [224]:
day_of_month_result.show()

+----+------------+-----------+
|Year|Day_of_month|count(Type)|
+----+------------+-----------+
|1985|          28|          1|
|1975|          23|          1|
|2011|          13|          1|
|2023|          22|      23409|
+----+------------+-----------+

