Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include worker_index in worker connection logging, and stop using the word "client" for what is actually a worker #2159

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 28 additions & 22 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ def start(self, user_count: int, spawn_rate: float, wait=False) -> None:
)

logger.info(
"Sending spawn jobs of %d users at %.2f spawn rate to %d ready clients"
"Sending spawn jobs of %d users at %.2f spawn rate to %d ready workers"
% (user_count, spawn_rate, num_workers)
)

Expand Down Expand Up @@ -795,7 +795,7 @@ def start(self, user_count: int, spawn_rate: float, wait=False) -> None:
)
dispatched_user_count = sum(map(sum, map(methodcaller("values"), dispatched_users.values())))
logger.debug(
"Sending spawn messages for %g total users to %i client(s)",
"Sending spawn messages for %g total users to %i worker(s)",
dispatched_user_count,
len(dispatch_greenlets),
)
Expand Down Expand Up @@ -878,7 +878,7 @@ def stop(self, send_stop_to_client: bool = True) -> None:

if send_stop_to_client:
for client in self.clients.all:
logger.debug(f"Sending stop message to client {client.id}")
logger.debug(f"Sending stop message to worker {client.id}")
self.server.send_to_client(Message("stop", None, client.id))

# Give an additional 60s for all workers to stop
Expand All @@ -897,7 +897,7 @@ def quit(self) -> None:
self.stop(send_stop_to_client=False)
logger.debug("Quitting...")
for client in self.clients.all:
logger.debug(f"Sending quit message to client {client.id}")
logger.debug(f"Sending quit message to worker {client.id} (index {self.get_worker_index(client.id)})")
self.server.send_to_client(Message("quit", None, client.id))
gevent.sleep(0.5) # wait for final stats report from all workers
self.greenlet.kill(block=True)
Expand Down Expand Up @@ -966,7 +966,7 @@ def heartbeat_worker(self) -> NoReturn:
self.start(user_count=self.target_user_count, spawn_rate=self.spawn_rate)

def reset_connection(self) -> None:
logger.info("Resetting RPC server and all client connections.")
logger.info("Resetting RPC server and all worker connections.")
try:
self.server.close(linger=0)
self.server = rpc.Server(self.master_bind_host, self.master_bind_port)
Expand All @@ -983,12 +983,12 @@ def client_listener(self) -> NoReturn:
try:
self.server.send_to_client(Message("reconnect", None, client_id))
except Exception as e:
logger.error(f"Error sending reconnect message to client: {e}. Will reset RPC server.")
logger.error(f"Error sending reconnect message to worker: {e}. Will reset RPC server.")
self.connection_broken = True
gevent.sleep(FALLBACK_INTERVAL)
continue
except RPCSendError as e:
logger.error(f"Error sending reconnect message to client: {e}. Will reset RPC server.")
logger.error(f"Error sending reconnect message to worker: {e}. Will reset RPC server.")
self.connection_broken = True
gevent.sleep(FALLBACK_INTERVAL)
continue
Expand All @@ -997,13 +997,12 @@ def client_listener(self) -> NoReturn:
logger.error(f"RPCError: {e}. Will reset RPC server.")
else:
logger.debug(
"RPCError when receiving from client: %s (but no clients were expected to be connected anyway)"
"RPCError when receiving from worker: %s (but no workers were expected to be connected anyway)"
% (e)
)
self.connection_broken = True
gevent.sleep(FALLBACK_INTERVAL)
continue
msg.node_id = client_id
if msg.type == "client_ready":
if not msg.data:
logger.error(f"An old (pre 2.0) worker tried to connect ({client_id}). That's not going to work.")
Expand All @@ -1018,16 +1017,15 @@ def client_listener(self) -> NoReturn:
f"A worker ({client_id}) running a different version ({msg.data}) connected, master version is {__version__}"
)
self.send_message("ack", client_id=client_id, data={"index": self.get_worker_index(client_id)})
worker_node_id = msg.node_id
self.clients[worker_node_id] = WorkerNode(worker_node_id, heartbeat_liveness=HEARTBEAT_LIVENESS)

self.clients[client_id] = WorkerNode(client_id, heartbeat_liveness=HEARTBEAT_LIVENESS)
if self._users_dispatcher is not None:
self._users_dispatcher.add_worker(worker_node=self.clients[worker_node_id])
self._users_dispatcher.add_worker(worker_node=self.clients[client_id])
if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:
# TODO: Test this situation
self.start(self.target_user_count, self.spawn_rate)
logger.info(
"Client %r reported as ready. Currently %i clients ready to swarm."
% (worker_node_id, len(self.clients.ready + self.clients.running + self.clients.spawning))
f"Worker {client_id} (index {self.get_worker_index(client_id)}) reported as ready. {len(self.clients.ready + self.clients.running + self.clients.spawning)} workers connected."
)
if self.rebalancing_enabled() and self.state == STATE_RUNNING and self.spawning_completed:
self.start(self.target_user_count, self.spawn_rate)
Expand All @@ -1036,7 +1034,7 @@ def client_listener(self) -> NoReturn:
# warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.")
elif msg.type == "client_stopped":
if msg.node_id not in self.clients:
logger.warning(f"Received {msg.type} message from an unknown client: {msg.node_id}.")
logger.warning(f"Received {msg.type} message from an unknown worker: {msg.node_id}.")
continue
client = self.clients[msg.node_id]
del self.clients[msg.node_id]
Expand All @@ -1045,7 +1043,9 @@ def client_listener(self) -> NoReturn:
if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:
# TODO: Test this situation
self.start(self.target_user_count, self.spawn_rate)
logger.info(f"Removing {msg.node_id} client from running clients")
logger.info(
f"Worker {msg.node_id} (index {self.get_worker_index(client_id)}) reported that it has stopped, removing from running workers"
)
elif msg.type == "heartbeat":
if msg.node_id in self.clients:
c = self.clients[msg.node_id]
Expand All @@ -1064,7 +1064,7 @@ def client_listener(self) -> NoReturn:
self.worker_cpu_warning_emitted = True # used to fail the test in the end
c.cpu_warning_emitted = True # used to suppress logging for this node
logger.warning(
f"Worker {msg.node_id} exceeded cpu threshold (will only log this once per worker)"
f"Worker {msg.node_id} (index {self.get_worker_index(msg.node_id)}) exceeded cpu threshold (will only log this once per worker)"
)
if "current_memory_usage" in msg.data:
c.memory_usage = msg.data["current_memory_usage"]
Expand All @@ -1084,7 +1084,9 @@ def client_listener(self) -> NoReturn:
if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:
# TODO: Test this situation
self.start(self.target_user_count, self.spawn_rate)
logger.info(f"Client {msg.node_id!r} quit. Currently {len(self.clients.ready)} clients connected.")
logger.info(
f"Worker {msg.node_id!r} (index {self.get_worker_index(msg.node_id)}) quit. {len(self.clients.ready)} workers ready."
)
if self.worker_count - len(self.clients.missing) <= 0:
logger.info("The last worker quit, stopping test.")
self.stop()
Expand All @@ -1093,10 +1095,14 @@ def client_listener(self) -> NoReturn:
elif msg.type == "exception":
self.log_exception(msg.node_id, msg.data["msg"], msg.data["traceback"])
elif msg.type in self.custom_messages:
logger.debug(f"Received {msg.type} message from worker {msg.node_id}")
logger.debug(
f"Received {msg.type} message from worker {msg.node_id} (index {self.get_worker_index(msg.node_id)})"
)
self.custom_messages[msg.type](environment=self.environment, msg=msg)
else:
logger.warning(f"Unknown message type received from worker {msg.node_id}: {msg.type}")
logger.warning(
f"Unknown message type received from worker {msg.node_id} (index {self.get_worker_index(msg.node_id)}): {msg.type}"
)

self.check_stopped()

Expand All @@ -1122,11 +1128,11 @@ def send_message(self, msg_type: str, data: Optional[Dict[str, Any]] = None, cli
If None, will send to all attached workers
"""
if client_id:
logger.debug(f"Sending {msg_type} message to client {client_id}")
logger.debug(f"Sending {msg_type} message to worker {client_id}")
self.server.send_to_client(Message(msg_type, data, client_id))
else:
for client in self.clients.all:
logger.debug(f"Sending {msg_type} message to client {client.id}")
logger.debug(f"Sending {msg_type} message to worker {client.id}")
self.server.send_to_client(Message(msg_type, data, client.id))


Expand Down