In [None]:
# imports
from __future__ import annotations

import math
import requests
import xml.etree.ElementTree as ET
from common import Coordinate, Placemark, heversine_and_azimuth

In [None]:
API_KEY = "0AaKvkxb1Q452IX0NDW0zmN9TBKu1gquS5zoll4d5Ws"
API_URL = "https://atlas.microsoft.com/weather/route/json"
SPEED = 55

In [None]:
def parse_kml_file(filename):
    root = ET.parse(filename).getroot()
    placemarks = []
    for child in root[0]:
        if child.tag == '{http://www.opengis.net/kml/2.2}Placemark':
            # name tag
            name = child.find('{http://www.opengis.net/kml/2.2}name').text
            coords = []
            # coordinates tag
            p_coords = child.find('{http://www.opengis.net/kml/2.2}LineString').find('{http://www.opengis.net/kml/2.2}coordinates')
            for line in p_coords.text.split("\n"):
                line = line.strip() 
                if (len(line) == 0):
                    continue

                parts = line.split(",")
                coords.append(Coordinate(float(parts[0]),float(parts[1]),float(parts[2])))

            placemarks.append(Placemark(name, coords))


    return placemarks

In [None]:
def batch_elems(elems: list, size: int):
    if (len(elems) == 0):
        return []
    
    out = [[]]

    for elem in elems:
        if len(out[-1]) < size:
            out[-1].append(elem)
        else:
            out.append([elem])

    return out

In [None]:
def flat_2d(input: list[list]):
    out = []
    for i in input:
        out.extend(i)

    return out

In [None]:
def weather_data_req(data: list[tuple[Coordinate, float]]):
    query = ":".join(map(lambda d: f"{d[0].lat},{d[0].lon},{d[1]}", data))
    
    params = {
        "api-version": "1.1",
        "subscription-key": API_KEY,
        "query": query
    }

    res = requests.get(API_URL, params)
    if not (res.status_code == 200):
        raise Exception(res)

    # json = res.json()
    # assert(len(json["waypoints"]) == len(data))
    return res

In [None]:
def get_weather_data(coords: list[Coordinate], time_offset: int = 0, speed: float = SPEED):
    if len(coords) == 0: return []

    coords_with_time: list[tuple[Coordinate, float]] = []
    running_distance = 0

    for i in range(len(coords)):
        c1 = coords[i]
        if i > 0:
            c1 = coords[i-1]
            
        c2 = coords[i]
        distance,_ = heversine_and_azimuth(float(c1.lon), float(c1.lat), float(c2.lon), float(c2.lat))
        running_distance = running_distance + distance
        
        coords_with_time.append(
            (
                c2,
                min((running_distance / speed * 60)+time_offset, 120) # max 120 minutes
            )
        )
    
    # list[list[tuple[Coordinate, float]]]
    # [
    #     [
    #         (coord, eta),
    #         (coord, eta),
    #         (coord, eta),
    #         ...
    #     ],
    #     [
    #         (coord, eta),
    #         (coord, eta),
    #         (coord, eta),
    #         ...
    #     ],
    #     ...
    # ]
    batched: list[list[tuple[Coordinate, float]]] = batch_elems(coords_with_time, 60)

    responses = []

    for batch in batched:
        res = weather_data_req(batch)
        if not (res.status_code == 200):
            print(res.json())
            raise Exception(res.json())
        responses.append(res.json())

    # return list of waypoints
    return flat_2d(map(lambda r: r["waypoints"], responses))

In [None]:
# data: [(distance, elevation, cloud_cover_%, wind_dir, wind_speed)]
def write_csv(name: str, data: list[tuple[float, float, float, float, int, int, int]]):
    path = f"data/generated/{name}.csv"

    with open(path, 'w') as csvfile:
        csvfile.write(f'{name}\nLat,Lon,Distance,Azimuth,Elevation,Cloud Cover,Wind Direction,Wind Speed\n')
        for row in data:
            csvfile.write(f"{row[0]},{row[1]},{row[2]},{row[3]},{row[4]},{row[5]},{row[6]},{row[7]}\n")

In [None]:
def calc_distance(coords: list[Coordinate], current_coord: Coordinate):
    sum_distance = 0
    stop_index = coords.index(current_coord)
    for i in range(stop_index+1):
       if i > 0: sum_distance += heversine_and_azimuth(coords[i-1].lon, coords[i-1].lat, coords[i].lon, coords[i].lat)[0]
    return sum_distance * 1000 #convert to meters

def calc_azimuth(coords: list[Coordinate], current_coord: Coordinate):
   current_index = coords.index(current_coord)

   if(current_index < len(coords) - 1):
      next_coord = coords[current_index + 1]

      lat1 = math.radians(current_coord.lat)
      lon1 = math.radians(current_coord.lon)
      lat2 = math.radians(next_coord.lat)
      lon2 = math.radians(next_coord.lon)
      
      dlon = lon2 - lon1
      azimuth = math.degrees(math.atan2(math.sin(dlon)*math.cos(lat2), math.cos(lat1)*math.sin(lat2)-math.sin(lat1)*math.cos(lat2)*math.cos(dlon)))
      azimuth %= 360
      return azimuth
   else:
      return 0

In [None]:
def generate_data(path_to_kml: str):
    print(f"[{path_to_kml}] Generating data...")
    placemarks = parse_kml_file(path_to_kml)

    for placemark in placemarks:
        print(f"[{placemark.name}] Fetching data...")
        #convert elevation to meters
        route_data: list[tuple[float, float, float, float, float]] = list(map(lambda p: (p.lat, p.lon, calc_distance(placemark.coords, p), calc_azimuth(placemark.coords, p), p.elevation * 0.3048), placemark.coords))
        weather_data: list[tuple[int, int ,int]] = list(map(lambda w: (w["cloudCover"], w["wind"]["direction"]["degrees"], w["wind"]["speed"]["value"] if w["wind"]["speed"]["unit"] == "km/h" else w["wind"]["speed"]["value"] * 1.60934), get_weather_data(placemark.coords)))

        assert(len(weather_data) == len(route_data))
        
        csv_data: list[tuple[float, float, float, float, float, int, int, int]] = []
        for i in range(len(route_data)):
            csv_data.append((route_data[i][0],route_data[i][1], route_data[i][2], route_data[i][3], route_data[i][4], weather_data[i][0], weather_data[i][1], weather_data[i][2]))
            
        write_csv(placemark.name, csv_data)
        print(f"[{placemark.name}] Created CSV!")

In [None]:
generate_data("data/Main Route.kml")