In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, col, to_timestamp, lit, avg, udf, sqrt
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Create a Spark session
spark = SparkSession.builder.master("local[2]").appName("Aidetic").getOrCreate()
csv_path = "data/database.csv"

custom_schema = StructType([
    StructField("Date", StringType(), True),
    StructField("Time", StringType(), True),
    StructField("Latitude", DoubleType(), True),
    StructField("Longitude", DoubleType(), True),
    StructField("Type", StringType(), True),
    StructField("Depth", DoubleType(), True),
    StructField("Depth Error", DoubleType(), True),
    StructField("Depth Seismic Stations", DoubleType(), True),
    StructField("Magnitude", DoubleType(), True),
    StructField("Magnitude Type", StringType(), True),
    StructField("Magnitude Error", DoubleType(), True),
    StructField("Magnitude Seismic Stations", DoubleType(), True),
    StructField("Azimuthal Gap", DoubleType(), True),
    StructField("Horizontal Distance", DoubleType(), True),
    StructField("Horizontal Error", DoubleType(), True),
    StructField("Root Mean Square", DoubleType(), True),
    StructField("ID", StringType(), True),
    StructField("Source", StringType(), True),
    StructField("Location Source", StringType(), True),
    StructField("Magnitude Source", StringType(), True),
    StructField("Status", StringType(), True),
])


# Read the CSV file into a DataFrame
# df = spark.read.csv(csv_path, header=True, inferSchema=True)
df = spark.read.csv(csv_path, header=True, schema=custom_schema)
df.show()

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+
|01/02/1965|13:44:18|  19.246|  145.616|Earthquake|131.6|       NULL|                  NULL

In [3]:
df = df.withColumn("Timestamp", to_timestamp(concat(col("Date"), lit(" "), col("Time")), "MM/dd/yyyy HH:mm:ss"))
df.show()

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|          Timestamp|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+
|01/02/1965|13:44:18|  19.246| 

In [4]:
df = df.filter(df.Magnitude > 5.0)
df.show()

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|          Timestamp|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+
|01/02/1965|13:44:18|  19.246| 

In [5]:
unique_values = df.select("Type").distinct().collect()
unique_values
result = df.groupBy("Type").agg(avg("Depth").alias("Average_Depth"), avg("Magnitude").alias("Average_Magnitude"))
result.show()

+-----------------+-----------------+-----------------+
|             Type|    Average_Depth|Average_Magnitude|
+-----------------+-----------------+-----------------+
|        Explosion|              0.0|             5.85|
|       Rock Burst|              1.0|              6.2|
|Nuclear Explosion|              0.3|5.850685714285718|
|       Earthquake|71.31391348140497|5.882762568870756|
+-----------------+-----------------+-----------------+



In [6]:
def categorize_magnitude(magnitude):
    if magnitude < 5.0:
        return "Low"
    elif magnitude < 7.0:
        return "Moderate"
    else:
        return "High"

# Register the UDF
categorize_magnitude_udf = udf(categorize_magnitude, StringType())

# Apply the UDF to create a new column "Magnitude_Level"
df = df.withColumn("Magnitude_Level", categorize_magnitude_udf("Magnitude"))
df.show()

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+---------------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|          Timestamp|Magnitude_Level|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+

In [10]:
reference_location = (0.0, 0.0)
df = df.withColumn("Distance_From_Reference", sqrt((col("Latitude") - reference_location[0])**2 + (col("Longitude") - reference_location[1])**2))
df.show()

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+---------------+-----------------------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|          Timestamp|Magnitude_Level|Distance_From_Reference|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------

In [45]:
df.write.csv("Final", header=True)

In [None]:
import folium
from folium.plugins import MarkerCluster 
map_center = [0, 0]
mymap = folium.Map(location=map_center, zoom_start=2)

# Create a MarkerCluster layer for better performance
marker_cluster = MarkerCluster().add_to(mymap)

# Iterate through the DataFrame and add markers to the map
for row in df.collect():
    lat, lon, mag, depth = row["Latitude"], row["Longitude"], row["Magnitude"], row["Depth"]
    folium.Marker(location=[lat, lon], popup=f"Magnitude: {mag}, Depth: {depth}").add_to(marker_cluster)

# Save the map to an HTML file
mymap.save("earthquake_map.html")