In [1]:
import requests # om queries mee te berekenen
import pandas as pd # om dataset mee in te laden
from python_tsp.heuristics import solve_tsp_simulated_annealing # om optimale route mee te berekenen
import numpy as np # voor array berekenen
from prefect import task, Flow # om taken te verbinden aan dashboard
from prefect.executors import LocalDaskExecutor

In [2]:
@task()
def dataset_inlezen():
    df = pd.read_csv("dataset.csv", sep=";")
    datasets = []
    for current_woonplaats in df['woonplaats'].unique():
        
        # haal alleen de locaties van de huidige woonplaats op
        temp_df = df[df['woonplaats'] == current_woonplaats].sample(150).copy()
        
        temp_df['rijnummer'] =  np.arange(temp_df.shape[0]) # voeg een rijnummer toe
        datasets.append({"woonplaats": current_woonplaats, "dataset": temp_df})
        
    return datasets

In [3]:
@task()
def fetch_distance_matrix(record):
    
    # uitrekenen van coordinaat paren, van losse kolommen naar 1 lange string
    # dus van:
    # lat, lon
    # 1234, 5678
    # 9102, 3456
    # naar:
    # 1234,5678;9102,3456
    coords = ""
    for coordpair in record['dataset'][['lat','lon']].astype(str).to_dict("records"):
        coords += coordpair['lon'] + "," + coordpair['lat'] + ";"
        
    # haal de laatste ; weg
    coords = coords[:-1]
    
    # doe een api call naar de OSRM-backend instantie en haal een matrix van deze routegegevens op
    distance_matrix = np.array(requests.get("http://localhost:5000/table/v1/driving/"+ coords).json()['durations'])
    
    # sla de distance matrix op onder de key 'distance_matrix' voor de volgende taak
    record['distance_matrix'] = distance_matrix
    
    return record

In [4]:
@task()
def optimize_order(record):   
    
    # los het probleem op met behulp van de tsp_solver
    optimized_order, total_duration = solve_tsp_simulated_annealing(record['distance_matrix'])
    
    # voeg de volgorde toe als kolom aan de orginele dataset
    for index, location in enumerate(optimized_order):
        record['dataset'].loc[record['dataset']['rijnummer'] == location, 'volgorde'] = index
    
    # sla de total_duration op voor printen in de volgende taak
    record['total_duration'] = total_duration
    
    return record

In [5]:
@task()
def store_datasets(record):
    
    # sorteer de volgorde met de nieuwe gegevens om een routebook te maken   
    record['dataset'].sort_values(by="volgorde", inplace=True)
    
    record['dataset'].to_csv("prefect_" + record['woonplaats'] + ".csv", sep=";", index=False)
    logger = prefect.context.get("logger")
    logger.info(" ".join(["De totale reistijd van deze route is ", str(round(record['total_duration']/60)), "uur, dit is de route voor:", record['woonplaats']]))
    
    return record

In [67]:
# definieer de flow met prefect
with Flow("prallel tsp uitrekenen") as flow:        
    # lees dataset in
    datasets = dataset_inlezen()
    
    # map de input een paar keer met dask parallel. Dit is vergelijkbaar met een for loop
    datasets = fetch_distance_matrix.map(datasets)
    datasets = optimize_order.map(datasets)
    datasets = store_datasets.map(datasets)

# stel in dat Dask wordt gebruikt om de taken mee uit te voeren
flow.executor = LocalDaskExecutor()

# registreer het project in de GUI van prefect
flow.register(project_name="tsp-example")

Flow URL: http://localhost:8080/default/flow/172f4c7e-1bef-49e1-bf2e-69afe8a02c1f
 └── ID: b8638f7c-a847-4afa-b7ef-78ddee35f3d8
 └── Project: tsp-example
 └── Labels: ['DESKTOP-RQUUSMJ']


'b8638f7c-a847-4afa-b7ef-78ddee35f3d8'