/
faasmeasure.py
151 lines (136 loc) · 5 KB
/
faasmeasure.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import boto3
import json
import threading
import queue
import random
import os
import sys
import time
def process(lambdainvocation, numpar, simulate, threshold, baseline, bulk, q):
if simulate:
response = {"cost": 5.0, "remaining": random.randrange(9)}
else:
# this is faasproxy.py, which in turn invokes the 'queueeater' function, i.e. faasconsumer.py
pl = {"threshold": threshold, "baseline": baseline, "bulk": bulk}
nsleep = 1
ssleep = 0
while True:
try:
fullresponse = lambdainvocation.invoke(FunctionName="proxycollectiveshipment", Payload=json.dumps(pl))
except:
time.sleep(nsleep)
ssleep += nsleep
nsleep *= 2
else:
break
if ssleep > 0:
print("warning: delayed invoke {} s due to network issues".format(ssleep))
response = json.loads(fullresponse["Payload"].read().decode("utf-8"))
if not "cost" in response:
if "errorMessage" in response and "timed out" in response["errorMessage"]:
print("warning: timeout occurred, setting cost/remaining/duration to -1")
else:
print("warning: invalid response received, setting cost/remaining/duration to -1")
response["cost"] = -1
response["remaining"] = -1
response["duration"] = -1
cost = response["cost"]
duration = response["duration"]
rem = response["remaining"]
full = True
if rem <= 0:
full = False
print("{:4d} remaining, local duration {:5.2f} s, local cost {:.6f} USD".format(rem, duration / 1000, cost))
if q:
q.put((cost, duration, full, rem))
else:
return (cost, duration, full, rem)
def measure(f, numpar, simulate, threshold, baseline, bulk):
mrem = 0
crem = 0
cfull = True
ccost = 0
cduration = 0
invocations = 0
error = False
forig = sys.stdout
sys.stdout = f
q = None
if numpar > 1:
q = queue.Queue()
t_start = time.time()
while cfull:
threads = []
for i in range(numpar):
lambdainvocation = boto3.client("lambda")
if numpar == 1:
cost, duration, full, rem = process(lambdainvocation, numpar, simulate, threshold, baseline, bulk, q)
threads.append(None)
else:
t = threading.Thread(target=process, args=(lambdainvocation, numpar, simulate, threshold, baseline, bulk, q))
threads.append(t)
t.start()
if not cfull:
break
for t in threads:
if t:
t.join()
cost, duration, full, rem = q.get()
if cost == -1:
cfull = False
error = True
invocations += 1
ccost += cost
cduration += duration
if not full:
cfull = False
if crem == -1:
continue
if mrem == 0 or rem < mrem:
mrem = rem
if crem == 0 and rem > 0:
crem = mrem
for i in range(crem - mrem):
print("+", end="", flush=True, file=forig)
crem = mrem
if crem == 0:
crem = -1
print("", file=forig)
t_end = time.time()
t_diff = t_end - t_start
print("overall processing time: {:.2f} s, net duration: {:.2f} s, invocations {:2d}, cost: {:.6f} USD".format(t_diff, cduration / 1000, invocations, ccost))
print("configuration was: numpar={} simulate={} | threshold={} baseline={} bulk={}".format(numpar, simulate, threshold, baseline, bulk))
sys.stdout = forig
print("spent {:.2f}s for experiment numpar={} simulate={}".format(t_diff, numpar, simulate))
f = open("results.csv", "a")
print("{:s},{:02d},{:02d},{:s},{:s},{:07.3f},{:02d},{:.6f}".format(str(simulate), numpar, threshold, baseline, str(bulk), cduration / 1000, invocations, ccost), file=f)
f.close()
return error
if __name__ == "__main__":
# Parameters configurable via the CLI
numpar = 1
simulate = False
threshold = 90
baseline = "none"
bulk = False
if len(sys.argv) >= 3:
numpar = int(sys.argv[1])
if sys.argv[2] == "true":
simulate = True
if len(sys.argv) >= 4:
threshold = int(sys.argv[3])
if len(sys.argv) >= 5:
baseline = sys.argv[4]
if len(sys.argv) >= 6:
if sys.argv[5] == "true":
bulk = True
else:
print("Syntax: {} <numpar:1> <simulate:false*/true> [<threshold:90(%)>] [baseline:none*/individual/all] [bulk:false*/true]".format(sys.argv[0]), file=sys.stderr)
sys.exit(-1)
os.makedirs("data", exist_ok=True)
f = open("data/faasmeasure.{:02d}.{}.log".format(numpar, simulate), mode="w", buffering=1)
error = measure(f, numpar, simulate, threshold, baseline, bulk)
f.close()
if error:
print("Bailing out due to errors.", file=sys.stderr)
sys.exit(-1)