-
Notifications
You must be signed in to change notification settings - Fork 1
/
server.py
104 lines (91 loc) · 3.64 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import zmq
from collections import deque, defaultdict
import time
WORKER_TIMEOUT = 60 # ping worker before send job if it has not had any job for 60 sec
def serve(host="tcp://127.0.0.1:5000"):
context = zmq.Context()
socket = context.socket(zmq.XREP)
socket.bind(host)
group_by_fn = {}
group_by_worker = {}
workers_by_group = defaultdict(deque)
tasks_by_group = defaultdict(deque)
pair = {}
workers_time = {} # last time when worker was active
#log = print
log = lambda *argv, **kargs: None
def run_task(worker_addr, client_addr, name, data):
pair[worker_addr] = client_addr
log('send worker', worker_addr, name, data)
socket.send_multipart([worker_addr, b'', name, data]) # fn + arg
while True:
log('recv')
# addr/client, b'', cmd, fn, args
raw = socket.recv_multipart()
log(raw)
cmd = raw[2]
if cmd == b'do':
client_addr = raw[0]
name = raw[3]
data = raw[4]
try:
group_id = group_by_fn[name]
except KeyError:
socket.send_multipart([client_addr, b'', b'', b'nm']) # no method
continue
wgroup = workers_by_group[group_id]
while True:
if wgroup:
worker_addr = wgroup.popleft()
# ping worker if it is not active
if workers_time[worker_addr] + WORKER_TIMEOUT < time.time():
print('ping')
pair[worker_addr] = None
socket.send_multipart([worker_addr, b'', b'$ping'])
continue # try take next one worker
else:
run_task(worker_addr, client_addr, name, data)
else:
tasks_by_group[group_id].append((client_addr, name, data))
break
elif cmd == b'rs': # response
worker_addr = raw[0]
result = raw[3]
log('worker', worker_addr)
client_addr = pair[worker_addr]
log('send client', client_addr)
socket.send_multipart([client_addr, b'', result])
pair[worker_addr] = None
group_id = group_by_worker[worker_addr]
tasks = tasks_by_group[group_id]
if tasks:
task = tasks.popleft()
run_task(worker_addr, task[0], task[1], task[2])
else:
workers_by_group[group_id].append(worker_addr)
workers_time[worker_addr] = time.time()
elif cmd == b'rg':
worker_addr = raw[0]
fn_list = raw[3]
group_id = hash(fn_list)
print('worker', worker_addr, fn_list, group_id)
group_by_worker[worker_addr] = group_id
workers_by_group[group_id].append(worker_addr)
workers_time[worker_addr] = time.time()
for name in fn_list.split(b','):
if name in group_by_fn:
if group_by_fn[name] != group_id:
log('Warning: function collision')
group_by_fn[name] = group_id
elif cmd == b'pr': # ping response
worker_addr = raw[0]
group_id = group_by_worker[worker_addr]
tasks = tasks_by_group[group_id]
if tasks:
task = tasks.popleft()
run_task(worker_addr, task[0], task[1], task[2])
else:
workers_by_group[group_id].append(worker_addr)
workers_time[worker_addr] = time.time()
else:
raise Exception('Wrong command')