This notebook is for plotting a known route and returning a routecard.
You can select which API service to getthe route and elevations from.

In [0]:
from pyspark.sql.functions import col, expr, first
from pyspark.sql import functions as F, DataFrame, Window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, FloatType, ArrayType, TimestampType
from datetime import datetime, time, timedelta
import requests 
import time
import builtins
import json
import itertools
from functools import reduce
import numpy as np

In [0]:
dbutils.widgets.dropdown("Competition_Year", "2025", ["2025", "2024", "2019", "2018", "2017"])
Competition_Year = dbutils.widgets.get("Competition_Year")

dbutils.widgets.dropdown("API_Service", "Google Maps", ["Bing Maps","Google Maps", "OpenRouteService"])
Mapping_ = dbutils.widgets.get("API_Service") 

dbutils.widgets.text("Dwell", "7")
StopTimeAtCheckPoints = dbutils.widgets.get("Dwell")

dbutils.widgets.text("Speed", "5.30")
Speed = dbutils.widgets.get("Speed")

dbutils.widgets.text("StartTime", "10:00:00")
StartTime =  datetime.strptime( dbutils.widgets.get("StartTime"), '%H:%M:%S')
FinishTime = StartTime + timedelta(hours=7)

dbutils.widgets.text("Naismith", "10")
AddMinutesPer10mHeight = dbutils.widgets.get("Naismith")

dbutils.widgets.dropdown("API_Service", "Google Maps", ["Bing Maps","Google Maps", "OpenRouteService"])
API_Service = dbutils.widgets.get("API_Service") 

In [0]:
OpenRouteServiceAPIKey = dbutils.secrets.get(scope = "allpay-dwh01-scope", key = "OpenRouteServiceAPIKey")
BingMapsAPIKey = dbutils.secrets.get(scope = "allpay-dwh01-scope", key = "BingMapsAPI-Key")
GoogleMapsAPIKey = dbutils.secrets.get(scope = "allpay-dwh01-scope", key = "GoogleMapsAPIKey")
OrdnanceSurveyMapsAPIKey = dbutils.secrets.get(scope = "allpay-dwh01-scope", key = "OrdnanceSurveyMapsAPIKey")

In [0]:
databricksServicePrincipalClientId = dbutils.secrets.get(scope = "allpay-dwh01-scope", key = "Databricks-Client-Id")
databricksServicePrincipalClientSecret = dbutils.secrets.get(scope = "allpay-dwh01-scope", key = "Databricks-Client-Secret")
azureADTenant = dbutils.secrets.get(scope = "allpay-dwh01-scope", key = "AzureAD-Tenant-GUID")
endpoint = "https://login.microsoftonline.com/" + azureADTenant + "/oauth2/token"
Folder = "abfss://qaha7ai04@allpaydwh01datalake.dfs.core.windows.net"
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", databricksServicePrincipalClientId)
spark.conf.set("fs.azure.account.oauth2.client.secret", databricksServicePrincipalClientSecret)
spark.conf.set("fs.azure.account.oauth2.client.endpoint", endpoint)

In [0]:
opening_file_path = Folder + "/Dovetrek/Openings_" + Competition_Year + ".csv"

# Read csv files into PySpark DataFrames
openings_df = spark.read.csv(opening_file_path, header=True, inferSchema=True)


In [0]:
openings_df = openings_df.withColumn("NGR_Easting", F.concat(F.lit("4"), F.substring(openings_df.BNG, 1, 3), F.lit("00")).cast("double")) \
                    .withColumn("NGR_Northing", F.concat(F.lit("3"), F.substring(openings_df.BNG, 5, 3), F.lit("00")).cast("double"))
display(openings_df)

CP,BNG,1000,1030,1100,1130,1200,1230,1300,1330,1400,1430,1500,1530,1600,1630,1700,NGR_Easting,NGR_Northing
Start,258 779,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,425800.0,377900.0
CP1,251 801,0,0,1,1,1,0,0,0,0,0,1,1,1,1,1,425100.0,380100.0
CP2,265 801,1,1,1,1,1,1,0,0,0,1,1,0,0,0,0,426500.0,380100.0
CP3,268 789,1,1,0,0,0,0,0,0,0,0,0,0,1,1,1,426800.0,378900.0
CP4,262 747,0,0,0,1,1,0,0,0,1,1,1,1,1,0,0,426200.0,374700.0
CP5,254 775,0,0,0,0,0,1,1,0,0,0,0,0,0,0,0,425400.0,377500.0
CP6,251 786,0,1,1,0,0,1,1,1,1,1,0,0,1,0,0,425100.0,378600.0
CP7,247 801,0,0,1,1,1,1,1,1,0,0,0,0,0,1,1,424700.0,380100.0
E1,251 807,0,1,1,1,0,0,1,1,1,0,0,1,1,1,0,425100.0,380700.0
E2R,274 814,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0,427400.0,381400.0


In [0]:
LatLong_Df = spark.createDataFrame(
    [(1, 1.0, 1.0)], 
    schema='CP string, Latitude float, Longitude float'
)
LatLong_Df = LatLong_Df.filter('1!=1')

# Convert DataFrame to a list
gridrefs_list = openings_df.select(
    openings_df.CP, 
    openings_df.NGR_Easting, 
    openings_df.NGR_Northing
).collect()

# Iterate over the list
for row in gridrefs_list:
    # Perform desired operations with the column values
    resp = requests.get(
        "http://webapps.bgs.ac.uk/data/webservices/CoordConvert_LL_BNG.cfc?method=BNGtoLatLng&easting=" 
        + str(row.NGR_Easting) 
        + "&northing=" 
        + str(row.NGR_Northing)
    )
    
    # Parse the JSON response
    json_resp = json.loads(resp.text)

    # Check if 'LATITUDE' and 'LONGITUDE' keys exist in the response
    if 'LATITUDE' in json_resp and 'LONGITUDE' in json_resp:
        # Extract latitude and longitude values
        latitude = json_resp['LATITUDE']
        longitude = json_resp['LONGITUDE']
        cp = row["CP"]
        
        df = spark.createDataFrame(
            [(cp, latitude, longitude)], 
            'CP string, Latitude float, Longitude float'
        )
        LatLong_Df = LatLong_Df.union(df)

openings_df = openings_df.join(LatLong_Df, openings_df.CP == LatLong_Df.CP).drop(LatLong_Df.CP)

LatLong_Df.unpersist()
# Show DataFrame
display(openings_df)

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
# So this isn't the best way to do this, i realise, but it is reused code from another part of the project and doesn't take too long so we may as well just get every combination and run it through the api

StartCPs = openings_df.select("CP").withColumnRenamed("CP","StartCP")
FinishCPs = openings_df.select("CP").withColumnRenamed("CP","FinishCP")
CP_Combinations_DF = StartCPs.crossJoin(FinishCPs)

StartCPs.unpersist
FinishCPs.unpersist

CP_Combinations_DF = CP_Combinations_DF.alias("df").join(openings_df.select("CP", "Latitude", "Longitude").withColumnRenamed('Latitude','StartLatitude').withColumnRenamed('Longitude','StartLongitude'), CP_Combinations_DF.StartCP == openings_df.CP).drop("CP")

CP_Combinations_DF = CP_Combinations_DF.alias("df").join(openings_df.select("CP", "Latitude", "Longitude").withColumnRenamed('Latitude','FinishLatitude').withColumnRenamed('Longitude','FinishLongitude'), CP_Combinations_DF.FinishCP == openings_df.CP).drop("CP")

CP_Combinations_DF = CP_Combinations_DF.filter(CP_Combinations_DF.StartCP != CP_Combinations_DF.FinishCP)
display(CP_Combinations_DF)

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:464)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:571)
	at com.data

In [0]:
if API_Service == "Google Maps":

  CP_Combinations_List = CP_Combinations_DF.collect()

  Distances_DF = spark.createDataFrame(
      [(1, 1.0, 1.0, 1)], 
      schema='StartCP string, FinishCP string, Distance float, Height_Gain int'
  )
  Distances_DF = Distances_DF.filter("1!=1")

  def make_request(method, url, **kwargs):
      """Helper function to handle API requests with 429 retry logic."""
      while True:
          response = requests.request(method, url, **kwargs)
          if response.status_code == 429:
              print("Rate limit exceeded (429). Waiting 60 seconds before retrying...")
              time.sleep(60)
          else:
              return response  # Return response if successful

  for row in CP_Combinations_List:
      wp1 = f"{row.StartLongitude},{row.StartLatitude}"
      wp2 = f"{row.FinishLongitude},{row.FinishLatitude}"

      # Route API Call with Retry Handling
      route_url = "https://routes.googleapis.com/directions/v2:computeRoutes"
      route_params = {
    "origin":{
      "location":{
        "latLng":{
          "latitude": row.StartLatitude,
          "longitude": row.StartLongitude
        }
      }
    },
    "destination":{
      "location":{
        "latLng":{
          "latitude": row.FinishLatitude,
          "longitude": row.FinishLongitude
        }
      }
    },
    "travelMode": "WALK",
    "languageCode": "en-GB",
    "units": "METRIC"
  }
      route_headers = {
          "X-Goog-Api-Key": GoogleMapsAPIKey,
          "X-Goog-FieldMask": "routes.duration,routes.distanceMeters,routes.polyline,routes.legs"
      }

      route_resp = make_request("POST", route_url, headers=route_headers, json=route_params)
      route_json_resp = route_resp.json()

      travel_distance = float(route_json_resp["routes"][0]["legs"][0]["distanceMeters"]) / 1000
      StartCP, FinishCP = row["StartCP"], row["FinishCP"]

      # Prepare height points for elevation request
      height_polyline = route_json_resp["routes"][0]["polyline"]["encodedPolyline"]


      # Elevation API Call with Retry Handling
      elevations_url = "https://maps.googleapis.com/maps/api/elevation/json"
      elevations_params = {
          "path": "enc:"+height_polyline,
          "samples": 100,
          "key": GoogleMapsAPIKey
      }

      elevations_resp = make_request("GET", elevations_url, params=elevations_params)
      elevation_json_resp = elevations_resp.json()
      elevations = [point["elevation"] for point in elevation_json_resp["results"]]
      differences = [builtins.max(0, elevations[i+1] - elevations[i]) for i in range(len(elevations)-1)]
      height_gain = float(sum(differences))

      # Append to Spark DataFrame
      df = spark.createDataFrame(
          [(StartCP, FinishCP, travel_distance, height_gain)], 
          'StartCP string, FinishCP string, Distance float, Height_Gain float'
      )
      Distances_DF = Distances_DF.union(df)

  #   time.sleep(2.0)  # Keep a delay between requests to reduce rate limits

  display(Distances_DF)

else:
    print(f"Skipping Google Maps Cell, selected service: {API_Service}")

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:464)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:571)
	at com.data

In [0]:
if API_Service == "OpenRouteService":

    CP_Combinations_List = CP_Combinations_DF.collect()

    Distances_DF = spark.createDataFrame(
        [(1, 1.0, 1.0, 1)], 
        schema='StartCP string, FinishCP string, Distance float, Height_Gain int'
    )
    Distances_DF = Distances_DF.filter("1!=1")

    def make_request(method, url, **kwargs):
        """Helper function to handle API requests with 429 retry logic."""
        while True:
            response = requests.request(method, url, **kwargs)
            if response.status_code == 429:
                print("Rate limit exceeded (429). Waiting 60 seconds before retrying...")
                time.sleep(60)
            else:
                return response  # Return response if successful

    for row in CP_Combinations_List:
        wp1 = f"{row.StartLongitude},{row.StartLatitude}"
        wp2 = f"{row.FinishLongitude},{row.FinishLatitude}"

        # Route API Call with Retry Handling
        route_url = "https://api.openrouteservice.org/v2/directions/foot-hiking"
        route_params = {
            "start": wp1,
            "end": wp2,
            "api_key": OpenRouteServiceAPIKey
        }
        route_resp = make_request("GET", route_url, params=route_params)
        route_json_resp = route_resp.json()

        travel_distance = float(route_json_resp["features"][0]["properties"]["summary"]["distance"]) / 1000
        StartCP, FinishCP = row["StartCP"], row["FinishCP"]

        # Prepare height points for elevation request
        height_points = route_json_resp["features"][0]["geometry"]["coordinates"]
        flattened_height_points = ",".join([f"{lon},{lat}" for lon, lat in height_points])

        # Elevation API Call with Retry Handling
        elevations_url = "https://api.openrouteservice.org/elevation/line"
        elevations_params = {
            "format_in": "polyline",
            "format_out": "polyline",
            "dataset": "srtm",
            "geometry": height_points
        }
        elevations_headers = {
            "Authorization": OpenRouteServiceAPIKey,
            "Content-Type": "application/json"
        }
        elevations_resp = make_request("POST", elevations_url, headers=elevations_headers, json=elevations_params)
        elevation_json_resp = elevations_resp.json()
        elevation_coordinates = elevation_json_resp["geometry"]
        elevations = [elev for _, _, elev in elevation_coordinates]    
        differences = [builtins.max(0, elevations[i+1] - elevations[i]) for i in range(len(elevations)-1)]
        height_gain = sum(differences)

        # Append to Spark DataFrame
        df = spark.createDataFrame(
            [(StartCP, FinishCP, travel_distance, height_gain)], 
            'StartCP string, FinishCP string, Distance float, Height_Gain float'
        )
        Distances_DF = Distances_DF.union(df)

        time.sleep(2.0)  # Keep a delay between requests to reduce rate limits

    display(Distances_DF)

else:
    print(f"Skipping OpenRouteService Cell, selected service: {API_Service}")

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:464)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:571)
	at com.data

In [0]:
%python
if API_Service == "Bing Maps":

    CP_Combinations_List = CP_Combinations_DF.collect()

    Distances_DF = spark.createDataFrame(
        [(1, 1.0, 1.0, 1)], 
        schema='StartCP string, FinishCP string, Distance float, Height_Gain int'
    )
    Distances_DF = Distances_DF.filter("1!=1")

    for row in CP_Combinations_List:
        wp1 = str(row.StartLatitude) + ',' + str(row.StartLongitude)
        wp2 = str(row.FinishLatitude) + ',' + str(row.FinishLongitude)

        route_url = "http://dev.virtualearth.net/REST/v1/Routes/walking"
        route_params = {
            "wayPoint.1": wp1,
            "waypoint.2": wp2,
            "optimize": "distance",
            "avoid": "ferry",
            "routeAttributes": "routePath,excludeItinerary",
            "distanceUnit": "km",
            "key": BingMapsAPIKey
        }
        route_resp = requests.get(route_url, params=route_params)
        route_json_resp = json.loads(route_resp.text)
        travel_distance = float(route_json_resp['resourceSets'][0]['resources'][0]['travelDistance'])
        StartCP = row["StartCP"]
        FinishCP = row["FinishCP"]

        height_points = route_json_resp['resourceSets'][0]['resources'][0]['routePath']['line']['coordinates']
        flattened_height_points = ",".join([f"{lat},{lon}" for lat, lon in height_points])  # Fix formatting

        elevations_url = "http://dev.virtualearth.net/REST/v1/Elevation/List"
        elevations_params = {
            "points": flattened_height_points,
            "heights": "ellipsoid",
            "key": BingMapsAPIKey
        }
        elevations_resp = requests.get(elevations_url, params=elevations_params)
        elevations_json_resp = json.loads(elevations_resp.text)
        elevations = elevations_json_resp['resourceSets'][0]['resources'][0]['elevations']
        differences = [builtins.max(0, elevations[i+1] - elevations[i]) for i in range(len(elevations)-1)]
        height_gain = sum(differences)

        df = spark.createDataFrame(
            [(StartCP, FinishCP, travel_distance, height_gain)], 
            'StartCP string, FinishCP string, Distance float, Height_Gain int'
        )
        Distances_DF = Distances_DF.union(df)

    display(Distances_DF)

else:
    print(f"Skipping BingMaps Cell, selected service: {API_Service}")

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:464)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:571)
	at com.data

In [0]:
Distances_DF = Distances_DF.withColumn("TimeInMinutes",(col("Distance")/ Speed)*60 + (col("Height_Gain") / AddMinutesPer10mHeight ) )
display(Distances_DF)

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:464)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:571)
	at com.data

In [0]:
# Define the route and start time
route = ["Start","CP3", "E7U", "CP4", "E8", "E6", "CP5", "E5", "E4R", "CP6", "E9U","CP2","E2R","E10U", "E1", "CP1", "CP7", "E3", "Finish"]

# Create a DataFrame for the route sequence
route_df = spark.createDataFrame([(route[i], route[i+1], i) for i in range(len(route)-1)], 
                                 ["StartCP", "FinishCP", "RouteOrder"])

# Join with Distances_Df to get TimeInMinutes for each leg
timing_results_df = (route_df
      .join(Distances_DF, ["StartCP", "FinishCP"], "left")
      .orderBy("RouteOrder"))

# Use a window function to calculate cumulative time
window_spec = Window.orderBy("RouteOrder").rowsBetween(Window.unboundedPreceding, 0)
timing_results_df = timing_results_df.withColumn("CumulativeTime", F.sum("TimeInMinutes").over(window_spec))

# Show results
display(timing_results_df)

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:464)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:571)
	at com.data

In [0]:
# Define window specification to order rows by RouteOrder
window_spec = Window.orderBy("RouteOrder")

# Calculate the CumulativeTimeWithDwell (CumulativeTime + StopTimeAtCheckpoints)
timing_results_df = timing_results_df.withColumn(
    "CumulativeTimeWithDwell",
    F.col("CumulativeTime") + (F.col("RouteOrder") * F.lit(StopTimeAtCheckPoints))
)

# Calculate the LeavingTime (StartTime + CumulativeTimeWithDwell)
timing_results_df = timing_results_df.withColumn(
    "LeavingTime",
    F.to_timestamp(F.lit(StartTime)) + F.col("CumulativeTimeWithDwell").cast("int").cast("interval minute")
)

# Calculate the ArrivalTime (LeavingTime - StopTimeAtCheckpoints)
timing_results_df = timing_results_df.withColumn(
    "ArrivalTime",
    F.col("LeavingTime") - F.expr(f"INTERVAL {StopTimeAtCheckPoints} MINUTE")
)

# Show the results
display(timing_results_df)


com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:464)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:571)
	at com.data