## Data Analysis of Data Source

###  Environment Setup


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

### Data Import

In [43]:
filepath = 'data/raw/earthquake_raw.json'

raw_df = spark.read\
    .option('multiline', 'true')\
    .option('mode', 'PERMISSIVE')\
    .json(filepath)


earth_quake_df = raw_df.withColumn('Exp_RESULTS', F.explode(F.col('features')))\
    .drop('features')\
    .select('Exp_RESULTS.geometry.coordinates',
           'Exp_RESULTS.id',
           'Exp_RESULTS.properties.*')


### Data Analysis

#### Data Schema

In [44]:
df_columns = earth_quake_df.columns

print('Number of rows: ' + str(earth_quake_df.count()) + '\n' + 'Number of columns: ' + str(len(df_columns)))
print('Columns: ', *df_columns, sep="\n")

earth_quake_df.printSchema()

Number of rows: 149
Number of columns: 28
Columns: 
coordinates
id
alert
cdi
code
detail
dmin
felt
gap
ids
mag
magType
mmi
net
nst
place
rms
sig
sources
status
time
title
tsunami
type
types
tz
updated
url
root
 |-- coordinates: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- id: string (nullable = true)
 |-- alert: string (nullable = true)
 |-- cdi: double (nullable = true)
 |-- code: string (nullable = true)
 |-- detail: string (nullable = true)
 |-- dmin: double (nullable = true)
 |-- felt: long (nullable = true)
 |-- gap: double (nullable = true)
 |-- ids: string (nullable = true)
 |-- mag: double (nullable = true)
 |-- magType: string (nullable = true)
 |-- mmi: double (nullable = true)
 |-- net: string (nullable = true)
 |-- nst: long (nullable = true)
 |-- place: string (nullable = true)
 |-- rms: double (nullable = true)
 |-- sig: long (nullable = true)
 |-- sources: string (nullable = true)
 |-- status: string (nullable = true)
 |-- time: long (null

#### Null Values


In [45]:
null_values = {
    col: earth_quake_df.filter(earth_quake_df[col].isNull()).count()
    for col in earth_quake_df.columns
}

for key, value in null_values.items():
    print(key, '->', value)

coordinates -> 0
id -> 0
alert -> 149
cdi -> 140
code -> 0
detail -> 0
dmin -> 49
felt -> 140
gap -> 31
ids -> 0
mag -> 0
magType -> 0
mmi -> 148
net -> 0
nst -> 35
place -> 0
rms -> 0
sig -> 0
sources -> 0
status -> 0
time -> 0
title -> 0
tsunami -> 0
type -> 0
types -> 0
tz -> 149
updated -> 0
url -> 0


#### Duplicates

In [46]:
total_rows = earth_quake_df.count()
unique_rows = earth_quake_df.distinct().count()
duplicates_rows = total_rows - unique_rows

print(duplicates_rows)

0


### Report of Data Analysis

#### Steps in data transformation

##### Lowercase in columns content
- All string columns

##### Rename columns name
- cdi -> max_intensity
- alert -> alert_type
- dmin -> epicenter_horizontal_distance
- felt -> people_felt_earthquake
- gap -> azimuthal_gap
- ids -> earthquake_id
- mag -> magnitude
- magType -> magnitude_type
- nst -> nr_seismic_stations
- rms -> root_mean_square
- sig -> earthquake_impact_estimation

##### Split column content
- Split array content of coordinates column in new three columns -> longitude, latitude, depth

##### Replace Null values
- alert -> green
- cdi -> 0.0
- dmin -> 0.0
- gap -> 0.0
- nst -> 0
- felt -> 0

##### Drop columns
- code
- detail
- mmi
- net
- sources
- title
- types
- tz

##### Data type convert
- time -> timestamp
- updated -> timestamp
- felt -> integer
- nst -> integer
- sig -> integer
- tsunami -> integer


##### Column content transformation
- Convert magType values in:
    - md -> duration
    - ml -> local
    - ms -> surface wave
    - mw -> w-phase
    - me -> energy
    - mi -> p-wave
    - mb -> short-period body wave
    - mlg -> short-period surface wave