# Apache Spark Installation

In [1]:
import findspark
findspark.init()
findspark.find()

'c:\\Users\\natha\\miniconda3\\envs\\myenv\\lib\\site-packages\\pyspark'

# Project

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("final-proj") \
    .getOrCreate()

In [3]:
# constant
earthquake_catalogue_path = "datasource\katalog_gempa.csv"
bps_path = "datasource/bps_2022.csv"

In [4]:
# load earthquake data
earthquake_data = spark.read.csv(earthquake_catalogue_path, header=True)
earthquake_data.show(10)

+----------+------------+-----+------+-----+---+--------------------+-------+----+-----+-------+----+-----+
|       tgl|          ot|  lat|   lon|depth|mag|              remark|strike1|dip1|rake1|strike2|dip2|rake2|
+----------+------------+-----+------+-----+---+--------------------+-------+----+-----+-------+----+-----+
|2008/11/01|21:02:43.058|-9.18|119.06|   10|4.9|Sumba Region - In...|   null|null| null|   null|null| null|
|2008/11/01|20:58:50.248|-6.55|129.64|   10|4.6|           Banda Sea|   null|null| null|   null|null| null|
|2008/11/01|17:43:12.941|-7.01|106.63|  121|3.7|    Java - Indonesia|   null|null| null|   null|null| null|
|2008/11/01|16:24:14.755|-3.30|127.85|   10|3.2|   Seram - Indonesia|   null|null| null|   null|null| null|
|2008/11/01|16:20:37.327|-6.41|129.54|   70|4.3|           Banda Sea|   null|null| null|   null|null| null|
|2008/11/01|14:47:00.029|-7.37|105.31|   18|3.3|    Java - Indonesia|   null|null| null|   null|null| null|
|2008/11/01|13:04:38.742| 0.

In [5]:
# load BPS data
bps_data = spark.read.csv(bps_path, header=True)
bps_data.show(10)

+--------------------+---------------+-----------------------------------+-------------------+---------------------------------+----------------------------+
|            Provinsi|Jumlah Penduduk|Laju Pertumbuhan Penduduk per Tahun|Persentase Penduduk|Kepadatan Penduduk per km persegi|Rasio Jenis Kelamin Penduduk|
+--------------------+---------------+-----------------------------------+-------------------+---------------------------------+----------------------------+
|                Aceh|         5407.9|                               1.43|               1.96|                               95|                       100.9|
|      Sumatera Utara|        15115.2|                               1.21|               5.48|                              209|                       100.7|
|      Sumatera Barat|         5640.6|                               1.09|               2.05|                              134|                       101.5|
|                Riau|         6614.4|              

## Earthquake Data Processing

In [6]:
# processing data gempa
col_to_drop = ["strike1", "dip1", "rake1", "strike2", "dip2", "rake2"]
earthquake_data = earthquake_data.drop(*col_to_drop)
earthquake_data.show(10)

+----------+------------+-----+------+-----+---+--------------------+
|       tgl|          ot|  lat|   lon|depth|mag|              remark|
+----------+------------+-----+------+-----+---+--------------------+
|2008/11/01|21:02:43.058|-9.18|119.06|   10|4.9|Sumba Region - In...|
|2008/11/01|20:58:50.248|-6.55|129.64|   10|4.6|           Banda Sea|
|2008/11/01|17:43:12.941|-7.01|106.63|  121|3.7|    Java - Indonesia|
|2008/11/01|16:24:14.755|-3.30|127.85|   10|3.2|   Seram - Indonesia|
|2008/11/01|16:20:37.327|-6.41|129.54|   70|4.3|           Banda Sea|
|2008/11/01|14:47:00.029|-7.37|105.31|   18|3.3|    Java - Indonesia|
|2008/11/01|13:04:38.742| 0.10| 98.55|   12|4.7|Northern Sumatra ...|
|2008/11/01|10:23:51.646|-7.07|129.67|  135|4.8|           Banda Sea|
|2008/11/01|09:50:32.503|-3.32|128.02|   10|2.3|   Seram - Indonesia|
|2008/11/01|06:50:52.220|-4.43|127.45|   10|3.2|           Banda Sea|
+----------+------------+-----+------+-----+---+--------------------+
only showing top 10 

# Manual Geocoder


In [8]:
# manual geocoder
# datasource: https://gadm.org/download_country.html#google_vignette
import geopandas as gpd

gadm = gpd.read_file(r'D:\nieru\project\big-data\gadm\gadm41_IDN_4.json')


In [9]:
from shapely.geometry import Point
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def is_in_province(latitude, longitude, province_geometry, buffer_distance=0.1):
    point = Point(longitude, latitude)
    buffered_geometry = province_geometry.buffer(buffer_distance)  # Buffering geometri provinsi
    return buffered_geometry.contains(point)

def reverse_geocode_with_buffer(lat, lon, gadm_data, buffer_size=0.01):
    point = Point(lon, lat).buffer(buffer_size)
    for _, row in gadm_data.iterrows():
        if row['geometry'].intersects(point):
            return row['NAME_1']
    return None

def reverse_geocoder_with_buffer(latitude, longitude, buffer=0.01):
    point = Point(longitude, latitude).buffer(buffer)
    print(point)
    
    for _, row in gadm.iterrows():
        if row['geometry'].intersects(point):
            return row['NAME_1']
    
    return None

def reverse_geocoder(latitude, longitude):
    point = Point(longitude, latitude)
    print(point)
    
    for _, row in gadm.iterrows():
        if is_in_province(latitude, longitude, row['geometry'], buffer_distance=0.5):
            return row['NAME_1']
    
    return None

def reverse_geocoder_ind(latitude, longitude):
    point = Point(longitude, latitude)
    
    for _, row in indo_prov_geojson.iterrows():
      
        if row['geometry'].contains(point):
            return row['Propinsi']
    
    return None


get_province_udf = udf(lambda lat, lon: reverse_geocoder_with_buffer(lat, lon, 0.8), StringType())# define user defined function

In [None]:
latitude =  -7.01 # Jakarta's latitude
longitude = 106.63 # Jakarta's longitude
# -3.76|127.38
# -9.18|119.06
#  0.10| 98.55|

-3.76,127.38
province_name = reverse_geocoder_ind(latitude, longitude)

if province_name:
    print(f"Province: {province_name}")
else:
    print("Location not found in dataset.")
    

Province: JAWA BARAT


In [10]:
sampleEarthquakeData = earthquake_data.limit(5000)

In [11]:
sampleEarthquakeData = earthquake_data.withColumn('province', get_province_udf(earthquake_data['lat'], earthquake_data['lon']))
sampleEarthquakeData.show()

+----------+------------+-----+------+-----+---+--------------------+-----------------+
|       tgl|          ot|  lat|   lon|depth|mag|              remark|         province|
+----------+------------+-----+------+-----+---+--------------------+-----------------+
|2008/11/01|21:02:43.058|-9.18|119.06|   10|4.9|Sumba Region - In...|NusaTenggaraBarat|
|2008/11/01|20:58:50.248|-6.55|129.64|   10|4.6|           Banda Sea|             null|
|2008/11/01|17:43:12.941|-7.01|106.63|  121|3.7|    Java - Indonesia|           Banten|
|2008/11/01|16:24:14.755|-3.30|127.85|   10|3.2|   Seram - Indonesia|           Maluku|
|2008/11/01|16:20:37.327|-6.41|129.54|   70|4.3|           Banda Sea|             null|
|2008/11/01|14:47:00.029|-7.37|105.31|   18|3.3|    Java - Indonesia|           Banten|
|2008/11/01|13:04:38.742| 0.10| 98.55|   12|4.7|Northern Sumatra ...|    SumateraBarat|
|2008/11/01|10:23:51.646|-7.07|129.67|  135|4.8|           Banda Sea|           Maluku|
|2008/11/01|09:50:32.503|-3.32|1

In [None]:
# earthquake_data = earthquake_data.withColumn('province', get_province_udf(earthquake_data['lat'], earthquake_data['lon']))
# earthquake_data.show(100)

In [12]:
earthquake_data_cpy = sampleEarthquakeData

In [None]:
# from pyspark.sql.functions import col, sum as sum_
# # check nll 
# null_counts = earthquake_data_cpy.select([sum_(col('province').isNull().cast("int")).alias(c) for c in earthquake_data_cpy.columns])

# # Show result
# null_counts.show()

In [13]:
# null_count = earthquake_data_cpy.filter(earthquake_data_cpy['province'].isNull()).count()

# print(null_count)

In [None]:
#export to csv
# earthquake_data_cpy.toPandas().to_csv("gempa_catalogue_with_province.csv", index=False)