In [13]:
import requests
from requests.exceptions import HTTPError, Timeout, RequestException

def make_nws_request(endpoint, user_agent):
    """Function to send HTTP request to NWS. 
    Source: https://www.pythonsnacks.com/p/a-guide-on-using-the-national-weather-service-api-with-python"""
    headers = {
        "User-Agent": user_agent,
    }

    try:
        response = requests.get(
                       endpoint, 
                       headers=headers
                   )
        # Raise HTTPError for bad responses (4xx or 5xx)
        response.raise_for_status()
        return response.json()

    except HTTPError as http_err:
        print(f"HTTP error occurred: {http_err} - Status code: {response.status_code}")
    except Timeout as timeout_err:
        print(f"Request timed out: {timeout_err}")
    except RequestException as req_err:
        print(f"Request error: {req_err}")
    
    return None  # Return None if an error occurred

In [14]:
# Extract data via api, send to gcs bucket
import json
user_agent = 'jacks2224fs@gmail.com'
endpoint = 'https://api.weather.gov/alerts?area=MI&limit=500'

alert_data = make_nws_request(endpoint, user_agent)
print(alert_data)

with open('alert_data.json', 'w') as f:
    json.dump(alert_data['features'], f)




In [15]:
# send raw json to bucket
import json
from google.cloud import storage

def upload_to_gcs(bucket_name, destination_blob_name, alert_data):
    # Initialize a storage client
    storage_client = storage.Client()

    # Get the bucket
    bucket = storage_client.bucket(bucket_name)

    # Create a blob object from the bucket
    blob = bucket.blob(destination_blob_name)

    # Convert the JSON data to a string
    alert_data_str = json.dumps(alert_data['features'])

    # Upload the JSON string to the blob
    blob.upload_from_string(alert_data_str)

    print(f"File uploaded to {destination_blob_name} in bucket {bucket_name}.")


bucket_name = "nws-alerts"
destination_blob_name = "daily-alerts-load.json"

upload_to_gcs(bucket_name, destination_blob_name, alert_data) 


File uploaded to daily-alerts-load.json in bucket nws-alerts.


In [16]:
# Extract data from bucket and transform using pyspark
from pyspark.sql import SparkSession
# Step 1: Create a SparkSession
spark = SparkSession.builder.appName("JSONtoBigQuery").getOrCreate()
# Step 2: Read the JSON file
gcs_json_file_path = "gs://nws-alerts/daily-alerts-load.json"
json_file_path = "alert_data.json"
# Read JSON file into dataframe
df = spark.read.option("multiline", "true").json(json_file_path)
# df = spark.read.json(json_file_path)
# Step 2.1: Check the schema
df.printSchema()


root
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- id: string (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- @id: string (nullable = true)
 |    |-- @type: string (nullable = true)
 |    |-- affectedZones: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- areaDesc: string (nullable = true)
 |    |-- category: string (nullable = true)
 |    |-- certainty: string (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- effective: string (nullable = true)
 |    |-- ends: string (nullable = true)
 |    |-- event: string (nullable = true)
 |    |-- expires: string (nullable = true)
 |    |-- geocode: struct (nullable = true)
 |    |    |-- SAME: array (nullab

In [17]:
df.show(10)

+--------+--------------------+--------------------+-------+
|geometry|                  id|          properties|   type|
+--------+--------------------+--------------------+-------+
|    NULL|https://api.weath...|{https://api.weat...|Feature|
|    NULL|https://api.weath...|{https://api.weat...|Feature|
|    NULL|https://api.weath...|{https://api.weat...|Feature|
|    NULL|https://api.weath...|{https://api.weat...|Feature|
|    NULL|https://api.weath...|{https://api.weat...|Feature|
|    NULL|https://api.weath...|{https://api.weat...|Feature|
|    NULL|https://api.weath...|{https://api.weat...|Feature|
|    NULL|https://api.weath...|{https://api.weat...|Feature|
|    NULL|https://api.weath...|{https://api.weat...|Feature|
|    NULL|https://api.weath...|{https://api.weat...|Feature|
+--------+--------------------+--------------------+-------+
only showing top 10 rows



In [18]:
from pyspark.sql.functions import explode, col
# Clean df

df2 = df.select(col('properties.@id').alias('id'), 
                col('properties.@type').alias('type'), 
                col('properties.areaDesc'), 
                col('properties.geocode.SAME').alias('FIPS'),
                col('properties.affectedZones'),
                col('properties.references'),
                col('properties.sent'),
                col('properties.effective'),
                col('properties.onset'),
                col('properties.expires'),
                col('properties.ends'),
                col('properties.status'),
                col('properties.messageType'),
                col('properties.category'),
                col('properties.severity'),
                col('properties.certainty'),
                col('properties.urgency'),
                col('properties.event'),
                col('properties.senderName'),
                col('properties.headline'),
                col('properties.description'),
                col('properties.instruction'),
                col('properties.response'),
                col('properties.parameters')

)

df2.show(10)

+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------+-----------+--------+--------+---------+--------+--------------------+-------------------+--------------------+--------------------+--------------------+--------+--------------------+
|                  id|    type|            areaDesc|                FIPS|       affectedZones|          references|                sent|           effective|               onset|             expires|                ends|status|messageType|category|severity|certainty| urgency|               event|         senderName|            headline|         description|         instruction|response|          parameters|
+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+

In [None]:
# Load transformed data into BQ

In [None]:
# Insert new data into historical alerts table in BQ

In [None]:
# orchestrate steps w airflow