-
Notifications
You must be signed in to change notification settings - Fork 3
/
benchmarking_server.py
418 lines (321 loc) · 13.4 KB
/
benchmarking_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
import numpy as np
import time
import tensorflow as tf
# Import networks
import keras.applications.vgg16 as vgg16
import keras.applications.vgg19 as vgg19
import keras.applications.resnet50 as resnet50
import keras.applications.resnet_v2 as resnet50v2
import keras.applications.mobilenet as mobilenet
import keras.applications.densenet as densenet
import non_keras_networks.alexnet as alexnet
import non_keras_networks.lenet as lenet
# Import keras features
import keras
from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array
from keras.models import model_from_json
from keras.models import Model
from keras import models
from keras import layers
# Networking features
import socket
import tempfile
import random
import socketserver
import os
import gc
import signal
import subprocess
from subprocess import Popen, PIPE
import sys
import json
import psutil
# Import common functions, shared between the Cloud and the Edge
from benchmarking_common_functions import *
if len(sys.argv) < 2:
print('Run like this: sudo python3 benchmarking_server.py <network_name>')
exit()
model_name = sys.argv[1]
model = set_model(model_name)
images = ['dog', 'cat', 'tree', 'car', 'banana']
weights = [0,0,0,0,0]
# Reset 'tc' rules, in case of previous runs. 'tc' is used to slow the network connection.
command = 'tc qdisc del dev ens5 root'
p = Popen(command, stdin=PIPE, stderr=PIPE, shell=True, universal_newlines=True)
valid_cut_points = get_valid_cut_points(model, model_name)
valid_cut_points_iter = iter(valid_cut_points)
HOST = '0.0.0.0' # The server's hostname or IP address
PORT = 1234 # The port used by the server
cp = 0
cut_point = 0
# this is the split point, i.e. the starting layer in our sub-model
starting_layer_name = model.layers[cut_point].name
# create a new input layer for our sub-model we want to construct
layer_input_shape = model.get_layer(starting_layer_name).get_input_shape_at(0)
if isinstance(layer_input_shape, list):
layer_input = [layers.Input(shape=layer_input_shape[0][1:]) for x in range(len(layer_input_shape))]
else:
layer_input = layers.Input(shape=layer_input_shape[1:])
new_input = layer_input
layer_outputs = {}
def get_output_of_layer(layer):
# if we have already applied this layer on its input(s) tensors,
# just return its already computed output
if layer.name in layer_outputs:
return layer_outputs[layer.name]
# if this is the starting layer, then apply it on the input tensor
if layer.name == starting_layer_name:
out = layer(new_input)
layer_outputs[layer.name] = out
return out
# find all the connected layers which this layer
# consumes their output
prev_layers = []
for node in layer._inbound_nodes:
prev_layers.extend(node.inbound_layers)
# get the output of connected layers
pl_outs = []
for pl in prev_layers:
pl_outs.extend([get_output_of_layer(pl)])
# apply this layer on the collected outputs
out = layer(pl_outs[0] if len(pl_outs) == 1 else pl_outs)
layer_outputs[layer.name] = out
return out
# Functional models
def cut_model_functional(model, cut_point):
global layer_outputs, starting_layer_name, new_input, valid_cut_points
try:
if cut_point >= len(model.layers):
cut_point = len(model.layers)-1
elif cut_point <= 0:
cut_point = 0
starting_layer_name = model.layers[cut_point+1].name
print(starting_layer_name)
# create a new input layer for our sub-model we want to construct
new_input = layers.Input(batch_shape=model.get_layer(starting_layer_name).get_input_shape_at(0))
layer_input_shape = model.get_layer(starting_layer_name).get_input_shape_at(0)
if isinstance(layer_input_shape, list):
layer_input = [layers.Input(shape=layer_input_shape[0][1:]) for x in range(len(layer_input_shape))]
else:
layer_input = layers.Input(shape=layer_input_shape[1:])
new_input = layer_input
layer_outputs = {}
in_l = model.get_layer(index=0)
out_l = model.get_layer(index=cut_point)
modelA = Model(inputs=in_l.get_input_at(0), outputs=out_l.get_output_at(0))
new_output = get_output_of_layer(model.layers[-1])
# create the sub-model
modelB = keras.Model(new_input, new_output)
except Exception as e:
print(e)
for layer in model.layers:
if len(layer._inbound_nodes) > 1:
layer._inbound_nodes.pop()
raise Exception('e')
return None, None
for layer in model.layers:
if len(layer._inbound_nodes) > 1:
layer._inbound_nodes.pop()
return modelA, modelB
def cut_model(model, cut_point):
modelA, modelB = cut_model_functional(model, cut_point)
return modelA, modelB
models = []
for c in valid_cut_points:
try:
modelA, modelB = cut_model(model, c)
models.append(modelB)
except Exception as e:
models.append(None)
continue
warmup_start = time.time()
if model_name == 'lenet':
image = load_img('images/'+images[0]+'.jpg', color_mode='grayscale', target_size=(28, 28))
else:
image = load_img('images/'+images[0]+'.jpg', target_size=(224, 224))
# convert the image pixels to a numpy array
image = img_to_array(image)
image = np.array([image for x in range(1)])
input_data = modelA.predict(image)
try:
temp = modelB.predict(input_data)
except Exception as e:
pass
a = time.time()
print('time spent on model(s) warmup:', a - warmup_start)
print('done warmup prediction')
max_layer = 999
class Handler(socketserver.BaseRequestHandler):
tmp = tempfile.NamedTemporaryFile(mode='w+b')
previous_cut_point = 0
requests_answered = 0
def receive_file(self):
data = b''
while True:
d = self.request.recv(10240000)
if not d:
break
data += d
if d[-3:] == b'eof':
break
return data
def receive_message(self):
raw = self.request.recv(1024)
try:
data = json.loads(raw.decode('utf-8'))
except Exception as e:
print(e, raw)
data = json.loads('{"edge_stats": [], "cloud_edge_time": {"total": -1, "edge": -1} }')
return data
def send_message(self, message, s):
data = json.dumps(message, sort_keys=False, indent=2)
data = data.ljust(1024 - len(data), ' ')
s.sendall(data.encode())
return True
def handle(self):
global cut_point, weights, pro, valid_cut_points, valid_cut_points_iter, models
t1, t2 = 0, 0
t3, t4 = 0, 0
addr = self.client_address
print('Got connection from', addr[0], ':', addr[1])
cut_point_averages = []
edge_only_averages = []
cloud_only_averages = []
cut_frequency = 3
averages = []
edge_averages = []
cloud_averages = []
avgs = []
cps = []
q = -1
while True:
q += 1
# After every X requests check if cut_point should be updated
if self.requests_answered % cut_frequency == 0:
print('waiting for stats')
msg = self.receive_message()
edge_stats = msg['edge_stats']
if self.requests_answered != 0 and q % cut_frequency == 0:
averages.append(sum(cut_point_averages[1:]) / (cut_frequency - 1))
cut_point_averages = []
edge_averages.append(sum(edge_only_averages[1:]) / (cut_frequency - 1))
edge_only_averages = []
cloud_averages.append(sum(cloud_only_averages[1:]) / (cut_frequency - 1))
cloud_only_averages = []
cps.append(cut_point)
print('cps', cps)
print('cut_point', cut_point)
t3 = time.time()
cut_point = next(valid_cut_points_iter)
print('new cut point', cut_point)
modelB = models[valid_cut_points.index(cut_point)]
t4 = time.time()
# Server sends cut_point to client
self.send_message({'cut_point': cut_point}, self.request)
if cut_point != -1:
data = self.receive_file()
if not data:
print(addr[0], ':', addr[1], "disconnected")
break
with open(self.tmp.name, 'wb') as w:
w.write(data)
data_from_client = np.load(self.tmp.name, allow_pickle=True)['arr_0']
try:
print('Doing inference')
choice = 0
try:
a = time.time()
output = modelB.predict(data_from_client)
b = time.time()
# print(len(modelB.layers), '/', len(model.layers))
self.send_message({'prediction': 'test'}, self.request)
times = self.receive_message()
cloud_edge_time = times['cloud_edge_time']['total']
edge_only_time = times['cloud_edge_time']['edge']
if int(cloud_edge_time) == -1 or int(edge_only_time) == -1:
q -= 1
cut_point = valid_cut_points[valid_cut_points.index(cut_point) - 1]
self.send_message({'redo': 'yes'}, self.request)
print('redoing cut-point ... ('+ str(cut_point) +')')
continue
else:
self.send_message({'redo': 'no'}, self.request)
cloud_only_time = b - a
print('time taken:', cloud_edge_time, ', edge:', edge_only_time, ', cloud:', cloud_only_time)
cut_point_averages.append(cloud_edge_time)
edge_only_averages.append(edge_only_time)
cloud_only_averages.append(cloud_only_time)
gc.collect()
print(cut_point_averages, 'avg:', sum(cut_point_averages[1:]) / (cut_frequency - 1))
except ValueError as e:
print(e)
self.send_message({'prediction': 'err'}, self.request)
self.requests_answered += 1
except Exception as e:
print("error", e)
self.send_message({'error': e}, self.request)
if cut_point == -1:
q -= 1
valid_cut_points_iter = iter(valid_cut_points)
t1 = time.time()
with open('benchmarking_outputs/outputs_'+str(model_name)+ '/weights_'+"".join([str(x) for x in weights])+'.txt', 'w') as f:
for i in range(len(averages)):
# if i >= 0 and cps[i] >= 0:
if str(averages[i]) != '0.0':
f.write(str(averages[i]) + ' ' + str(edge_averages[i]) + ' ' + str(cloud_averages[i]) + ' ' + str(cps[i]) + '\n')
averages = []
edge_averages = []
cloud_averages = []
cps = []
cut_point = 0
try:
for process in psutil.process_iter():
if 'stress-ng' in process.name():
process.kill()
except:
pass
add(weights, [0,0,0,0,1], 3)
print('new weights', weights)
if weights[0] == 0:
network_speed = 50
elif weights[0] == 1:
network_speed = 25
else:
network_speed = 10
if weights[1] == 0:
server_cpu_stress = 0
elif weights[1] == 1:
server_cpu_stress = 50
else:
server_cpu_stress = 100
if weights[2] == 0:
memory_stress = 0
elif weights[2] == 1:
memory_stress = 50
else:
memory_stress = 100
if weights[3] == 0:
server_memory_stress = 0
elif weights[3] == 1:
server_memory_stress = 50
else:
server_memory_stress = 100
if weights[4] == 0:
cpu_stress = 0
elif weights[4] == 1:
cpu_stress = 50
else:
cpu_stress = 100
try:
# Try to update stress levels, if it fails because of too much memory usage run the garbage collector and try again.
set_cpu_memory_stress(server_cpu_stress, server_memory_stress)
except MemoryError:
gc.collect()
set_cpu_memory_stress(server_cpu_stress, server_memory_stress)
t2 = time.time()
socketserver.TCPServer.allow_reuse_address = True
class ThreadedTCPServer(socketserver.TCPServer): pass
with ThreadedTCPServer((HOST, PORT), Handler) as server:
print('Server running and waiting for connections ...')
server.serve_forever()