### This notebook is the first step toward creating the input for our project


__Main Steps__

* _Libraries_: Run requirements.txt to install the required libraries. It is recommended to use a venv.

* _Load traffic event data_: Traffic events taken from https://smoosavi.org/datasets/lstw - Please run the get_files.sh script to download the required files

* _Load Sunlight data_: Sunlight data was downloaded from https://sunrise-sunset.org/, and it is downloaded/stored in this code

* _Load Weather event data_: Weather events are downloaded dynamically from copernicus(default)/oikolab for specific lat and long. The copernicus data is obtained at 5km grid level, while oikolab can only be obtained at 25km level unless the user has a paid subscription.

* _Construct Feature Vectors for pairs of City-Geohash_: This would be an initial feature csv that only contains time, traffic, and weather information 

#### Imports

In [1]:
from multiprocessing import Pool, cpu_count, Lock
from urllib.request import Request, urlopen
from dotenv import load_dotenv
from bs4 import BeautifulSoup
from datetime import datetime
import xarray as xr
import pandas as pd
import numpy as np
import tempfile
import requests
import cdsapi
import pprint
import pytz
import math
import csv
import ast
import os

#### Defaults

In [2]:
from common import cities, city_keys, years, months

# defaulting to 1st date for both
start_date = pd.to_datetime(f"{years[0]}-{months[0]}-01 00:00:00")
end_date = pd.to_datetime(f"{years[-1]}-{months[-1]}-01 00:00:00")
if start_date == end_date:
    end_date = pd.to_datetime(f"{years[-1]}-{months[-1]}-02 00:00:00")
    month_ranges = [f"{years[0]}-{months[0]}-01/{years[-1]}-{months[-1]}-02"]
else:
    month_ranges = [ # costly requests can take longer, might as well get one month at a time...
        (f'{d.strftime('%Y-%m-01')}/{(d + pd.offsets.MonthBegin(1)).strftime('%Y-%m-01')}')
        for d in pd.date_range(start=start_date, end=end_date, freq='MS') if d < end_date
    ]

class W_MODEL_API:
    OIKO = "OIKO"
    ECMFW = "ECMFW"

In [None]:
month_ranges

In [3]:
dirs = ['../data', '../data/traffic', '../data/weather', '../data/input', '../data/sample_daylight']
for dir in dirs:
    if not os.path.exists(dir):
        os.makedirs(dir)

load_dotenv(dotenv_path=os.path.abspath('../prod.env'))
api_key = os.environ['OIKOLAB_API']
url = 'https://api.oikolab.com/weather' # this is paid, use at your own discretion
weather_model = W_MODEL_API.OIKO

max_cores = len(city_keys) if len(city_keys) < cpu_count() else cpu_count()

In [None]:
with open(os.path.join(os.environ['HOME'],'.cdsapirc'), 'w') as file:
    file.write(f'url: https://cds.climate.copernicus.eu/api\n')
    file.write(f'key: {os.environ['CDS_API']}\n')


weather_model = W_MODEL_API.ECMFW

cdsClient = cdsapi.Client()

# ecmwf_paramters = '141.128/164.128/165.128/166.128/167.128/228089.128/228090.128/228219.128'
# [ # https://confluence.ecmwf.int/display/CKB/ERA5%3A+data+documentation#heading-Parameterlistings
#     "snow_depth", #141 sd
#     "total_cloud_cover", #164 tcc
#     "10m_u_component_of_wind", #165 u10
#     "10m_v_component_of_wind", #166 v10
#     "2m_temperature", # 167 t2m
#     "total_column_rain_water", #228089 tcrw
#     "total_column_snow_water", #228089 tcsw
#     "large_scale_rain_rate", #228219 lsrr
#     "large_scale_snowfall_rate_water_equivalent" #228221 lssfr
# ]

ecmwf_variables = [
    [
        '2m_temperature',
        '10m_u_component_of_wind',
        '10m_v_component_of_wind',
        'instantaneous_10m_wind_gust',
        'snow_depth', 
        'total_cloud_cover'
    ],
    [
        'total_precipitation'
    ],
    # [ # ignoring snowfall in favor of snow depth and precip?
    #     'snowfall'
    # ],
]

#### Sunrise Sunset Data

In [None]:
for (city, city_vals) in cities.items():
    file = open(f"../data/sample_daylight/{city}_{years[0]}{months[0]}01_{years[-1]}{months[-1]}01.csv", "w")
    wr_csv = csv.writer(file)
    is_header = False
    s_month = 1
    for year in range(years[0], years[-1]+1):
        s_end = months[-1] if (len(years) > 1 and years[-1] == year) or (len(years)==1) else 12
        s_start = months[0] if (s_month==1) else 1
        s_month = 0
        for month in range(s_start, s_end+1, 1):
            req = Request(
                url = f'https://sunrise-sunset.org/us/{city_vals['sunrise']}/{year}/{month}', 
                headers={'User-Agent': 'Mozilla/5.0'} # weirdly enough we get auth errors w/o this??
            )
            html = urlopen(req).read()
            soup =  BeautifulSoup(html)
            for span in soup.find_all("span", {'class':'tooltip'}): 
                span.decompose()
            
            table = soup.select_one("table#month")

            if not is_header:
                th_all = table.select("tr.headers th")
                headers = [th.text for th in th_all]
                headers.append('Cities')
                wr_csv.writerow(headers)
                is_header = True
            
            tr_all = table.select("tr.day")
            day = 1
            for tr in tr_all:
                td_all = [td.text for td in tr.select("th,td")]
                td_all[0] = f'{year}-{month}-{day}'
                day += 1
                td_all.append(city)
                wr_csv.writerow(td_all)

    print(file.name)
    file.close()

#### Read the traffic events

In [7]:
t_events = pd.read_csv('../TrafficEvents_Aug16_Dec20_Publish.csv') # This is the latest version of LSTW dataset
# get the data from https://smoosavi.org/datasets/lstw

t_events['StartTime(UTC)'] = pd.to_datetime(t_events['StartTime(UTC)'], utc=True)
t_events['EndTime(UTC)'] = pd.to_datetime(t_events['EndTime(UTC)'], utc=True)
t_events['StartTime'] = t_events['StartTime(UTC)']
t_events['EndTime'] = t_events['EndTime(UTC)']
t_events = t_events[(t_events['StartTime(UTC)'] >= start_date.tz_localize('UTC')) & (t_events['EndTime(UTC)'] < end_date.tz_localize('UTC'))]

t_events.head()

#### Clean the traffic data

In [11]:
def get_subset(city_vals):
    tzone = pytz.timezone(city_vals['timezone'])
    crds = city_vals['coordinates']
    
    subset_all = t_events[(t_events['LocationLat']>crds[0]) & (t_events['LocationLat']<crds[2]) & (t_events['LocationLng']>crds[1]) & 
                    (t_events['LocationLng']<crds[3])]
    subset_all['StartTime'].dt.tz_convert(tzone)
    subset_all['EndTime'].dt.tz_convert(tzone)
    subset_all = subset_all.copy()
    subset_all['Type'] = subset_all['Type'].str.replace('-', '')
    
    subset_accidents = subset_all[(subset_all['Type']=='Accident')]

    return subset_all, subset_accidents

In [12]:
def clean_traffic(city):
    city_vals = cities[city]
    subset_all, subset_accidents = get_subset(city_vals)
    subset_all.to_csv(f'../data/traffic/TD_{city}_{years[0]}{months[0]}01_{years[-1]}{months[-1]}01.csv', index=False)
    if len(subset_all):
        print(f'For {city} we have {len(subset_all)} incidents, with {len(subset_accidents)} accidents! ratio {len(subset_accidents)*1.0/len(subset_all):.2f}')
    return subset_all

In [13]:
tt_events = {}
with Pool(max_cores) as p:
    for i, event in enumerate(p.map(clean_traffic, city_keys)):
        tt_events[city_keys[i]] = event

In [14]:
for city in city_keys:
    pprint.pprint(tt_events[city].head(2))

In [15]:
del t_events # major abuser of memory removed??

#### Get the weather data

In [16]:
class OIKO:
    def __init__(self, city):
        self.city = city

    def get_weather_data(self, sdate, edate):
        lat = []
        long = []
        city_vals = cities[self.city]

        for (la, lo) in city_vals['weather']:
            lat.append(la)
            long.append(lo)
        
        response = requests.get(url,
            params={'param': ['temperature', '10m_wind_gust', 'relative_humidity', 'wind_speed', 'wind_direction', 'total_cloud_cover', 'total_precipitation', 'snowfall'],
                    'lat': lat,
                    'lon': long,
                    'start': sdate,
                    'end': edate,
                    'format': 'csv'},
                    headers={'api-key': api_key}
            )
        
        output = None
        with tempfile.NamedTemporaryFile(mode='w+', delete=True) as tmp:
            tmp.write(response.text)
            output = pd.read_csv(tmp.name)
            
        
        return output

    def sample_weather(self):
        tzone = pytz.timezone(cities[self.city]['timezone'])
        traffic = pd.read_csv(f'../data/traffic/TD_{self.city}_{years[0]}{months[0]}01_{years[-1]}{months[-1]}01.csv')
        if (len(traffic) == 0):
            return pd.DataFrame()
        
        # since weather is obtained based on utc, we use utc. shouldn't matter if the local time is different
        traffic['StartTime(UTC)'] = pd.to_datetime(traffic['StartTime(UTC)'])
        traffic = traffic.sort_values('StartTime(UTC)').set_index('StartTime(UTC)')
        f_out = self.get_weather_data(traffic.index.min().date(), traffic.index.max().date())
        f_out.columns = [col.partition(' ')[0] for col in f_out.columns] # ignore the suffix in col names
        f_out['datetime'] = pd.to_datetime(f_out['datetime'], utc=True)
        f_out['datetime(UTC)'] = f_out['datetime']
        f_out['datetime'].dt.tz_convert(tzone)
        f_out[['LocationLat', 'LocationLng']] = f_out['coordinates'].apply(lambda x: pd.Series(ast.literal_eval(x)) if isinstance(x, str) else pd.Series(x) if isinstance(x, (list, tuple)) and len(x) == 2 else pd.Series([None, None]))
        f_out.drop(columns=['coordinates'], inplace=True)
        f_out.to_csv(f'../data/weather/WD_{self.city}_{years[0]}{months[0]}01_{years[-1]}{months[-1]}01.csv')
        return f_out

In [17]:
class ECMWF:
    def __init__(self, city):
        self.city = city

    def get_weather_data(self):
        coords = cities[self.city]['coordinates']
        datasets = []
        output_nc = []
        output_dirs = []
        index = 0
        for date in month_ranges:
            curr_var_set = []
            for vars in ecmwf_variables:
                output_dirs.append(tempfile.TemporaryDirectory(delete=True))
                output_nc.append(os.path.join(output_dirs[index].name, f'era5_data{index}.nc'))
                cdsClient.retrieve(
                    'reanalysis-era5-single-levels',
                    {
                        'product_type': 'reanalysis',
                        'format': 'netcdf',
                        'variable': vars,
                        'date': date,
                        'time': '00/to/23/by/1',
                        'area': coords,
                        'grid': [0.05, 0.05],
                        'data_format': 'netcdf'
                    },
                    output_nc[index]
                )
                curr_var_set.append(xr.open_dataset(output_nc[index], engine='netcdf4').load())
                index += 1
            
            
            datasets.append(xr.merge(curr_var_set))
            del curr_var_set
        
        df = (xr.merge(datasets)).to_dataframe().reset_index()
        for i in range(len(ecmwf_variables)*len(month_ranges)):
            if i < len(output_nc):
                os.remove(output_nc[i])
            else:
                break

        del datasets
        return df

    def sample_weather(self):
        tzone = pytz.timezone(cities[self.city]['timezone'])
        f_out = self.get_weather_data()
        f_out['time'] = pd.to_datetime(f_out['valid_time'], utc=True)
        f_out['time(UTC)'] = f_out['time']
        f_out['time'].dt.tz_convert(tzone)
        f_out.to_csv(f'../data/weather/WD_{self.city}_{years[0]}{months[0]}01_{years[-1]}{months[-1]}01.csv', index=False)
        return f_out

In [18]:
def get_weather_api(city):
    if weather_model == W_MODEL_API.OIKO:
        # for unit of measurements please refer https://docs.oikolab.com/parameters/
        return OIKO(city)
    else:
        # rate limited to one request per user, so might as well perform single threaded operation
        # for unit of measurements please refer https://cds.climate.copernicus.eu/datasets
        return ECMWF(city)

In [19]:
def sample_weather(city):
    return get_weather_api(city).sample_weather()

In [20]:
wt_events = {}
with Pool(max_cores) as p:
    for i, event in enumerate(p.map(sample_weather, city_keys)):
        wt_events[city_keys[i]] = event

In [21]:
for city in city_keys:
    print(city, wt_events[city].head(2))

#### Generate final input csv

In [5]:
ttype_dict = {
    'Construction': 0, # let's ignore insig events atm
    'Event': 1,
    'FlowIncident': 2,
    'LaneBlocked': 3,
    'BrokenVehicle': 4,
    'Congestion': 5,
    'Accident': 6
}

class Traffic_Event:
    Severity: int = 0
    StartTime: datetime = None
    EndTime: datetime = None
    LocationLat: float = 0
    LocationLng: float = 0
    Type: int = -1
    # Accident: int = 0
    # BrokenVehicle: int  = 0
    # Congestion: int  = 0
    # Construction: int  = 0
    # Event: int  = 0
    # FlowIncident: int  = 0
    # LaneBlocked: int  = 0

    def __init__(self, trafficRow, weatherRow, daylightRow):
        self.Type = ttype_dict[trafficRow['Type']]
        if (self.Type < 2):
            return
        self.StartTime = trafficRow['StartTime']
        self.EndTime = trafficRow['EndTime']
        self.Sunrise = daylightRow['Twilight Start']
        self.Sunset = daylightRow['Twilight End']
        # self.Severity = trafficRow['Severity'] # we can't predict severity, so it's alright i guess??
        if (weather_model == W_MODEL_API.OIKO):
            self.Snowfall = weatherRow['snowfall']
            self.Precipitation = weatherRow['total_precipitation']
            self.CloudCover = weatherRow['total_cloud_cover']
            self.WindDirection = weatherRow['wind_direction']
            self.WindSpeed = weatherRow['wind_speed']
            self.RelativeHumidity = weatherRow['relative_humidity']
            self.WindGust = weatherRow['10m_wind_gust']
            self.Temperature = weatherRow['temperature']

            ### I urge you to modify our code to make these parts work. We are only using ECMWF, so you might have to manually fix the grid
            self.LocationLat = trafficRow['LocationLat']
            self.LocationLng = trafficRow['LocationLng']
        else:
            self.LocationLat = weatherRow['latitude'] # closest 5km grid lat
            self.LocationLng = weatherRow['longitude'] # closest 5km grid lng
            self.SnowDept = weatherRow['sd']
            # if 'sf' in weatherRow:
            #     self.SnowFall = weatherRow['sf']
            self.CloudCover = weatherRow['tcc']
            self.UWind = weatherRow['u10']
            self.VWind = weatherRow['v10']
            if 'i10fg' in weatherRow:
                self.Gust = weatherRow['i10fg']
            self.Temperature = weatherRow['t2m']
            if 'tp' in weatherRow:
                self.Precipitation = weatherRow['tp']

In [6]:
def haversine(lat1, lon1, lat2, lon2):
    lat1 = np.radians(lat1)
    lon1 = np.radians(lon1)
    lat2 = np.radians(lat2)
    lon2 = np.radians(lon2)

    dlat = lat2 - lat1
    dlon = lon2 - lon1

    a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
    c = 2 * np.arcsin(np.sqrt(a))
    return 6371000 * c

In [7]:
file_lock = Lock()

In [8]:
def write_file(path, events: pd.DataFrame):
    with file_lock:
        curr_events = pd.DataFrame()
        if os.path.exists(path) and os.path.getsize(path):
            curr_events = pd.read_csv(path)
        
        if curr_events.empty or not curr_events.columns.equals(events.columns):
            events.to_csv(path, index=False)
        else:    
            pd.concat([events, curr_events], ignore_index=True).to_csv(path, index=False)

In [9]:
tt_events = {}
wt_events = {}
dl_events = {}
for city in city_keys:
    d_curr = pd.read_csv(f"../data/sample_daylight/{city}_{years[0]}{months[0]}01_{years[-1]}{months[-1]}01.csv")
    d_curr['Twilight Start'] = pd.to_datetime(d_curr['Twilight Start'], format='%I:%M:%S %p').dt.strftime('%H:%M:%S')
    d_curr['Twilight End'] = pd.to_datetime(d_curr['Twilight End'], format='%I:%M:%S %p').dt.strftime('%H:%M:%S')
    d_curr['Day'] = pd.to_datetime(d_curr['Day']).dt.date
    t_curr = pd.read_csv(f'../data/traffic/TD_{city}_{years[0]}{months[0]}01_{years[-1]}{months[-1]}01.csv')
    t_curr['StartTime(UTC)'] = pd.to_datetime(t_curr['StartTime(UTC)'], utc=True)
    t_curr['StartDay'] = t_curr['StartTime(UTC)'].dt.date
    t_curr['EndTime(UTC)'] = pd.to_datetime(t_curr['EndTime(UTC)'], utc=True)
    t_curr['StartTime(UTC)'] = t_curr['StartTime(UTC)'].apply(lambda x: x.replace(minute=0, second=0, microsecond=0))
    t_curr['EndTime(UTC)'] = t_curr['EndTime(UTC)'].apply(lambda x: x.replace(minute=0, second=0, microsecond=0))
    w_curr = pd.read_csv(f'../data/weather/WD_{city}_{years[0]}{months[0]}01_{years[-1]}{months[-1]}01.csv')
    w_curr['time(UTC)'] = pd.to_datetime(w_curr['time(UTC)'], utc=True)
    tt_events[city] = t_curr
    wt_events[city] = w_curr
    dl_events[city] = d_curr

In [10]:
def sample_input(args):
    city = args[0]
    start_i = args[1]
    end_i = args[2]
    file = args[3]
    t_curr = tt_events[city]
    w_curr = wt_events[city]
    d_curr = dl_events[city]
    
    events = []
    for t_index in range(start_i, min(len(t_curr), end_i)):
        row = t_curr.iloc[t_index]
        d_match = d_curr[(d_curr['Day'] == row['StartDay'])]
        w_match = w_curr[(w_curr['time(UTC)'] == row['StartTime(UTC)'])].copy()
        w_match['distance'] = haversine(w_match['latitude'], w_match['longitude'], row['LocationLat'], row['LocationLng'])
        if w_match.empty or d_match.empty:
            continue
        
        w_match = w_match[(w_match['distance'].min() == w_match['distance'])]
        te = Traffic_Event(row, w_match.iloc[0], d_match.iloc[0])
        if te.Type < 2:
            continue
        events.append(vars(te))
    
    if len(events) > 0:
        write_file(file, pd.DataFrame(events))

    del events

In [None]:
with Pool(cpu_count()) as p:
    for city in city_keys:
        events = []
        if (len(t_curr) == 0 or len(w_curr) == 0):
            continue

        file = f'../data/input/{city}_{years[0]}{months[0]}01_{years[-1]}{months[-1]}01.csv'
        if os.path.exists(file):
            os.remove(file)
        per_cpu = len(tt_events[city])//cpu_count()
        count = math.ceil(len(tt_events[city])/per_cpu)
        args = [[city, per_cpu*i, per_cpu*(i+1), file] for i in range(count)]
        print(city, count)
        p.map(sample_input, args)

In [29]:
# ## helper terminal??
# # !echo '--- reading traffic events...'
# # !head -n 3 TrafficEvents_Aug16_Dec20_Publish.csv
# # !echo '--- reading weather events...'
# # !head -n 3 WeatherEvents_Aug16_Dec20_Publish.csv
# # !echo '--- traffic event types'
# # !tail -n +2 TrafficEvents_Aug16_Dec20_Publish.csv | awk -F, '{print $2}' | sort | uniq
# # !echo '--- weather event types'
# # !tail -n +2 WeatherEvents_Aug16_Dec20_Publish.csv | awk -F, '{print $2}' | sort | uniq
# # !echo '--- weather event severity'
# # !tail -n +2 WeatherEvents_Aug16_Dec20_Publish.csv | awk -F, '{print $3}' | sort | uniq
# # !tail -n+2 ../data/traffic/TD_Austin*.csv | awk -F, '{print $9,$10}' | sort | uniq | wc -l
# # !tail -n+2 ../data/traffic/TD_Austin*.csv | wc -l


# ## helper account details??
# rr = requests.get('https://api.oikolab.com/account',
#                  headers={'api-key': api_key}
#                  )

# rr.text