In [5]:
import requests
from requests.structures import CaseInsensitiveDict
import json
import numpy as np
from strava_key_gen import access_token
from typing import Dict, Tuple
import pandas as pd
import polyline
from geopy import distance
import os 
import time
import plotly.express as px
from physics.params import rider_weight,frontal_area,coef_drag,grade,weight_else,coef_rolling_resistance,air_density,coef_drag,drive_losses
from physics.functions import get_power_given_speed, converge_on_speed_given_power


id = 6677392

# retrieve class 
def check_if_segment_in_db(id:int) -> bool:
    return str(id) in os.listdir()

def return_segment_from_api(id: int, access_token: str) -> None:
    url = f"https://www.strava.com/api/v3/segments/{id}"
    headers = CaseInsensitiveDict()
    headers["Authorization"] = f"Bearer {access_token}"
    resp = requests.get(url, headers=headers)
    if not check_if_segment_in_db(id):
        os.mkdir(str(id))
    with open(str(id) + '/strava_json.json', 'w') as f:
        json.dump(resp.json(), f)

def get_name_lat_lon_from_segment_from_json(id:int) ->Tuple[str,list]:
    f = open(str(id) + '/strava_json.json')
    segment = json.load(f)
    return segment['name'], polyline.decode(segment['map']['polyline']) 

def retrieve_segment(id: int, access_token:str) -> list:
    if not check_if_segment_in_db(id):
        return_segment_from_api(id, access_token)
    return get_name_lat_lon_from_segment_from_json(id)
    

def build_api_call_string(lat_lon):
    length = len(lat_lon)
    iterations =int(np.ceil(length/100)) 
    start, end = 0, 100
    string_lists = []
    for i in range(iterations):
        string = ''
        for j in range(start, end):
            lat, lon = lat_lon[j][0], lat_lon[j][1]
            string += str(lat) + ',' + str(lon)+ '|'
        string_lists.append(string[:-1]) # drop the last pipe
        start += 100
        
        if end + 100 > len(lat_lon):
            end = len(lat_lon) 
        else:
            end += 100
    
    return string_lists

# elevations and lat lons 
def get_elevations_lat_lon(string: str):
    url = f"https://api.opentopodata.org/v1/eudem25m?locations={string}"
    resp = requests.get(url)
    resp = resp.json()['results']
    result_list = []
    for i in range(len(resp)):
        result_list.append([resp[i]['elevation'], resp[i]['location']['lat'], resp[i]['location']['lng']])
    return result_list

def check_if_elevation_lat_lon_exists(id):
    return os.path.exists(str(id)+ '/' + 'elevation_lat_lon.npy') 


def run_api_calls(name:str, lat_lon: list) -> np.array:
    if not check_if_elevation_lat_lon_exists(id):
        print(f"getting data for {name}")
        calls = build_api_call_string(lat_lon)
        result_list = []
        for call in calls:
            time.sleep(1) # sleep for 1 second to prevent api limit
            result_list += get_elevations_lat_lon(call)
        ell_np=np.array([np.array(xi) for xi in result_list])
        np.save(str(id)+'/elevation_lat_lon.npy', ell_np)
        return ell_np

    ell_np = np.load(str(id)+'/elevation_lat_lon.npy')
    return ell_np



# deltas 
def check_if_rise_run_grade_exists(id):
    return os.path.exists(str(id)+ '/' + 'rise_run_grade.npy') 

def get_distance_delta(point1: list, point2: list) -> float:
    return distance.distance(point1, point2).km * 1000 # return delta in meters 

def get_elevation_delta(point1_elev, point2_elev) -> float:
    return point2_elev - point1_elev

def get_rise_run_grade(seg_data: np.array):
    if not check_if_rise_run_grade_exists(id):
        rrg_np= np.empty((0, 3), float)
        for i in range(len(seg_data)-1):
            run = get_distance_delta(seg_data[i][1:3], seg_data[i+1][1:3])
            rise = get_elevation_delta(seg_data[i][0], seg_data[i+1][0])

            rrg_np = np.append(rrg_np, np.array([[rise, run,  rise/run *100]]), axis=0)
        
        np.save(str(id)+'/rise_run_grade.npy', rrg_np)
        return rrg_np

    rrg_np = np.load(str(id)+'/rise_run_grade.npy')
    return rrg_np


# calculations 
def get_time_to_complete_seconds(distance: float, speed: float):
    """ distance is expressed in metres, speed is expressed in kmh.
    """
    time = 3600 * (distance / 1000)/speed
    return time

    
def get_time_in_array(target_power, distance, grade):

    speed = converge_on_speed_given_power(target_power = target_power, 
        grade = grade,
        rider_weight = rider_weight,
        weight_else = weight_else,
        coef_rolling_resistance = coef_rolling_resistance,
        air_density = air_density,
        frontal_area = frontal_area,
        coef_drag = coef_drag,
        drive_losses = drive_losses) 

    return get_time_to_complete_seconds(distance, speed)

def get_time_for_segment(target_power, rrg):
    return sum([get_time_in_array(target_power, rrg[i, 1], rrg[i,2]) for i in range(len(rrg))])

def get_segment_summary(rrg):
    dist = round(rrg[:, 1].sum()/1000, 2)
    mean_grade = round(rrg[:, 2].mean(), 2)
    return dist, mean_grade

def main_runner(id:int, access_token: str):
    name, lat_lon = retrieve_segment(id, access_token)
    ell = run_api_calls(name, lat_lon)
    rrg = get_rise_run_grade(ell)

    d, m =get_segment_summary(rrg)

    wkg = []
    time = []
    for wpkg in np.arange(3,8.1, 0.1):
        wkg.append(wpkg)
        target_power = rider_weight * wpkg
        time.append(get_time_for_segment(target_power, rrg)/60)

    fig = px.scatter(y=wkg, x=time, title = f'{name}, {d} km, {m}% average gradient.', labels = {'x': 'Time (minutes)', 'y': 'w/kg'})
    fig.show()


In [6]:
main_runner(id, access_token)


getting data for OFFICIAL 100Climbs No84 Hardknott Pass


In [13]:
name, lat_lon = retrieve_segment(id, access_token)

In [7]:
lat = [x[0] for x in lat_lon]
lon = [x[1] for x in lat_lon]

In [8]:
fig = px.scatter_geo(lat = lat, lon = lon)
fig.show()