In [1]:
import findspark

findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# configure spark session

spark = SparkSession.builder.master('local[2]').appName('quake_etl')\
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/Earthquake.quake") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/Earthquake.quake") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:2.4.1')\
    .getOrCreate()

In [3]:
# load data set
df_load = spark.read.csv('docs\database.csv', header=True)

print(df_load.count())
# Preview data
df_load.take(1)

23412


[Row(Date='01/02/1965', Time='13:44:18', Latitude='19.246', Longitude='145.616', Type='Earthquake', Depth='131.6', Depth Error=None, Depth Seismic Stations=None, Magnitude='6', Magnitude Type='MW', Magnitude Error=None, Magnitude Seismic Stations=None, Azimuthal Gap=None, Horizontal Distance=None, Horizontal Error=None, Root Mean Square=None, ID='ISCGEM860706', Source='ISCGEM', Location Source='ISCGEM', Magnitude Source='ISCGEM', Status='Automatic')]

In [4]:
# Drop coluns not required
lst_dropped_columns = ['Depth Error', 'Time', 'Horizontal Distance', 'Depth Seismic Stations', 'Magnitude Error', 'Magnitude Seismic Stations', 'Root Mean Square', 'Azimuthal Gap', 'Source', 'Location Source', 'Magnitude Source', 'Status', 'Horizontal Error']

df_load = df_load.drop(*lst_dropped_columns)

df_load.show(5)

print(df_load.count())

+----------+--------+---------+----------+-----+---------+--------------+------------+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|
+----------+--------+---------+----------+-----+---------+--------------+------------+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|ISCGEM860706|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|ISCGEM860737|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|ISCGEM860762|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|ISCGEM860856|
|01/09/1965|  11.938|  126.427|Earthquake|   15|      5.8|            MW|ISCGEM860890|
+----------+--------+---------+----------+-----+---------+--------------+------------+
only showing top 5 rows

23412


In [5]:
# create year field and add to column
df_load = df_load.withColumn('Year', year(to_timestamp('Date', 'dd/MM/yyyy')))
df_load.show(5)

print(df_load.count())

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake|   15|      5.8|            MW|ISCGEM860890|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 5 rows

23412


In [6]:
# Build quakes frequency df by year
df_quake_frequency = df_load.groupBy('Year').count().withColumnRenamed('count', 'Counts')

In [7]:
df_quake_frequency.show(5)

+----+------+
|Year|Counts|
+----+------+
|1990|   196|
|1975|   150|
|1977|   148|
|2003|   187|
|2007|   211|
+----+------+
only showing top 5 rows



In [8]:
# Preview df_load schema
df_load.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Year: integer (nullable = true)



In [9]:
# change data types
df_load = df_load.withColumn('Latitude', df_load['Latitude'].cast(DoubleType()))\
            .withColumn('Longitude', df_load['Longitude'].cast(DoubleType()))\
            .withColumn('Depth', df_load['Depth'].cast(DoubleType()))\
            .withColumn('Magnitude', df_load['Magnitude'].cast(DoubleType()))

# Preview dataframe

df_load.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|      6.0|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake| 80.0|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake| 20.0|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake| 15.0|      5.8|            MW|ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake| 15.0|      5.8|            MW|ISCGEM860890|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 5 rows



In [10]:
# Preview df_load schema
df_load.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Year: integer (nullable = true)



In [11]:
# Add max and average magnitude to df_quake

df_max = df_load.groupBy("Year").max('Magnitude').withColumnRenamed('max(Magnitude)' , 'Max_Magnitude')
df_avg = df_load.groupBy("Year").avg('Magnitude').withColumnRenamed('avg(Magnitude)' , 'Avg_Magnitude')
df_avg.show(5)

+----+-----------------+
|Year|    Avg_Magnitude|
+----+-----------------+
|1990|5.858163265306125|
|1975| 5.84866666666667|
|1977|5.757432432432437|
|2003|5.850802139037435|
|2007| 5.89099526066351|
+----+-----------------+
only showing top 5 rows



In [12]:
# join df_max_and df_avg to df_quake
df_quake_frequency = df_quake_frequency.join(df_avg, ["Year"]).join(df_max, ["Year"])

# preview new dataframe

df_quake_frequency.show(5)

+----+------+-----------------+-------------+
|Year|Counts|    Avg_Magnitude|Max_Magnitude|
+----+------+-----------------+-------------+
|1990|   196|5.858163265306125|          7.6|
|1975|   150| 5.84866666666667|          7.8|
|1977|   148|5.757432432432437|          7.6|
|2003|   187|5.850802139037435|          7.6|
|2007|   211| 5.89099526066351|          8.4|
+----+------+-----------------+-------------+
only showing top 5 rows



In [13]:
# drop null values

df_load = df_load.dropna()
print(df_load.count())
df_quake_frequency = df_quake_frequency.dropna()

df_quake_frequency.show(5)

9201
+----+------+-----------------+-------------+
|Year|Counts|    Avg_Magnitude|Max_Magnitude|
+----+------+-----------------+-------------+
|1990|   196|5.858163265306125|          7.6|
|1975|   150| 5.84866666666667|          7.8|
|1977|   148|5.757432432432437|          7.6|
|2003|   187|5.850802139037435|          7.6|
|2007|   211| 5.89099526066351|          8.4|
+----+------+-----------------+-------------+
only showing top 5 rows



In [14]:
# load dataframe to mongo db
df_load.write.format('mongo')\
    .mode('overwrite')\
    .option("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/Earthquake.quake").save()


In [15]:
# load quake_frequency dataframe to mongo db
df_quake_frequency.write.format('mongo')\
    .mode('overwrite')\
    .option("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/Earthquake.quake_frequency").save()


In [16]:
"""
Machine Learning section

Training data: What we have collected

Testing data: data for year 2017
"""

'\nMachine Learning section\n\nTraining data: What we have collected\n\nTesting data: data for year 2017\n'

In [17]:
# load test data for year 2017
df_test = spark.read.csv('docs\query.csv', header=True)

# preview data
df_test.take(1)

[Row(time='2017-01-02T00:13:06.300Z', latitude='-36.0365', longitude='51.9288', depth='10', mag='5.7', magType='mwb', nst=None, gap='26', dmin='14.685', rms='1.37', net='us', id='us10007p5d', updated='2017-03-27T23:53:17.040Z', place='Southwest Indian Ridge', type='earthquake', horizontalError='10.3', depthError='1.7', magError='0.068', magNst='21', status='reviewed', locationSource='us', magSource='us')]

In [18]:
# load tranining data
df_train = spark.read.format('mongo')\
    .option("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/Earthquake.quake").load()

# Preview train data
df_train.show(5)


+----------+-----+------------+--------+---------+---------+--------------+----------+----+--------------------+
|      Date|Depth|          ID|Latitude|Longitude|Magnitude|Magnitude Type|      Type|Year|                 _id|
+----------+-----+------------+--------+---------+---------+--------------+----------+----+--------------------+
|01/02/1965|131.6|ISCGEM860706|  19.246|  145.616|      6.0|            MW|Earthquake|1965|{64250fbdba8bac66...|
|01/04/1965| 80.0|ISCGEM860737|   1.863|  127.352|      5.8|            MW|Earthquake|1965|{64250fbdba8bac66...|
|01/05/1965| 20.0|ISCGEM860762| -20.579| -173.972|      6.2|            MW|Earthquake|1965|{64250fbdba8bac66...|
|01/08/1965| 15.0|ISCGEM860856| -59.076|  -23.557|      5.8|            MW|Earthquake|1965|{64250fbdba8bac66...|
|01/09/1965| 15.0|ISCGEM860890|  11.938|  126.427|      5.8|            MW|Earthquake|1965|{64250fbdba8bac66...|
+----------+-----+------------+--------+---------+---------+--------------+----------+----+-----

In [19]:
# select test data fields
df_test_clean = df_test['Time', 'latitude', 'longitude', 'mag', 'depth']

# Preview test_clean
df_test_clean.show(5)


+--------------------+--------+---------+---+------+
|                Time|latitude|longitude|mag| depth|
+--------------------+--------+---------+---+------+
|2017-01-02T00:13:...|-36.0365|  51.9288|5.7|    10|
|2017-01-02T13:13:...|  -4.895| -76.3675|5.9|   106|
|2017-01-02T13:14:...|-23.2513| 179.2383|6.3|551.62|
|2017-01-03T09:09:...| 24.0151|  92.0177|5.7|    32|
|2017-01-03T21:19:...|-43.3527| -74.5017|5.5| 10.26|
+--------------------+--------+---------+---+------+
only showing top 5 rows



In [20]:
# Rename test data fields
df_test_clean = df_test_clean.withColumnRenamed('time', 'Date')\
    .withColumnRenamed('latitude', 'Latitude')\
    .withColumnRenamed('longitude', 'Longitude')\
    .withColumnRenamed('mag', 'Magnitude')\
    .withColumnRenamed('depth', 'Depth')\
# Preview test_clean
df_test_clean.show(5)


+--------------------+--------+---------+---------+------+
|                Date|Latitude|Longitude|Magnitude| Depth|
+--------------------+--------+---------+---------+------+
|2017-01-02T00:13:...|-36.0365|  51.9288|      5.7|    10|
|2017-01-02T13:13:...|  -4.895| -76.3675|      5.9|   106|
|2017-01-02T13:14:...|-23.2513| 179.2383|      6.3|551.62|
|2017-01-03T09:09:...| 24.0151|  92.0177|      5.7|    32|
|2017-01-03T21:19:...|-43.3527| -74.5017|      5.5| 10.26|
+--------------------+--------+---------+---------+------+
only showing top 5 rows



In [21]:
 # Preview schema
df_test_clean.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Depth: string (nullable = true)



In [22]:
# change data types of test data

df_test_clean = df_test_clean.withColumn('Latitude', df_test_clean['Latitude'].cast(DoubleType()))\
            .withColumn('Longitude', df_test_clean['Longitude'].cast(DoubleType()))\
            .withColumn('Depth', df_test_clean['Depth'].cast(DoubleType()))\
            .withColumn('Magnitude', df_test_clean['Magnitude'].cast(DoubleType()))

df_test_clean.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Depth: double (nullable = true)



In [23]:
df_testing = df_test_clean['Latitude', 'Longitude', 'Magnitude', 'Depth']
df_training = df_train['Latitude', 'Longitude', 'Magnitude', 'Depth']

In [24]:
# Preview training data
df_training.show(5)

+--------+---------+---------+-----+
|Latitude|Longitude|Magnitude|Depth|
+--------+---------+---------+-----+
|  19.246|  145.616|      6.0|131.6|
|   1.863|  127.352|      5.8| 80.0|
| -20.579| -173.972|      6.2| 20.0|
| -59.076|  -23.557|      5.8| 15.0|
|  11.938|  126.427|      5.8| 15.0|
+--------+---------+---------+-----+
only showing top 5 rows



In [25]:
# Preview testing data
df_testing.show(5)

+--------+---------+---------+------+
|Latitude|Longitude|Magnitude| Depth|
+--------+---------+---------+------+
|-36.0365|  51.9288|      5.7|  10.0|
|  -4.895| -76.3675|      5.9| 106.0|
|-23.2513| 179.2383|      6.3|551.62|
| 24.0151|  92.0177|      5.7|  32.0|
|-43.3527| -74.5017|      5.5| 10.26|
+--------+---------+---------+------+
only showing top 5 rows



In [26]:
# drop rows with null
df_training = df_training.dropna()
df_training = df_training.dropna()

In [27]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

In [28]:
# Select features to pasrse into model and create the feature vector
assembler = VectorAssembler(inputCols=["Latitude", "Longitude", "Depth"], outputCol="features")

# Create Model
model_reg = RandomForestRegressor(featuresCol="features", labelCol="Magnitude")

# chain model into a pipeline
pipeline = Pipeline(stages=[assembler, model_reg])

# Train the model using training df for all years
model = pipeline.fit(df_training)

# make the prediction
pred_results = model.transform(df_testing)

In [29]:
# Preview prediction dataframe
pred_results.show(5)

+--------+---------+---------+------+--------------------+-----------------+
|Latitude|Longitude|Magnitude| Depth|            features|       prediction|
+--------+---------+---------+------+--------------------+-----------------+
|-36.0365|  51.9288|      5.7|  10.0|[-36.0365,51.9288...|5.804875388513735|
|  -4.895| -76.3675|      5.9| 106.0|[-4.895,-76.3675,...|5.875455085675417|
|-23.2513| 179.2383|      6.3|551.62|[-23.2513,179.238...|5.872604388266402|
| 24.0151|  92.0177|      5.7|  32.0|[24.0151,92.0177,...|5.894525768871877|
|-43.3527| -74.5017|      5.5| 10.26|[-43.3527,-74.501...|5.888053342195419|
+--------+---------+---------+------+--------------------+-----------------+
only showing top 5 rows



In [30]:
# Evaluate model and check erroer matrix
# rmse should be less than 0.5
evaluator = RegressionEvaluator(labelCol='Magnitude', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(pred_results)

rmse

0.4026552619476222