In [None]:
import sys
import json
import requests
import pandas as pd
from datetime import datetime
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql.functions import col, udf, lit
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType
from awsglue.utils import getResolvedOptions

# Initialize Glue Context
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# PostgreSQL connection details
postgres_connection_options = {
    "url": "jdbc:postgresql://URL",
    "user": "username",
    "password": "pw",
    "dbtable": "fact_transit_weather_traffic"
}

# Helper function to read PostgreSQL tables
def read_postgresql_table(table_name):
    return spark.read \
        .format("jdbc") \
        .option("url", postgres_connection_options["url"]) \
        .option("dbtable", table_name) \
        .option("user", postgres_connection_options["user"]) \
        .option("password", postgres_connection_options["password"]) \
        .load()

# Helper function to write data to PostgreSQL
def write_to_postgresql_table(df, table_name):
    df.write \
        .format("jdbc") \
        .option("url", postgres_connection_options["url"]) \
        .option("dbtable", table_name) \
        .option("user", postgres_connection_options["user"]) \
        .option("password", postgres_connection_options["password"]) \
        .option("truncate", True) \
        .mode("overwrite") \
        .save()

# Fetch Stop Monitoring Data and Convert to DataFrame
def fetch_511_stop_monitoring_to_dataframe():
    try:
        api_key = "apikey"
        operator_id = "AC"
        response = requests.get(
            "http://api.511.org/transit/StopMonitoring",
            params={"api_key": api_key, "agency": operator_id, "format": "json"}
        )
        response.raise_for_status()

        raw_content = response.content.decode('utf-8-sig')
        data = json.loads(raw_content)

        if 'ServiceDelivery' in data and 'StopMonitoringDelivery' in data['ServiceDelivery']:
            monitored_stop_visits = data['ServiceDelivery']['StopMonitoringDelivery']['MonitoredStopVisit']
            flattened_entries = []

            for entry in monitored_stop_visits:
                flattened_entry = {
                    'RecordedAtTime': entry.get('RecordedAtTime'),
                    'LineRef': entry.get('MonitoredVehicleJourney', {}).get('LineRef'),
                    'Longitude': entry.get('MonitoredVehicleJourney', {}).get('VehicleLocation', {}).get('Longitude'),
                    'Latitude': entry.get('MonitoredVehicleJourney', {}).get('VehicleLocation', {}).get('Latitude'),
                    'delay_duration': entry.get('MonitoredVehicleJourney', {}).get('Delay', 0)
                }
                flattened_entries.append(flattened_entry)

            return pd.DataFrame(flattened_entries)
        else:
            print("No vehicle monitoring data found in the response")
            return None

    except requests.exceptions.RequestException as e:
        print(f"Error fetching 511 Stop Monitoring data: {e}")
        return None

# Fetch Weather Data
def fetch_weather_data(lat, lon):
    try:
        api_key = "apikey"
        response = requests.get(
            "https://api.openweathermap.org/data/2.5/weather",
            params={"lat": lat, "lon": lon, "appid": api_key}
        )
        response.raise_for_status()

        data = json.loads(response.content.decode('utf-8-sig'))
        return {
            "weather_description": data["weather"][0]["description"],
            "visibility": data.get("visibility", 0),
            "temperature": round(data["main"]["temp"] - 273.15, 2)
        }
    except requests.exceptions.RequestException as e:
        print(f"Error fetching weather data for coordinates ({lat}, {lon}): {e}")
        return None

# Define UDF for fetching weather data
def fetch_weather_udf(lat, lon):
    weather = fetch_weather_data(lat, lon)
    if weather:
        return (weather["weather_description"], weather["visibility"], weather["temperature"])
    return (None, None, None)

# Register UDF
weather_udf = udf(fetch_weather_udf, StructType([
    StructField("weather_description", StringType(), True),
    StructField("visibility", IntegerType(), True),
    StructField("temperature", FloatType(), True)
]))

# Load data from PostgreSQL
dim_event_df = read_postgresql_table("dim_event")
dim_location_df = read_postgresql_table("dim_location")
dim_event_df.show(5)
dim_location_df.show(5)

# Fetch and process stop monitoring data
stop_monitoring_pd_df = fetch_511_stop_monitoring_to_dataframe()
stop_monitoring_spark_df = spark.createDataFrame(stop_monitoring_pd_df)
stop_monitoring_spark_df.show(5)

# Join stop monitoring data with location dimension
stop_monitoring_with_location = stop_monitoring_spark_df.alias("sm").join(
    dim_location_df.alias("dl"),
    (col("sm.Latitude") == col("dl.latitude")) & (col("sm.Longitude") == col("dl.longitude")),
    "inner"
).select(
    col("sm.LineRef").alias("event_type"),
    col("dl.location_id"),
    col("sm.RecordedAtTime").alias("datetime"),
    col("sm.delay_duration"),
    col("dl.latitude"),
    col("dl.longitude")
)

# Apply weather UDF
stop_monitoring_with_weather = stop_monitoring_with_location.withColumn(
    "weather", weather_udf(col("latitude"), col("longitude"))
)

# Extract weather details from StructType column
stop_monitoring_with_weather = stop_monitoring_with_weather.select(
    col("event_type"),
    col("location_id"),
    col("datetime"),
    col("delay_duration"),
    col("latitude"),
    col("longitude"),
    col("weather.weather_description"),
    col("weather.visibility"),
    col("weather.temperature")
)
stop_monitoring_with_weather.show(n=5,truncate=False)
# Join with event dimension
fact_event_df = stop_monitoring_with_weather.alias("sm").join(
    dim_event_df.alias("de"),
    col("sm.event_type") == col("de.event_type"),
    "inner"
).select(
    col("de.event_id"),
    col("sm.location_id"),
    col("sm.datetime"),
    col("sm.delay_duration"),
    col("sm.weather_description"),
    col("sm.visibility"),
    col("sm.temperature")
)
fact_event_df.show(5)

# Write to PostgreSQL
write_to_postgresql_table(fact_event_df, "fact_transit_weather_traffic")

print("ETL process completed and fact table successfully written to PostgreSQL.")
