In [1]:
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')

import pyspark 
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('basics').getOrCreate()

file_path = 'Dataset/earthquake_data.csv'
earthquake_data = spark.read.csv(file_path, header=True, inferSchema=True)

# Selecting the desired columns
selected_columns = ['magnitude', 'depth', 'latitude', 'longitude','date_time','tsunami']
selected_data = earthquake_data.select(selected_columns)

# Display the top rows of the selected data
selected_data.show()


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/10 09:41:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/10 09:41:07 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/10/10 09:41:07 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


+---------+-------+--------+---------+----------------+-------+
|magnitude|  depth|latitude|longitude|       date_time|tsunami|
+---------+-------+--------+---------+----------------+-------+
|      7.0|   14.0| -9.7963|  159.596|22-11-2022 02:03|      1|
|      6.9|   25.0| -4.9559|  100.738|18-11-2022 13:37|      0|
|      7.0|  579.0|-20.0508| -178.346|12-11-2022 07:09|      1|
|      7.3|   37.0|-19.2918| -172.129|11-11-2022 10:48|      1|
|      6.6|624.464|-25.5948|  178.278|09-11-2022 10:14|      1|
|      7.0|  660.0|-26.0442|  178.381|09-11-2022 09:51|      1|
|      6.8|630.379|-25.9678|  178.363|09-11-2022 09:38|      1|
|      6.7|   20.0|  7.6712| -82.3396|20-10-2022 11:57|      1|
|      6.8|   20.0|   18.33| -102.913|22-09-2022 06:16|      1|
|      7.6| 26.943| 18.3667| -103.252|19-09-2022 18:05|      1|
|      6.9|   10.0| 23.1444|  121.307|18-09-2022 06:44|      1|
|      6.5|   10.0|  23.029|  121.348|17-09-2022 13:41|      1|
|      7.0|  137.0|-21.2077|  170.239|14

In [12]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Initialize SparkSession (assuming it's already done)
# spark = SparkSession.builder.appName('basics').getOrCreate()

# Assuming 'selected_data' is already a PySpark DataFrame

# Define the UDFs for Magnitude and Depth categories
@udf(StringType())
def magnitude_category(magnitude):
    if magnitude < 5:
        return "Light"
    elif 5 <= magnitude < 6:
        return "Moderate"
    elif 6 <= magnitude < 7:
        return "Strong"
    elif 7 <= magnitude < 8:
        return "Major"
    else:
        return "Great"

@udf(StringType())
def depth_category(depth):
    if depth < 70:
        return "Shallow"
    elif 70 <= depth < 300:
        return "Intermediate"
    else:
        return "Deep"

# Apply the UDFs to create the new columns
selected_data = selected_data.withColumn('Magnitude Category', magnitude_category(selected_data['magnitude']))
selected_data = selected_data.withColumn('Depth Category', depth_category(selected_data['depth']))

# Show the top rows with the new columns
selected_data.select('date_time', 'magnitude', 'Magnitude Category', 'depth', 'Depth Category', 'latitude', 'longitude').show()



[Stage 7:>                                                          (0 + 1) / 1]

+----------------+---------+------------------+-------+--------------+--------+---------+
|       date_time|magnitude|Magnitude Category|  depth|Depth Category|latitude|longitude|
+----------------+---------+------------------+-------+--------------+--------+---------+
|22-11-2022 02:03|      7.0|             Major|   14.0|       Shallow| -9.7963|  159.596|
|18-11-2022 13:37|      6.9|            Strong|   25.0|       Shallow| -4.9559|  100.738|
|12-11-2022 07:09|      7.0|             Major|  579.0|          Deep|-20.0508| -178.346|
|11-11-2022 10:48|      7.3|             Major|   37.0|       Shallow|-19.2918| -172.129|
|09-11-2022 10:14|      6.6|            Strong|624.464|          Deep|-25.5948|  178.278|
|09-11-2022 09:51|      7.0|             Major|  660.0|          Deep|-26.0442|  178.381|
|09-11-2022 09:38|      6.8|            Strong|630.379|          Deep|-25.9678|  178.363|
|20-10-2022 11:57|      6.7|            Strong|   20.0|       Shallow|  7.6712| -82.3396|
|22-09-202

                                                                                

In [3]:
# Read the infrastructure_damage data from CSV
infrastructure_damage = spark.read.csv('Dataset/infrastructure_damage.csv', header=True, inferSchema=True)

# Join the two dataframes on the 'date_time' column
merged_df = selected_data.join(infrastructure_damage, on='date_time', how='inner')
# Drop the unwanted columns
merged_ds = merged_df.drop("Unnamed: 4", "Unnamed: 5")

# Display the merged data
merged_ds.show()



+----------------+---------+-------+--------+---------+-------+----------------------------+-------------+--------------------+
|       date_time|magnitude|  depth|latitude|longitude|tsunami|number_of_buildings_impacted|economic_loss|            location|
+----------------+---------+-------+--------+---------+-------+----------------------------+-------------+--------------------+
|22-11-2022 02:03|      7.0|   14.0| -9.7963|  159.596|      1|                        1502|     10790000|    Malango, Islands|
|18-11-2022 13:37|      6.9|   25.0| -4.9559|  100.738|      0|                        2587|     28900000| Bengkulu, Indonesia|
|12-11-2022 07:09|      7.0|  579.0|-20.0508| -178.346|      1|                        2654|     23040000|                null|
|11-11-2022 10:48|      7.3|   37.0|-19.2918| -172.129|      1|                        1056|     23020000|       Neiafu, Tonga|
|09-11-2022 10:14|      6.6|624.464|-25.5948|  178.278|      1|                         706|      108000

In [4]:
# Specify the columns you want to select
selected_columns = ['magnitude', 'depth', 'latitude', 
                    'longitude', 'date_time', 
                    'number_of_buildings_impacted', 
                    'economic_loss', 'tsunami']

# Select the desired columns from the merged DataFrame
reduced_data = merged_ds.select(*selected_columns)

# Display the reduced data
reduced_data.show()


+---------+-------+--------+---------+----------------+----------------------------+-------------+-------+
|magnitude|  depth|latitude|longitude|       date_time|number_of_buildings_impacted|economic_loss|tsunami|
+---------+-------+--------+---------+----------------+----------------------------+-------------+-------+
|      7.0|   14.0| -9.7963|  159.596|22-11-2022 02:03|                        1502|     10790000|      1|
|      6.9|   25.0| -4.9559|  100.738|18-11-2022 13:37|                        2587|     28900000|      0|
|      7.0|  579.0|-20.0508| -178.346|12-11-2022 07:09|                        2654|     23040000|      1|
|      7.3|   37.0|-19.2918| -172.129|11-11-2022 10:48|                        1056|     23020000|      1|
|      6.6|624.464|-25.5948|  178.278|09-11-2022 10:14|                         706|      1080000|      1|
|      7.0|  660.0|-26.0442|  178.381|09-11-2022 09:51|                         107|     40270000|      1|
|      6.8|630.379|-25.9678|  178.363

In [5]:
# Specify the path where you want to save the data
output_path = "BDAS/Dataset"

# Save the DataFrame to CSV
reduced_data.write.csv(output_path, header=True, mode="overwrite")
