From 51fbb7768f642903d4f9d45dda9437743576bd03 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Mon, 17 Nov 2025 23:54:54 -0400 Subject: [PATCH] Fix dict handling in pool and metrics There are certain rules you need to following in the cases when dicts are modified and read in parallel. Otherwise you end up with `RuntimeError: dictionary changed size during iteration`. Rules are the following: 1. Any iteration over items or keys, needs to be done over a snapshot, i.e. `list()` or `set()` 2. Avoid unnecessary iterations like `len(d.keys())`, you can replace them with `len(d)` This commit fixes code to match these rules in the `pool.py` and `metrics.py` --- cassandra/metrics.py | 4 ++-- cassandra/pool.py | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cassandra/metrics.py b/cassandra/metrics.py index 223b0c7c6e..abfc863b55 100644 --- a/cassandra/metrics.py +++ b/cassandra/metrics.py @@ -134,9 +134,9 @@ def __init__(self, cluster_proxy): scales.Stat('known_hosts', lambda: len(cluster_proxy.metadata.all_hosts())), scales.Stat('connected_to', - lambda: len(set(chain.from_iterable(s._pools.keys() for s in cluster_proxy.sessions)))), + lambda: len(set(chain.from_iterable(list(s._pools.keys()) for s in cluster_proxy.sessions)))), scales.Stat('open_connections', - lambda: sum(sum(p.open_count for p in s._pools.values()) for s in cluster_proxy.sessions))) + lambda: sum(sum(p.open_count for p in list(s._pools.values())) for s in cluster_proxy.sessions))) # TODO, to be removed in 4.0 # /cassandra contains the metrics of the first cluster registered diff --git a/cassandra/pool.py b/cassandra/pool.py index 70abb6eccc..b8a8ef7493 100644 --- a/cassandra/pool.py +++ b/cassandra/pool.py @@ -492,7 +492,7 @@ def _get_connection_for_routing_key(self, routing_key=None, keyspace=None, table "Connection to shard_id=%i reached orphaned stream limit, replacing on host %s (%s/%i)", shard_id, self.host, - len(self._connections.keys()), + len(self._connections), self.host.sharding_info.shards_count ) elif shard_id not in self._connecting: @@ -503,7 +503,7 @@ def _get_connection_for_routing_key(self, routing_key=None, keyspace=None, table "Trying to connect to missing shard_id=%i on host %s (%s/%i)", shard_id, self.host, - len(self._connections.keys()), + len(self._connections), self.host.sharding_info.shards_count ) @@ -607,7 +607,7 @@ def _replace(self, connection): log.debug("Replacing connection (%s) to %s", id(connection), self.host) try: - if connection.features.shard_id in self._connections.keys(): + if connection.features.shard_id in self._connections: del self._connections[connection.features.shard_id] if self.host.sharding_info and not self._session.cluster.shard_aware_options.disable: self._connecting.add(connection.features.shard_id) @@ -759,7 +759,7 @@ def _open_connection_to_missing_shard(self, shard_id): with self._lock: is_shutdown = self.is_shutdown if not is_shutdown: - if conn.features.shard_id in self._connections.keys(): + if conn.features.shard_id in self._connections: # Move the current connection to the trash and use the new one from now on old_conn = self._connections[conn.features.shard_id] log.debug( @@ -804,7 +804,7 @@ def _open_connection_to_missing_shard(self, shard_id): num_missing_or_needing_replacement = self.num_missing_or_needing_replacement log.debug( "Connected to %s/%i shards on host %s (%i missing or needs replacement)", - len(self._connections.keys()), + len(self._connections), self.host.sharding_info.shards_count, self.host, num_missing_or_needing_replacement @@ -816,7 +816,7 @@ def _open_connection_to_missing_shard(self, shard_id): len(self._excess_connections) ) self._close_excess_connections() - elif self.host.sharding_info.shards_count == len(self._connections.keys()) and self.num_missing_or_needing_replacement == 0: + elif self.host.sharding_info.shards_count == len(self._connections) and self.num_missing_or_needing_replacement == 0: log.debug( "All shards are already covered, closing newly opened excess connection %s for host %s", id(self), @@ -917,7 +917,7 @@ def get_state(self): @property def num_missing_or_needing_replacement(self): return self.host.sharding_info.shards_count \ - - sum(1 for c in self._connections.values() if not c.orphaned_threshold_reached) + - sum(1 for c in list(self._connections.values()) if not c.orphaned_threshold_reached) @property def open_count(self):