Install and import required Libraries
1. Pyspark
2. Pandas
3. Folium

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=9b2529e917b615b6c0fe29d13c45adfbbe355dc6dfc3a3a7b0e0a1c5aaeb928e
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [104]:
!pip install folium



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.types import *

spark = SparkSession.builder \
        .appName("assignment") \
        .getOrCreate()

In [149]:
import folium
from folium.plugins import MarkerCluster
import pandas as pd

1.Load the  dataset into a PySpark DataFrame.

In [122]:
DF= spark.read.csv("database.csv", header= True)

2.Convert the Date and Time columns into a timestamp column named Timestamp.

In [123]:
DF= DF.withColumn("Timestamp", concat_ws(" ",col("Date"), col("Time")))

In [124]:
DF= DF.withColumn("Timestamp", date_format(to_timestamp(col("Timestamp"), "dd/MM/yyyy HH:mm:ss"),"yyyy-MM-ss HH:mm:ss"))

In [125]:
DF= DF.withColumn("Magnitude",col("Magnitude").cast('double'))\
.withColumn("Latitude",col("Latitude").cast('double'))\
.withColumn("Longitude",col("Longitude").cast('double'))\
.withColumn("Depth",col("Depth").cast('double'))

3.Filter the dataset to include only earthquakes with a magnitude greater than 5.0.

In [126]:
DF= DF.filter((col("Magnitude")>=5.0))

4.Calculate the average depth and magnitude of earthquakes for each earthquake type.

In [145]:
DF_avg= DF.groupBy("Type").agg(avg("Depth").alias("avg_depth"),avg("Magnitude").alias("avg_magnitude"))
DF_avg.show()

+-----------------+-----------------+-----------------+
|             Type|        avg_depth|    avg_magnitude|
+-----------------+-----------------+-----------------+
|        Explosion|              0.0|             5.85|
|       Rock Burst|              1.0|              6.2|
|Nuclear Explosion|              0.3|5.850685714285718|
|       Earthquake|71.31391348140497|5.882762568870756|
+-----------------+-----------------+-----------------+



5.Implement a UDF to categorize the earthquakes into levels (e.g., Low, Moderate, High) based on their magnitudes.

In [128]:
"""The classification starts with “minor” for magnitudes between 3.0 and 3.9,
where earthquakes generally begin to be felt, and ends with “great” for magnitudes greater than 8.0,
 where significant damage is expected."""
def Category(magnitude):
    if magnitude<4.0:
      cat= "Low"
    elif magnitude >=4.0 and magnitude<8.0:
      cat="Moderate"
    else:
      cat= "High"
    return cat

categoryUDF = udf(lambda z: Category(z),StringType())

DF= DF.withColumn("Category", categoryUDF(col("Magnitude")))

In [101]:
DF.show(10)

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+---------------+---------+---------------+----------------+---------+-------------------+--------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|             ID|   Source|Location Source|Magnitude Source|   Status|          Timestamp|Category|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+---------------+---------+---------------+----------------+---------+-------------------+--------+
|01/02/1965|13:44:1

6.Calculate the distance of each earthquake from a reference location (e.g., (0, 0)).

In [129]:
"""Assuming reference point (0,0)"""
DF= DF.withColumn("Distance_from_Reference", sqrt((col("Latitude")**2)+(col("Longitude")**2)))


7.Visualize the geographical distribution of earthquakes on a world map using appropriate libraries (e.g., Basemap or Folium).

In [146]:
lat=DF.select(col("Latitude")).rdd.flatMap(lambda x:x).collect()


In [131]:
lon=DF.select(col("Longitude")).rdd.flatMap(lambda x:x).collect()

In [140]:
map= folium.Map(location=[0,0], tiles='CartoDB dark_matter',zoom_starts=1)
marker_cluster = MarkerCluster().add_to(map)

In [141]:
for x,y in zip(lat, lon):
  folium.Marker([x,y]).add_to(marker_cluster)
map

8.Please include the final csv in the repository.

In [151]:
map.save("map.html")
df= DF.toPandas()

In [157]:
df.to_csv('final_data.csv', index= False)