In [None]:
#install libraries required

!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

!ls

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
0% [Connecting to archive.ubuntu.com] [1 InRelease 5,482 B/110 kB 5%] [Connected to cloud.r-project.                                                                                                    Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Hit:6 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:8 http://security.ubuntu.com/ubuntu jammy-security/multiverse amd64 Packages [44.6 kB]
Hit:9 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Get:10 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packa

In [None]:
# Initialize findspark
import findspark
findspark.init()

# Create a PySpark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [None]:
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.functions import concat,col,lit

In [None]:
# 1. Load dataset into a pyspark dataframe
df=spark.read.format("csv").option("header", "true").load("sample_data/database.csv")

In [None]:
df.count()

23412

In [None]:
df.show()

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+
|01/02/1965|13:44:18|  19.246|  145.616|Earthquake|131.6|       null|                  null

In [None]:
df.dtypes

[('Date', 'string'),
 ('Time', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string'),
 ('Type', 'string'),
 ('Depth', 'string'),
 ('Depth Error', 'string'),
 ('Depth Seismic Stations', 'string'),
 ('Magnitude', 'string'),
 ('Magnitude Type', 'string'),
 ('Magnitude Error', 'string'),
 ('Magnitude Seismic Stations', 'string'),
 ('Azimuthal Gap', 'string'),
 ('Horizontal Distance', 'string'),
 ('Horizontal Error', 'string'),
 ('Root Mean Square', 'string'),
 ('ID', 'string'),
 ('Source', 'string'),
 ('Location Source', 'string'),
 ('Magnitude Source', 'string'),
 ('Status', 'string')]

In [None]:
df1 = df.withColumn('date_time',
                    F.concat(F.col('Date'),F.lit(' '),F.col('Time')))
df1.show()

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|          date_time|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+
|01/02/1965|13:44:18|  19.246| 

In [None]:
# 2. Convert date and time columns into a timestamp column named timestamp
df1=df1.withColumn('Timestamp',F.to_timestamp("date_time", "MM/dd/yyyy HH:mm:ss"))
df1.show()

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+-------------------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|          date_time|          Timestamp|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+------------

In [None]:
df1.dtypes

[('Date', 'string'),
 ('Time', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string'),
 ('Type', 'string'),
 ('Depth', 'string'),
 ('Depth Error', 'string'),
 ('Depth Seismic Stations', 'string'),
 ('Magnitude', 'string'),
 ('Magnitude Type', 'string'),
 ('Magnitude Error', 'string'),
 ('Magnitude Seismic Stations', 'string'),
 ('Azimuthal Gap', 'string'),
 ('Horizontal Distance', 'string'),
 ('Horizontal Error', 'string'),
 ('Root Mean Square', 'string'),
 ('ID', 'string'),
 ('Source', 'string'),
 ('Location Source', 'string'),
 ('Magnitude Source', 'string'),
 ('Status', 'string'),
 ('date_time', 'string'),
 ('Timestamp', 'timestamp')]

In [None]:
df1=df1.withColumn('Magnitude', df1.Magnitude.cast('float'))
df1.show()

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+-------------------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|          date_time|          Timestamp|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+------------

In [None]:
df1.dtypes

[('Date', 'string'),
 ('Time', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string'),
 ('Type', 'string'),
 ('Depth', 'string'),
 ('Depth Error', 'string'),
 ('Depth Seismic Stations', 'string'),
 ('Magnitude', 'float'),
 ('Magnitude Type', 'string'),
 ('Magnitude Error', 'string'),
 ('Magnitude Seismic Stations', 'string'),
 ('Azimuthal Gap', 'string'),
 ('Horizontal Distance', 'string'),
 ('Horizontal Error', 'string'),
 ('Root Mean Square', 'string'),
 ('ID', 'string'),
 ('Source', 'string'),
 ('Location Source', 'string'),
 ('Magnitude Source', 'string'),
 ('Status', 'string'),
 ('date_time', 'string'),
 ('Timestamp', 'timestamp')]

In [None]:
# 3. Filter the dataset to include only earthquakes with a magnitude greater than 5.0
df2=df1.filter((df1.Type == "Earthquake") & (df1.Magnitude > 5.0))
df2.show()

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+-------------------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|          date_time|          Timestamp|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+------------

In [None]:
df2.count()

23232

In [None]:
df2.select('Magnitude Type').distinct().collect()

[Row(Magnitude Type=None),
 Row(Magnitude Type='MWB'),
 Row(Magnitude Type='MWC'),
 Row(Magnitude Type='MW'),
 Row(Magnitude Type='MD'),
 Row(Magnitude Type='MB'),
 Row(Magnitude Type='MS'),
 Row(Magnitude Type='MWW'),
 Row(Magnitude Type='MWR'),
 Row(Magnitude Type='MH'),
 Row(Magnitude Type='ML')]

In [None]:
df2=df2.withColumn('Depth', df1.Magnitude.cast('float'))
df2.show()

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+-------------------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|          date_time|          Timestamp|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+------------

In [None]:
df2.dtypes

[('Date', 'string'),
 ('Time', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string'),
 ('Type', 'string'),
 ('Depth', 'float'),
 ('Depth Error', 'string'),
 ('Depth Seismic Stations', 'string'),
 ('Magnitude', 'float'),
 ('Magnitude Type', 'string'),
 ('Magnitude Error', 'string'),
 ('Magnitude Seismic Stations', 'string'),
 ('Azimuthal Gap', 'string'),
 ('Horizontal Distance', 'string'),
 ('Horizontal Error', 'string'),
 ('Root Mean Square', 'string'),
 ('ID', 'string'),
 ('Source', 'string'),
 ('Location Source', 'string'),
 ('Magnitude Source', 'string'),
 ('Status', 'string'),
 ('date_time', 'string'),
 ('Timestamp', 'timestamp')]

In [None]:
# 4. Calculate average depth and magnitude of earthquakes for each type
avg_df=df2.groupBy("Magnitude Type").agg({'Depth':'avg', 'Magnitude':'avg'})
avg_df.show()

+--------------+------------------+------------------+
|Magnitude Type|    avg(Magnitude)|        avg(Depth)|
+--------------+------------------+------------------+
|          null| 5.706666628519694| 5.706666628519694|
|           MWB| 5.907282324706373| 5.907282324706373|
|           MWC| 5.858115717989505| 5.858115717989505|
|            MW| 5.933794334619597| 5.933794334619597|
|            MD|5.9666666984558105|5.9666666984558105|
|            MB| 5.674540872499554| 5.674540872499554|
|            MS| 5.994359559374046| 5.994359559374046|
|           MWW| 6.008673713836054| 6.008673713836054|
|           MWR| 5.630769197757427| 5.630769197757427|
|            MH| 6.540000057220459| 6.540000057220459|
|            ML|5.8424615053030164|5.8424615053030164|
+--------------+------------------+------------------+



In [None]:
m_df=df2.select('Magnitude').distinct()
m_df.sort('Magnitude').show(m_df.count())

+---------+
|Magnitude|
+---------+
|      5.5|
|     5.51|
|     5.52|
|     5.53|
|     5.55|
|     5.58|
|      5.6|
|     5.64|
|     5.66|
|     5.67|
|     5.69|
|      5.7|
|     5.72|
|     5.73|
|     5.75|
|     5.77|
|      5.8|
|     5.82|
|     5.84|
|     5.88|
|     5.89|
|      5.9|
|     5.94|
|     5.97|
|      6.0|
|     6.02|
|      6.1|
|      6.2|
|      6.3|
|     6.31|
|     6.35|
|      6.4|
|     6.45|
|     6.47|
|     6.48|
|      6.5|
|     6.57|
|      6.6|
|      6.7|
|      6.8|
|      6.9|
|      7.0|
|      7.1|
|      7.2|
|      7.3|
|      7.4|
|      7.5|
|      7.6|
|      7.7|
|      7.8|
|      7.9|
|      8.0|
|      8.1|
|      8.2|
|      8.3|
|      8.4|
|      8.6|
|      8.7|
|      8.8|
|      9.1|
+---------+



In [None]:
# 5. Implement a UDF to categorize the earthquakes into levels (High, moderate, low) based on their magnitudes
def mag(x):

    if ((x >= 5.5) & (x < 6.5)):
        return 'Low'
    elif ((x >= 6.5) & (x < 7.5)):
        return 'Moderate'
    elif (x >= 7.5):
        return 'High'

In [None]:
from pyspark.sql.functions import col, udf

In [None]:
convertUDF = udf(lambda z: mag(z))

In [None]:
df2=df2.withColumn("earthquake_intensity", convertUDF(col("Magnitude")))
df2.show()

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+-------------------+--------------------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|          date_time|          Timestamp|earthquake_intensity|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+-------------

In [None]:
pd_df=df2.toPandas()
pd_df

  series = series.astype(t, copy=False)


Unnamed: 0,Date,Time,Latitude,Longitude,Type,Depth,Depth Error,Depth Seismic Stations,Magnitude,Magnitude Type,...,Horizontal Error,Root Mean Square,ID,Source,Location Source,Magnitude Source,Status,date_time,Timestamp,earthquake_intensity
0,01/02/1965,13:44:18,19.246,145.616,Earthquake,6.0,,,6.0,MW,...,,,ISCGEM860706,ISCGEM,ISCGEM,ISCGEM,Automatic,01/02/1965 13:44:18,1965-01-02 13:44:18,Low
1,01/04/1965,11:29:49,1.863,127.352,Earthquake,5.8,,,5.8,MW,...,,,ISCGEM860737,ISCGEM,ISCGEM,ISCGEM,Automatic,01/04/1965 11:29:49,1965-01-04 11:29:49,Low
2,01/05/1965,18:05:58,-20.579,-173.972,Earthquake,6.2,,,6.2,MW,...,,,ISCGEM860762,ISCGEM,ISCGEM,ISCGEM,Automatic,01/05/1965 18:05:58,1965-01-05 18:05:58,Low
3,01/08/1965,18:49:43,-59.076,-23.557,Earthquake,5.8,,,5.8,MW,...,,,ISCGEM860856,ISCGEM,ISCGEM,ISCGEM,Automatic,01/08/1965 18:49:43,1965-01-08 18:49:43,Low
4,01/09/1965,13:32:50,11.938,126.427,Earthquake,5.8,,,5.8,MW,...,,,ISCGEM860890,ISCGEM,ISCGEM,ISCGEM,Automatic,01/09/1965 13:32:50,1965-01-09 13:32:50,Low
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
23227,12/28/2016,08:22:12,38.3917,-118.8941,Earthquake,5.6,1.2,40,5.6,ML,...,,0.1898,NN00570710,NN,NN,NN,Reviewed,12/28/2016 08:22:12,2016-12-28 08:22:12,Low
23228,12/28/2016,09:13:47,38.3777,-118.8957,Earthquake,5.5,2,33,5.5,ML,...,,0.2187,NN00570744,NN,NN,NN,Reviewed,12/28/2016 09:13:47,2016-12-28 09:13:47,Low
23229,12/28/2016,12:38:51,36.9179,140.4262,Earthquake,5.9,1.8,,5.9,MWW,...,4.8,1.52,US10007NAF,US,US,US,Reviewed,12/28/2016 12:38:51,2016-12-28 12:38:51,Low
23230,12/29/2016,22:30:19,-9.0283,118.6639,Earthquake,6.3,1.8,,6.3,MWW,...,6,1.43,US10007NL0,US,US,US,Reviewed,12/29/2016 22:30:19,2016-12-29 22:30:19,Low


In [None]:
!pip install haversine

Collecting haversine
  Downloading haversine-2.8.1-py2.py3-none-any.whl (7.7 kB)
Installing collected packages: haversine
Successfully installed haversine-2.8.1


In [None]:
import pandas as pd
import numpy as np
import haversine as hs
from haversine import Unit
import folium

In [None]:
pd_df['Latitude']=pd_df['Latitude'].astype(float)
pd_df['Longitude']=pd_df['Longitude'].astype(float)
pd_df.dtypes

Date                                  object
Time                                  object
Latitude                             float64
Longitude                            float64
Type                                  object
Depth                                float32
Depth Error                           object
Depth Seismic Stations                object
Magnitude                            float32
Magnitude Type                        object
Magnitude Error                       object
Magnitude Seismic Stations            object
Azimuthal Gap                         object
Horizontal Distance                   object
Horizontal Error                      object
Root Mean Square                      object
ID                                    object
Source                                object
Location Source                       object
Magnitude Source                      object
Status                                object
date_time                             object
Timestamp 

In [None]:
# 6. Calculate the diatance of each earthquake from a reference location
# vectorized haversine function
def haversine(lat1, lon1, lat2, lon2, to_radians=True, earth_radius=6371):

    if to_radians:
        lat1, lon1, lat2, lon2 = np.radians([lat1, lon1, lat2, lon2])

    a = np.sin((lat2-lat1)/2.0)**2 + \
        np.cos(lat1) * np.cos(lat2) * np.sin((lon2-lon1)/2.0)**2

    return earth_radius * 2 * np.arcsin(np.sqrt(a))

In [None]:
import numpy as np

In [None]:
pd_df['distance'] = haversine(pd_df['Latitude'].shift(), pd_df['Longitude'].shift(), pd_df['Latitude'], pd_df['Longitude'])
pd_df

Unnamed: 0,Date,Time,Latitude,Longitude,Type,Depth,Depth Error,Depth Seismic Stations,Magnitude,Magnitude Type,...,Root Mean Square,ID,Source,Location Source,Magnitude Source,Status,date_time,Timestamp,earthquake_intensity,dist
0,01/02/1965,13:44:18,19.2460,145.6160,Earthquake,6.0,,,6.0,MW,...,,ISCGEM860706,ISCGEM,ISCGEM,ISCGEM,Automatic,01/02/1965 13:44:18,1965-01-02 13:44:18,Low,
1,01/04/1965,11:29:49,1.8630,127.3520,Earthquake,5.8,,,5.8,MW,...,,ISCGEM860737,ISCGEM,ISCGEM,ISCGEM,Automatic,01/04/1965 11:29:49,1965-01-04 11:29:49,Low,2772.561257
2,01/05/1965,18:05:58,-20.5790,-173.9720,Earthquake,6.2,,,6.2,MW,...,,ISCGEM860762,ISCGEM,ISCGEM,ISCGEM,Automatic,01/05/1965 18:05:58,1965-01-05 18:05:58,Low,6853.994185
3,01/08/1965,18:49:43,-59.0760,-23.5570,Earthquake,5.8,,,5.8,MW,...,,ISCGEM860856,ISCGEM,ISCGEM,ISCGEM,Automatic,01/08/1965 18:49:43,1965-01-08 18:49:43,Low,10753.699934
4,01/09/1965,13:32:50,11.9380,126.4270,Earthquake,5.8,,,5.8,MW,...,,ISCGEM860890,ISCGEM,ISCGEM,ISCGEM,Automatic,01/09/1965 13:32:50,1965-01-09 13:32:50,Low,14209.881624
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
23227,12/28/2016,08:22:12,38.3917,-118.8941,Earthquake,5.6,1.2,40,5.6,ML,...,0.1898,NN00570710,NN,NN,NN,Reviewed,12/28/2016 08:22:12,2016-12-28 08:22:12,Low,1.839439
23228,12/28/2016,09:13:47,38.3777,-118.8957,Earthquake,5.5,2,33,5.5,ML,...,0.2187,NN00570744,NN,NN,NN,Reviewed,12/28/2016 09:13:47,2016-12-28 09:13:47,Low,1.562963
23229,12/28/2016,12:38:51,36.9179,140.4262,Earthquake,5.9,1.8,,5.9,MWW,...,1.52,US10007NAF,US,US,US,Reviewed,12/28/2016 12:38:51,2016-12-28 12:38:51,Low,8353.000438
23230,12/29/2016,22:30:19,-9.0283,118.6639,Earthquake,6.3,1.8,,6.3,MWW,...,1.43,US10007NL0,US,US,US,Reviewed,12/29/2016 22:30:19,2016-12-29 22:30:19,Low,5590.691010


In [146]:
pd_df.to_csv("final_df.csv")

In [None]:
m = folium.Map()
m.save("footprint.html")

In [None]:
columns=pd_df[["Dist", "Location Source"]]

In [None]:
# 7. Visualize geographical distribution of earthquakes on worldmap using folium
# Creating the map and adding points to it
map = folium.Map(location=[columns.Latitude.mean(), columns.Longitude.mean()], zoom_start=3, control_scale=True)

for index, location_info in columns.iterrows():
    folium.Marker([location_info["Dist"], popup=location_info["Location Source"]).add_to(map)

In [145]:
display(map)