<a href="https://colab.research.google.com/github/stratoskar/Path-Based-Traffic-Flow-Prediction/blob/main/Python_Code/Map_Matching.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Handle data files
import pandas as pd

# These libraries needed for communicating with the Map Matching API
import json
import random
import requests
from datetime import timedelta

# These libraries are for connecting to the database
import psycopg2
from sqlalchemy import create_engine

In [None]:
# Read all data
all_data = pd.read_csv('splitted_data.csv')

In [None]:
# Sort all data
all_data = all_data.sort_values(['Taxi ID','Traj ID','Date Time'])
all_data = all_data.reset_index(drop=True)

In [None]:
# View data
all_data.shape

In [None]:
# Count unique pairs of columns 'Traj ID' AND 'Taxi ID'
unique_pairs = all_data[['Taxi ID', 'Traj ID']].drop_duplicates()
count_unique_pairs = len(unique_pairs)
print(f"Number of trajectories in the dataset: {count_unique_pairs}")

In [None]:
# Convert string values to datetime
all_data['Date Time'] = pd.to_datetime(all_data['Date Time'])

print("Min date is: "+str(all_data['Date Time'].min()))
print("Max date is: "+str(all_data['Date Time'].max()))

In [None]:
# Pass latitude and longitude pairs to Valhalla API
df_for_meili = all_data[['Latitude','Longitude']]
df_for_meili = df_for_meili.rename(columns={"Latitude": "lat", "Longitude": "lon"})

### Map matching done using Valhalla Meili API.

Given each trajectory to the API as input, the response contains information of the exact path that each trajectory followed. The paths are in the form of OSM Way IDs.

#### Sources:

Installation using Docker: https://ikespand.github.io/posts/meili/
Paper about Valhalla: https://link.springer.com/article/10.1007/s42979-022-01340-5#Tab5
APIs documentation: https://valhalla.github.io/valhalla/api/map-matching/api-reference/#matched-point-items

In [None]:
# For each Taxi ID in the dataset
for taxi_id in all_data['Taxi ID'].unique():
    # For each Traj ID in the dataset
    for traj_id in all_data[all_data['Taxi ID'] == taxi_id]['Traj ID'].unique():

        # Define a new dataframe to store the map matched results
        visited_segments = pd.DataFrame(columns=['taxi_id', 'traj_id', 'osm_way_id', 'edge_length', 'edge_speed',
       'road_class', 'expected_start_time', 'expected_end_time'])

        # Get the batch of data that we send to the request
        indexes = all_data[(all_data['Taxi ID']==taxi_id) & (all_data['Traj ID'] == traj_id)].index

        # Input to API
        passed_data = df_for_meili.iloc[indexes]

        # Preparing the request to Valhalla's Meili
        meili_coordinates = passed_data.to_json(orient='records')

        # Head and Tail of the request
        meili_head = '{"shape":'
        meili_tail = ""","search_radius": 200, "sigma_z": 10, "beta": 10,"shape_match":"map_snap", "costing":"auto",
                        "filters":{"attributes":["edge.way_id","edge.speed","edge.length","edge.road_class"],"action":"include"},
                        "format":"osrm"}"""

        # Construction of the the request
        meili_request_body = meili_head + meili_coordinates + meili_tail

        # The URL of the local valhalla server
        url = "http://localhost:8002/trace_attributes"

        # Providing headers to the request
        headers = {'Content-type': 'application/json'}

        # we need to send the JSON as a string
        data = str(meili_request_body)

        # sending a request
        r = requests.post(url, data=data, headers=headers)

        if r.status_code == 200: # Response from Valhalla API was successful
            # Connect to database
            connection = psycopg2.connect(database="visited_segments",
                                              user="postgres",
                                              password="sobadata2",
                                              host="localhost",
                                              port="5432")

            # Create a cursor to execute queries
            cursor = connection.cursor()

            # This is the name of the table to be created inside the database
            name = "table_"+str(taxi_id)+'_'+str(traj_id)

            # Create the table if it doesn't exist
            cursor.execute(f"""
            CREATE TABLE IF NOT EXISTS {name} (
                Taxi_ID VARCHAR(255) NOT NULL,
                Traj_ID VARCHAR(255) NOT NULL,
                OSM_Way_ID VARCHAR(255) NOT NULL,
                Edge_Length VARCHAR(255) NOT NULL,
                Edge_Speed VARCHAR(255) NOT NULL,
                Road_Class VARCHAR(255) NOT NULL,
                Expected_Start_Time VARCHAR(255) NOT NULL,
                Expected_End_Time VARCHAR(255) NOT NULL
            )
            """)

            # Commit the changes to the database
            connection.commit()

            # Parsing the JSON response
            response_text = json.loads(r.text)

            # Find the time interval (in sec) that the trajectory needs to be completed [last timestamp - first timestamp]
            interval = (all_data.iloc[indexes].iloc[-1]['Date Time'] - all_data.iloc[indexes].iloc[0]['Date Time']).total_seconds()

            # Compute the expected duration that the moving object is in each edge (duration is equal for each edge that the trajectory visits)
            duration  = interval/len(response_text['edges'])

            # Fill the rows of the dataframe with information that API gave
            for i in range(len(response_text['edges'])):
                visited_segments.at[i,'taxi_id'] = taxi_id # taxi id
                visited_segments.at[i,'traj_id'] = traj_id # traj id
                visited_segments.at[i,'osm_way_id'] = response_text['edges'][i]['way_id'] # osm_way id
                visited_segments.at[i,'edge_speed'] = response_text['edges'][i]['speed'] # speed
                visited_segments.at[i,'edge_length'] = response_text['edges'][i]['length'] # edges length
                visited_segments.at[i,'road_class'] = response_text['edges'][i]['road_class'] # type of edge
                # time information
                if i == 0:
                    visited_segments.at[i,'expected_start_time'] = all_data.iloc[indexes].iloc[0]['Date Time']
                else:
                    visited_segments.at[i,'expected_start_time'] = visited_segments.at[i-1,'expected_end_time']

                visited_segments.at[i,'expected_end_time'] = visited_segments.at[i,'expected_start_time'] + timedelta(seconds=duration)

            # Convert all dataframe to string
            visited_segments = visited_segments.astype(str)

            # Try to pass this information to database table
            try:
                engine = create_engine('postgresql://postgres:sobadata2@localhost:5432/visited_segments')

                visited_segments.to_sql(name, engine, if_exists='append', index=False)

            except (Exception, psycopg2.Error) as error:
                print(error)

The results of the map matching process are stored in a PostgreSQL database, due to the huge volume of data that is produced. In the next code files, we download the data from the database, aggregate them and construct a single CSV file with all the data gathered together.