# Initializers

These cells should be run before attempting to request any data from [weather.gov](http://weather.gov/), as they provide necessary functions and variables for the requesting and formatting code.

### Note:

If you migrate the python code from this notebook to a regular python file (`.py`), replace all occurences of `StopExecution(..)` calls with `quit()` to get the expected behaviour.

In [1]:
class StopExecution(Exception):
    def _render_traceback_(self):
        return []

### Libraries

In [2]:
from dataclasses            import dataclass
from datetime               import datetime

import math
import pytz
import requests
import time

import matplotlib.pyplot    as plt
import numpy                as np

import private              as private_constants

### Constants

If you're not me, you won't have a python file called "private.py" in the same folder. To make this work, replace the values of `private_constants.lattitude` and `private_constants.longitude` with **your** lattitude and longitude. To obtain this information, just go to [weather.gov](http://weather.gov) and get the forecast for your area by providing your zip code. On the forecast page, it'll list your location's lattitude and longitude. Also: comment out the line in the section above (should be **line 12**, or the last line in the cell) that says `import private as private_constants`. Alternatively, you can add a file named `private.py` that contains just two entries, `lattitude` and `longitude` with the values.

The API only supports up to $4$ decimals of precision, so round to the nearest digit. For example:

$$123.456789\quad\rightarrow\quad 123.4568$$

**If you do not respect the API, it will return an invalid response.**

In [3]:
HTML_OK_CODE            = 200
HTML_ERROR_CODE         = 404

LATTITUDE               = private_constants.lattitude
LONGITUDE               = private_constants.longitude

DATETIME_FORMAT         = "%Y-%m-%d %H:%M"
LOCAL_TIME_ZONE         = pytz.timezone(private_constants.timezone)

LAST_TIME_QUERIED       = None #TODO:: Replace with checks for data file to alleviate the dependence on the notebook's memory
FORECAST_VALID_UNTIL    = None

DEGREE_SIGN             = u'\N{DEGREE SIGN}'

### DateTime

In [4]:
def convert_API_dates_to_datetime(startDate: str):
    startDate, _, junk = startDate.partition("+")
    date, time = startDate.split(sep="T")
    hour, minute, *more_junk = time.split(sep=":")

    # Uncomment to see how something like "2024-12-16T18:00:00-08:00" is parsed
    # print(date)
    # print(time)
    # print(time.split(sep=":"))

    #Create the datetime object to handle timezone changing and formatting for us
    dt = datetime.strptime(f'{date} {hour}:{minute}', DATETIME_FORMAT)

    dt = LOCAL_TIME_ZONE.localize(dt)
    #dt = dt.astimezone(LOCAL_TIME_ZONE)
    
    return dt.strftime(DATETIME_FORMAT)

### Wind Chill

In [7]:
def calculate_wind_chill(temperature, wind_speed):
    #Source: https://www.weather.gov/epz/wxcalc_windchill
    if temperature <= 50:
        return round(35.74 + (0.6125 * temperature) - (35.75 * pow(wind_speed, 0.16))+ (0.4275 * temperature * pow(wind_speed, 0.16)))
    else:
        return 0

### JSON Parsing

In [5]:
@dataclass
class ForecastData:
    name: str
    time: datetime

    temperature: int
    temperature_unit: str
    wind_chill: int

    wind_speed: int
    wind_unit: str
    wind_direction: str

    #dew_point: float
    percent_precipitation: int

    forecast_short: str
    forecast_detail: str

In [6]:
def parse_raw_json_to_dataclass(raw_data):
    list_of_forecast_data = { }

    for data in raw_data:
        converted_time = convert_API_dates_to_datetime(data["startTime"])
        #print(data["windSpeed"].split(sep=" "))
        
        wind_data = data["windSpeed"].split(sep=" ")

        wind_speed = int(wind_data[0])
        wind_unit = wind_data[1]

        if len(wind_data) > 2:
            wind_speed = round((int(wind_data[0]) + int(wind_data[2]))/2)
            wind_unit = wind_data[3]
            #print(f'{wind_speed}, {wind_unit}')

        list_of_forecast_data[converted_time] = ForecastData(
            name                    = data["name"],
            time                    = converted_time,

            temperature             = data["temperature"],
            temperature_unit        = data["temperatureUnit"],
            wind_chill              = calculate_wind_chill(temperature=data["temperature"], wind_speed=wind_speed),

            wind_speed              = wind_speed,
            wind_unit               = wind_unit,
            wind_direction          = data["windDirection"],
            
            #dew_point               = data["dewpoint"]["value"],
            percent_precipitation   = data["probabilityOfPrecipitation"]["value"],

            forecast_short          = data["shortForecast"],
            forecast_detail         = data["detailedForecast"]
        )

    return list_of_forecast_data

### Data Request Helpers

In [8]:
def is_good_response(response, error_message):
    if response.status_code == HTML_ERROR_CODE:
        print(f'{error_message}')
        print(f'\nError message: \n  {response.json()["detail"]}')
        return False
    else:
        print(f'\t\tData recieved.')
        return True

def is_reasonable_to_request_new_forecast():
    global LAST_TIME_QUERIED, FORECAST_VALID_UNTIL
    
    if LAST_TIME_QUERIED is None:
        LAST_TIME_QUERIED = datetime.now()

    print(f'Last time queried:            {datetime.strftime(LAST_TIME_QUERIED, DATETIME_FORMAT)}')
    
    if (FORECAST_VALID_UNTIL is None) == True:
        print(f'No current forecast data, request is reasonable.')
        return True

    print(f'Forecast Data is valid until: {datetime.strftime(FORECAST_VALID_UNTIL, DATETIME_FORMAT)}\n')

    time_difference = FORECAST_VALID_UNTIL - LAST_TIME_QUERIED

    if (time_difference.total_seconds() <= 0) == True:
        return True
    else:
        print(f'Please wait until {FORECAST_VALID_UNTIL.strftime(DATETIME_FORMAT)} before trying to get new forecast data.')
        return False

# Data Request

**Note:** Be very careful about running this cell often. It should have built in functionality to prevent too many requests to the API, but if you wipe the notebook's memory, you'll be overriding the protection measures. Try to re-run as infrequently as possible.

In [None]:
response = None

if is_reasonable_to_request_new_forecast() == True:
    points_url = f'https://api.weather.gov/points/{LATTITUDE},{LONGITUDE}'
    print(f'Getting data for {LATTITUDE,LONGITUDE}...\n')

    response = requests.get(url=points_url)

if response is None:
    #You're most likely attempting to retrieve forecast data while the currently loaded data set is still valid
    raise StopExecution

LAST_TIME_QUERIED = datetime.now()

if (response.status_code == HTML_ERROR_CODE) == True:
    print("Something went wrong with requesting data. Please verify the lattitude and longitude are correct before retrying.")
    print(f'\tService Error Message:\n\t\t{response.json()["detail"]}')
    raise StopExecution
    
if (response.status_code == HTML_OK_CODE) == True:
    points_data = response.json()

    forecast_urls = {
        "extended" : points_data["properties"]["forecast"],
        "hourly" : points_data["properties"]["forecastHourly"]
    }

    raw_forecast_data = {
        "extended" : { },
        "hourly" : { }
    }

    for forecast_type, forecast_url in forecast_urls.items():
        #Add delay to prevent requesting too quickly, otherwise Weather.gov will return invalid responses
        time_delay = 5
        print(f'\tWaiting {time_delay} seconds before sending request to avoid exceeding the request limit.')
        time.sleep(time_delay)

        #Request the forecast
        print(f'\t\tRequesting the {forecast_type} forecast...')
        raw_forecast_data[forecast_type] = requests.get(url=forecast_url)

        if is_good_response(raw_forecast_data[forecast_type], f'Verify that the grid coordinates for the specified lattitude and longitude {LATTITUDE,LONGITUDE} were retreived correctly.'):
            print(f'\tRequest "{forecast_type}" completed.\n')
        else:
            #We exit the loop if there is an issue to avoid sending another invalid request
            raise StopExecution
    
    FORECAST_VALID_UNTIL = datetime.strptime(convert_API_dates_to_datetime(raw_forecast_data["hourly"].json()["properties"]["validTimes"]), DATETIME_FORMAT)
    print(f'\nThe following forecast data is considered valid until: {FORECAST_VALID_UNTIL.strftime(DATETIME_FORMAT)}')

# Data Conversion

Now that we have the data, we can format it into something readable.

In [10]:
hourly_data = { }
extended_data = { }

dates_and_times = { }

In [11]:
for type, raw_data in raw_forecast_data.items():
    unparsed_data = raw_data.json()
    if type == "hourly":
        hourly_data = parse_raw_json_to_dataclass(unparsed_data["properties"]["periods"])
    else:
        extended_data = parse_raw_json_to_dataclass(unparsed_data["properties"]["periods"])



for forecast_datetime in hourly_data.keys():
    fdate, ftime = forecast_datetime.split(sep= " ")
    current_date = fdate

    if fdate not in dates_and_times:
        dates_and_times[fdate] = [ ]
    
    if ftime not in dates_and_times[fdate]:
        dates_and_times[fdate].append(ftime)

temperature_unit = hourly_data[list(hourly_data.keys())[0]].temperature_unit

# Hourly Forecast

### As Raw Text

In [12]:
def print_forecast_as_text():
    for dates, times in dates_and_times.items():
        print(f'{dates}\'s Forecast by the Hour:')
        for t in times:
            rebuilt_date_time = f'{dates} {t}'
            print(f'\t{t}: {hourly_data[rebuilt_date_time].forecast_short}')
            print(f'\t\tTemperature:    {hourly_data[rebuilt_date_time].temperature} {hourly_data[rebuilt_date_time].temperature_unit}')
            print(f'\t\tChance of rain: {hourly_data[rebuilt_date_time].percent_precipitation} %')
        print("---\n")

#### Output

In [None]:
print_forecast_as_text()

### Column Text

In [14]:
def print_forecast_as_column_text():
    for dates, times in dates_and_times.items():
        print(f'{dates}\'s Forecast by the Hour:')
        for t in times:
            print(f'{t}:')
        for t in times:
            rebuilt_date_time = f'{dates} {t}'
            print(f'{hourly_data[rebuilt_date_time].forecast_short}')
            print(f'Temperature:    {hourly_data[rebuilt_date_time].temperature} {hourly_data[rebuilt_date_time].temperature_unit}')
            print(f'Chance of rain: {hourly_data[rebuilt_date_time].percent_precipitation} %')
        print("---\n")

#### Output

In [None]:
print_forecast_as_column_text()

### As a Markdown Table

In [16]:
def print_forecast_as_markdown():
    for dates, times in dates_and_times.items():
        print(f'# {dates}\'s Forecast by the Hour:\n')
        print(f'| Time | ({DEGREE_SIGN}{temperature_unit}): Temperature, Wind Chill | Chance of Rain (%) | Forecast Comments |')
        print(f'| ---- | ---------------------------------------------- | ------------------ | ----------------- |')
        for t in times:
            rebuilt_date_time = f'{dates} {t}'
            temperature_and_windchill = f'{hourly_data[rebuilt_date_time].temperature}'
            if (hourly_data[rebuilt_date_time].wind_chill == 0) == False:
                temperature_and_windchill += f', {hourly_data[rebuilt_date_time].wind_chill}'
            forecast_comments = hourly_data[rebuilt_date_time].forecast_short
            if (hourly_data[rebuilt_date_time].forecast_detail == "") == False:
                forecast_comments += f'<br>{hourly_data[rebuilt_date_time].forecast_detail}'
            print(f'| {t} | {temperature_and_windchill} | {hourly_data[rebuilt_date_time].percent_precipitation} | {forecast_comments} |')
        print("\n")

#### Output

In [None]:
print_forecast_as_markdown()

# Graphs

In [18]:
todays_date = next(iter(dates_and_times))
todays_date = list(dates_and_times.keys())[1]

In [None]:
hourly_times = np.array(dates_and_times[todays_date])
hourly_temps = [ ]
hourly_windchill = [ ]

for hour in hourly_times:
    rebuilt_dt_str = f'{todays_date} {hour}'
    hourly_temps.append(hourly_data[rebuilt_dt_str].temperature)
    if (hourly_data[rebuilt_dt_str].wind_chill > 0) == True:
        hourly_windchill.append(hourly_data[rebuilt_dt_str].wind_chill)
    else:
        hourly_windchill.append(float('nan'))

hourly_temperature_vs_windchill = [ ]

for temp, chill in zip(hourly_temps, hourly_windchill):
    if math.isnan(chill) == False:
        hourly_temperature_vs_windchill.append((temp - chill) * 0.5)
    else:
        hourly_temperature_vs_windchill.append(float('nan'))

hourly_temps = np.array(hourly_temps)
hourly_windchill = np.array(hourly_windchill)
hourly_temperature_vs_windchill = np.array(hourly_temperature_vs_windchill)

print(todays_date)
# for i in range(0, len(dates_and_times[todays_date])):
#     if (hourly_windchill[i] > 0) == True:
#         print(f'At {hourly_times[i]} it feels like {hourly_windchill[i]}{DEGREE_SIGN}F, when its really {hourly_temps[i]}{DEGREE_SIGN}F')
#     else:
#         print(f'At {hourly_times[i]} its {hourly_temps[i]}{DEGREE_SIGN}F')

In [20]:
lolims = np.zeros(shape=np.shape(hourly_windchill), dtype=bool)
uplims = np.zeros(shape=np.shape(hourly_windchill), dtype=bool)

for i in range(0, len(dates_and_times[todays_date])):
    if (hourly_windchill[i] > 0) == True:
        uplims[i] = True

windchill_linestyle = 'dotted'

In [None]:
fig, ax = plt.subplots(nrows=1, ncols=1)

ax.errorbar(
    x=hourly_times,
    y=hourly_temps,
    yerr=hourly_temperature_vs_windchill,
    uplims=uplims,
    marker='o',
    markersize=8,
    linestyle=windchill_linestyle
)

ax.plot(
    hourly_times,
    hourly_windchill,
    marker='o',
    markersize=8,
    linestyle=windchill_linestyle
)
ax.set_xlim((hourly_times[0], hourly_times[-1]))

ymin = hourly_temps.min() - 1
if np.nanmin(hourly_windchill) < ymin:
    ymin = np.nanmin(hourly_windchill) - 1
ymax = hourly_temps.max() + 1

ax.set_ylim((ymin, ymax))

ax.set_title('Hourly Temperature Forecast (with Windchill)')
plt.xticks(rotation=45)
plt.savefig('Hourly Forecast.png', dpi=300, format='png', facecolor='w')