In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession\
      .builder\
      .master('local[2]')\
      .appName('quake_etl')\
      .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:2.4.1')\
      .getOrCreate()

In [2]:
df_load = spark.read.csv(r'database.csv', header=True)

df_load.take(1)

[Row(Date='01/02/1965', Time='13:44:18', Latitude='19.246', Longitude='145.616', Type='Earthquake', Depth='131.6', Depth Error=None, Depth Seismic Stations=None, Magnitude='6', Magnitude Type='MW', Magnitude Error=None, Magnitude Seismic Stations=None, Azimuthal Gap=None, Horizontal Distance=None, Horizontal Error=None, Root Mean Square=None, ID='ISCGEM860706', Source='ISCGEM', Location Source='ISCGEM', Magnitude Source='ISCGEM', Status='Automatic')]

In [3]:
lst_dropped_columns = ['Depth Error', 'Time', 'Depth Seismic Stations', 'Magnitude Error',
                       'Magnitude Seismic Stations', 'Azimuthal Gap', 'Root Mean Square', 'Source', 'Location Source', 'Magnitude Source', 'Status']

df_load = df_load.drop(*lst_dropped_columns)

In [4]:
df_load = df_load.withColumn('Year', year(to_timestamp('Date', 'dd/MM/yyyy')))
df_load.show(5)

+----------+--------+---------+----------+-----+---------+--------------+-------------------+----------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|Horizontal Distance|Horizontal Error|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+-------------------+----------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|               null|            null|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|               null|            null|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|               null|            null|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|               null|            null|ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake|   15|      5.8|            MW|               null|    

In [5]:
df_quake_freq = df_load.groupBy('Year').count()
df_quake_freq.show(5)

+----+-----+
|Year|count|
+----+-----+
|1990|  196|
|1975|  150|
|1977|  148|
|2003|  187|
|2007|  211|
+----+-----+
only showing top 5 rows



In [6]:
df_load.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- Horizontal Distance: string (nullable = true)
 |-- Horizontal Error: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Year: integer (nullable = true)



In [7]:
df_load = df_load.withColumn('Latitude', df_load['Latitude'].cast(DoubleType()))\
      .withColumn('Longitude', df_load['Longitude'].cast(DoubleType()))\
      .withColumn('Depth', df_load['Depth'].cast(DoubleType()))\
      .withColumn('Magnitude', df_load['Magnitude'].cast(DoubleType()))

df_load.show()

+----------+--------+---------+----------+-----+---------+--------------+-------------------+----------------+--------------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|Horizontal Distance|Horizontal Error|                  ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+-------------------+----------------+--------------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|      6.0|            MW|               null|            null|        ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake| 80.0|      5.8|            MW|               null|            null|        ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake| 20.0|      6.2|            MW|               null|            null|        ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake| 15.0|      5.8|            MW|               null|            null|        ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake

In [8]:
df_max = df_load.groupBy('Year').max('Magnitude').withColumnRenamed('max(Magnitude)', 'Max_Magnitude')
df_avg = df_load.groupBy('Year').avg('Magnitude').withColumnRenamed('avg(Magnitude)', 'Avg_Magnitude')

df_avg.show(5)

+----+-----------------+
|Year|    Avg_Magnitude|
+----+-----------------+
|1990|5.858163265306125|
|1975| 5.84866666666667|
|1977|5.757432432432437|
|2003|5.850802139037435|
|2007| 5.89099526066351|
+----+-----------------+
only showing top 5 rows



In [9]:
df_quake_freq = df_quake_freq.join(df_avg, ['Year']).join(df_max, ['Year'])
df_quake_freq.show(5)

+----+-----+-----------------+-------------+
|Year|count|    Avg_Magnitude|Max_Magnitude|
+----+-----+-----------------+-------------+
|1990|  196|5.858163265306125|          7.6|
|1975|  150| 5.84866666666667|          7.8|
|1977|  148|5.757432432432437|          7.6|
|2003|  187|5.850802139037435|          7.6|
|2007|  211| 5.89099526066351|          8.4|
+----+-----+-----------------+-------------+
only showing top 5 rows



In [10]:
df_load.dropna()
df_quake_freq.dropna()

DataFrame[Year: int, count: bigint, Avg_Magnitude: double, Max_Magnitude: double]

In [11]:
df_load.show(5)

+----------+--------+---------+----------+-----+---------+--------------+-------------------+----------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|Horizontal Distance|Horizontal Error|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+-------------------+----------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|      6.0|            MW|               null|            null|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake| 80.0|      5.8|            MW|               null|            null|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake| 20.0|      6.2|            MW|               null|            null|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake| 15.0|      5.8|            MW|               null|            null|ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake| 15.0|      5.8|            MW|               null|    

In [12]:
df_quake_freq.show(5)

+----+-----+-----------------+-------------+
|Year|count|    Avg_Magnitude|Max_Magnitude|
+----+-----+-----------------+-------------+
|1990|  196|5.858163265306125|          7.6|
|1975|  150| 5.84866666666667|          7.8|
|1977|  148|5.757432432432437|          7.6|
|2003|  187|5.850802139037435|          7.6|
|2007|  211| 5.89099526066351|          8.4|
+----+-----+-----------------+-------------+
only showing top 5 rows



In [13]:
df_load.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri', 'mongodb://127.0.0.1:27017/Quake.quakes').save()

In [14]:
df_quake_freq.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri', 'mongodb://127.0.0.1:27017/Quake.quake_freq').save()

In [15]:
#Load test data file

In [16]:
df_test = spark.read.csv(r'query.csv', header=True)
df_test.take(1)

[Row(time='2017-01-02T00:13:06.300Z', latitude='-36.0365', longitude='51.9288', depth='10', mag='5.7', magType='mwb', nst=None, gap='26', dmin='14.685', rms='1.37', net='us', id='us10007p5d', updated='2017-03-27T23:53:17.040Z', place='Southwest Indian Ridge', type='earthquake', horizontalError='10.3', depthError='1.7', magError='0.068', magNst='21', status='reviewed', locationSource='us', magSource='us')]

In [17]:
df_train = spark.read.format('mongo')\
    .option('spark.mongodb.input.uri', 'mongodb://127.0.0.1:27017/Quake.quakes').load()

df_train.show(5)

+----------+-----+-------------------+----------------+------------+--------+---------+---------+--------------+----------+----+--------------------+
|      Date|Depth|Horizontal Distance|Horizontal Error|          ID|Latitude|Longitude|Magnitude|Magnitude Type|      Type|Year|                 _id|
+----------+-----+-------------------+----------------+------------+--------+---------+---------+--------------+----------+----+--------------------+
|01/02/1965|131.6|               null|            null|ISCGEM860706|  19.246|  145.616|      6.0|            MW|Earthquake|1965|{653c3e513f6d7a60...|
|01/04/1965| 80.0|               null|            null|ISCGEM860737|   1.863|  127.352|      5.8|            MW|Earthquake|1965|{653c3e513f6d7a60...|
|01/05/1965| 20.0|               null|            null|ISCGEM860762| -20.579| -173.972|      6.2|            MW|Earthquake|1965|{653c3e513f6d7a60...|
|01/08/1965| 15.0|               null|            null|ISCGEM860856| -59.076|  -23.557|      5.8|   

In [18]:
df_test_clean = df_test['time', 'latitude', 'longitude', 'mag', 'depth']

df_test_clean.show(5)

+--------------------+--------+---------+---+------+
|                time|latitude|longitude|mag| depth|
+--------------------+--------+---------+---+------+
|2017-01-02T00:13:...|-36.0365|  51.9288|5.7|    10|
|2017-01-02T13:13:...|  -4.895| -76.3675|5.9|   106|
|2017-01-02T13:14:...|-23.2513| 179.2383|6.3|551.62|
|2017-01-03T09:09:...| 24.0151|  92.0177|5.7|    32|
|2017-01-03T21:19:...|-43.3527| -74.5017|5.5| 10.26|
+--------------------+--------+---------+---+------+
only showing top 5 rows



In [19]:
df_test_clean = df_test_clean.withColumnRenamed('time', 'Date')\
    .withColumnRenamed('latitude', 'Latitude')\
    .withColumnRenamed('longitude', 'Longitude')\
    .withColumnRenamed('mag', 'Magnitude')\
    .withColumnRenamed('depth', 'Depth')

df_test_clean

DataFrame[Date: string, Latitude: string, Longitude: string, Magnitude: string, Depth: string]

In [20]:
df_test_clean.show(5)

+--------------------+--------+---------+---------+------+
|                Date|Latitude|Longitude|Magnitude| Depth|
+--------------------+--------+---------+---------+------+
|2017-01-02T00:13:...|-36.0365|  51.9288|      5.7|    10|
|2017-01-02T13:13:...|  -4.895| -76.3675|      5.9|   106|
|2017-01-02T13:14:...|-23.2513| 179.2383|      6.3|551.62|
|2017-01-03T09:09:...| 24.0151|  92.0177|      5.7|    32|
|2017-01-03T21:19:...|-43.3527| -74.5017|      5.5| 10.26|
+--------------------+--------+---------+---------+------+
only showing top 5 rows



In [21]:
df_test_clean = df_test_clean.withColumn('Latitude', df_test_clean['Latitude'].cast(DoubleType()))\
    .withColumn('Longitude', df_test_clean['Longitude'].cast(DoubleType()))\
    .withColumn('Depth', df_test_clean['Depth'].cast(DoubleType()))\
    .withColumn('Magnitude', df_test_clean['Magnitude'].cast(DoubleType()))

In [22]:
df_test_clean.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Depth: double (nullable = true)



In [23]:
df_testing = df_test_clean['Latitude', 'Longitude', 'Magnitude', 'Depth']
df_training = df_train['Latitude', 'Longitude', 'Magnitude', 'Depth']

In [24]:
df_training.show()

+--------+---------+---------+-----+
|Latitude|Longitude|Magnitude|Depth|
+--------+---------+---------+-----+
|  19.246|  145.616|      6.0|131.6|
|   1.863|  127.352|      5.8| 80.0|
| -20.579| -173.972|      6.2| 20.0|
| -59.076|  -23.557|      5.8| 15.0|
|  11.938|  126.427|      5.8| 15.0|
| -13.405|  166.629|      6.7| 35.0|
|  27.357|   87.867|      5.9| 20.0|
| -13.309|  166.212|      6.0| 35.0|
| -56.452|  -27.043|      6.0| 95.0|
| -24.563|  178.487|      5.8|565.0|
|  -6.807|  108.988|      5.9|227.9|
|  -2.608|  125.952|      8.2| 20.0|
|  54.636|  161.703|      5.5| 55.0|
| -18.697| -177.864|      5.6|482.9|
|  37.523|   73.251|      6.0| 15.0|
|  -51.84|  139.741|      6.1| 10.0|
|  51.251|  178.715|      8.7| 30.3|
|  51.639|  175.055|      6.0| 30.0|
|  52.528|  172.007|      5.7| 25.0|
|  51.626|  175.746|      5.8| 25.0|
+--------+---------+---------+-----+
only showing top 20 rows



In [25]:
df_testing = df_testing.dropna()
df_training = df_training.dropna()

In [26]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

In [28]:
assembler = VectorAssembler(inputCols=['Latitude', 'Longitude', 'Depth'], outputCol='features')

model_reg = RandomForestRegressor(featuresCol='features', labelCol='Magnitude')

pipeline = Pipeline(stages=[assembler, model_reg])

model = pipeline.fit(df_training)

pred_results = model.transform(df_testing)

In [29]:
pred_results.show(5)

+--------+---------+---------+------+--------------------+------------------+
|Latitude|Longitude|Magnitude| Depth|            features|        prediction|
+--------+---------+---------+------+--------------------+------------------+
|-36.0365|  51.9288|      5.7|  10.0|[-36.0365,51.9288...| 5.810108702452941|
|  -4.895| -76.3675|      5.9| 106.0|[-4.895,-76.3675,...| 5.884911777098706|
|-23.2513| 179.2383|      6.3|551.62|[-23.2513,179.238...| 5.931782846758473|
| 24.0151|  92.0177|      5.7|  32.0|[24.0151,92.0177,...|5.9074022011071445|
|-43.3527| -74.5017|      5.5| 10.26|[-43.3527,-74.501...|   5.9639844941963|
+--------+---------+---------+------+--------------------+------------------+
only showing top 5 rows



In [30]:
evaluator = RegressionEvaluator(labelCol = 'Magnitude', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(pred_results)
print('Root Mean Squared Error (RMSE) on test data = %g' % rmse)

Root Mean Squared Error (RMSE) on test data = 0.402263


In [31]:
df_pred_results = pred_results['Latitude', 'Longitude', 'prediction']

df_pred_results = df_pred_results.withColumnRenamed('prediction', 'Pred_Magnitude')

df_pred_results = df_pred_results.withColumn('Year', lit(2017))\
    .withColumn('RMSE', lit(rmse))

df_pred_results.show(5)

+--------+---------+------------------+----+-------------------+
|Latitude|Longitude|    Pred_Magnitude|Year|               RMSE|
+--------+---------+------------------+----+-------------------+
|-36.0365|  51.9288| 5.810108702452941|2017|0.40226316031287257|
|  -4.895| -76.3675| 5.884911777098706|2017|0.40226316031287257|
|-23.2513| 179.2383| 5.931782846758473|2017|0.40226316031287257|
| 24.0151|  92.0177|5.9074022011071445|2017|0.40226316031287257|
|-43.3527| -74.5017|   5.9639844941963|2017|0.40226316031287257|
+--------+---------+------------------+----+-------------------+
only showing top 5 rows



In [33]:
df_pred_results.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri', 'mongodb://127.0.0.1:27017/Quake.pred_results').save()