Skip to content

Commit

Permalink
Eliminate TcpPortDispatcher and port ranges
Browse files Browse the repository at this point in the history
Now this functionality is not needed anymore due to free port auto
resolving mechanism.
  • Loading branch information
ylobankov committed Sep 7, 2022
1 parent 5903d94 commit 67c900a
Showing 1 changed file with 4 additions and 41 deletions.
45 changes: 4 additions & 41 deletions dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,35 +46,6 @@
from listeners import StatisticsWatcher


class TcpPortDispatcher:
""" Helper class holds available and occupied TCP port ranges. This ranges
intended to distributes between workers.
"""
def __init__(self, range_count):
lowest_port = 3000
highest_port = 59999
port_count = highest_port - lowest_port + 1
range_size = port_count // range_count

self.available_ranges = set()
for i in range(range_count):
start_port = lowest_port + i * range_size
end_port = start_port + range_size - 1
tcp_port_range = (start_port, end_port)
self.available_ranges.add(tcp_port_range)

self.acquired_ranges = dict()

def acquire_range(self, _id):
tcp_port_range = self.available_ranges.pop()
self.acquired_ranges[_id] = tcp_port_range
return tcp_port_range

def release_range(self, _id):
tcp_port_range = self.acquired_ranges.pop(_id)
self.available_ranges.add(tcp_port_range)


class Dispatcher:
"""Run specified count of worker processes ('max_workers_cnt' arg), pass
task IDs (via 'task_queue'), receive results and output (via
Expand Down Expand Up @@ -136,8 +107,6 @@ def __init__(self, task_groups, max_workers_cnt, randomize):
self.worker_id_to_pid = dict()

self.randomize = randomize
self.tcp_port_dispatcher = TcpPortDispatcher(
range_count=max_workers_cnt)

def terminate_all_workers(self):
for process in self.processes:
Expand Down Expand Up @@ -235,10 +204,7 @@ def add_worker(self):
# find_nonempty_task_queue_disp()
if self.workers_cnt >= self.max_workers_cnt:
return False
tcp_port_range = self.tcp_port_dispatcher.acquire_range(
self.worker_next_id)
process = task_queue_disp.add_worker(self.worker_next_id,
tcp_port_range)
process = task_queue_disp.add_worker(self.worker_next_id)
self.processes.append(process)
self.pids.append(process.pid)
self.pid_to_worker_id[process.pid] = self.worker_next_id
Expand All @@ -255,7 +221,6 @@ def del_worker(self, worker_id):
task_queue_disp = self.get_task_queue_disp(worker_id)
task_queue_disp.del_worker(worker_id)
self.workers_cnt -= 1
self.tcp_port_dispatcher.release_range(worker_id)

self.pids.remove(pid)
del self.worker_id_to_pid[worker_id]
Expand Down Expand Up @@ -412,24 +377,22 @@ def __init__(self, key, task_group, randomize):
self.done = False
self.done_task_ids = set()

def _run_worker(self, worker_id, tcp_port_range):
def _run_worker(self, worker_id):
"""Entry function for worker processes."""
os.environ['TEST_RUN_WORKER_ID'] = str(worker_id)
os.environ['TEST_RUN_TCP_PORT_START'] = str(tcp_port_range[0])
os.environ['TEST_RUN_TCP_PORT_END'] = str(tcp_port_range[1])
color_stdout.queue = self.result_queue
worker = self.gen_worker(worker_id)
sampler.set_queue(self.result_queue, worker_id, worker.name)
worker.run_all(self.task_queue, self.result_queue)

def add_worker(self, worker_id, tcp_port_range):
def add_worker(self, worker_id):
# Note: each of our workers should consume only one None, but for the
# case of abnormal circumstances we listen for processes termination
# (method 'check_for_dead_processes') and for time w/o output from
# workers (class 'HangWatcher').
self.task_queue.put(None) # 'stop worker' marker

entry = functools.partial(self._run_worker, worker_id, tcp_port_range)
entry = functools.partial(self._run_worker, worker_id)

self.worker_ids.add(worker_id)
process = multiprocessing.Process(target=entry)
Expand Down

0 comments on commit 67c900a

Please sign in to comment.