From f8a17ce01439aeec47f97c1267ee7b3c829ae6ef Mon Sep 17 00:00:00 2001 From: Lars Holmberg Date: Mon, 15 Aug 2022 13:59:48 +0200 Subject: [PATCH 1/2] Include worker_index in some loggings. Consistently call them "workers" in log statements, instead of a mix of "workers" and "clients". --- locust/runners.py | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/locust/runners.py b/locust/runners.py index bdcebb4433..1f5fa26771 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -751,7 +751,7 @@ def start(self, user_count: int, spawn_rate: float, wait=False) -> None: ) logger.info( - "Sending spawn jobs of %d users at %.2f spawn rate to %d ready clients" + "Sending spawn jobs of %d users at %.2f spawn rate to %d ready workers" % (user_count, spawn_rate, num_workers) ) @@ -795,7 +795,7 @@ def start(self, user_count: int, spawn_rate: float, wait=False) -> None: ) dispatched_user_count = sum(map(sum, map(methodcaller("values"), dispatched_users.values()))) logger.debug( - "Sending spawn messages for %g total users to %i client(s)", + "Sending spawn messages for %g total users to %i worker(s)", dispatched_user_count, len(dispatch_greenlets), ) @@ -878,7 +878,7 @@ def stop(self, send_stop_to_client: bool = True) -> None: if send_stop_to_client: for client in self.clients.all: - logger.debug(f"Sending stop message to client {client.id}") + logger.debug(f"Sending stop message to worker {client.id}") self.server.send_to_client(Message("stop", None, client.id)) # Give an additional 60s for all workers to stop @@ -897,7 +897,7 @@ def quit(self) -> None: self.stop(send_stop_to_client=False) logger.debug("Quitting...") for client in self.clients.all: - logger.debug(f"Sending quit message to client {client.id}") + logger.debug(f"Sending quit message to worker {client.id}") self.server.send_to_client(Message("quit", None, client.id)) gevent.sleep(0.5) # wait for final stats report from all workers self.greenlet.kill(block=True) @@ -966,7 +966,7 @@ def heartbeat_worker(self) -> NoReturn: self.start(user_count=self.target_user_count, spawn_rate=self.spawn_rate) def reset_connection(self) -> None: - logger.info("Resetting RPC server and all client connections.") + logger.info("Resetting RPC server and all worker connections.") try: self.server.close(linger=0) self.server = rpc.Server(self.master_bind_host, self.master_bind_port) @@ -983,12 +983,12 @@ def client_listener(self) -> NoReturn: try: self.server.send_to_client(Message("reconnect", None, client_id)) except Exception as e: - logger.error(f"Error sending reconnect message to client: {e}. Will reset RPC server.") + logger.error(f"Error sending reconnect message to worker: {e}. Will reset RPC server.") self.connection_broken = True gevent.sleep(FALLBACK_INTERVAL) continue except RPCSendError as e: - logger.error(f"Error sending reconnect message to client: {e}. Will reset RPC server.") + logger.error(f"Error sending reconnect message to worker: {e}. Will reset RPC server.") self.connection_broken = True gevent.sleep(FALLBACK_INTERVAL) continue @@ -997,13 +997,12 @@ def client_listener(self) -> NoReturn: logger.error(f"RPCError: {e}. Will reset RPC server.") else: logger.debug( - "RPCError when receiving from client: %s (but no clients were expected to be connected anyway)" + "RPCError when receiving from worker: %s (but no workers were expected to be connected anyway)" % (e) ) self.connection_broken = True gevent.sleep(FALLBACK_INTERVAL) continue - msg.node_id = client_id if msg.type == "client_ready": if not msg.data: logger.error(f"An old (pre 2.0) worker tried to connect ({client_id}). That's not going to work.") @@ -1018,16 +1017,15 @@ def client_listener(self) -> NoReturn: f"A worker ({client_id}) running a different version ({msg.data}) connected, master version is {__version__}" ) self.send_message("ack", client_id=client_id, data={"index": self.get_worker_index(client_id)}) - worker_node_id = msg.node_id - self.clients[worker_node_id] = WorkerNode(worker_node_id, heartbeat_liveness=HEARTBEAT_LIVENESS) + + self.clients[client_id] = WorkerNode(client_id, heartbeat_liveness=HEARTBEAT_LIVENESS) if self._users_dispatcher is not None: - self._users_dispatcher.add_worker(worker_node=self.clients[worker_node_id]) + self._users_dispatcher.add_worker(worker_node=self.clients[client_id]) if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING: # TODO: Test this situation self.start(self.target_user_count, self.spawn_rate) logger.info( - "Client %r reported as ready. Currently %i clients ready to swarm." - % (worker_node_id, len(self.clients.ready + self.clients.running + self.clients.spawning)) + f"Worker {client_id} (index {self.get_worker_index(client_id)}) reported as ready. {len(self.clients.ready + self.clients.running + self.clients.spawning)} workers connected." ) if self.rebalancing_enabled() and self.state == STATE_RUNNING and self.spawning_completed: self.start(self.target_user_count, self.spawn_rate) @@ -1036,7 +1034,7 @@ def client_listener(self) -> NoReturn: # warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.") elif msg.type == "client_stopped": if msg.node_id not in self.clients: - logger.warning(f"Received {msg.type} message from an unknown client: {msg.node_id}.") + logger.warning(f"Received {msg.type} message from an unknown worker: {msg.node_id}.") continue client = self.clients[msg.node_id] del self.clients[msg.node_id] @@ -1045,7 +1043,9 @@ def client_listener(self) -> NoReturn: if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING: # TODO: Test this situation self.start(self.target_user_count, self.spawn_rate) - logger.info(f"Removing {msg.node_id} client from running clients") + logger.info( + f"Worker {msg.node_id} (index {self.get_worker_index(client_id)}) reported that it has stopped, removing from running workers" + ) elif msg.type == "heartbeat": if msg.node_id in self.clients: c = self.clients[msg.node_id] @@ -1084,7 +1084,7 @@ def client_listener(self) -> NoReturn: if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING: # TODO: Test this situation self.start(self.target_user_count, self.spawn_rate) - logger.info(f"Client {msg.node_id!r} quit. Currently {len(self.clients.ready)} clients connected.") + logger.info(f"Worker {msg.node_id!r} quit. {len(self.clients.ready)} workers ready.") if self.worker_count - len(self.clients.missing) <= 0: logger.info("The last worker quit, stopping test.") self.stop() @@ -1122,11 +1122,11 @@ def send_message(self, msg_type: str, data: Optional[Dict[str, Any]] = None, cli If None, will send to all attached workers """ if client_id: - logger.debug(f"Sending {msg_type} message to client {client_id}") + logger.debug(f"Sending {msg_type} message to worker {client_id}") self.server.send_to_client(Message(msg_type, data, client_id)) else: for client in self.clients.all: - logger.debug(f"Sending {msg_type} message to client {client.id}") + logger.debug(f"Sending {msg_type} message to worker {client.id}") self.server.send_to_client(Message(msg_type, data, client.id)) From 8a6f4cfd2938767d8ceac9ae1b135a782b9ad3f2 Mon Sep 17 00:00:00 2001 From: Lars Holmberg Date: Mon, 15 Aug 2022 14:10:47 +0200 Subject: [PATCH 2/2] Add worker index to a couple more places in the logs --- locust/runners.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/locust/runners.py b/locust/runners.py index 1f5fa26771..ed67bc9954 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -897,7 +897,7 @@ def quit(self) -> None: self.stop(send_stop_to_client=False) logger.debug("Quitting...") for client in self.clients.all: - logger.debug(f"Sending quit message to worker {client.id}") + logger.debug(f"Sending quit message to worker {client.id} (index {self.get_worker_index(client.id)})") self.server.send_to_client(Message("quit", None, client.id)) gevent.sleep(0.5) # wait for final stats report from all workers self.greenlet.kill(block=True) @@ -1064,7 +1064,7 @@ def client_listener(self) -> NoReturn: self.worker_cpu_warning_emitted = True # used to fail the test in the end c.cpu_warning_emitted = True # used to suppress logging for this node logger.warning( - f"Worker {msg.node_id} exceeded cpu threshold (will only log this once per worker)" + f"Worker {msg.node_id} (index {self.get_worker_index(msg.node_id)}) exceeded cpu threshold (will only log this once per worker)" ) if "current_memory_usage" in msg.data: c.memory_usage = msg.data["current_memory_usage"] @@ -1084,7 +1084,9 @@ def client_listener(self) -> NoReturn: if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING: # TODO: Test this situation self.start(self.target_user_count, self.spawn_rate) - logger.info(f"Worker {msg.node_id!r} quit. {len(self.clients.ready)} workers ready.") + logger.info( + f"Worker {msg.node_id!r} (index {self.get_worker_index(msg.node_id)}) quit. {len(self.clients.ready)} workers ready." + ) if self.worker_count - len(self.clients.missing) <= 0: logger.info("The last worker quit, stopping test.") self.stop() @@ -1093,10 +1095,14 @@ def client_listener(self) -> NoReturn: elif msg.type == "exception": self.log_exception(msg.node_id, msg.data["msg"], msg.data["traceback"]) elif msg.type in self.custom_messages: - logger.debug(f"Received {msg.type} message from worker {msg.node_id}") + logger.debug( + f"Received {msg.type} message from worker {msg.node_id} (index {self.get_worker_index(msg.node_id)})" + ) self.custom_messages[msg.type](environment=self.environment, msg=msg) else: - logger.warning(f"Unknown message type received from worker {msg.node_id}: {msg.type}") + logger.warning( + f"Unknown message type received from worker {msg.node_id} (index {self.get_worker_index(msg.node_id)}): {msg.type}" + ) self.check_stopped()