## Import üîó

In [40]:
import pyspark
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit, year, weekofyear, rand

## Create a Spark Session instance üìú

In [2]:
spark = SparkSession.builder.appName('quakewatch').getOrCreate()

24/05/11 13:23:44 WARN Utils: Your hostname, Mutasim.local resolves to a loopback address: 127.0.0.1; using 10.48.137.171 instead (on interface en0)
24/05/11 13:23:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/11 13:23:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Some dummy config to remove Spark warnings! (IMO, some of them look like errors and distract me while coding).

%%html
<style>
    div.output_stderr {
    display: none;
}
</style>

## Connect Spark with GCS üçü

In [4]:
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile","./keys/keys.json")

## Read files from GCS üß¨

In [62]:
bucket_name="quakewatch"
path=f"gs://{bucket_name}/significant_earthquakes.csv"

In [8]:
df = spark.read.csv(path, header=True)

                                                                                

In [15]:
# Selecting required columns and renaming them with correct data types
cleanedDF = df.select(
    col("ID").cast("string").alias("id"),
    col("Mag").cast("float").alias("magnitude"),
    col("nst").cast("integer").alias("felt"),
    col("Depth").cast("float").alias("depth"),
    col("Latitude").cast("float").alias("latitude"),
    col("Longitude").cast("float").alias("longitude"),
    col("gap").cast("float").alias("distanceKM"),
    col("locationSource").cast("string").alias("location"),
    col("Place").cast("string").alias("city"),  # Assuming Place represents the city
    col("Place").cast("string").alias("country"),  # Assuming Place represents the country
    col("MagSource").cast("string").alias("continent"),
    year(col("Time")).cast("integer").alias("year"),
    weekofyear(col("Time")).cast("integer").alias("week"),
    col("Time").cast("timestamp").alias("date")
)

In [23]:
cleanedDF.count()

                                                                                

37331

In [29]:
cleanedDF = cleanedDF.withColumn("id", (rand()*100000).cast("integer"))

In [35]:
cleanedDF.head(1)

                                                                                

[Row(id=11657, magnitude=6.099999904632568, felt=119, depth=38.6150016784668, latitude=-6.598599910736084, longitude=132.0762939453125, distanceKM=51.0, location='us', city='130 km SW of Tual, Indonesia', country='130 km SW of Tual, Indonesia', continent='us', year=2023, week=7, date=datetime.datetime(2023, 2, 17, 15, 37, 34, 868000))]

# Filter earthquakes with magnitude greater than 7.0 ü™Ñ

In [36]:
filteredDF = cleanedDF.filter(cleanedDF["magnitude"] > 7.0)
filteredDF.show()

[Stage 18:>                                                         (0 + 1) / 1]

+-----+---------+----+------+--------+---------+----------+--------+--------------------+--------------------+---------+----+----+--------------------+
|   id|magnitude|felt| depth|latitude|longitude|distanceKM|location|                city|             country|continent|year|week|                date|
+-----+---------+----+------+--------+---------+----------+--------+--------------------+--------------------+---------+----+----+--------------------+
|90084|      7.5| 135|  10.0| 38.0235|   37.203|      17.0|      us|4 km SSE of Ekin√∂...|4 km SSE of Ekin√∂...|       us|2023|   6|2023-02-06 16:24:...|
|24261|      7.8| 216|17.943| 37.1662|  37.0421|      17.0|      us|27 km E of Nurda?...|27 km E of Nurda?...|       us|2023|   6|2023-02-06 07:17:...|
| 6990|      7.6| 115|105.14| -7.0652| 130.0047|      21.0|      us|Pulau Pulau Tanim...|Pulau Pulau Tanim...|       us|2023|   2|2023-01-09 23:47:...|
|72905|      7.3| 149|  37.0|-19.2881|-172.1471|      21.0|      us|205 km ESE of Nei.

                                                                                

# Sample 5% of the earthquakes ‚ù§Ô∏è

In [37]:
sampledDF = cleanedDF.sample(fraction=0.05, seed=42)
sampledDF.show()

[Stage 19:>                                                         (0 + 1) / 1]

+-----+---------+----+------+--------+---------+----------+--------+--------------------+--------------------+---------+----+----+--------------------+
|   id|magnitude|felt| depth|latitude|longitude|distanceKM|location|                city|             country|continent|year|week|                date|
+-----+---------+----+------+--------+---------+----------+--------+--------------------+--------------------+---------+----+----+--------------------+
|32982|      6.0|  59|20.094| 38.0302|  37.9636|      63.0|      us|      Central Turkey|      Central Turkey|       us|2023|   6|2023-02-06 16:26:...|
|56476|      5.6| 104|  32.0|-58.7309| -25.4268|      57.0|      us|South Sandwich Is...|South Sandwich Is...|       us|2022|  49|2022-12-11 12:40:...|
|43572|      5.5| 215|  10.0|-26.6668|-114.0671|      57.0|      us|                NULL|                NULL|       us|2022|  49|2022-12-06 00:16:...|
|56726|      5.7| 103|  25.0| -9.8314| 159.7962|      32.0|      us|     Solomon Islands

                                                                                

# Calculate the total felt reports for all earthquakes ‚õìÔ∏è

In [43]:
total_felt_reports = cleanedDF.select(F.sum(cleanedDF["felt"])).collect()[0][0]
print("Total felt reports:", total_felt_reports)

[Stage 27:>                                                         (0 + 2) / 2]

Total felt reports: 1983940




# # Calculate the average depth of earthquakes üß¨

In [54]:
average_depth = cleanedDF.select(F.avg(cleanedDF["depth"])).collect()[0][0]
print("Average depth:", average_depth)



Average depth: 58.58334600287308


                                                                                

# Filter and Group By using Spark SQL ü¶ã

In [58]:
# Register DataFrame as a temporary view
cleanedDF.createOrReplaceTempView("earthquakes")

# Filter earthquakes with magnitude greater than or equal to 6.0
filtered_query = """
    SELECT * 
    FROM earthquakes 
    WHERE magnitude >= 6.0

"""
filtered_sql_df = spark.sql(filtered_query)
filtered_sql_df.show()

[Stage 48:>                                                         (0 + 1) / 1]

+-----+---------+----+-------+--------+---------+----------+--------+--------------------+--------------------+---------+----+----+--------------------+
|   id|magnitude|felt|  depth|latitude|longitude|distanceKM|location|                city|             country|continent|year|week|                date|
+-----+---------+----+-------+--------+---------+----------+--------+--------------------+--------------------+---------+----+----+--------------------+
|11657|      6.1| 119| 38.615| -6.5986| 132.0763|      51.0|      us|130 km SW of Tual...|130 km SW of Tual...|       us|2023|   7|2023-02-17 15:37:...|
|90590|      6.1| 148| 20.088| 12.3238| 123.8662|      47.0|      us|Masbate region, P...|Masbate region, P...|       us|2023|   7|2023-02-16 00:10:...|
| 7084|      6.1|  47|374.033|-29.5218|-177.9727|     178.0|      us|Kermadec Islands,...|Kermadec Islands,...|       us|2023|   7|2023-02-13 15:18:...|
|81104|      6.0| 116|   10.0| 38.0605|   36.537|      29.0|      us|5 km NE of G√

                                                                                

In [60]:
# Group earthquakes by country and calculate the total number of earthquakes in each country
grouped_query = """
    SELECT country, COUNT(*) 
    AS total_earthquakes 
    FROM earthquakes 
    GROUP BY country
"""
grouped_sql_df = spark.sql(grouped_query)
grouped_sql_df.show()



+--------------------+-----------------+
|             country|total_earthquakes|
+--------------------+-----------------+
|11 km E of Dolore...|                1|
|110 km WNW of Kan...|                1|
|Mariana Islands r...|               89|
|23 km NE of K?sh,...|                1|
|230 km SSE of Kat...|                1|
|38 km ESE of Fala...|                1|
|179 km NNW of Ara...|                1|
|28 km SW of Puert...|                1|
|85 km WNW of Sola...|                1|
|66 km WSW of Raba...|                1|
|25 km SE of Lata,...|                2|
|80 km S of P√Ωrgos...|                1|
|8 km NNE of El Co...|                1|
|60 km WSW of Vill...|                1|
|72 km SSE of Fark...|                1|
|9 km NW of Manokw...|                1|
|105 km SSE of Lat...|                1|
|45 km SSE of Sina...|                1|
|177 km E of Tadin...|                1|
|176 km W of Lata,...|                2|
+--------------------+-----------------+
only showing to

                                                                                

## Using a custom function with `yield` to filter earthquakes based on a certain condition üçó

In [61]:
# Define a custom function to filter earthquakes based on magnitude threshold
def filter_earthquakes_by_magnitude_threshold(df, threshold):
    for row in df.collect():
        if row["magnitude"] >= threshold:
            yield row

# Filter earthquakes with magnitude greater than or equal to a threshold
threshold = 5.0
filtered_earthquakes = filter_earthquakes_by_magnitude_threshold(cleanedDF, threshold)

# Print the filtered earthquakes
for earthquake in filtered_earthquakes:
    print("Earthquake with magnitude {} is greater than or equal to {}".format(earthquake["magnitude"], threshold))

                                                                                

Earthquake with magnitude 6.099999904632568 is greater than or equal to 5.0
Earthquake with magnitude 5.599999904632568 is greater than or equal to 5.0
Earthquake with magnitude 6.099999904632568 is greater than or equal to 5.0
Earthquake with magnitude 5.699999809265137 is greater than or equal to 5.0
Earthquake with magnitude 5.599999904632568 is greater than or equal to 5.0
Earthquake with magnitude 6.099999904632568 is greater than or equal to 5.0
Earthquake with magnitude 5.599999904632568 is greater than or equal to 5.0
Earthquake with magnitude 5.900000095367432 is greater than or equal to 5.0
Earthquake with magnitude 5.5 is greater than or equal to 5.0
Earthquake with magnitude 5.5 is greater than or equal to 5.0
Earthquake with magnitude 5.5 is greater than or equal to 5.0
Earthquake with magnitude 5.5 is greater than or equal to 5.0
Earthquake with magnitude 5.5 is greater than or equal to 5.0
Earthquake with magnitude 6.0 is greater than or equal to 5.0
Earthquake with magn

Earthquake with magnitude 5.849999904632568 is greater than or equal to 5.0
Earthquake with magnitude 5.71999979019165 is greater than or equal to 5.0
Earthquake with magnitude 5.920000076293945 is greater than or equal to 5.0
Earthquake with magnitude 6.5 is greater than or equal to 5.0
Earthquake with magnitude 6.269999980926514 is greater than or equal to 5.0
Earthquake with magnitude 5.929999828338623 is greater than or equal to 5.0
Earthquake with magnitude 6.809999942779541 is greater than or equal to 5.0
Earthquake with magnitude 6.269999980926514 is greater than or equal to 5.0
Earthquake with magnitude 5.75 is greater than or equal to 5.0
Earthquake with magnitude 6.369999885559082 is greater than or equal to 5.0
Earthquake with magnitude 6.239999771118164 is greater than or equal to 5.0
Earthquake with magnitude 6.570000171661377 is greater than or equal to 5.0
Earthquake with magnitude 6.389999866485596 is greater than or equal to 5.0
Earthquake with magnitude 6.099999904632

# Store cleaned earthquake data back to Google Cloud Storage (GCS) ü¶Ñ

In [64]:
# Define the output path for storing the transformed data in GCS
output_path = "gs://{}/transformed_significant_earthquake_data".format(bucket_name)

# Write the transformed DataFrame to CSV files in GCS
cleanedDF.write \
    .option("header", "true") \
    .csv(output_path)

print("Transformed earthquake data has been stored in GCS at:", output_path)

                                                                                

Transformed earthquake data has been stored in GCS at: gs://quakewatch/transformed_significant_earthquake_data
