# Use Sedona and orsm to calculate route distance and duration

In previous tutorials, we have calculated the bird view distance between two points. If we need to real route distance and duration, the default Sedona function can't do that.
 
In this tutorial, we will use sedona and [orsm-backend](https://github.com/Project-OSRM/osrm-backend) to find best route to destination, then calculate distance and duration.

> a doc on how to deploy orsm https://github.com/pengfei99/OSRM-deployement

The dataset is always the French commune data set released by INSEE.

Step1: calculate the centroid of each french commune
Step2: convert the centroid (geometry point) to a GPS coordinates(double),OSRM-backend exposes a rest api
Step3: Build a start point, end point matrix
Step4: Create a spark udf
Step5: Use the udf to calculate the distance and duration 


In [1]:
from sedona.spark import *
from sedona.sql import st_functions as stf
from pathlib import Path
import requests
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, split

Skipping SedonaKepler import, verify if keplergl is installed
Skipping SedonaPyDeck import, verify if pydeck is installed


In [2]:
# build a sedona session offline
jar_folder = Path(r"/home/pengfei/git/PySparkCommonFunc/jars")
jar_list = [str(jar) for jar in jar_folder.iterdir() if jar.is_file()]
jar_path = ",".join(jar_list)

# build a sedona session (sedona = 1.5.1)
config = SedonaContext.builder() \
    .master("local[*]") \
    .config("spark.driver.memory","6G") \
    .config('spark.jars', jar_path). \
    getOrCreate()
# config = SedonaContext.builder(). \
#     config('spark.jars.packages',
#            'org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.4.1,'
#            'org.datasyslab:geotools-wrapper:1.4.0-28.2'). \
#     getOrCreate()

# create a sedona context
sedona = SedonaContext.create(config)

24/05/16 15:31:14 WARN Utils: Your hostname, pengfei-Virtual-Machine resolves to a loopback address: 127.0.1.1; using 10.50.2.80 instead (on interface eth0)
24/05/16 15:31:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/05/16 15:31:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

## Step 0: Intro of the OSRM API

Get the distance and duration by using osrm api. Here we only use the simple one
 
A testing query

```shell
# below is the general form
# route/v1 is the main corps
# driving indicates which mode of the route we are asking. followed by the gps coordinates of the starting point and 
# end point
# steps can be true or false, if true, the reponse will contain the route itinerary
curl "http://<host>:<port>/route/v1/driving/<start_longitude>,<start_latitude>;<end_longitude>,<end_latitude>?steps=true"

curl "http://10.50.5.84:5000/route/v1/driving/2.309167,48.819552;2.467290,48.758568?steps=false"

# an example
# start_gps = lat:48.819552,long:2.309167
# end_gps = lat:48.758568,long:2.467290
curl "https://maps-api.casd.local/route/v1/driving/2.309167,48.819552;2.467290,48.758568?steps=false"
```

You should receive the below response
The route clause is the route itinerary which osrm returned. In it, you can find:
- "distance": 18677.9 : It means the route distance between the two points is 18677.9 meters
- "duration": 1115.1: It means the itinerary duration is 1115.1 seconds or 18 minutes

> The duration is calculated by using the max authorized speed without considering traffic jam and other complications. So in reality, the itinerary should take more time.

```json
{
  "code": "Ok",
  "routes": [
    {
      "geometry": "g`~hHc_bM^eDiRk\\zVinBrIqSdBy_AcIge@vHi\\bAyc@o_@}_B_W_i@{Ik`@tWua@dIaS`U{rAnBemAs@}oAbEan@lGeK`RgGfh@p@nUfFbi@b_@dY~Iz_@fBxj@oDrl@ee@vNgSbN__@lQk{@lLsRpL_I",
      "legs": [
        {
          "steps": [],
          "summary": "",
          "weight": 1115.1,
          "duration": 1115.1,
          "distance": 18677.9
        }
      ],
      "weight_name": "routability",
      "weight": 1115.1,
      "duration": 1115.1,
      "distance": 18677.9
    }
  ],
  "waypoints": [
    {
      "hint": "zrIqgdCyKoEIAAAARwAAANsAAAAAAAAAD1xjQMnb60GQULZCAAAAAAgAAABHAAAA2wAAAAAAAABtIgAAEDwjAMfs6AIvPCMAYO3oAgkAHwbkNR0k",
      "distance": 17.166158355,
      "name": "",
      "location": [2.309136, 48.819399]
    },
    {
      "hint": "V5UEgP___38OAAAAMQAAAJQBAACcAgAAzx6vQSYxWULcY0pElQ6ZRA4AAAAxAAAAlAEAAJwCAABtIgAAja8lAD8B6ALapSUAKP_nAhIAHwrkNR0k",
      "distance": 191.955244684,
      "name": "",
      "location": [2.469773, 48.759103]
    }
  ]
}
```

You can call the rest api by using
```shell
curl -k "http://127.0.0.1:5000/route/v1/driving/2.309167,48.819552;2.467290,48.758568?steps=false"
```

In [21]:
! curl -k "https://maps-api.casd.local/route/v1/driving/2.309167,48.819552;2.467290,48.758568?steps=false"


{"code":"Ok","routes":[{"geometry":"g`~hHc_bM^eDiRk\\zVinBrIqSdBy_AcIge@vHi\\bAyc@o_@}_B_W_i@{Ik`@tWua@dIaS`U{rAnBemAs@}oAbEan@lGeK`RgGfh@p@nUfFbi@b_@dY~Iz_@fBxj@oDrl@ee@vNgSbN__@lQk{@lLsRpL_I","legs":[{"steps":[],"summary":"","weight":1115.1,"duration":1115.1,"distance":18677.9}],"weight_name":"routability","weight":1115.1,"duration":1115.1,"distance":18677.9}],"waypoints":[{"hint":"zrIqgdCyKoEIAAAARwAAANsAAAAAAAAAD1xjQMnb60GQULZCAAAAAAgAAABHAAAA2wAAAAAAAABtIgAAEDwjAMfs6AIvPCMAYO3oAgkAHwbkNR0k","distance":17.166158355,"name":"","location":[2.309136,48.819399]},{"hint":"V5UEgP___38OAAAAMQAAAJQBAACcAgAAzx6vQSYxWULcY0pElQ6ZRA4AAAAxAAAAlAEAAJwCAABtIgAAja8lAD8B6ALapSUAKP_nAhIAHwrkNR0k","distance":191.955244684,"name":"","location":[2.469773,48.759103]}]}

## Step 1: calculate the centroid of each french commune

In [5]:
fr_zone_file_path= "/home/pengfei/data_set/kaggle/geospatial/communes_fr_geoparquet"

In [6]:
fr_zone_df = sedona.read.format("geoparquet").load(fr_zone_file_path)
fr_zone_df.cache()
fr_zone_df.show()

[Stage 4:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------------------+-----------------+-----+
|            geometry|           wikipedia|             surf_ha|              nom|insee|
+--------------------+--------------------+--------------------+-----------------+-----+
|POLYGON ((9.32016...|fr:Pie-d'Orezza  ...|     573.00000000...|     Pie-d'Orezza|2B222|
|POLYGON ((9.20010...|fr:Lano          ...|     824.00000000...|             Lano|2B137|
|POLYGON ((9.27757...|fr:Cambia        ...|     833.00000000...|           Cambia|2B051|
|POLYGON ((9.25119...|fr:Érone         ...|     393.00000000...|            Érone|2B106|
|POLYGON ((9.28339...|fr:Oletta        ...|    2674.00000000...|           Oletta|2B185|
|POLYGON ((9.30951...|fr:Canari (Haute-...|    1678.00000000...|           Canari|2B058|
|POLYGON ((9.30101...|fr:Olmeta-di-Tuda...|    1753.00000000...|   Olmeta-di-Tuda|2B188|
|POLYGON ((9.32662...|fr:Campana       ...|     236.00000000...|          Campana|2B052|
|POLYGON ((9.33944...

                                                                                

In [7]:
centroid_df = fr_zone_df.withColumn("centroid",stf.ST_Centroid(col("geometry"))).select("nom","insee","centroid").withColumnRenamed("centroid","geometry")

In [8]:
centroid_df.show()

+-----------------+-----+--------------------+
|              nom|insee|            geometry|
+-----------------+-----+--------------------+
|     Pie-d'Orezza|2B222|POINT (9.33815086...|
|             Lano|2B137|POINT (9.23535777...|
|           Cambia|2B051|POINT (9.30210765...|
|            Érone|2B106|POINT (9.26661425...|
|           Oletta|2B185|POINT (9.33384508...|
|           Canari|2B058|POINT (9.34524454...|
|   Olmeta-di-Tuda|2B188|POINT (9.36394979...|
|          Campana|2B052|POINT (9.34042768...|
|Carcheto-Brustico|2B063|POINT (9.36026336...|
|         Ampriani|2B015|POINT (9.35701808...|
|         Pianello|2B213|POINT (9.35641690...|
|            Zuani|2B364|POINT (9.34092227...|
|     Pietraserena|2B226|POINT (9.35471346...|
|     Piedipartino|2B221|POINT (9.34491216...|
|         Montbolo|66113|POINT (2.63221051...|
|       Targasonne|66202|POINT (1.98851907...|
|         L'Albère|66001|POINT (2.89587079...|
|       Mont-Louis|66117|POINT (2.11967214...|
|          Es

## Step2: Convert the centroid

Convert the centroid (geometry point) to a GPS coordinates(double),OSRM-backend exposes a rest api

In [11]:
converted_centroid_df = centroid_df.withColumn("longitude",stf.ST_X(col("geometry"))).withColumn("latitude",stf.ST_Y(col("geometry"))).drop("geometry")

In [16]:
converted_centroid_df.cache()
converted_centroid_df.show()

[Stage 12:>                                                         (0 + 1) / 1]

+-----------------+-----+------------------+------------------+
|              nom|insee|         longitude|          latitude|
+-----------------+-----+------------------+------------------+
|     Pie-d'Orezza|2B222| 9.338150861836196|42.374292014354154|
|             Lano|2B137| 9.235357777014519| 42.37887024991088|
|           Cambia|2B051| 9.302107656444328| 42.36875223806091|
|            Érone|2B106|  9.26661425039706|42.375563316535825|
|           Oletta|2B185|  9.33384508224219|42.641774511917404|
|           Canari|2B058| 9.345244547654016|42.843017113153394|
|   Olmeta-di-Tuda|2B188| 9.363949798662757| 42.61232393952698|
|          Campana|2B052| 9.340427687694566| 42.38826970859529|
|Carcheto-Brustico|2B063| 9.360263365997817| 42.35520610104405|
|         Ampriani|2B015| 9.357018084967732|  42.2540399256354|
|         Pianello|2B213| 9.356416901011539| 42.29772067884147|
|            Zuani|2B364| 9.340922275473961| 42.26546661866408|
|     Pietraserena|2B226| 9.354713461428

                                                                                

In [13]:
converted_centroid_df.printSchema()

root
 |-- nom: string (nullable = true)
 |-- insee: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)


In [13]:
converted_centroid_path = "/home/pengfei/data_set/kaggle/geospatial/converted_centroid_of_french_commune"
converted_centroid_df=sedona.read.parquet(converted_centroid_path)

In [14]:
new_df = converted_centroid_df.repartition(4)

In [15]:
print(converted_centroid_df.rdd.getNumPartitions())

3


In [16]:
print(new_df.rdd.getNumPartitions())

4


## Step 3: Build the matrix  

In [22]:
# build a commune code list, which will be used as starting point of the matrix
insee_code_list = ["75056","92049"]
commune_df = converted_centroid_df.filter(col("insee").isin(insee_code_list))
commune_df.show()

+---------+-----+------------------+-----------------+
|      nom|insee|         longitude|         latitude|
+---------+-----+------------------+-----------------+
|    Paris|75056|2.3428764301940275|48.85662219553845|
|Montrouge|92049|2.3171758940549156|48.81520615999795|
+---------+-----+------------------+-----------------+


                                                                                

In [23]:
full_code_list = [row.insee for row in converted_centroid_df.select(col("insee")).collect()]

In [24]:
print(f"code list length: {len(full_code_list)}")
print(f"first element of the code list: {full_code_list[0]}")

code list length: 34955
first element of the code list: 2B222


In [25]:
commune_matrix_df = (commune_df.alias("add1")
                  .join(converted_centroid_df.alias("add2"),col("add1.insee")!=col("add2.insee"),"inner")
                  .select(col("add1.longitude").alias("source_long"),col("add1.latitude").alias("source_lat"),col("add1.insee").alias("source_insee"),col("add1.nom").alias("source_nom"),col("add2.longitude").alias("dest_long"),col("add2.latitude").alias("dest_lat"),col("add2.insee").alias("dest_insee"),col("add2.nom").alias("dest_nom")))
commune_matrix_df.show()

+------------------+-----------------+------------+----------+------------------+------------------+----------+-----------------+
|       source_long|       source_lat|source_insee|source_nom|         dest_long|          dest_lat|dest_insee|         dest_nom|
+------------------+-----------------+------------+----------+------------------+------------------+----------+-----------------+
|2.3428764301940275|48.85662219553845|       75056|     Paris| 9.338150861836196|42.374292014354154|     2B222|     Pie-d'Orezza|
|2.3428764301940275|48.85662219553845|       75056|     Paris| 9.235357777014519| 42.37887024991088|     2B137|             Lano|
|2.3428764301940275|48.85662219553845|       75056|     Paris| 9.302107656444328| 42.36875223806091|     2B051|           Cambia|
|2.3428764301940275|48.85662219553845|       75056|     Paris|  9.26661425039706|42.375563316535825|     2B106|            Érone|
|2.3428764301940275|48.85662219553845|       75056|     Paris|  9.33384508224219|42.641774

                                                                                

> The commune_matrix_df contains two starting point "75056(paris)","92049(montrouge)", and endpoints are all other french coummnes

In [20]:
commune_matrix_df.count()

69908

# Step4: Create a spark udf

There are two ways to declare spark udf, here I used the annotation approach. Before we start we must define the functions which calculate the actual distance and duration:


In [26]:
def get_route(lat_start:str, long_start:str, lat_end:str, long_end:str, show_steps:str="false")->dict:
    """
    This function takes a starting point and end point gps coordinates, then call the osrm rest api.
    It returns the api json response if the response status is 200, otherwise return None.
    :param lat_start: 
    :type lat_start: 
    :param long_start: 
    :type long_start: 
    :param lat_end: 
    :type lat_end: 
    :param long_end: 
    :type long_end: 
    :param show_steps: 
    :type show_steps: 
    :return: 
    :rtype: 
    """
    host="maps-api.casd.local"
    start_point = f"{long_start},{lat_start}"
    end_point= f"{long_end},{lat_end}"
    # Define the URL
    url = f"https://{host}/route/v1/driving/{start_point};{end_point}?steps={show_steps}"
    
    # Make the GET request
    response = requests.get(url,verify=False)
    json_response = None
    # Check if the request was successful (status code 200)
    if response.status_code == 200:
        # Print the response content
        json_response = response.json()
    else:
        print("Error:", response.status_code)
    return json_response

In [27]:
# an example of get_route
start_long = "2.309167"
start_lat = "48.819552"
end_long = "2.467290"
end_lat = "48.758568"
route_json = get_route(start_lat,start_long,end_lat,end_long)



In [28]:
print(type(route_json))
print(route_json)

<class 'dict'>
{'code': 'Ok', 'routes': [{'geometry': 'g`~hHc_bM^eDiRk\\zVinBrIqSdBy_AcIge@vHi\\bAyc@o_@}_B_W_i@{Ik`@tWua@dIaS`U{rAnBemAs@}oAbEan@lGeK`RgGfh@p@nUfFbi@b_@dY~Iz_@fBxj@oDrl@ee@vNgSbN__@lQk{@lLsRpL_I', 'legs': [{'steps': [], 'summary': '', 'weight': 1115.1, 'duration': 1115.1, 'distance': 18677.9}], 'weight_name': 'routability', 'weight': 1115.1, 'duration': 1115.1, 'distance': 18677.9}], 'waypoints': [{'hint': 'zrIqgdCyKoEIAAAARwAAANsAAAAAAAAAD1xjQMnb60GQULZCAAAAAAgAAABHAAAA2wAAAAAAAABtIgAAEDwjAMfs6AIvPCMAYO3oAgkAHwbkNR0k', 'distance': 17.166158355, 'name': '', 'location': [2.309136, 48.819399]}, {'hint': 'V5UEgP___38OAAAAMQAAAJQBAACcAgAAzx6vQSYxWULcY0pElQ6ZRA4AAAAxAAAAlAEAAJwCAABtIgAAja8lAD8B6ALapSUAKP_nAhIAHwrkNR0k', 'distance': 191.955244684, 'name': '', 'location': [2.469773, 48.759103]}]}


In [29]:
def parse_route_json(input_route:dict)->(float,float):
    """
    This function parse the orsm json response, and return distance(meter), duration(minute)
    :param input_route: 
    :type input_route: 
    :return: tuple of distance and duration
    :rtype: (float,float)
    """
    route = input_route['routes'][0]
    if route:
        # the raw distance is in meter
        distance = route["distance"]
        # the raw duration is in second
        # the returned duration is in minutes
        duration = round((route["duration"]/60), 2)
    else:
        distance = 0
        duration = 0
    return distance, duration

In [30]:
# an example of parse_route_json
dis1,dur1= parse_route_json(route_json)

In [31]:
print(f"distance has type: {type(dis1)}, value: {dis1}")
print(f"duration has type: {type(dur1)}, value: {dur1}")


distance has type: <class 'float'>, value: 18677.9
duration has type: <class 'float'>, value: 18.58


Now we can define a function which encapsulate the two above functions

In [32]:
def calculate_distance_duration(lat_start:str,long_start:str,lat_end:str,long_end:str)->(float,float):
    """
    This function takes a starting point and end point gps coordinates, then call the osrm rest api. It
    parses the response and returns the distance(meter) and duration(minutes)
    :param lat_start: 
    :type lat_start: 
    :param long_start: 
    :type long_start: 
    :param lat_end: 
    :type lat_end: 
    :param long_end: 
    :type long_end: 
    :return: 
    :rtype: 
    """
    route = get_route(lat_start,long_start,lat_end,long_end)
    return parse_route_json(route)

In [33]:
dis2,dur2 = calculate_distance_duration(start_lat,start_long,end_lat,end_long)



In [34]:
print(f"distance has type: {type(dis2)}, value: {dis2}")
print(f"duration has type: {type(dur2)}, value: {dur2}")

distance has type: <class 'float'>, value: 18677.9
duration has type: <class 'float'>, value: 18.58


In [35]:
def calculate_distance_duration_str(lat_start:str,long_start:str,lat_end:str,long_end:str) -> str:
    """
    This function is a wrapper of calculate_distance_duration, it returns a string "distance;duration". With one 
    withColumn we can have all information, which will limit the osrm api call
    :param lat_start: 
    :type lat_start: 
    :param long_start: 
    :type long_start: 
    :param lat_end: 
    :type lat_end: 
    :param long_end: 
    :type long_end: 
    :return: 
    :rtype: 
    """
    distance, duration= calculate_distance_duration(lat_start,long_start,lat_end,long_end)
    return f"{distance};{duration}"


In [36]:
res1 = calculate_distance_duration_str(start_lat,start_long,end_lat,end_long)



In [37]:
print(f"result has type: {type(res1)}, value: {res1}")

result has type: <class 'str'>, value: 18677.9;18.58


Now we have all we need to create the udf.

In [38]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf


@udf(returnType=StringType()) 
def get_distance_duration(lat_start:str,long_start:str,lat_end:str,long_end:str):
    return calculate_distance_duration_str(lat_start,long_start,lat_end,long_end)

## Step5: Calculate the distance with udf

In [39]:
# let's do a test
commune_matrix_distance_df=commune_matrix_df.withColumn("distance_duration",get_distance_duration(col("source_lat"),col("source_long"),col("dest_lat"),col("dest_long"))).select("source_nom","source_insee","dest_nom","dest_insee","distance_duration")

In [40]:
commune_matrix_distance_df.show()



+----------+------------+-----------------+----------+-----------------+
|source_nom|source_insee|         dest_nom|dest_insee|distance_duration|
+----------+------------+-----------------+----------+-----------------+
|     Paris|       75056|     Pie-d'Orezza|     2B222|  1189597.6;919.4|
|     Paris|       75056|             Lano|     2B137| 1176819.3;904.53|
|     Paris|       75056|           Cambia|     2B051| 1182215.2;911.62|
|     Paris|       75056|            Érone|     2B106|   1177287;905.13|
|     Paris|       75056|           Oletta|     2B185| 1168612.9;903.37|
|     Paris|       75056|           Canari|     2B058|  1195574.3;933.4|
|     Paris|       75056|   Olmeta-di-Tuda|     2B188|  1171248.2;902.4|
|     Paris|       75056|          Campana|     2B052| 1182214.5;909.74|
|     Paris|       75056|Carcheto-Brustico|     2B063|   1193160;921.69|
|     Paris|       75056|         Ampriani|     2B015| 1211285.7;947.56|
|     Paris|       75056|         Pianello|     2B2



In [41]:
clean_distance_duration_df = (commune_matrix_distance_df
                              .withColumn("distance(meter)", split(col("distance_duration"),";")[0])
                              .withColumn("duration(minutes)", split(col("distance_duration"),";")[1])
                              .drop("distance_duration"))

In [42]:
clean_distance_duration_df.show()



+----------+------------+-----------------+----------+---------------+-----------------+
|source_nom|source_insee|         dest_nom|dest_insee|distance(meter)|duration(minutes)|
+----------+------------+-----------------+----------+---------------+-----------------+
|     Paris|       75056|     Pie-d'Orezza|     2B222|      1189597.6|            919.4|
|     Paris|       75056|             Lano|     2B137|      1176819.3|           904.53|
|     Paris|       75056|           Cambia|     2B051|      1182215.2|           911.62|
|     Paris|       75056|            Érone|     2B106|        1177287|           905.13|
|     Paris|       75056|           Oletta|     2B185|      1168612.9|           903.37|
|     Paris|       75056|           Canari|     2B058|      1195574.3|            933.4|
|     Paris|       75056|   Olmeta-di-Tuda|     2B188|      1171248.2|            902.4|
|     Paris|       75056|          Campana|     2B052|      1182214.5|           909.74|
|     Paris|       75



Now we can refactor all the above logic into a single function

The `calculate_distance_duration_matrix_in_patch` takes a sub list of the giving code list and do the join.
The `calculate_distance_duration_matrix` takes only one code of the code list and do the join.

In [10]:
def calculate_distance_duration_matrix_in_patch(insee_code_list:list, centroid_df:DataFrame, output_file_path:str,patch_size:int=4, worker_number:int=16):
    # split the input insee code list into patch
    for i in range(0, len(insee_code_list), patch_size):
        patch_code_list = insee_code_list[i:i+patch_size]
        # 1. build the source commune df with the given insee code
        commune_df = centroid_df.filter(col("insee").isin(patch_code_list))
        # 2. build a matrix with given source commnue which joins all other commune
        commune_matrix_df=commune_df.alias("add1").join(centroid_df.alias("add2"),col("add1.insee")!=col("add2.insee"),"inner").select(col("add1.longitude").alias("source_long"),col("add1.latitude").alias("source_lat"),col("add1.insee").alias("source_insee"),col("add1.nom").alias("source_nom"),col("add2.longitude").alias("dest_long"),col("add2.latitude").alias("dest_lat"),col("add2.insee").alias("dest_insee"),col("add2.nom").alias("dest_nom"))
        # repartition the df to match cluster worker number
        commune_matrix_df = commune_matrix_df.repartition(worker_number)
        # 3. calculate the distance and duration 
        distance_duration_df=commune_matrix_df.withColumn("distance_duration",get_distance_duration(col("source_lat"),col("source_long"),col("dest_lat"),col("dest_long"))).select("source_nom","source_insee","dest_nom","dest_insee","distance_duration").withColumn("distance(meter)", split(col("distance_duration"),";")[0]).withColumn("duration(minutes)", split(col("distance_duration"),";")[1]).drop("distance_duration")
        # 4. write the result into a parquet file
        distance_duration_df.write.mode("append").partitionBy("source_insee").parquet(output_file_path)
        

In [11]:
def calculate_distance_duration_matrix(insee_code_list:list, centroid_df:DataFrame, output_file_path:str):
    # split the input insee code list into patch
    for insee_code in insee_code_list:
        # 1. build the source commune df with the given insee code
        commune_df = centroid_df.filter(col("insee")==insee_code)
        # 2. build a matrix with given source commnue which joins all other commune
        commune_matrix_df=commune_df.alias("add1").join(centroid_df.alias("add2"),col("add1.insee")!=col("add2.insee"),"inner").select(col("add1.longitude").alias("source_long"),col("add1.latitude").alias("source_lat"),col("add1.insee").alias("source_insee"),col("add1.nom").alias("source_nom"),col("add2.longitude").alias("dest_long"),col("add2.latitude").alias("dest_lat"),col("add2.insee").alias("dest_insee"),col("add2.nom").alias("dest_nom"))
        # 3. calculate the distance and duration 
        distance_duration_df=commune_matrix_df.withColumn("distance_duration",get_distance_duration(col("source_lat"),col("source_long"),col("dest_lat"),col("dest_long"))).select("source_nom","source_insee","dest_nom","dest_insee","distance_duration").withColumn("distance(meter)", split(col("distance_duration"),";")[0]).withColumn("duration(minutes)", split(col("distance_duration"),";")[1]).drop("distance_duration")
        # 4. write the result into a parquet file
        distance_duration_df.write.mode("append").partitionBy("source_insee").parquet(output_file_path)

Let's test 

In [20]:
# input argument
code_list = ["92049"]
file_path = "/tmp/duration_test_with_patch"

calculate_distance_duration_matrix_in_patch(code_list, new_df, file_path)

[Stage 25:>                                                        (0 + 4) / 16]

24/05/16 16:22:13 ERROR Executor: Exception in task 0.0 in stage 25.0 (TID 15526)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/ipykernel_3059399/972932387.py", line 7, in get_distance_duration
NameError: name 'calculate_distance_duration_str' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.a

[Stage 25:>                                                        (0 + 1) / 16]

Py4JJavaError: An error occurred while calling o291.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:638)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:278)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:793)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 25.0 failed 1 times, most recent failure: Lost task 2.0 in stage 25.0 (TID 15528) (10.50.2.80 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/ipykernel_3059399/972932387.py", line 7, in get_distance_duration
NameError: name 'calculate_distance_duration_str' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
	at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:245)
	... 42 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/ipykernel_3059399/972932387.py", line 7, in get_distance_duration
NameError: name 'calculate_distance_duration_str' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
	at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


NameError: name 'get_distance_duration' is not defined

# Read the result parquet file

In [4]:

prod_result_file_path = "/home/pengfei/data_set/fr_commune_distance/duration_prod_final"

df = sedona.read.parquet(prod_result_file_path)
df.show()

                                                                                

24/05/16 15:33:38 WARN SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.


[Stage 5:>                                                          (0 + 1) / 1]

+----------+--------------------+----------+---------------+-----------------+------------+
|source_nom|            dest_nom|dest_insee|distance(meter)|duration(minutes)|source_insee|
+----------+--------------------+----------+---------------+-----------------+------------+
|  Docelles|  Marignac-Lasclares|     31317|       932124.3|           596.25|       88135|
|  Docelles|Guigneville-sur-E...|     91293|       391917.3|           267.23|       88135|
|  Docelles|       Esmery-Hallon|     80284|       419241.2|           296.73|       88135|
|  Docelles|          Foulangues|     60249|       472930.4|           324.05|       88135|
|  Docelles|               Sassy|     14669|       657827.7|           434.03|       88135|
|  Docelles|        Saint-Castin|     64472|      1091895.6|           672.12|       88135|
|  Docelles|Bernay-Neuvy-en-C...|     72219|       596010.4|            383.9|       88135|
|  Docelles|              Bierne|     59082|       552869.5|           370.73|  

                                                                                

In [5]:
total_row = df.count()
print(f"total row: {total_row}")

[Stage 8:>                                                          (0 + 1) / 1]

total row: 382326852


                                                                                

In [6]:
code_insee_df = df.select("source_insee").distinct()

In [7]:
row_number = code_insee_df.count()

                                                                                

In [7]:
code_insee_df.orderBy().show()




+------------+
|source_insee|
+------------+
|       07200|
|       62646|
|       59569|
|       02747|
|       43105|
|       07198|
|       07340|
|       42097|
|       60445|
|       59256|
|       59346|
|       02726|
|       77440|
|       59600|
|       59625|
|       77293|
|       59408|
|       2B213|
|       02056|
|       02718|
+------------+


                                                                                

In [8]:
print(f"total code count:{row_number}")

total code count:10934
