In [17]:
import requests
import json
import boto3
import pandas as pd
import time

In [2]:

ACCESS_KEY = ''
SECRET_KEY = ''
API_KEY = ''


In [3]:
# S3
REGION = 'ca-central-1'
BUCKET = 'quetzal-immense'

# api gateway
url = 'https://lud5uqi5j5.execute-api.ca-central-1.amazonaws.com/dev'
stateMachineArn = 'arn:aws:states:ca-central-1:142023388927:stateMachine:quetzal-immense'

In [4]:
headers = {
        'Accept': '*/*',
        'Content-Type':'application/json',
        'x-api-key' : API_KEY,
}

# Inputs (S3 structure)

scenario/inputs/pt/links.geojson <br>
scenario/inputs/pt/nodes.geojson <br>
scenario/inputs/road/road_links.geojson <br>
scenario/inputs/road/road_nodes.geojson <br>
scenario/inputs/car_skim.csv <br>
scenario/outputs/<br>
<br>
you may see other files (such as params.json) those are either used in the web interface are create by the model.

In [5]:
periods = ['am', 'pm']

In [6]:
session = boto3.Session(
    region_name=REGION,
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY)

s3 = session.client('s3')

In [7]:
inputs_list=[]
for period in periods:
    prefix = period + '/inputs/'
    response = s3.list_objects_v2(Bucket=BUCKET, Prefix=prefix)
    for obj in response['Contents']:
        inputs_list.append(obj["Key"])

In [8]:
inputs_list

['am/inputs/car_skim.csv',
 'am/inputs/params.json',
 'am/inputs/pt/links.geojson',
 'am/inputs/pt/nodes.geojson',
 'am/inputs/road/road_links.geojson',
 'am/inputs/road/road_nodes.geojson',
 'pm/inputs/car_skim.csv',
 'pm/inputs/params.json',
 'pm/inputs/pt/links.geojson',
 'pm/inputs/pt/nodes.geojson',
 'pm/inputs/road/road_links.geojson',
 'pm/inputs/road/road_nodes.geojson']

<b>NOTE:</b> Constant inputs maybe stored into the model docker directly.<br>
zones, reference volume and reference skims are store into the docker and not on S3 here

At this moment, Only one period can be run per scenario. <br>
the solution is to run a scenario per period and have another scenario (running the demand model) that will used data from those period and return the outputs <br>
<br>
In this example, this mean that we need to add our inputs under am/ and pm/ and run a third model (named example here). This model will run both am and pm in parallel and run the demand model. the results will be under example/outputs/
<br>
<br>
The last scenario (example) can have any name, the period one cannot at this moment. this mean that you can only run one simulation at the time as they all use the same s3 folder.

# Run

In [11]:
scenario = 'example/' # this will be the outputs s3 path (example/outputs/)

In [12]:
inputs = { 
        'scenario_path_S3': scenario, # necessary anything
        'choice': 'orchestrator', # necessary: orchestrator | test
        'metadata' : {"user_email": "test@test.com"} # necessary. it is used in the WebApp to show last modified
}
inputs = json.dumps(inputs)


In [13]:
data = { "stateMachineArn": stateMachineArn, 'input':inputs }
data = json.dumps(data)
data

'{"stateMachineArn": "arn:aws:states:ca-central-1:142023388927:stateMachine:quetzal-immense", "input": "{\\"scenario_path_S3\\": \\"example/\\", \\"choice\\": \\"orchestrator\\", \\"metadata\\": {\\"user_email\\": \\"test@test.com\\"}}"}'

POST start quetzal model execution

In [14]:
resp =  requests.post(url,data=data,headers=headers)
resp.ok

True

execution arn is returned in the post response and used to get status or end the simulation

In [15]:
executionArn = resp.json()['executionArn']
executionArn

'arn:aws:states:ca-central-1:142023388927:execution:quetzal-immense:fa8cb56c-9707-414d-8ecb-053e87b8b2a0'

# Polling Simulation Status

RUNNING | SUCCEEDED | FAILED | TIMED_OUT | ABORTED 

In [18]:
def polling(executionArn):
    data = { "executionArn": executionArn }
    data=json.dumps(data, ensure_ascii=False).encode('utf-8')
    resp =  requests.post(url+'/describe',data=data,headers=headers)
    resp.ok

    status =  'RUNNING'
    while True:
        resp =  requests.post(url+'/describe',data=data,headers=headers)
        status = resp.json()['status']
        print(status)
        if status != 'RUNNING':
            break
        time.sleep(10)
polling(executionArn)

RUNNING
RUNNING
RUNNING
RUNNING
RUNNING
RUNNING
RUNNING
RUNNING
RUNNING
RUNNING
RUNNING
RUNNING
RUNNING
RUNNING
RUNNING
RUNNING
RUNNING
RUNNING
RUNNING
RUNNING
RUNNING
RUNNING
SUCCEEDED


You could get the step function definition and poll its current Step if needed. <br>
However, it is a bit difficult to decrypt and not really relevent for this project in my opinion


# Outputs

In [19]:
prefix = scenario + 'outputs'
ls = []
response = s3.list_objects_v2(Bucket=BUCKET, Prefix=prefix)
for obj in response['Contents']:
    file = obj["Key"]
    ls.append(file)
    print(file)

example/outputs/volume_am.csv
example/outputs/volume_pm.csv


In [20]:
from io import BytesIO
data = s3.get_object(Bucket=BUCKET, Key=ls[0])
contents = data['Body'].read()
with BytesIO(contents) as bio:
    df = pd.read_csv(bio)

In [21]:
df

Unnamed: 0.1,Unnamed: 0,index,origin,destination,pt,car
0,0,0,zone_1,zone_278,0.00,3.334000e+01
1,1,1,zone_1,zone_298,0.00,3.872000e+01
2,2,2,zone_1,zone_352,0.00,2.277000e+01
3,3,3,zone_1,zone_513,0.00,1.906000e+01
4,4,4,zone_1,zone_565,0.00,3.161000e+01
...,...,...,...,...,...,...
19885,19885,44572,zone_98,zone_61,20.48,5.320000e+01
19886,19886,44573,zone_98,zone_69,0.00,0.000000e+00
19887,19887,44574,zone_98,zone_691,85.49,5.932060e-14
19888,19888,44575,zone_98,zone_694,27.93,-1.550426e-15


# Stop simulation

In [297]:
data = { "executionArn": executionArn }
data = json.dumps(data)
resp =  requests.post(url+'/abort',data=data,headers=headers)
resp.ok


True

In [298]:
resp.json()

{'stopDate': 1707756039.138}

In [299]:
resp =  requests.post(url+'/describe',data=data,headers=headers)
resp.ok

True

In [300]:
resp.json()['status'] # will return ABORTED. if not. the simulation was done are had an error.

'ABORTED'

# test (faster)

<b>choice = test</b> is a simple endpoint to test. it run in ~5secs and will create a  file outputs/test.txt with it's current time (timestamp in seconds)

In [36]:
scenario = 'example_2/'
inputs = { 
        'scenario_path_S3': scenario, # necessary anything
        'choice': 'test', # necessary: orchestrator | test
        'metadata' : {"user_email": "test@test.com"} # necessary. it is used in the WebApp to show last modified
}
inputs = json.dumps(inputs)

In [37]:
data = { "stateMachineArn": stateMachineArn, 'input':inputs }
data = json.dumps(data)
resp =  requests.post(url,data=data,headers=headers)
resp.ok

True

In [38]:
executionArn = resp.json()['executionArn']
polling(executionArn)

RUNNING
SUCCEEDED


In [39]:
data = s3.get_object(Bucket=BUCKET, Key=scenario + 'outputs/test.txt')
contents = data['Body'].read()
contents

b'test passed! \ntime: 1707758858'