In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *


In [4]:
# Configur spark session
spark = SparkSession\
    .builder\
    .master('local[4]')\
    .appName('quake_etl')\
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:2.4.1')\
    .getOrCreate()

In [6]:
# Loading dataset
df_load = spark.read.csv(r"C:\1_My_things\1. Sem 7\BDAD\database.csv", header = True)
# Preview dr_load
df_load.take(1)

[Row(Date='01/02/1965', Time='13:44:18', Latitude='19.246', Longitude='145.616', Type='Earthquake', Depth='131.6', Depth Error=None, Depth Seismic Stations=None, Magnitude='6', Magnitude Type='MW', Magnitude Error=None, Magnitude Seismic Stations=None, Azimuthal Gap=None, Horizontal Distance=None, Horizontal Error=None, Root Mean Square=None, ID='ISCGEM860706', Source='ISCGEM', Location Source='ISCGEM', Magnitude Source='ISCGEM', Status='Automatic')]

In [8]:
lst_dropped_columns = [
    'Depth Error',
    'Time',
    'Depth Seismic Stations',
    'Magnitude Error',
    'Magnitude Seismic Stations',
    'Azimuthal Gap',
    'Horizontal Distance',
    'Horizontal Error',
    'Root Mean Square',
    'Source',
    'Location Source',
    'Magnitude Source',
    'Status'
]

# Drop all the columns listed above and assign new dataframe to df_drop
df_load = df_load.drop(*lst_dropped_columns)

# Preview df_drop
df_load.show(10)

+----------+--------+---------+----------+-----+---------+--------------+---------------+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|             ID|
+----------+--------+---------+----------+-----+---------+--------------+---------------+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|   ISCGEM860706|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|   ISCGEM860737|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|   ISCGEM860762|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|   ISCGEM860856|
|01/09/1965|  11.938|  126.427|Earthquake|   15|      5.8|            MW|   ISCGEM860890|
|01/10/1965| -13.405|  166.629|Earthquake|   35|      6.7|            MW|   ISCGEM860922|
|01/12/1965|  27.357|   87.867|Earthquake|   20|      5.9|            MW|   ISCGEM861007|
|01/15/1965| -13.309|  166.212|Earthquake|   35|        6|            MW|   ISCGEM861111|
|01/16/196

In [10]:
# Creating year field and add it to the dataframe
df_load = df_load.withColumn('Year', year(to_timestamp('Date', 'dd/MM/yyyy')))
# preview df_load
df_load.show(10)

+----------+--------+---------+----------+-----+---------+--------------+---------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|             ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+---------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|   ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|   ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|   ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|   ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake|   15|      5.8|            MW|   ISCGEM860890|1965|
|01/10/1965| -13.405|  166.629|Earthquake|   35|      6.7|            MW|   ISCGEM860922|1965|
|01/12/1965|  27.357|   87.867|Earthquake|   20|      5.9|            MW|   ISCGEM861007|1965|
|01/15/1965| -13.309|  166.212|Earthquake|   35|  

In [12]:
# Build the quakes frequiency dataframe using year filed and count for each year
df_quake_freq = df_load.groupBy('Year').count().withColumnRenamed('count', 'Counts')
# Preview df_quake_freq
df_quake_freq.show(10)

+----+------+
|Year|Counts|
+----+------+
|1990|   196|
|1975|   150|
|1977|   148|
|2003|   187|
|2007|   211|
|1974|   147|
|2015|   175|
|2006|   176|
|1978|   158|
|2013|   202|
+----+------+
only showing top 10 rows

