In [1]:
import os
import logging
from io import BytesIO
import time
import zipfile
import numpy as np
import boto3
from datetime import datetime, timezone
from time import gmtime, strftime
import json
import pandas as pd
import matplotlib.pyplot as plt
import pickle

In [9]:
import math

In [2]:
# The difference between UTC and local timezone
timezone_offset = 0

### Function Name List

In [10]:
function_prefix = "Structures"
function_count = 4
function_name_list = [function_prefix+'_f'+str(i) for i in range(1, function_count+1)]
print(function_name_list)

['Structures_f1', 'Structures_f2', 'Structures_f3', 'Structures_f4']


In [11]:
mem_config_list={
    'f1':1280,
    'f2':896,
    'f3':1536,
    'f4':1088
}

# Execute the Application

In [3]:
sfn_client = boto3.client('stepfunctions')

In [4]:
stateMachineArn='arn:aws:states:us-east-2:499537426559:stateMachine:Parallel'

The serverless application workflow can be found in the README.md.

## Test Run

In [5]:
sfn_client.start_execution(
    stateMachineArn=stateMachineArn
)

{'executionArn': 'arn:aws:states:us-east-2:499537426559:express:Parallel:c4e305af-9ce9-4201-af65-5d05a5ce4ca3:8400be4f-627b-4d00-8ef6-ec2f0e5ca338',
 'startDate': datetime.datetime(2022, 1, 2, 19, 36, 15, 936000, tzinfo=tzlocal()),
 'ResponseMetadata': {'RequestId': 'ebe4ec54-1135-490d-b10c-883cb1794af0',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'ebe4ec54-1135-490d-b10c-883cb1794af0',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '176'},
  'RetryAttempts': 0}}

## Configure Logging

In [6]:
logging.basicConfig(filename='AppExecution.log', encoding='utf-8', format='%(asctime)s.%(msecs)03d %(message)s', datefmt='%Y-%m-%d %H:%M:%S', level=logging.INFO)

## Execute Parallel

In [7]:
np.random.seed(256)

In [8]:
for i in range(5000):
    response = sfn_client.start_execution(stateMachineArn=stateMachineArn)
    RequestId = response.get('ResponseMetadata', {}).get('RequestId')
    StatusCode = response.get('ResponseMetadata', {}).get('HTTPStatusCode', 'ERR')
    logging.info(f'{i+1} {StatusCode} {RequestId}')
    time.sleep(10)

## Get the start time and the end time

In [15]:
app_exeuction_start_time = ' '.join(os.popen('head -1 AppExecution.log').read().split(' ')[:2])
app_execution_end_time = ' '.join(os.popen('tail -1 AppExecution.log').read().split(' ')[:2])
app_exeuction_start_time = datetime.strptime(app_exeuction_start_time, '%Y-%m-%d %H:%M:%S.%f')
app_execution_end_time = datetime.strptime(app_execution_end_time, '%Y-%m-%d %H:%M:%S.%f')

In [16]:
app_exeuction_start_time

datetime.datetime(2022, 1, 2, 19, 37, 47, 703000)

In [17]:
app_execution_end_time

datetime.datetime(2022, 1, 3, 9, 43, 38, 849000)

In [18]:
app_exeuction_start_time = int(datetime.timestamp(app_exeuction_start_time))

In [19]:
app_execution_end_time = int(datetime.timestamp(app_execution_end_time)) + 20

# Retrieve Logs

In [20]:
logclient = boto3.client('logs')

## Query Step Functions Logs

In [21]:
query_sfn_Parallel = logclient.start_query(
    logGroupName='/aws/vendedlogs/states/{}-Logs'.format('Parallel'),
    queryString="fields type, @timestamp| filter type = 'ExecutionStarted' or type = 'ExecutionSucceeded' | sort id desc",
    startTime=app_exeuction_start_time,
    endTime=app_execution_end_time,
    limit = 10000
)

In [25]:
query_results_sfn_Parallel = logclient.get_query_results(
    queryId=query_sfn_Parallel['queryId']
)

In [26]:
Parallel_starttimestamp = np.sort([datetime.timestamp(datetime.strptime(item[1]['value'], '%Y-%m-%d %H:%M:%S.%f'))+timezone_offset*3600 for item in query_results_sfn_Parallel['results'] if item[0]['value']=='ExecutionStarted'])
Parallel_endtimestamp = np.sort([datetime.timestamp(datetime.strptime(item[1]['value'], '%Y-%m-%d %H:%M:%S.%f'))+timezone_offset*3600 for item in query_results_sfn_Parallel['results'] if item[0]['value']=='ExecutionSucceeded'])

In [27]:
pd.DataFrame({'Start': Parallel_starttimestamp, 'End':Parallel_endtimestamp}).to_csv('Parallel_SFN_Logs.csv', index=False)


In [28]:
Parallel_sfn_logs = pd.read_csv('Parallel_SFN_Logs.csv', low_memory=False)

In [29]:
Parallel_sfn_logs.shape

(5000, 2)

## Query Lambda Function Logs

### Functions for parsing Logs

In [34]:
def lambda_report_log_to_dict(log):
    res={}
    lis=[item.split(': ') for item in log[1]['value'].split('\t')]
    res['RequestId']=lis[0][1]
    res['Duration']=float(lis[1][1].split(' ')[0])
    res['Billed_Duration']=int(lis[2][1].split(' ')[0])
    res['Memory_Size']=int(lis[3][1].split(' ')[0])
    res['Max_Memory_Used']=int(lis[4][1].split(' ')[0])
    res['UTC_Timestamp'] = time.mktime(datetime.strptime(log[0]['value'], "%Y-%m-%d %H:%M:%S.%f").timetuple()) +timezone_offset*3600
    return res

### Prepare Logs

In [35]:
query_lambda = []
for function in function_name_list:
    query_lambda.append(logclient.start_query(
        logGroupName='/aws/lambda/{}'.format(function),
        queryString="fields @timestamp, @message| filter @message like 'REPORT'| sort @timestamp asc",
        startTime=app_exeuction_start_time,
        endTime=app_execution_end_time,
        limit=10000
        ))
    time.sleep(4)
time.sleep(10)

### Retrieve Logs

In [36]:
query_lambda_results = []
for q in query_lambda:
    query_lambda_results.append(logclient.get_query_results(
        queryId=q['queryId']
    ))
    time.sleep(4)

In [37]:
with open('query_lambda_results.pickle', 'wb') as f:
    f.write(pickle.dumps(query_lambda_results))

In [38]:
Parallel_lambda_logs_dict = {'f'+str(i):None for i in range(1, function_count+1)}
for i in range(1, function_count+1):
    Parallel_lambda_logs_dict['f'+str(i)] = [lambda_report_log_to_dict(item) for item in query_lambda_results[i-1]['results']]
    for item in Parallel_lambda_logs_dict['f'+str(i)]:
        item['Function']='f'+str(i)

In [39]:
len(Parallel_lambda_logs_dict['f1'])

5000

#### Convert Logs into DataFrame and Save as CSV

In [40]:
Parallel_lambda_logs=pd.DataFrame()
for i in range(1, function_count+1):
    Parallel_lambda_logs = Parallel_lambda_logs.append(pd.DataFrame(Parallel_lambda_logs_dict['f'+str(i)]))
Parallel_lambda_logs.index=range(Parallel_lambda_logs.shape[0])
Parallel_lambda_logs=Parallel_lambda_logs[['Function', 'Memory_Size', 'Max_Memory_Used', 'Duration', 'Billed_Duration', 'UTC_Timestamp', 'RequestId']]
Parallel_lambda_logs.to_csv('Parallel_lambda_logs.csv',index=False)

In [41]:
Parallel_lambda_logs = pd.read_csv('Parallel_lambda_logs.csv', low_memory=False)
Parallel_lambda_logs.columns = ['Function', 'Memory_Size', 'Max_Memory_Used', 'Duration', 'Billed_Duration', 'UTCTimestamp', 'RequestId']

In [42]:
Parallel_lambda_logs.head()

Unnamed: 0,Function,Memory_Size,Max_Memory_Used,Duration,Billed_Duration,UTCTimestamp,RequestId
0,f1,1280,36,277.08,278,1641152000.0,53d01e01-35eb-48e6-ab72-c91c2b48ecdb
1,f1,1280,37,292.21,293,1641152000.0,1027d9cb-dcc1-4670-8477-a022e3c4fc6a
2,f1,1280,37,282.31,283,1641152000.0,5d69404e-eb5b-4856-aa5e-9151949d9491
3,f1,1280,37,276.79,277,1641152000.0,31f3fd57-17f5-4b7f-9b9f-2a6d11dfb065
4,f1,1280,37,277.89,278,1641152000.0,89692b7c-8075-4fa7-9541-ff4979dab4c0


In [43]:
for i in range(1, function_count+1):
    print(f"f{i}", Parallel_lambda_logs.query(f"Function == 'f{i}'").shape[0], Parallel_lambda_logs.query(f"Function == 'f{i}'")['Duration'].mean())

f1 5000 278.18332599999997
f2 5000 208.44621400000003
f3 5000 669.9683
f4 5000 308.199706


In [44]:
def calculate_cost(rt: float, mem: float, pmms: float = 0.0000166667/1024/1000, ppi: float = 0.0000002) -> float:
    return math.ceil(rt) * mem * pmms + ppi

def adjacent_values(vals, q1, q3):
    upper_adjacent_value = q3 + (q3 - q1) * 1.5
    upper_adjacent_value = np.clip(upper_adjacent_value, q3, vals[-1])

    lower_adjacent_value = q1 - (q3 - q1) * 1.5
    lower_adjacent_value = np.clip(lower_adjacent_value, vals[0], q1)
    return lower_adjacent_value, upper_adjacent_value

# End-to-end RT Reported by AWS

In [109]:
Parallel_duration = pd.DataFrame((Parallel_sfn_logs['End'] - Parallel_sfn_logs['Start'])*1000, columns=['Duration'])
Parallel_duration.to_csv('Parallel_duration_aws.csv', index=False)
Parallel_duration = pd.read_csv('Parallel_duration_aws.csv', low_memory=False)

In [107]:
print('Number of Executions: ', len(Parallel_duration['Duration']))
Parallel_avg_duration_aws = np.mean(Parallel_duration['Duration'])
Parallel_mid_duration_aws = np.median(Parallel_duration['Duration'])
Parallel_percentile10_aws = np.percentile(Parallel_duration['Duration'], 10)
Parallel_percentile90_aws = np.percentile(Parallel_duration['Duration'], 90)
print('Average Duration Reported by AWS: ', Parallel_avg_duration_aws, 'ms')
print('Median Duration Reported by AWS: ', Parallel_mid_duration_aws, 'ms')
print('10-th percentile of Duration Reported by AWS: ', Parallel_percentile10_aws, 'ms')
print('90-th percentile Duration Reported by AWS: ', Parallel_percentile90_aws, 'ms')
print('Standard Deviation of Duration Reported by AWS: ', np.std(Parallel_duration['Duration']), 'ms')

Number of Executions:  5000
Average Duration Reported by AWS:  1006.2184027194977 ms
Median Duration Reported by AWS:  993.000030517578 ms
10-th percentile of Duration Reported by AWS:  939.000129699707 ms
90-th percentile Duration Reported by AWS:  1079.9999237060547 ms
Standard Deviation of Duration Reported by AWS:  67.00714180341731 ms


# Cost Reported by AWS

In [72]:
Parallel_sfn_logs.head()

Unnamed: 0,Start,End
0,1641152000.0,1641152000.0
1,1641152000.0,1641152000.0
2,1641152000.0,1641152000.0
3,1641152000.0,1641152000.0
4,1641152000.0,1641152000.0


In [48]:
Parallel_lambda_logs.head()

Unnamed: 0,Function,Memory_Size,Max_Memory_Used,Duration,Billed_Duration,UTCTimestamp,RequestId
0,f1,1280,36,277.08,278,1641152000.0,53d01e01-35eb-48e6-ab72-c91c2b48ecdb
1,f1,1280,37,292.21,293,1641152000.0,1027d9cb-dcc1-4670-8477-a022e3c4fc6a
2,f1,1280,37,282.31,283,1641152000.0,5d69404e-eb5b-4856-aa5e-9151949d9491
3,f1,1280,37,276.79,277,1641152000.0,31f3fd57-17f5-4b7f-9b9f-2a6d11dfb065
4,f1,1280,37,277.89,278,1641152000.0,89692b7c-8075-4fa7-9541-ff4979dab4c0


In [91]:
cost_list = []
for index, row in Parallel_sfn_logs.iterrows():
    cost = 0
    app_start = row['Start'] - 2
    app_end = row['End'] + 2
    lambda_logs = Parallel_lambda_logs.query(f"""UTCTimestamp>{app_start-4} and UTCTimestamp<{app_end+4}""")
    for i, r in lambda_logs.iterrows():
        memory_size = r['Memory_Size']
        duration = r['Duration']
        cost += calculate_cost(rt=duration, mem=memory_size) * 1000000
    cost_list.append(cost)

In [92]:
Parallel_avg_cost_aws = np.mean(cost_list)
Parallel_mid_cost_aws = np.median(cost_list)
Parallel_percentile10_cost_aws = np.percentile(cost_list, 10)
Parallel_percentile90_cost_aws = np.percentile(cost_list, 90)
Parallel_std_cost_aws = np.std(cost_list)
print('Average Cost Reported by AWS: ', Parallel_avg_cost_aws, 'USD')
print('Median Cost Reported by AWS: ', Parallel_mid_cost_aws, 'USD')
print('10-th percentile of Cost Reported by AWS: ', Parallel_percentile10_cost_aws, 'USD')
print('90-th percentile Cost Reported by AWS: ', Parallel_percentile90_cost_aws, 'USD')
print('Standard Deviation of Cost Reported by AWS: ', Parallel_std_cost_aws, 'USD')

Average Cost Reported by AWS:  31.88067445289125 USD
Median Cost Reported by AWS:  31.64068668125 USD
10-th percentile of Cost Reported by AWS:  30.194850456250002 USD
90-th percentile Cost Reported by AWS:  33.734545035625 USD
Standard Deviation of Cost Reported by AWS:  1.7212247681952844 USD


# End-to-end RT and Cost Derived from the Modeling Algorithm

In [80]:
import sys
sys.path.append('../../')
from slappsim.Structures import *
from slappsim.Function import *
from slappsim.PetriApp import *
from slappsim.States import *

In [81]:
Structures_lambda_logs = pd.read_csv('../structures/Structures_lambda_logs.csv', low_memory=False)
Structures_lambda_logs.columns = ['Function', 'Memory_Size', 'Max_Memory_Used', 'Duration', 'Billed_Duration',
                              'UTCTimestamp', 'RequestId']
scheduling_overhead = pd.read_csv('../sfn-delay/Scheduling_Overhead.csv')
scheduling_overhead = np.array(scheduling_overhead['scheduling_overhead'].to_list())
function_execution_delay = pd.read_csv('../sfn-delay/Function_Execution_Delay.csv')
function_execution_delay = np.array(function_execution_delay['Duration'].to_list())

In [103]:
rs = np.random.RandomState(64)
random.seed(64)

## Define the application

In [104]:
f1_rt = np.array(Structures_lambda_logs.query(f"Function=='f1'")['Duration'].to_list()[500:9501])
f1_pp_fun = partial(rs.choice, a=f1_rt)
f1 = Function(pf_fun=f1_pp_fun, mem=mem_config_list['f1'], name='f1')
f2_rt = np.array(Structures_lambda_logs.query(f"Function=='f2'")['Duration'].to_list()[500:9501])
f2_pp_fun = partial(rs.choice, a=f2_rt)
f2 = Function(pf_fun=f2_pp_fun, mem=mem_config_list['f2'], name='f2')
f3_rt = np.array(Structures_lambda_logs.query(f"Function=='f3'")['Duration'].to_list()[500:9501])
f3_pp_fun = partial(rs.choice, a=f3_rt)
f3 = Function(pf_fun=f3_pp_fun, mem=mem_config_list['f3'], name='f3')
f4_rt = np.array(Structures_lambda_logs.query(f"Function=='f4'")['Duration'].to_list()[500:9501])
f4_pp_fun = partial(rs.choice, a=f4_rt)
f4 = Function(pf_fun=f4_pp_fun, mem=mem_config_list['f4'], name='f4')
sfn_scheduling_overhead_fun = partial(rs.choice, a=scheduling_overhead)
function_execution_delay_fun = partial(rs.choice, a=function_execution_delay)
delays = {'FunctionExecution': function_execution_delay_fun, 'SchedulingOverhead': sfn_scheduling_overhead_fun}
start = Start()
end = End()
sequence1 = Sequence(actions=[f1, f3])
sequence2 = Sequence(actions=[f2, f4])
structures = [sequence1, sequence2]
parallel = Parallel(branches=[sequence1, sequence2])
i1 = InArc(place=start)
o1 = OutArc(place=parallel.structure_start)
t1 = Transition(in_arcs=[i1], out_arcs=[o1])
i2 = InArc(place=parallel.structure_end)
o2 = OutArc(place=end)
t2 = Transition(in_arcs=[i2], out_arcs=[o2])
transitions = [t1, t2]
transitions += parallel.transitions
AppParallel = PetriApp(transitions=transitions,
                  functions=[f1, f2, f3, f4],
                  structures=structures,
                  delays=delays)

## Run the modeling algorithm

In [105]:
ert = []
ec = []
for i in range(100000):
    rt, c, s, logs = AppParallel.execute()
    ert.append(rt)
    ec.append(c * 1000000)
    AppParallel.reset()
Parallel_avg_cost_mdl = np.mean(ec)
Parallel_mid_cost_mdl = np.median(ec)
Parallel_percentile10_cost_mdl = np.percentile(ec, 10)
Parallel_percentile90_cost_mdl = np.percentile(ec, 90)
Parallel_std_cost_mdl = np.std(ec)
Parallel_avg_ert_mdl = np.mean(ert)
Parallel_mid_ert_mdl = np.median(ert)
Parallel_percentile10_ert_mdl = np.percentile(ert, 10)
Parallel_percentile90_ert_mdl = np.percentile(ert, 90)
Parallel_std_ert_mdl = np.std(ert)
print('Average Duration Reported by Algorithm: ', Parallel_avg_ert_mdl, 'ms')
print('Median Duration Reported by Algorithm: ', Parallel_mid_ert_mdl, 'ms')
print('10-th percentile of Duration Reported by Algorithm: ', Parallel_percentile10_ert_mdl, 'ms')
print('90-th percentile Duration Reported by Algorithm: ', Parallel_percentile90_ert_mdl, 'ms')
print('Standard Deviation of Duration Reported by Algorithm: ', Parallel_std_ert_mdl, 'ms')
print('Average Cost Reported by Algorithm: ', Parallel_avg_cost_mdl, 'USD')
print('Median Cost Reported by Algorithm: ', Parallel_mid_cost_mdl, 'USD')
print('10-th percentile of Cost Reported by Algorithm: ', Parallel_percentile10_cost_mdl, 'USD')
print('90-th percentile Cost Reported by Algorithm: ', Parallel_percentile90_cost_mdl, 'USD')
print('Standard Deviation of Cost Reported by Algorithm: ', Parallel_std_cost_mdl, 'USD')

Average Duration Reported by Algorithm:  1021.202376398526 ms
Median Duration Reported by Algorithm:  1034.3400400543214 ms
10-th percentile of Duration Reported by Algorithm:  937.1900417709351 ms
90-th percentile Duration Reported by Algorithm:  1082.1610732421877 ms
Standard Deviation of Duration Reported by Algorithm:  146.49764162371636 ms
Average Cost Reported by Algorithm:  32.96236226209587 USD
Median Cost Reported by Algorithm:  33.1604813875 USD
10-th percentile of Cost Reported by Algorithm:  30.73860154375 USD
90-th percentile Cost Reported by Algorithm:  34.77090127500001 USD
Standard Deviation of Cost Reported by Algorithm:  2.9819861778848464 USD


In [108]:
Parallel_cost_aws = pd.DataFrame(pd.Series(cost_list), columns=['Cost'])
Parallel_cost_aws.to_csv('Parallel_cost_aws.csv', index=False)
Parallel_duration_model = pd.DataFrame(pd.Series(ert), columns=['Duration'])
Parallel_duration_model.to_csv('Parallel_duration_model.csv', index=False)
Parallel_cost_model = pd.DataFrame(pd.Series(ec), columns=['Cost'])
Parallel_cost_model.to_csv('Parallel_cost_model.csv', index=False)