Skip to content

Commit

Permalink
Merge pull request #2159 from locustio/Include-worker-index-in-worker…
Browse files Browse the repository at this point in the history
…-connection-logging-and-clean-up-code

Include worker_index in worker connection logging, and stop using the word "client" for what is actually a worker
  • Loading branch information
cyberw committed Aug 15, 2022
2 parents cb86f54 + 8a6f4cf commit b25ba33
Showing 1 changed file with 28 additions and 22 deletions.
50 changes: 28 additions & 22 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ def start(self, user_count: int, spawn_rate: float, wait=False) -> None:
)

logger.info(
"Sending spawn jobs of %d users at %.2f spawn rate to %d ready clients"
"Sending spawn jobs of %d users at %.2f spawn rate to %d ready workers"
% (user_count, spawn_rate, num_workers)
)

Expand Down Expand Up @@ -795,7 +795,7 @@ def start(self, user_count: int, spawn_rate: float, wait=False) -> None:
)
dispatched_user_count = sum(map(sum, map(methodcaller("values"), dispatched_users.values())))
logger.debug(
"Sending spawn messages for %g total users to %i client(s)",
"Sending spawn messages for %g total users to %i worker(s)",
dispatched_user_count,
len(dispatch_greenlets),
)
Expand Down Expand Up @@ -878,7 +878,7 @@ def stop(self, send_stop_to_client: bool = True) -> None:

if send_stop_to_client:
for client in self.clients.all:
logger.debug(f"Sending stop message to client {client.id}")
logger.debug(f"Sending stop message to worker {client.id}")
self.server.send_to_client(Message("stop", None, client.id))

# Give an additional 60s for all workers to stop
Expand All @@ -897,7 +897,7 @@ def quit(self) -> None:
self.stop(send_stop_to_client=False)
logger.debug("Quitting...")
for client in self.clients.all:
logger.debug(f"Sending quit message to client {client.id}")
logger.debug(f"Sending quit message to worker {client.id} (index {self.get_worker_index(client.id)})")
self.server.send_to_client(Message("quit", None, client.id))
gevent.sleep(0.5) # wait for final stats report from all workers
self.greenlet.kill(block=True)
Expand Down Expand Up @@ -966,7 +966,7 @@ def heartbeat_worker(self) -> NoReturn:
self.start(user_count=self.target_user_count, spawn_rate=self.spawn_rate)

def reset_connection(self) -> None:
logger.info("Resetting RPC server and all client connections.")
logger.info("Resetting RPC server and all worker connections.")
try:
self.server.close(linger=0)
self.server = rpc.Server(self.master_bind_host, self.master_bind_port)
Expand All @@ -983,12 +983,12 @@ def client_listener(self) -> NoReturn:
try:
self.server.send_to_client(Message("reconnect", None, client_id))
except Exception as e:
logger.error(f"Error sending reconnect message to client: {e}. Will reset RPC server.")
logger.error(f"Error sending reconnect message to worker: {e}. Will reset RPC server.")
self.connection_broken = True
gevent.sleep(FALLBACK_INTERVAL)
continue
except RPCSendError as e:
logger.error(f"Error sending reconnect message to client: {e}. Will reset RPC server.")
logger.error(f"Error sending reconnect message to worker: {e}. Will reset RPC server.")
self.connection_broken = True
gevent.sleep(FALLBACK_INTERVAL)
continue
Expand All @@ -997,13 +997,12 @@ def client_listener(self) -> NoReturn:
logger.error(f"RPCError: {e}. Will reset RPC server.")
else:
logger.debug(
"RPCError when receiving from client: %s (but no clients were expected to be connected anyway)"
"RPCError when receiving from worker: %s (but no workers were expected to be connected anyway)"
% (e)
)
self.connection_broken = True
gevent.sleep(FALLBACK_INTERVAL)
continue
msg.node_id = client_id
if msg.type == "client_ready":
if not msg.data:
logger.error(f"An old (pre 2.0) worker tried to connect ({client_id}). That's not going to work.")
Expand All @@ -1018,16 +1017,15 @@ def client_listener(self) -> NoReturn:
f"A worker ({client_id}) running a different version ({msg.data}) connected, master version is {__version__}"
)
self.send_message("ack", client_id=client_id, data={"index": self.get_worker_index(client_id)})
worker_node_id = msg.node_id
self.clients[worker_node_id] = WorkerNode(worker_node_id, heartbeat_liveness=HEARTBEAT_LIVENESS)

self.clients[client_id] = WorkerNode(client_id, heartbeat_liveness=HEARTBEAT_LIVENESS)
if self._users_dispatcher is not None:
self._users_dispatcher.add_worker(worker_node=self.clients[worker_node_id])
self._users_dispatcher.add_worker(worker_node=self.clients[client_id])
if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:
# TODO: Test this situation
self.start(self.target_user_count, self.spawn_rate)
logger.info(
"Client %r reported as ready. Currently %i clients ready to swarm."
% (worker_node_id, len(self.clients.ready + self.clients.running + self.clients.spawning))
f"Worker {client_id} (index {self.get_worker_index(client_id)}) reported as ready. {len(self.clients.ready + self.clients.running + self.clients.spawning)} workers connected."
)
if self.rebalancing_enabled() and self.state == STATE_RUNNING and self.spawning_completed:
self.start(self.target_user_count, self.spawn_rate)
Expand All @@ -1036,7 +1034,7 @@ def client_listener(self) -> NoReturn:
# warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.")
elif msg.type == "client_stopped":
if msg.node_id not in self.clients:
logger.warning(f"Received {msg.type} message from an unknown client: {msg.node_id}.")
logger.warning(f"Received {msg.type} message from an unknown worker: {msg.node_id}.")
continue
client = self.clients[msg.node_id]
del self.clients[msg.node_id]
Expand All @@ -1045,7 +1043,9 @@ def client_listener(self) -> NoReturn:
if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:
# TODO: Test this situation
self.start(self.target_user_count, self.spawn_rate)
logger.info(f"Removing {msg.node_id} client from running clients")
logger.info(
f"Worker {msg.node_id} (index {self.get_worker_index(client_id)}) reported that it has stopped, removing from running workers"
)
elif msg.type == "heartbeat":
if msg.node_id in self.clients:
c = self.clients[msg.node_id]
Expand All @@ -1064,7 +1064,7 @@ def client_listener(self) -> NoReturn:
self.worker_cpu_warning_emitted = True # used to fail the test in the end
c.cpu_warning_emitted = True # used to suppress logging for this node
logger.warning(
f"Worker {msg.node_id} exceeded cpu threshold (will only log this once per worker)"
f"Worker {msg.node_id} (index {self.get_worker_index(msg.node_id)}) exceeded cpu threshold (will only log this once per worker)"
)
if "current_memory_usage" in msg.data:
c.memory_usage = msg.data["current_memory_usage"]
Expand All @@ -1084,7 +1084,9 @@ def client_listener(self) -> NoReturn:
if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:
# TODO: Test this situation
self.start(self.target_user_count, self.spawn_rate)
logger.info(f"Client {msg.node_id!r} quit. Currently {len(self.clients.ready)} clients connected.")
logger.info(
f"Worker {msg.node_id!r} (index {self.get_worker_index(msg.node_id)}) quit. {len(self.clients.ready)} workers ready."
)
if self.worker_count - len(self.clients.missing) <= 0:
logger.info("The last worker quit, stopping test.")
self.stop()
Expand All @@ -1093,10 +1095,14 @@ def client_listener(self) -> NoReturn:
elif msg.type == "exception":
self.log_exception(msg.node_id, msg.data["msg"], msg.data["traceback"])
elif msg.type in self.custom_messages:
logger.debug(f"Received {msg.type} message from worker {msg.node_id}")
logger.debug(
f"Received {msg.type} message from worker {msg.node_id} (index {self.get_worker_index(msg.node_id)})"
)
self.custom_messages[msg.type](environment=self.environment, msg=msg)
else:
logger.warning(f"Unknown message type received from worker {msg.node_id}: {msg.type}")
logger.warning(
f"Unknown message type received from worker {msg.node_id} (index {self.get_worker_index(msg.node_id)}): {msg.type}"
)

self.check_stopped()

Expand All @@ -1122,11 +1128,11 @@ def send_message(self, msg_type: str, data: Optional[Dict[str, Any]] = None, cli
If None, will send to all attached workers
"""
if client_id:
logger.debug(f"Sending {msg_type} message to client {client_id}")
logger.debug(f"Sending {msg_type} message to worker {client_id}")
self.server.send_to_client(Message(msg_type, data, client_id))
else:
for client in self.clients.all:
logger.debug(f"Sending {msg_type} message to client {client.id}")
logger.debug(f"Sending {msg_type} message to worker {client.id}")
self.server.send_to_client(Message(msg_type, data, client.id))


Expand Down

0 comments on commit b25ba33

Please sign in to comment.