In [113]:
# importing necessary libraries

import pandas as pd
from sqlalchemy import create_engine
import json
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from matplotlib import pyplot as plt

In [2]:
# read the configuration
with open('configuration.json') as f:
    conf = json.load(f)

In [3]:
# reading data from the csv file
data = pd.read_csv("database.csv")
data.head()

Unnamed: 0,Date,Time,Latitude,Longitude,Type,Depth,Depth Error,Depth Seismic Stations,Magnitude,Magnitude Type,...,Magnitude Seismic Stations,Azimuthal Gap,Horizontal Distance,Horizontal Error,Root Mean Square,ID,Source,Location Source,Magnitude Source,Status
0,01/02/1965,13:44:18,19.246,145.616,Earthquake,131.6,,,6.0,MW,...,,,,,,ISCGEM860706,ISCGEM,ISCGEM,ISCGEM,Automatic
1,01/04/1965,11:29:49,1.863,127.352,Earthquake,80.0,,,5.8,MW,...,,,,,,ISCGEM860737,ISCGEM,ISCGEM,ISCGEM,Automatic
2,01/05/1965,18:05:58,-20.579,-173.972,Earthquake,20.0,,,6.2,MW,...,,,,,,ISCGEM860762,ISCGEM,ISCGEM,ISCGEM,Automatic
3,01/08/1965,18:49:43,-59.076,-23.557,Earthquake,15.0,,,5.8,MW,...,,,,,,ISCGEM860856,ISCGEM,ISCGEM,ISCGEM,Automatic
4,01/09/1965,13:32:50,11.938,126.427,Earthquake,15.0,,,5.8,MW,...,,,,,,ISCGEM860890,ISCGEM,ISCGEM,ISCGEM,Automatic


In [4]:
# Replace NaN with None, which represents NULL in Python
# data.replace({pd.NA: None}, inplace=True)

In [5]:
# Your MySQL connection parameters
mysql_user = conf['user']
mysql_password = conf['password']
mysql_host = conf['host']
mysql_db = 'aidetic'

# Create a SQLAlchemy engine
engine = create_engine(f'mysql+mysqlconnector://{mysql_user}:{mysql_password}@{mysql_host}/{mysql_db}')

# Your table name in MySQL
table_name = 'neic_earthquakes'

# Use pandas to_sql() function to insert data into MySQL table
data.to_sql(name=table_name, con=engine, if_exists='replace', index=False)

23412

In [6]:
spark = SparkSession.builder.\
        master('local[*]').appName('aidetic_assignment').getOrCreate()

In [7]:
# MySQL connection properties
mysql_properties = {
    "url": f"jdbc:mysql://localhost:3306",
    "driver": "com.mysql.jdbc.Driver",
    "user": mysql_user,
    "password": conf['pass_mysql']
}

In [8]:
# Read data from MySQL using SQLAlchemy
query = f"SELECT * FROM {table_name}"
df = spark.read.format("jdbc").option("url", mysql_properties["url"]) \
    .option("driver", mysql_properties["driver"]) \
    .option("dbtable", "aidetic.neic_earthquakes") \
    .option("user", mysql_properties["user"]) \
    .option("password", mysql_properties["password"]) \
    .option("inferSchema", "True") \
    .load()

In [9]:
df.show(2)

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+------------+------+---------------+----------------+---------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|          ID|Source|Location Source|Magnitude Source|   Status|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+------------+------+---------------+----------------+---------+
|01/02/1965|13:44:18|  19.246|  145.616|Earthquake|131.6|       NULL|                  NULL|      6.0|            MW|       

In [10]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: double (nullable = true)
 |-- Depth Error: double (nullable = true)
 |-- Depth Seismic Stations: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- Magnitude Error: double (nullable = true)
 |-- Magnitude Seismic Stations: double (nullable = true)
 |-- Azimuthal Gap: double (nullable = true)
 |-- Horizontal Distance: double (nullable = true)
 |-- Horizontal Error: double (nullable = true)
 |-- Root Mean Square: double (nullable = true)
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Location Source: string (nullable = true)
 |-- Magnitude Source: string (nullable = true)
 |-- Status: string (nullable = true)



###  How does the Day of a Week affect the number of earthquakes?

In [11]:
# detecting the different date format in the date column
# we are assuming that maximum date are in MM/dd/yyyy format, we are trying to detect other formats.

df.withColumn('dateformat_detection', when(col('Date').like("__/__/____"), 'known').otherwise('Unknown')) \
.filter(col('dateformat_detection')=='Unknown').select(['Date']).show(truncate=False)

+------------------------+
|Date                    |
+------------------------+
|1975-02-23T02:58:41.000Z|
|1985-04-28T02:53:41.530Z|
|2011-03-13T02:23:34.520Z|
+------------------------+



In [12]:
# converting the date column into same format to maintain homegeneity

df = df.withColumn('Date', when(to_date(col('Date'),'MM/dd/yyyy').isNotNull(), to_date(col('Date'),'MM/dd/yyyy')) \
                   .when(date_format(col('Date'),'yyyy-MM-dd').isNotNull(),date_format(col('Date'),'yyyy-MM-dd')))

In [13]:
# converting the date column type from string to date

df = df.withColumn('Date', to_date(col('Date'), 'yyyy-MM-dd'))

In [14]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Time: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: double (nullable = true)
 |-- Depth Error: double (nullable = true)
 |-- Depth Seismic Stations: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- Magnitude Error: double (nullable = true)
 |-- Magnitude Seismic Stations: double (nullable = true)
 |-- Azimuthal Gap: double (nullable = true)
 |-- Horizontal Distance: double (nullable = true)
 |-- Horizontal Error: double (nullable = true)
 |-- Root Mean Square: double (nullable = true)
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Location Source: string (nullable = true)
 |-- Magnitude Source: string (nullable = true)
 |-- Status: string (nullable = true)



In [15]:
# Extracting day of the week from the date column
df = df.withColumn('dayofweek', date_format(df.Date, 'EEEE'))

In [16]:
# relation with day of the week and no. of earthquakes
from pyspark.sql.functions import count, desc
dow_effect = df.groupBy('dayofweek').agg(count(col('dayofweek')).alias('no_of_earthquakes')).sort(col('no_of_earthquakes').desc())

In [17]:
dow_effect.show()

+---------+-----------------+
|dayofweek|no_of_earthquakes|
+---------+-----------------+
| Saturday|             3433|
|Wednesday|             3431|
|   Friday|             3362|
|   Monday|             3349|
| Thursday|             3306|
|   Sunday|             3287|
|  Tuesday|             3244|
+---------+-----------------+



In [18]:
# percentage difference in highest and lowest no. of earthquakes
from pyspark.sql.functions import max, min
max_value = dow_effect.agg({'no_of_earthquakes':'max'}).collect()[0][0]
min_value = dow_effect.agg({'no_of_earthquakes':'min'}).collect()[0][0]
print("Highest to lowest percent difference:", (max_value-min_value)/max_value*100)

Highest to lowest percent difference: 5.505388872706088


#### Relation between day of the week and earthquakes are as follows:
- Highest no. of earthquakes are encountered on  Saturday
- Lowest no. of earthquakes are encountered on Tuesday
- There is 5% more of earthquakes on Saturday than tuesday

###  What is the relation between Day of the month and Number of earthquakes thathappened in a year?


In [19]:
# relation with day of the month and no. of earthquakes
df = df.withColumn('dayofmonth', dayofmonth(df.Date))
dom_effect = df.groupBy('dayofmonth').agg(count(col('dayofmonth')).alias('no_of_earthquakes')).sort(col('no_of_earthquakes').desc())

In [20]:
dom_effect.show(31)

+----------+-----------------+
|dayofmonth|no_of_earthquakes|
+----------+-----------------+
|        11|              905|
|        17|              848|
|        23|              838|
|        20|              825|
|        16|              807|
|        14|              805|
|        26|              801|
|        28|              800|
|        12|              793|
|         7|              783|
|         3|              782|
|        21|              776|
|        15|              775|
|         1|              773|
|        18|              773|
|         5|              769|
|        10|              766|
|        13|              764|
|        19|              763|
|        25|              763|
|        24|              760|
|         6|              746|
|         4|              732|
|         8|              732|
|         9|              714|
|        27|              711|
|         2|              706|
|        22|              705|
|        29|              679|
|       

#### Relation of day of the month with no. of earthquakes are as follows:
- 11th day of the month has encountered highest no. of earthquakes.
- 31th day of the month has encountered the lowest no. of earthquakes. But, it also comes almost half of the times than other days.
- We can assume that the 30th day of the month has the lowest no. of earthquakes.

### What does the average frequency of earthquakes in a month from the year 1965 to 2016 tell us?


In [50]:
sql = """select month(date) as month, count(month(date))/(2016-1965) as frequency_per_month_per_year from {df} 
    where year(date) between 1965 and 2016
    group by month(date)
    order by frequency_per_month_per_year desc
    """
spark.sql(sql, df=df).show()

+-----+----------------------------+
|month|frequency_per_month_per_year|
+-----+----------------------------+
|    3|          41.450980392156865|
|    8|          39.490196078431374|
|   12|           39.23529411764706|
|   11|           38.96078431372549|
|    9|           38.92156862745098|
|    4|           38.64705882352941|
|    5|          38.509803921568626|
|   10|           38.27450980392157|
|    1|           37.07843137254902|
|    7|           36.86274509803921|
|    2|           35.86274509803921|
|    6|           35.76470588235294|
+-----+----------------------------+



### What is the relation between Year and Number of earthquakes that happened in that year?

In [72]:
sql = """
    select year(date) as year, count(1) as no_of_earthquakes
    from {df}
    group by year(date)
    order by no_of_earthquakes desc
"""
spark.sql(sql, df=df).corr('year','no_of_earthquakes')

0.7087157621721392

In [73]:
sql = """
    select year(date) as year, count(1) as no_of_earthquakes
    from {df}
    group by year(date)
    order by no_of_earthquakes desc
    limit 1
"""
spark.sql(sql, df=df).show()

+----+-----------------+
|year|no_of_earthquakes|
+----+-----------------+
|2011|              713|
+----+-----------------+



In [74]:
sql = """
    select year(date) as year, count(1) as no_of_earthquakes
    from {df}
    group by year(date)
    order by no_of_earthquakes
    limit 1
"""
spark.sql(sql, df=df).show()

+----+-----------------+
|year|no_of_earthquakes|
+----+-----------------+
|1966|              234|
+----+-----------------+



#### Relation between year and no. of earthquakes are as follows:
- There is positive relation of 0.70 between year and no. of earthquakes.
- Earthquakes are generally increasing with the increasing year
- Highest no. of earthquakes (713) are encountered in the year 2011
- Lowest no. of earthquakes (234) are encountered in the year 1966

###  How has the earthquake magnitude on average been varied over the years?

In [78]:
sql = """
    select year(date) as year, avg(magnitude) as average_magnitude
    from {df}
    group by year(date)
"""

spark.sql(sql, df=df).show()

+----+------------------+
|year| average_magnitude|
+----+------------------+
|1990| 5.860624999999987|
|1975| 5.848276699029118|
|1977| 5.783764705882346|
|2003| 5.885731958762881|
|2007| 5.886019736842098|
|1974| 5.830332409972294|
|2015| 5.913071748878919|
|2006| 5.859645669291335|
|1978|5.8185365853658455|
|2013| 5.881778741865502|
|1988| 5.848875255623711|
|1997| 5.859868421052616|
|1994|5.8774999999999915|
|1968| 6.078524590163932|
|2014| 5.886916666666659|
|1973| 5.814064837905229|
|1979| 5.828370786516847|
|1971| 5.972538860103624|
|1966| 6.040470085470085|
|2004| 5.850910683012252|
+----+------------------+
only showing top 20 rows



In [79]:
spark.sql(sql, df=df).corr('year', 'average_magnitude')

-0.34171150441072706

In [81]:
sql = """
    select year(date) as year, avg(magnitude) as average_magnitude
    from {df}
    group by year(date)
    order by average_magnitude desc
    limit 1
"""

spark.sql(sql, df=df).show()

+----+-----------------+
|year|average_magnitude|
+----+-----------------+
|1968|6.078524590163932|
+----+-----------------+



In [83]:
sql = """
    select year(date) as year, avg(magnitude) as average_magnitude
    from {df}
    group by year(date)
    order by average_magnitude
    limit 1
"""

spark.sql(sql, df=df).show()

+----+-----------------+
|year|average_magnitude|
+----+-----------------+
|1977|5.783764705882346|
+----+-----------------+



#### There is negative correlation of 0.34 with year and average magnitude
- Highest average magnitude recorded is 6.07 in the year 1968
- Lowest average magnitude recorded is 5.78 in the year 1977
- From here it can see, there is less impact average magnitude with the year.

### How does year impact the standard deviation of the earthquakes?


In [88]:
stdDF = df.groupBy(year(col('Date')).alias('year')).agg(stddev(df.Depth).alias('std'))

In [89]:
stdDF.corr('year','std')

0.29298642814638354

In [91]:
max_value = stdDF.agg({'std':'max'}).collect()[0][0]
min_value = stdDF.agg({'std':'min'}).collect()[0][0]

In [92]:
print("max_value:", max_value)
print("min_value:", min_value)

max_value: 169.08582392602557
min_value: 92.85854039532533


In [95]:
stdDF.sort(col('std').desc()).show(1)

+----+------------------+
|year|               std|
+----+------------------+
|2002|169.08582392602557|
+----+------------------+
only showing top 1 row



In [96]:
stdDF.sort(col('std')).show(1)

+----+-----------------+
|year|              std|
+----+-----------------+
|1976|92.85854039532533|
+----+-----------------+
only showing top 1 row



### With year there is slight rise in standard deviation of the earthquakes
- Minimum std is 92.85 in the year 1976
- maximun std is 169.08 in the year 2002
- Positive correlation of 0.29 between the year and standard deviation

### Does geographic location have anything to do with earthquakes?

In [97]:
geoDF = df.groupBy(['Latitude', 'Longitude']).count()

In [100]:
geoDF.sort(col('count')).show(1)

+--------+---------+-----+
|Latitude|Longitude|count|
+--------+---------+-----+
|  51.752|    175.5|    1|
+--------+---------+-----+
only showing top 1 row



In [101]:
geoDF.sort(col('count').desc()).show(1)

+--------+---------+-----+
|Latitude|Longitude|count|
+--------+---------+-----+
|    51.5|   -174.8|    4|
+--------+---------+-----+
only showing top 1 row



In [108]:
geoMax = df.groupBy(['Latitude', 'Longitude']).agg(max(col('depth')).alias('max_depth'))

In [111]:
geoMax.sort(col('max_depth')).show(1)

+--------+---------+---------+
|Latitude|Longitude|max_depth|
+--------+---------+---------+
|  34.131| -116.408|     -1.1|
+--------+---------+---------+
only showing top 1 row



In [112]:
geoMax.sort(col('max_depth').desc()).show(1)

+--------+---------+---------+
|Latitude|Longitude|max_depth|
+--------+---------+---------+
| -20.158| -179.163|    700.0|
+--------+---------+---------+
only showing top 1 row



### Geographic location and earthquakes:
- Location(-20.158, -179.169) encountered the highest depth of earthquake.
- Location(34.131, -116.408) encountered the lowest depth of earthquake.

### Where do earthquakes occur very frequently?
- Location(51.5, -174.8) has encountered maximum no. of earthquakes i.e. 4.