In [None]:
from pyspark.sql import*
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [None]:
#Load the dataset
df_load= (spark.read
         .format("csv")
         .option("header","true")
         .load("dbfs:/FileStore/ProjectPyspark/database.csv")
         )

#Preview of load
df_load.count()


23412

In [None]:
df_load.head(5)

[Row(Date='01/02/1965', Time='13:44:18', Latitude='19.246', Longitude='145.616', Type='Earthquake', Depth='131.6', Depth Error=None, Depth Seismic Stations=None, Magnitude='6', Magnitude Type='MW', Magnitude Error=None, Magnitude Seismic Stations=None, Azimuthal Gap=None, Horizontal Distance=None, Horizontal Error=None, Root Mean Square=None, ID='ISCGEM860706', Source='ISCGEM', Location Source='ISCGEM', Magnitude Source='ISCGEM', Status='Automatic'),
 Row(Date='01/04/1965', Time='11:29:49', Latitude='1.863', Longitude='127.352', Type='Earthquake', Depth='80', Depth Error=None, Depth Seismic Stations=None, Magnitude='5.8', Magnitude Type='MW', Magnitude Error=None, Magnitude Seismic Stations=None, Azimuthal Gap=None, Horizontal Distance=None, Horizontal Error=None, Root Mean Square=None, ID='ISCGEM860737', Source='ISCGEM', Location Source='ISCGEM', Magnitude Source='ISCGEM', Status='Automatic'),
 Row(Date='01/05/1965', Time='18:05:58', Latitude='-20.579', Longitude='-173.972', Type='Ear

In [None]:
# Drop fields we don't need from df_load
lst_dropped_columns = ['Depth Error', 'Time', 'Depth Seismic Stations','Magnitude Error','Magnitude Seismic Stations','Azimuthal Gap', 'Horizontal Distance','Horizontal Error','Root Mean Square','Source','Location Source','Magnitude Source','Status']

df_loaddb=df_load.drop(*lst_dropped_columns)
# Preview df_load
df_loaddb.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|
+----------+--------+---------+----------+-----+---------+--------------+------------+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|ISCGEM860706|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|ISCGEM860737|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|ISCGEM860762|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|ISCGEM860856|
|01/09/1965|  11.938|  126.427|Earthquake|   15|      5.8|            MW|ISCGEM860890|
+----------+--------+---------+----------+-----+---------+--------------+------------+
only showing top 5 rows



In [None]:
#Create a year column and add to dataframe
df_loaddb = df_loaddb.withColumn('Year',year(to_timestamp('Date','dd/MM/yyyy')))
df_loaddb.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake|   15|      5.8|            MW|ISCGEM860890|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 5 rows



In [None]:
# Build the quakes frequency dataframe using the year field and counts for each year
df_quake_freq = df_loaddb.groupBy("Year").count().withColumnRenamed("count","Counts")
# Preview df_quake_freq
df_quake_freq.show(5)

+----+------+
|Year|Counts|
+----+------+
|1990|   196|
|1975|   150|
|1977|   148|
|2003|   187|
|2007|   211|
+----+------+
only showing top 5 rows



In [None]:
# Preview df_loaddb schema
df_loaddb.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Year: integer (nullable = true)



In [None]:
# Cast some fields from string into numeric types
df_loaddb = df_loaddb.withColumn('Latitude', df_loaddb['Latitude'].cast(DoubleType()))\
    .withColumn('Longitude', df_loaddb['Longitude'].cast(DoubleType()))\
    .withColumn('Depth', df_loaddb['Depth'].cast(DoubleType()))\
    .withColumn('Magnitude', df_loaddb['Magnitude'].cast(DoubleType()))

# Preview df_load
df_loaddb.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|      6.0|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake| 80.0|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake| 20.0|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake| 15.0|      5.8|            MW|ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake| 15.0|      5.8|            MW|ISCGEM860890|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 5 rows



In [None]:
# Preview df_loaddb schema
df_loaddb.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Year: integer (nullable = true)



In [None]:
# Create avg magnitude and max magnitude fields and add to df_quake_freq
df_max_avg = df_loaddb.groupBy('Year').agg(max('Magnitude').alias('Max_Magnitude'),avg('Magnitude').alias('Avg_Magnitude'))
df_max_avg.show(5)

+----+-------------+-----------------+
|Year|Max_Magnitude|    Avg_Magnitude|
+----+-------------+-----------------+
|1990|          7.6|5.858163265306125|
|1975|          7.8| 5.84866666666667|
|1977|          7.6|5.757432432432437|
|2003|          7.6|5.850802139037435|
|2007|          8.4| 5.89099526066351|
+----+-------------+-----------------+
only showing top 5 rows



In [None]:
# Join df_max_avg to df_quake_freq
df_quake_freq=df_quake_freq.join(df_max_avg,'Year')
# Preview df_quake_freq
df_quake_freq.show(5)

+----+------+-------------+-----------------+
|Year|Counts|Max_Magnitude|    Avg_Magnitude|
+----+------+-------------+-----------------+
|1990|   196|          7.6|5.858163265306125|
|1975|   150|          7.8| 5.84866666666667|
|1977|   148|          7.6|5.757432432432437|
|2003|   187|          7.6|5.850802139037435|
|2007|   211|          8.4| 5.89099526066351|
+----+------+-------------+-----------------+
only showing top 5 rows



In [None]:
# Remove nulls
df_loaddb.dropna()
df_quake_freq.dropna()

DataFrame[Year: int, Counts: bigint, Max_Magnitude: double, Avg_Magnitude: double]

In [None]:
#Preview dataframes
df_loaddb.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|      6.0|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake| 80.0|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake| 20.0|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake| 15.0|      5.8|            MW|ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake| 15.0|      5.8|            MW|ISCGEM860890|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 5 rows



In [None]:
df_quake_freq.show(5)

+----+------+-------------+-----------------+
|Year|Counts|Max_Magnitude|    Avg_Magnitude|
+----+------+-------------+-----------------+
|1990|   196|          7.6|5.858163265306125|
|1975|   150|          7.8| 5.84866666666667|
|1977|   148|          7.6|5.757432432432437|
|2003|   187|          7.6|5.850802139037435|
|2007|   211|          8.4| 5.89099526066351|
+----+------+-------------+-----------------+
only showing top 5 rows



In [None]:
# Build the tables/collections in mongodb
# Write df_loaddb to mongodb
df_loaddb.write.format('mongodb')\
    .mode('overwrite')\
    .option("spark.mongodb.connection.uri", "mongodb+srv://mongouser:mongopwd@hostname.mongodb.net/")\
    .option("database","Quake")\
    .option("collection","quakes")\
    .save()

In [None]:
# Write df_quake_freq to mongodb
df_quake_freq.write.format('mongodb')\
    .mode('overwrite')\
    .option('spark.mongodb.connection.uri', "mongodb+srv://mongouser:mongopwd@hostname.mongodb.net/")\
    .option("database","Quake")\
    .option("collection","quake_freq")\
    .save()

Section: Machine Learning with Spark

In [None]:
# Load the test data file into a dataframe
df_test = (spark.read
         .format("csv")
         .option("header","true")
         .load("dbfs:/FileStore/ProjectPyspark/query.csv")
         )
# Preview df_test
df_test.take(1)

[Row(time='2017-01-02T00:13:06.300Z', latitude='-36.0365', longitude='51.9288', depth='10', mag='5.7', magType='mwb', nst=None, gap='26', dmin='14.685', rms='1.37', net='us', id='us10007p5d', updated='2017-03-27T23:53:17.040Z', place='Southwest Indian Ridge', type='earthquake', horizontalError='10.3', depthError='1.7', magError='0.068', magNst='21', status='reviewed', locationSource='us', magSource='us')]

In [None]:
# Load the training data from mongo into a dataframe
df_train = spark.read.format('mongodb')\
    .option('spark.mongodb.connection.uri', "mongodb+srv://mongouser:mongopwd@hostname.mongodb.net/")\
    .option("database","Quake")\
    .option("collection","quakes")\
    .load()

# Preview df_train
df_train.show(5)

+----------+-----+------------+--------+---------+---------+--------------+----------+----+--------------------+
|      Date|Depth|          ID|Latitude|Longitude|Magnitude|Magnitude Type|      Type|Year|                 _id|
+----------+-----+------------+--------+---------+---------+--------------+----------+----+--------------------+
|01/02/1965|131.6|ISCGEM860706|  19.246|  145.616|      6.0|            MW|Earthquake|1965|65f24936bdf037582...|
|01/04/1965| 80.0|ISCGEM860737|   1.863|  127.352|      5.8|            MW|Earthquake|1965|65f24936bdf037582...|
|01/05/1965| 20.0|ISCGEM860762| -20.579| -173.972|      6.2|            MW|Earthquake|1965|65f24936bdf037582...|
|01/08/1965| 15.0|ISCGEM860856| -59.076|  -23.557|      5.8|            MW|Earthquake|1965|65f24936bdf037582...|
|01/09/1965| 15.0|ISCGEM860890|  11.938|  126.427|      5.8|            MW|Earthquake|1965|65f24936bdf037582...|
+----------+-----+------------+--------+---------+---------+--------------+----------+----+-----

In [None]:
# Select fields we will use and discard fields we don't need
df_test_clean = df_test['time', 'latitude', 'longitude', 'mag', 'depth']
# Preview df_test_clean
df_test_clean.show(5)

+--------------------+--------+---------+---+------+
|                time|latitude|longitude|mag| depth|
+--------------------+--------+---------+---+------+
|2017-01-02T00:13:...|-36.0365|  51.9288|5.7|    10|
|2017-01-02T13:13:...|  -4.895| -76.3675|5.9|   106|
|2017-01-02T13:14:...|-23.2513| 179.2383|6.3|551.62|
|2017-01-03T09:09:...| 24.0151|  92.0177|5.7|    32|
|2017-01-03T21:19:...|-43.3527| -74.5017|5.5| 10.26|
+--------------------+--------+---------+---+------+
only showing top 5 rows



In [None]:
# Rename fields
df_test_clean = df_test_clean.withColumnRenamed('time', 'Date')\
    .withColumnRenamed('latitude', 'Latitude')\
    .withColumnRenamed('longitude', 'Longitude')\
    .withColumnRenamed('mag', 'Magnitude')\
    .withColumnRenamed('depth', 'Depth')

# Preview df_test_clean
df_test_clean.show(5)

+--------------------+--------+---------+---------+------+
|                Date|Latitude|Longitude|Magnitude| Depth|
+--------------------+--------+---------+---------+------+
|2017-01-02T00:13:...|-36.0365|  51.9288|      5.7|    10|
|2017-01-02T13:13:...|  -4.895| -76.3675|      5.9|   106|
|2017-01-02T13:14:...|-23.2513| 179.2383|      6.3|551.62|
|2017-01-03T09:09:...| 24.0151|  92.0177|      5.7|    32|
|2017-01-03T21:19:...|-43.3527| -74.5017|      5.5| 10.26|
+--------------------+--------+---------+---------+------+
only showing top 5 rows



In [None]:
# Preview Schema
df_test_clean.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Depth: string (nullable = true)



In [None]:
# Cast some string fields into numeric fields
df_test_clean = df_test_clean.withColumn('Latitude', df_test_clean['Latitude'].cast(DoubleType()))\
    .withColumn('Longitude', df_test_clean['Longitude'].cast(DoubleType()))\
    .withColumn('Depth', df_test_clean['Depth'].cast(DoubleType()))\
    .withColumn('Magnitude', df_test_clean['Magnitude'].cast(DoubleType()))

In [None]:
df_test_clean.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Depth: double (nullable = true)



In [None]:
df_test_clean.show()

+--------------------+--------+---------+---------+------+
|                Date|Latitude|Longitude|Magnitude| Depth|
+--------------------+--------+---------+---------+------+
|2017-01-02T00:13:...|-36.0365|  51.9288|      5.7|  10.0|
|2017-01-02T13:13:...|  -4.895| -76.3675|      5.9| 106.0|
|2017-01-02T13:14:...|-23.2513| 179.2383|      6.3|551.62|
|2017-01-03T09:09:...| 24.0151|  92.0177|      5.7|  32.0|
|2017-01-03T21:19:...|-43.3527| -74.5017|      5.5| 10.26|
|2017-01-03T21:52:...|-19.3733| 176.0518|      6.9|  12.0|
|2017-01-03T21:55:...|-19.3977| 175.9532|      5.7|  10.0|
|2017-01-03T22:40:...|-19.1207| 176.1875|      6.0|  10.0|
|2017-01-03T23:34:...|-18.9749| 176.2872|      5.5| 19.36|
|2017-01-04T01:06:...|-17.8694| 167.1235|      5.6|  22.1|
|2017-01-04T20:03:...|-18.7942| 176.2567|      5.7|   7.0|
|2017-01-06T06:30:...|-22.3176|  -67.795|      5.8| 172.0|
|2017-01-08T08:52:...| -6.2269| 147.4769|      5.9|  61.0|
|2017-01-08T14:20:...| -54.327|-135.8585|      5.6|  10.

In [None]:
# Create training and testing dataframes
df_testing = df_test_clean['Latitude', 'Longitude', 'Magnitude', 'Depth']
df_training = df_train['Latitude', 'Longitude', 'Magnitude', 'Depth']

In [None]:
# Preview df_training
df_training.show(5)

+--------+---------+---------+-----+
|Latitude|Longitude|Magnitude|Depth|
+--------+---------+---------+-----+
|  19.246|  145.616|      6.0|131.6|
|   1.863|  127.352|      5.8| 80.0|
| -20.579| -173.972|      6.2| 20.0|
| -59.076|  -23.557|      5.8| 15.0|
|  11.938|  126.427|      5.8| 15.0|
+--------+---------+---------+-----+
only showing top 5 rows



In [None]:
# Preview df_testing
df_testing.show(5)

+--------+---------+---------+------+
|Latitude|Longitude|Magnitude| Depth|
+--------+---------+---------+------+
|-36.0365|  51.9288|      5.7|  10.0|
|  -4.895| -76.3675|      5.9| 106.0|
|-23.2513| 179.2383|      6.3|551.62|
| 24.0151|  92.0177|      5.7|  32.0|
|-43.3527| -74.5017|      5.5| 10.26|
+--------+---------+---------+------+
only showing top 5 rows



In [None]:
# Drop records with null values from our dataframes
df_testing = df_testing.dropna()
df_training = df_training.dropna()

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# Select features to parse into our model and then create the feature vector
assembler = VectorAssembler(inputCols=['Latitude', 'Longitude', 'Depth'], outputCol='features')

# Create the Model
model_reg = RandomForestRegressor(featuresCol='features', labelCol='Magnitude')

# Chain the assembler with the model in a pipeline
pipeline = Pipeline(stages=[assembler, model_reg])

# Train the Model
model = pipeline.fit(df_training)

# Make the prediction
pred_results = model.transform(df_testing)

In [None]:
# Preview pred_results dataframe
pred_results.show(5)

+--------+---------+---------+------+--------------------+------------------+
|Latitude|Longitude|Magnitude| Depth|            features|        prediction|
+--------+---------+---------+------+--------------------+------------------+
|-36.0365|  51.9288|      5.7|  10.0|[-36.0365,51.9288...| 5.850136724800972|
|  -4.895| -76.3675|      5.9| 106.0|[-4.895,-76.3675,...| 5.869637793167875|
|-23.2513| 179.2383|      6.3|551.62|[-23.2513,179.238...| 5.896591561887881|
| 24.0151|  92.0177|      5.7|  32.0|[24.0151,92.0177,...| 5.931774818629581|
|-43.3527| -74.5017|      5.5| 10.26|[-43.3527,-74.501...|5.9204458792780486|
+--------+---------+---------+------+--------------------+------------------+
only showing top 5 rows



In [None]:
# Evaluate the model
# rmse should be less than 0.5 for the model to be useful
evaluator = RegressionEvaluator(labelCol='Magnitude', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(pred_results)
print('Root Mean Squared Error (RMSE) on test data = %g' % rmse)

Root Mean Squared Error (RMSE) on test data = 0.403077


Create the Prediction Dataset

In [None]:
# Create the prediction dataset
df_pred_results = pred_results['Latitude', 'Longitude', 'prediction']

# Rename the prediction field
df_pred_results = df_pred_results.withColumnRenamed('prediction', 'Pred_Magnitude')

# Add more columns to our prediction dataset
df_pred_results = df_pred_results.withColumn('Year', lit(2017))\
    .withColumn('RMSE', lit(rmse))

# Preview df_pred_results
df_pred_results.show(5)

+--------+---------+------------------+----+-------------------+
|Latitude|Longitude|    Pred_Magnitude|Year|               RMSE|
+--------+---------+------------------+----+-------------------+
|-36.0365|  51.9288| 5.850136724800972|2017|0.40307689171713734|
|  -4.895| -76.3675| 5.869637793167875|2017|0.40307689171713734|
|-23.2513| 179.2383| 5.896591561887881|2017|0.40307689171713734|
| 24.0151|  92.0177| 5.931774818629581|2017|0.40307689171713734|
|-43.3527| -74.5017|5.9204458792780486|2017|0.40307689171713734|
+--------+---------+------------------+----+-------------------+
only showing top 5 rows



In [None]:
# Load the prediction dataset into mongodb
# Write df_pred_results

df_pred_results.write.format('mongodb')\
    .mode('overwrite')\
    .option("spark.mongodb.connection.uri", "mongodb+srv://mongouser:mongopwd@hostname.mongodb.net/")\
    .option("database","Quake")\
    .option("collection","pred_results")\
    .save()