In [64]:
import sys
import json

params = {'exec_id': 0, 'exclusions': []}
default = {'scenario': 'test', 'training_folder': '../..', 'params': params}  # Default execution parameters
manual, argv = (True, default) if 'ipykernel' in sys.argv[0] else (False, dict(default, **json.loads(sys.argv[1])))

In [65]:
# Add path to quetzal
sys.path.insert(0, '../../../../quetzal/')
import os
import numba as nb

on_lambda = bool(os.environ.get('AWS_EXECUTION_ENV'))
num_cores = nb.config.NUMBA_NUM_THREADS

In [66]:
scenario = argv['scenario']
training_folder = argv['training_folder']
# if local. add the path to the scenario scenarios/<scenario>/
if on_lambda:
	input_folder = training_folder
else:
	input_folder = f'../scenarios/{scenario}/'
parallel_folder = os.path.join(input_folder, 'parallel')
if not os.path.exists(parallel_folder):
	os.makedirs(parallel_folder)

In [67]:
argv['params']

{'exec_id': 0, 'exclusions': []}

In [68]:
exec_id = argv['params'].get('exec_id', 0)
exclusions = argv['params'].get('exclusions', [])

In [69]:
import geopandas as gpd
from quetzal.engine.add_network_mapmatching import duplicate_nodes


In [70]:
links = gpd.read_file(os.path.join(input_folder, 'links.geojson'))
links.set_index('index', inplace=True)
nodes = gpd.read_file(os.path.join(input_folder, 'nodes.geojson'))
nodes.set_index('index', inplace=True)

In [71]:
links, nodes = duplicate_nodes(links, nodes)
excluded_links = links[links['route_type'].isin(exclusions)]
links = links[~links['route_type'].isin(exclusions)]

In [72]:
trip_list = links['trip_id'].unique()
num_trips = len(trip_list)

In [73]:
tot_num_iteration = num_trips // num_cores


def get_num_machine(num_it, target_it=20, choices=[12, 8, 4, 2, 1]):
	# return the number of machine (in choices) requiresd to have target_it per machine).
	num_machine = num_it / target_it
	best_diff = 100
	best_val = 12
	for v in choices:  # choice of output.
		diff = abs(num_machine - v)
		if diff < best_diff:
			best_diff = diff
			best_val = v
	return best_val


num_machine = get_num_machine(tot_num_iteration, target_it=20, choices=[12, 8, 4, 2, 1])

In [74]:
print('num it per machine', tot_num_iteration / num_machine)

chunk_length = round(len(trip_list) / num_machine)
# Split the list into four sub-lists
chunks = [trip_list[j : j + chunk_length] for j in range(0, len(trip_list), chunk_length)]
sum([len(c) for c in chunks]) == len(trip_list)

num it per machine 26.5


True

In [75]:
for i, trips in enumerate(chunks):
	print(i)
	tlinks = links[links['trip_id'].isin(trips)]
	nodes_set = set(tlinks['a'].unique()).union(set(tlinks['b'].unique()))
	tnodes = nodes[nodes.reset_index()['index'].isin(nodes_set).values]
	tlinks.to_file(os.path.join(input_folder, 'parallel', f'links_{i}.geojson'), driver='GeoJSON')
	tnodes.to_file(os.path.join(input_folder, 'parallel', f'nodes_{i}.geojson'), driver='GeoJSON')

if len(excluded_links) > 0:
	nodes_set = set(excluded_links['a'].unique()).union(set(excluded_links['b'].unique()))
	tnodes = nodes[nodes.reset_index()['index'].isin(nodes_set).values]
	excluded_links.to_file(os.path.join(input_folder, 'parallel', f'links_excluded.geojson'), driver='GeoJSON')
	tnodes.to_file(os.path.join(input_folder, 'parallel', f'nodes_excluded.geojson'), driver='GeoJSON')
# return num_machine


0
1


In [76]:
def deep_update(mapping, *updating_mappings):
	# update a nested dict
	# from Pydantic
	# https://github.com/pydantic/pydantic/blob/fd2991fe6a73819b48c906e3c3274e8e47d0f761/pydantic/utils.py#L200
	updated_mapping = mapping.copy()
	for updating_mapping in updating_mappings:
		for k, v in updating_mapping.items():
			if k in updated_mapping and isinstance(updated_mapping[k], dict) and isinstance(v, dict):
				updated_mapping[k] = deep_update(updated_mapping[k], v)
			else:
				updated_mapping[k] = v
	return updated_mapping

In [77]:
# this is return to the main.py and in the step function event.
ls = [i for i in range(num_machine)]
d = {'launcher_arg': {'params': {'exec_id': ls}}}
print('RETURN:', d)

RETURN: {'launcher_arg': {'params': {'exec_id': [0, 1]}}}
