In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format, concat, to_timestamp, lit, avg, udf, sqrt
from pyspark.sql.types import StringType
import folium

In [2]:
# Create a spark session
spark = SparkSession.builder.appName('Aditic DE Assignment').getOrCreate()

# Read the data and select revelent columns for analysis
df = spark.read.format("csv")\
    .option("header", "true")\
    .option("delimiter", ",")\
    .option("inferSchema", "true")\
    .load("./data/database.csv")

df = df.select(
    col("Date"), 
    col("Time"), 
    col("Latitude"), 
    col("Longitude"), 
    col("Type"), 
    col("Depth"), 
    col("Magnitude")
)

24/02/12 02:33:30 WARN Utils: Your hostname, tomato-pie resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/02/12 02:33:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/12 02:33:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [3]:
# Creating a timestamp column by combining date and time
df = df.withColumn("Time", date_format(col("Time"), "HH:mm:ss"))
df = df.withColumn(
        "Timestamp", 
        to_timestamp(concat("Date",lit(" "), "Time"), "MM/dd/yyyy HH:mm:ss")
    )

df.show(5)

+----------+--------+--------+---------+----------+-----+---------+-------------------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Magnitude|          Timestamp|
+----------+--------+--------+---------+----------+-----+---------+-------------------+
|01/02/1965|13:44:18|  19.246|  145.616|Earthquake|131.6|      6.0|1965-01-02 13:44:18|
|01/04/1965|11:29:49|   1.863|  127.352|Earthquake| 80.0|      5.8|1965-01-04 11:29:49|
|01/05/1965|18:05:58| -20.579| -173.972|Earthquake| 20.0|      6.2|1965-01-05 18:05:58|
|01/08/1965|18:49:43| -59.076|  -23.557|Earthquake| 15.0|      5.8|1965-01-08 18:49:43|
|01/09/1965|13:32:50|  11.938|  126.427|Earthquake| 15.0|      5.8|1965-01-09 13:32:50|
+----------+--------+--------+---------+----------+-----+---------+-------------------+
only showing top 5 rows



In [4]:
# Filter for rows with magnitude greater than 5.0
df = df.filter(col("Magnitude")>5.0)

df.show(5)

+----------+--------+--------+---------+----------+-----+---------+-------------------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Magnitude|          Timestamp|
+----------+--------+--------+---------+----------+-----+---------+-------------------+
|01/02/1965|13:44:18|  19.246|  145.616|Earthquake|131.6|      6.0|1965-01-02 13:44:18|
|01/04/1965|11:29:49|   1.863|  127.352|Earthquake| 80.0|      5.8|1965-01-04 11:29:49|
|01/05/1965|18:05:58| -20.579| -173.972|Earthquake| 20.0|      6.2|1965-01-05 18:05:58|
|01/08/1965|18:49:43| -59.076|  -23.557|Earthquake| 15.0|      5.8|1965-01-08 18:49:43|
|01/09/1965|13:32:50|  11.938|  126.427|Earthquake| 15.0|      5.8|1965-01-09 13:32:50|
+----------+--------+--------+---------+----------+-----+---------+-------------------+
only showing top 5 rows



In [5]:
# Aggregating for each type
avg_df = df.groupby("Type")\
    .agg(
        avg("Depth").alias("avg_Depth"), 
        avg("Magnitude").alias("avg_Magnitude")
    )

avg_df.show(5)

[Stage 4:>                                                          (0 + 1) / 1]

+-----------------+-----------------+-----------------+
|             Type|        avg_Depth|    avg_Magnitude|
+-----------------+-----------------+-----------------+
|        Explosion|              0.0|             5.85|
|       Rock Burst|              1.0|              6.2|
|Nuclear Explosion|              0.3|5.850685714285718|
|       Earthquake|71.31391348140497|5.882762568870756|
+-----------------+-----------------+-----------------+



                                                                                

In [6]:
avg_df.write.format("csv").mode("overwrite").save("./output/avg.csv")

In [7]:
# Categorizing the earthquakes
def catagorize(magnitude):
    if magnitude<6:
        return "Low"
    elif magnitude>=6 and magnitude<8:
        return "Medium"
    else:
        return "High"

categorize_udf = udf(catagorize, StringType())

cat_df = df.withColumn("Category", categorize_udf("Magnitude"))

In [8]:
# Calculating distance from 0,0
dis_df = cat_df.withColumn(
    "Distance", 
    sqrt(col("Latitude") ** 2 + col("Longitude") ** 2)
)

dis_df.show(5)

[Stage 10:>                                                         (0 + 1) / 1]

+----------+--------+--------+---------+----------+-----+---------+-------------------+--------+------------------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Magnitude|          Timestamp|Category|          Distance|
+----------+--------+--------+---------+----------+-----+---------+-------------------+--------+------------------+
|01/02/1965|13:44:18|  19.246|  145.616|Earthquake|131.6|      6.0|1965-01-02 13:44:18|  Medium| 146.8823609968195|
|01/04/1965|11:29:49|   1.863|  127.352|Earthquake| 80.0|      5.8|1965-01-04 11:29:49|     Low|127.36562594750595|
|01/05/1965|18:05:58| -20.579| -173.972|Earthquake| 20.0|      6.2|1965-01-05 18:05:58|  Medium|175.18490809713035|
|01/08/1965|18:49:43| -59.076|  -23.557|Earthquake| 15.0|      5.8|1965-01-08 18:49:43|     Low| 63.59957566682344|
|01/09/1965|13:32:50|  11.938|  126.427|Earthquake| 15.0|      5.8|1965-01-09 13:32:50|     Low|126.98937818967381|
+----------+--------+--------+---------+----------+-----+---------+-----

                                                                                

In [9]:
dis_df.write.format("csv").mode("overwrite").save("./output/final_output.csv")

                                                                                

In [10]:
# Convert Spark DataFrame to Pandas DataFrame
pd_df = dis_df.toPandas()

# Create a Folium map centered around the world
map = folium.Map(location=[0, 0], zoom_start=2)

category_colors = {
    'Low': 'green',
    'Medium': 'orange',
    'High': 'red'
}

# Add markers for each earthquake location
for idx, row in pd_df.iterrows():
    latitude = row['Latitude']
    longitude = row['Longitude']
    category = row['Category']
    
    # Get color based on category
    color = category_colors.get(category, 'gray')
    
    # Create a circle marker
    folium.CircleMarker(
        location=[latitude, longitude],
        radius=2,
        fill=True,
        color=color,
        fill_opacity=0.7
    ).add_to(map)

# Save the map as an HTML file
map.save('./output/earthquakes_map.html')

                                                                                