# <mark></mark>Worldwide Earthquake Events API - Bronze Layer Processing

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType

In [None]:
import requests
import json

# Construct the API URL with start and end dates provided by Data Factory, formatted for geojson output.
url = f"https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&starttime={start_date}-01&endtime={end_date}"

# Make the GET request to fetch data
response = requests.get(url)

# Check if the request was successful
if response.status_code == 200:
    # Get the JSON response
    data = response.json()
    data = data['features']
    
    # Specify the file name (and path if needed)
    file_path = '/lakehouse/default/Files/earthquake_data.json'
    
    # Open the file in write mode ('w') and save the JSON data
    with open(file_path, 'w') as file:
        # The `json.dump` method serializes `data` as a JSON formatted stream to `file`
        # `indent=4` makes the file human-readable by adding whitespace
        json.dump(data, file, indent=4)
        
    print(f"Data successfully saved to {file_path}")
else:
    print("Failed to fetch data. Status code:", response.status_code)

In [None]:
# defining the schema for the JSON data prior to reading as a df
schema = StructType([
  StructField("geometry", StringType()),
  StructField("id", StringType()),
  StructField("properties", StringType()),
  StructField("type", StringType())
])

In [None]:
# df now is a Spark DataFrame containing JSON data from "Files/earthquake_data.json".
df = spark.read.schema(schema).option("multiline", "true").json("Files/earthquake_data.json")

In [None]:
display(df)

In [None]:
# appending the data to the bronze table
df.write.mode('append').saveAsTable('earthquake_events_bronze')