In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, current_timestamp
from pyspark.sql.types import StringType
import urllib.request
import json
import time

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("WeatherAlerts") \
    .getOrCreate()

# Function to fetch weather alerts using Weather.gov API
def fetch_weather_alerts(lat, lon):
    try:
        url = f"https://api.weather.gov/alerts/active?point={lat},{lon}"
        headers = {"User-Agent": "MyWeatherApp (johndauphine@hotmail.com)"}
        
        req = urllib.request.Request(url, headers=headers)
        with urllib.request.urlopen(req, timeout=10) as response:
            if response.status == 200:
                data = json.loads(response.read().decode('utf-8'))
                alerts = data.get('features', [])
                if not alerts:
                    return "No alerts"
                # Include both headline and sent date in the output
                return "; ".join([
                    f"{alert.get('properties', {}).get('headline', 'No headline')} (sent: {alert.get('properties', {}).get('sent', 'N/A')})"
                    for alert in alerts
                ])
            else:
                return f"Error: HTTP {response.status}"
    except urllib.error.HTTPError as e:
        return f"HTTPError: {e.code} {e.reason}"
    except urllib.error.URLError as e:
        return f"URLError: {e.reason}"
    except Exception as e:
        return f"Exception: {str(e)}"

# Register the function as a UDF
fetch_weather_alerts_udf = udf(fetch_weather_alerts, StringType())

# Example DataFrame with latitude and longitude

df = spark.read.option("header", "true").option("inferSchema", "true").csv("city-data.csv")

# Add a column with weather alerts (headline and sent date)
df_with_alerts = df.withColumn("alerts", fetch_weather_alerts_udf(col("latitude"), col("longitude")))

# Add a column with the current datetime
df_with_alerts = df_with_alerts.withColumn("queried_at", current_timestamp())

# Show results
df_with_alerts.show(truncate=False)

# Stop SparkSession
spark.stop()