## USGS - Earthquake 1965 - 2016

In [1]:
# setup absolute path to location of package Starts and config-file 
from inspect import getsourcefile
import os.path as path, sys
current_dir = path.dirname(path.abspath(getsourcefile(lambda:0)))
sys.path.insert(0, current_dir[:current_dir.rfind(path.sep)])

# import data in Pandas DataFrame
from Starts.startml import *
from Starts.startspk import *

local_kwargs {'data_path': './data/earthquake-database.csv', 'drop_obj_col': False, 'nan_drop_col': False, 'nan_drop_row': False, 'nan_zero': False, 'nan_mean': True, 'nan_mean_neighbors': False}


In [2]:
train_data=data[0]
train_data.head()

Unnamed: 0,Date,Time,Latitude,Longitude,Type,Depth,Depth Error,Depth Seismic Stations,Magnitude,Magnitude Type,...,Magnitude Seismic Stations,Azimuthal Gap,Horizontal Distance,Horizontal Error,Root Mean Square,ID,Source,Location Source,Magnitude Source,Status
0,01/02/1965,13:44:18,19.246,145.616,Earthquake,131.6,,,6.0,MW,...,,,,,,ISCGEM860706,ISCGEM,ISCGEM,ISCGEM,Automatic
1,01/04/1965,11:29:49,1.863,127.352,Earthquake,80.0,,,5.8,MW,...,,,,,,ISCGEM860737,ISCGEM,ISCGEM,ISCGEM,Automatic
2,01/05/1965,18:05:58,-20.579,-173.972,Earthquake,20.0,,,6.2,MW,...,,,,,,ISCGEM860762,ISCGEM,ISCGEM,ISCGEM,Automatic
3,01/08/1965,18:49:43,-59.076,-23.557,Earthquake,15.0,,,5.8,MW,...,,,,,,ISCGEM860856,ISCGEM,ISCGEM,ISCGEM,Automatic
4,01/09/1965,13:32:50,11.938,126.427,Earthquake,15.0,,,5.8,MW,...,,,,,,ISCGEM860890,ISCGEM,ISCGEM,ISCGEM,Automatic


In [3]:
# create data with features: Time, Latitude, longitude, depth, magnitude, location source
data = train_data[['Date', 'Time', 'Latitude', 'Longitude', 'Depth', 'Magnitude', 'Location Source']]

# export data in csv-format without header
data.to_csv('./data/earthquake.csv', header=False)

# import data using Spark
spk = StartSPK(app_name="usgs", path_file="./data/earthquake.csv", rdd=True)
dat, sc, ss = spk.get_dat()
dat

./data/earthquake.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [4]:
type(dat)

pyspark.rdd.RDD

In [5]:
# show the first 10 elements in RDD dat.collect() to show all
dat.take(10)

['0,01/02/1965,13:44:18,19.246,145.616,131.6,6.0,ISCGEM',
 '1,01/04/1965,11:29:49,1.8630000000000002,127.352,80.0,5.8,ISCGEM',
 '2,01/05/1965,18:05:58,-20.579,-173.972,20.0,6.2,ISCGEM',
 '3,01/08/1965,18:49:43,-59.076,-23.557,15.0,5.8,ISCGEM',
 '4,01/09/1965,13:32:50,11.937999999999999,126.427,15.0,5.8,ISCGEM',
 '5,01/10/1965,13:36:32,-13.405,166.62900000000002,35.0,6.7,ISCGEM',
 '6,01/12/1965,13:32:25,27.357,87.867,20.0,5.9,ISCGEM',
 '7,01/15/1965,23:17:42,-13.309000000000001,166.21200000000002,35.0,6.0,ISCGEM',
 '8,01/16/1965,11:32:37,-56.452,-27.043000000000003,95.0,6.0,ISCGEM',
 '9,01/17/1965,10:43:17,-24.563000000000002,178.487,565.0,5.8,ISCGEM']

#### Tasks: RDD & pairRDD 
#### given location e.g. US, find/ count how many (data points) rows which happened at this location

In [6]:
def parseLine(line):
    fields = line.split(',')
    idx = int(fields[0])
    date = str(fields[1])
    time = str(fields[2])
    lat = float(fields[3])
    lon = float(fields[4])
    dep = float(fields[5])
    mag = float(fields[6])
    loc = str(fields[7])
    return (loc, date, time, lat, lon, dep, mag)

rdd = dat.map(parseLine)
rdd.take(5)

[('ISCGEM', '01/02/1965', '13:44:18', 19.246, 145.616, 131.6, 6.0),
 ('ISCGEM', '01/04/1965', '11:29:49', 1.8630000000000002, 127.352, 80.0, 5.8),
 ('ISCGEM', '01/05/1965', '18:05:58', -20.579, -173.972, 20.0, 6.2),
 ('ISCGEM', '01/08/1965', '18:49:43', -59.076, -23.557, 15.0, 5.8),
 ('ISCGEM', '01/09/1965', '13:32:50', 11.937999999999999, 126.427, 15.0, 5.8)]

In [7]:
rdd.count()

23412

In [8]:
# filter location 'US' out and count how many locations earthquake happened
usrdd = rdd.filter(lambda r: 'US' in r[0])
othersrdd = rdd.filter(lambda r: 'US' not in r[0])
usrdd.count()

20354

In [9]:
# other locations
othersrdd.count()

3058

In [10]:
usrdd.take(5)

[('US', '01/01/1973', '11:42:37', -35.513000000000005, -16.211, 33.0, 6.0),
 ('US', '01/02/1973', '00:53:20', -9.854, 117.427, 66.0, 5.5),
 ('US', '01/03/1973', '02:58:17', -27.715999999999998, -63.261, 563.0, 5.6),
 ('US', '01/03/1973', '14:31:05', 39.114000000000004, 71.889, 33.0, 5.5),
 ('US', '01/05/1973', '13:54:29', -38.997, 175.232, 150.0, 6.2)]

In [11]:
usrdd.take(1)[0][5]

33.0

In [12]:
# Location in US with max and min magnitude 
usrdd.max(lambda x: x[6]), usrdd.min(lambda x: x[6])

(('US', '12/26/2004', '00:58:53', 3.295, 95.98200000000001, 30.0, 9.1),
 ('US', '01/02/1973', '00:53:20', -9.854, 117.427, 66.0, 5.5))

In [13]:
# Location in US with max and min depth 
usrdd.max(lambda x: x[5]), usrdd.min(lambda x: x[5])

(('US', '10/22/1985', '19:14:02', -20.158, -179.16299999999998, 700.0, 5.5),
 ('US', '02/16/1973', '05:02:58', 49.835, 78.232, 0.0, 5.6))

In [14]:
type(usrdd)

pyspark.rdd.PipelinedRDD

In [15]:
usrdd.take(2)

[('US', '01/01/1973', '11:42:37', -35.513000000000005, -16.211, 33.0, 6.0),
 ('US', '01/02/1973', '00:53:20', -9.854, 117.427, 66.0, 5.5)]

#### Tasks: SparkSQL

In [16]:
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType
# from pyspark.sql.types import DateType
# from pyspark.sql.types import TimestampType
from pyspark.sql.types import DoubleType

# create schema for field
schema = StructType([StructField("Country", StringType(), True), 
                     StructField("Date", StringType(), True), 
                     StructField("Time", StringType(), True), 
                     StructField("Latitude", DoubleType(), True),
                     StructField("Longitude", DoubleType(), True), 
                     StructField("Depth", DoubleType(), True),
                     StructField("Magnitude", DoubleType(), True)
                     ])
schema

StructType(List(StructField(Country,StringType,true),StructField(Date,StringType,true),StructField(Time,StringType,true),StructField(Latitude,DoubleType,true),StructField(Longitude,DoubleType,true),StructField(Depth,DoubleType,true),StructField(Magnitude,DoubleType,true)))

In [17]:
# convert rdd object into DataFrame object and execute SQL operations on DataFrame object
df = ss.createDataFrame(usrdd, schema).cache()
df

DataFrame[Country: string, Date: string, Time: string, Latitude: double, Longitude: double, Depth: double, Magnitude: double]

In [18]:
type(df)

pyspark.sql.dataframe.DataFrame

In [19]:
df.take(1)

[Row(Country='US', Date='01/01/1973', Time='11:42:37', Latitude=-35.513000000000005, Longitude=-16.211, Depth=33.0, Magnitude=6.0)]

In [20]:
# register the DataFrame as a table for SQL-query
df.createOrReplaceTempView("EarthQuake")

In [21]:
# query data with condition via SQL-command
loc_mags = ss.sql("SELECT * FROM EarthQuake WHERE Magnitude >= 9")
for loc in loc_mags.collect():
    print(loc)

Row(Country='US', Date='12/26/2004', Time='00:58:53', Latitude=3.295, Longitude=95.98200000000001, Depth=30.0, Magnitude=9.1)
Row(Country='US', Date='03/11/2011', Time='05:46:24', Latitude=38.297, Longitude=142.373, Depth=29.0, Magnitude=9.1)


In [22]:
loc_depth = ss.sql("SELECT * FROM EarthQuake WHERE Magnitude >=7 and Depth>=650")
loc_depth.count()

2

In [23]:
loc_depth.collect()

[Row(Country='US', Date='08/19/2002', Time='11:08:24', Latitude=-23.884, Longitude=178.495, Depth=675.4, Magnitude=7.7),
 Row(Country='US', Date='05/30/2015', Time='11:23:02', Latitude=27.8386, Longitude=140.4931, Depth=664.0, Magnitude=7.8)]