## Aidetic Data Engineer Assignment

#### Install all required libraries

In [1]:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, FloatType
from pyspark.sql.functions import col, to_timestamp, avg
from pyspark.sql.functions import udf

####  Initialize Spark session

In [2]:
spark = SparkSession.builder.appName("Aidetic").config("spark.driver.memory", "4g").config("spark.executor.memory", "2g").master("local[*]").getOrCreate()

#### Step 1: Load the dataset into a PySpark DataFrame

In [3]:
folder_path = "C:/Users/Navas/PycharmProjects/Data_Engineer_Assessment/"
input_path = folder_path + "Data/database.csv"
output_path = folder_path + "Output"

df = spark.read.csv(input_path, header=True, inferSchema=True)
df= df.select("Date","Time","Latitude","Longitude","Type","Depth","Magnitude")

In [4]:
print("Total count : ",df.count())
df.show(5)
df.printSchema()
for col_name in df.columns:
    null_count = df.filter(col(col_name).isNull()).count()
    print(f"Column '{col_name}' has {null_count} NULL values.")

Total count :  23412
+----------+-------------------+--------+---------+----------+-----+---------+
|      Date|               Time|Latitude|Longitude|      Type|Depth|Magnitude|
+----------+-------------------+--------+---------+----------+-----+---------+
|01/02/1965|2024-02-06 13:44:18|  19.246|  145.616|Earthquake|131.6|      6.0|
|01/04/1965|2024-02-06 11:29:49|   1.863|  127.352|Earthquake| 80.0|      5.8|
|01/05/1965|2024-02-06 18:05:58| -20.579| -173.972|Earthquake| 20.0|      6.2|
|01/08/1965|2024-02-06 18:49:43| -59.076|  -23.557|Earthquake| 15.0|      5.8|
|01/09/1965|2024-02-06 13:32:50|  11.938|  126.427|Earthquake| 15.0|      5.8|
+----------+-------------------+--------+---------+----------+-----+---------+
only showing top 5 rows

root
 |-- Date: string (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: double (nullable = true)
 |-- Magn

#### Step 2: Convert the Date and Time columns into a timestamp column named Timestamp

In [5]:
df=df.withColumn("only_time",F.date_format(df["time"],"HH:mm:ss"))
df.show(3)

df_timestamp = df.withColumn("Timestamp", to_timestamp(F.concat_ws(" ", col("Date"), col("only_time")), "MM/dd/yyyy HH:mm:ss")).drop("only_time")
df_timestamp.show(3)

+----------+-------------------+--------+---------+----------+-----+---------+---------+
|      Date|               Time|Latitude|Longitude|      Type|Depth|Magnitude|only_time|
+----------+-------------------+--------+---------+----------+-----+---------+---------+
|01/02/1965|2024-02-06 13:44:18|  19.246|  145.616|Earthquake|131.6|      6.0| 13:44:18|
|01/04/1965|2024-02-06 11:29:49|   1.863|  127.352|Earthquake| 80.0|      5.8| 11:29:49|
|01/05/1965|2024-02-06 18:05:58| -20.579| -173.972|Earthquake| 20.0|      6.2| 18:05:58|
+----------+-------------------+--------+---------+----------+-----+---------+---------+
only showing top 3 rows

+----------+-------------------+--------+---------+----------+-----+---------+-------------------+
|      Date|               Time|Latitude|Longitude|      Type|Depth|Magnitude|          Timestamp|
+----------+-------------------+--------+---------+----------+-----+---------+-------------------+
|01/02/1965|2024-02-06 13:44:18|  19.246|  145.616|Eart

#### Step 3: Filter the dataset to include only earthquakes with a magnitude greater than 5.0

In [6]:
df_timestamp.select("magnitude").describe().show()
df_filtered = df_timestamp.filter(col("Magnitude")> 5.0)
print("Count after filter : ",df_filtered.count())

+-------+-------------------+
|summary|          magnitude|
+-------+-------------------+
|  count|              23412|
|   mean|  5.882530753460003|
| stddev|0.42306563931579794|
|    min|                5.5|
|    max|                9.1|
+-------+-------------------+

Count after filter :  23412


#### Step 4: Calculate the average depth and magnitude of earthquakes for each earthquake type

In [7]:
df_avg = df_filtered.groupBy("Type").agg(avg("Depth").alias("Average_Depth"), avg("Magnitude").alias("Average_Magnitude"))
df_avg.show()

+-----------------+-----------------+-----------------+
|             Type|    Average_Depth|Average_Magnitude|
+-----------------+-----------------+-----------------+
|        Explosion|              0.0|             5.85|
|       Rock Burst|              1.0|              6.2|
|Nuclear Explosion|              0.3|5.850685714285718|
|       Earthquake|71.31391348140497|5.882762568870756|
+-----------------+-----------------+-----------------+



#### Step 5: Implement a UDF to categorize the earthquakes into levels

In [8]:
def categorize_magnitude(magnitude):
    if magnitude < 6.0:
        return "Low"
    elif 6.0 <= magnitude < 8.0:
        return "Moderate"
    else:
        return "High"

categorize_magnitude_udf = udf(categorize_magnitude, StringType())
df_categorized = df_filtered.withColumn("MagnitudeLevel", categorize_magnitude_udf(col("Magnitude")))

#### Step 6: Calculate the distance of each earthquake from a reference location

In [72]:
df_distance = df_filtered.withColumn("DistanceFromReference", F.sqrt((col("Latitude") - 0)**2 + (col("Longitude") - 0)**2))
df_distance.show(3)

+----------+-------------------+--------+---------+----------+-----+---------+-------------------+---------------------+
|      Date|               Time|Latitude|Longitude|      Type|Depth|Magnitude|          Timestamp|DistanceFromReference|
+----------+-------------------+--------+---------+----------+-----+---------+-------------------+---------------------+
|01/02/1965|2024-02-05 13:44:18|  19.246|  145.616|Earthquake|131.6|      6.0|1965-01-02 13:44:18|    146.8823609968195|
|01/04/1965|2024-02-05 11:29:49|   1.863|  127.352|Earthquake| 80.0|      5.8|1965-01-04 11:29:49|   127.36562594750595|
|01/05/1965|2024-02-05 18:05:58| -20.579| -173.972|Earthquake| 20.0|      6.2|1965-01-05 18:05:58|   175.18490809713035|
+----------+-------------------+--------+---------+----------+-----+---------+-------------------+---------------------+
only showing top 3 rows



#### Step 7: Visualize the geographical distribution of earthquakes

In [80]:
import folium
from folium.plugins import MarkerCluster
# Create a Folium map centered at a specific location (e.g., world center)
earthquake_map = folium.Map(location=[0, 0], zoom_start=2)

# Create a MarkerCluster to group earthquake markers for better visualization
marker_cluster = MarkerCluster().add_to(earthquake_map)

# Add markers for each earthquake to the map
for row in df_distance.collect():
    folium.Marker(
        location=[row["Latitude"], row["Longitude"]],
        popup=f"Magnitude: {row['Magnitude']}, Depth: {row['Depth']}",
    ).add_to(marker_cluster)
earthquake_map

OSError: [Errno 22] Invalid argument

#### Step 8: Save the final CSV


In [17]:
df.coalesce(1).write.mode("overwrite").option("header", "true").format("csv").save(output_path)

#### Stop the Spark session


In [None]:
spark.stop()