# Washington State DOT Traffic API

source: [https://wsdot.wa.gov/traffic/api](https://wsdot.wa.gov/traffic/api)

In [43]:
import pandas as pd
import json
import logging
import os
import requests

from dotenv import load_dotenv
from time import sleep
from sqlalchemy import create_engine, text

## Import env variables

In [6]:
# load the .env file
load_dotenv()

# API variables
api_key = os.getenv("API_KEY")

# MySQL database variables
db_user = os.getenv("AZURE_USERNAME")
db_pwd = os.getenv("AZURE_PWD")
db_host = os.getenv("AZURE_URL")
db_port = os.getenv("AZURE_PORT")
db_database = os.getenv("AZURE_DB")

## URLs to Acces APIs

In [7]:
TRAVEL_TIMES_URL = "http://wsdot.wa.gov/Traffic/api/TravelTimes/TravelTimesREST.svc/GetTravelTimesAsJson?AccessCode={ACCESSCODE}"

TRAFFIC_ALERTS_URL = "http://www.wsdot.wa.gov/Traffic/api/HighwayAlerts/HighwayAlertsREST.svc/GetAlertsAsJson?AccessCode={ACCESSCODE}"

WEATHER_INFORMATION_URL = "http://wsdot.wa.gov/Traffic/api/WeatherInformation/WeatherInformationREST.svc/GetCurrentWeatherInformationAsJson?AccessCode={ACCESSCODE}"

In [16]:
# customer function to get api data
def get_api_data(url, access_key):
    # create the url with the access key
    url_api = url.format(ACCESSCODE=access_key)
    response = requests.get(url_api)

    # check if request was successful
    if response.status_code == 200:
        # print("Data fetched successfully.")
        data = response.json()
        df = pd.DataFrame(data)
    else:
        print(f"Failed to fetch data. Status code: {response.status_code}")

    # return the dataframe and the status code
    return df, response.status_code


# function to convert dictionary columns from api to json
def convert_dict_to_json(dataframe):

    for col in dataframe.columns:
        if dataframe[col].apply(lambda x: isinstance(x, dict)).all():
            dataframe[col] = dataframe[col].apply(lambda x: json.dumps(x))

In [41]:
# def upload_to_history_table(dataframe, table_name, conn):

#     try:
#         dataframe.to_sql(
#             table_name,
#             con=conn,
#             if_exists="append",
#             index=False,
#         )
#         print(f"{table_name} uploaded successfully.")
#     except Exception as e:
#         print(f"Failed to upload {table_name}. Error: {e}")


# def update_raw_table(dataframe, table_name, conn):

#     try:
#         dataframe.to_sql(
#             table_name,
#             con=conn,
#             if_exists="replace",
#             index=False,
#         )
#         print(f"{table_name} uploaded successfully.")
#     except Exception as e:
#         print(f"Failed to upload {table_name}. Error: {e}")


def upload_dataframe(dataframe, table_name, conn, mode="append"):
    try:
        dataframe.to_sql(
            table_name,
            con=conn,
            if_exists=mode,
            index=False,
        )
        logging.info(f"{table_name} ({mode}) uploaded successfully.")
    except Exception as e:
        logging.error(f"Failed to upload {table_name}. Error: {e}")

In [20]:
# function to get api data
def api_update(
    travel_times_url, traffic_alerts_url, weather_information_url, access_key
):

    # Get current time
    timestamp = pd.Timestamp.now()
    df_tt, response_tt = get_api_data(travel_times_url, access_key)
    df_ta, response_ta = get_api_data(traffic_alerts_url, access_key)
    df_wa, response_wa = get_api_data(weather_information_url, access_key)

    if response_tt == 200 and response_ta == 200 and response_wa == 200:
        status = "Success"
    else:
        status = "Failed"

    # add pull_date_time
    df_tt["timestamp"] = timestamp
    df_ta["timestamp"] = timestamp
    df_wa["timestamp"] = timestamp

    # convert dict columns to json
    convert_dict_to_json(df_tt)
    convert_dict_to_json(df_ta)
    convert_dict_to_json(df_wa)

    df_api_fetch = pd.DataFrame(
        [
            {
                "timestamp": timestamp,
                "travel_times_response": response_tt,
                "traffic_alerts_response": response_ta,
                "weather_alerts_response": response_wa,
                "status": status,
            }
        ]
    )

    return df_api_fetch, df_tt, df_ta, df_wa, status

# Extract and Load

In [44]:
def extract_load_transform():
    SLEEP_TIME = 5

    try:
        # fetch data
        df_api_fetch, df_tt, df_ta, df_wa, status = api_update(
            TRAVEL_TIMES_URL, TRAFFIC_ALERTS_URL, WEATHER_INFORMATION_URL, api_key
        )

        # connect to dateabase
        connection_url = (
            f"mysql+pymysql://{db_user}:{db_pwd}" f"@{db_host}:{db_port}/{db_database}"
        )
        engine = create_engine(connection_url)

        with engine.connect() as conn:
            print("*** Connected to database ***")

            # Append history tables
            print("*** Appending to history tables ***")
            history_tables = {
                "api_fetch_hist": df_api_fetch,
                "time_travel_hist": df_tt,
                "traffic_alerts_hist": df_ta,
                "weather_alerts_hist": df_wa,
            }
            for table, df in history_tables.items():
                upload_dataframe(df, table, conn, "append")

            if status != "Success":
                logging.warning(
                    "API update was unsuccessful. Skipping raw table updates."
                )
                return None

            # Update raw tables
            print("*** Updating raw tables ***")
            raw_tables = {
                "api_fetch": df_api_fetch,
                "time_travel_raw": df_tt,
                "traffic_alerts_raw": df_ta,
                "weather_alerts_raw": df_wa,
            }
            for table, df in raw_tables.items():
                upload_dataframe(df, table, conn, "replace")

            # Wait before calling stored procedures
            sleep(SLEEP_TIME)

            # Call stored procedures
            stored_procedures = [
                "CALL TransformTravelTime",
                "CALL TransformTrafficAlerts",
                "CALL TransformWeatherAlerts",
            ]
            results = [conn.execute(text(sp)) for sp in stored_procedures]

            # Print results
            for sp, result in zip(stored_procedures, results):
                print(f"{sp} result: {result}")

    except Exception as e:
        print("Database operation failed: ", e)
        return None

    return None


extract_load_transform()

*** Connected to database ***
*** Appending to history tables ***
*** Updating raw tables ***
Database operation failed:  (pymysql.err.OperationalError) (1054, "Unknown column 'raw.id' in 'field list'")
[SQL: CALL TransformWeatherAlerts]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
