Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions benchmarks/bench_lazy_init_callbacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Micro-benchmark: lazy initialization of _callbacks/_errbacks.

Measures the allocation savings from deferring list creation in
ResponseFuture.__init__() for the common case where no callbacks
are registered (synchronous execute path).

Run:
python benchmarks/bench_lazy_init_callbacks.py
"""
import timeit
import sys


def bench_lazy_init():
"""Compare allocation cost of [] vs None initialization."""
n = 1_000_000

# Simulate the __init__ allocation pattern
def init_with_lists():
callbacks = []
errbacks = []
return callbacks, errbacks

def init_with_none():
callbacks = None
errbacks = None
return callbacks, errbacks

t_lists = timeit.timeit(init_with_lists, number=n)
t_none = timeit.timeit(init_with_none, number=n)

print(f"Init with [] x2 ({n} iters): {t_lists / n * 1e9:.1f} ns/call")
print(f"Init with None x2 ({n} iters): {t_none / n * 1e9:.1f} ns/call")
print(f"Speedup: {t_lists / t_none:.1f}x")
print(f"Memory per empty list: {sys.getsizeof([])} bytes")
print(f"Saved per request (no callbacks): {sys.getsizeof([]) * 2} bytes")

# Benchmark the happy path: _set_final_result with no callbacks
# This is the hot path - iterating None vs empty list
def iter_empty_list():
callbacks = []
for fn, args, kwargs in callbacks:
pass

def iter_none_with_guard():
callbacks = None
for fn, args, kwargs in callbacks or ():
pass

t_list_iter = timeit.timeit(iter_empty_list, number=n)
t_none_iter = timeit.timeit(iter_none_with_guard, number=n)

print(f"\nHappy-path iteration (no callbacks):")
print(f" Iterate empty []: {t_list_iter / n * 1e9:.1f} ns/call")
print(f" Guard None or (): {t_none_iter / n * 1e9:.1f} ns/call")
print(f" Speedup: {t_list_iter / t_none_iter:.2f}x")


def main():
bench_lazy_init()


if __name__ == '__main__':
main()
16 changes: 10 additions & 6 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4340,7 +4340,7 @@
self._scheduled_tasks.discard(task)
fn, args, kwargs = task
kwargs = dict(kwargs)
future = self._executor.submit(fn, *args, **kwargs)

Check failure on line 4343 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test libev (3.11)

cannot schedule new futures after shutdown
future.add_done_callback(self._log_if_failed)
else:
self._queue.put_nowait((run_at, i, task))
Expand Down Expand Up @@ -4463,8 +4463,8 @@
self._make_query_plan()
self._event = Event()
self._errors = {}
self._callbacks = []
self._errbacks = []
self._callbacks = None
self._errbacks = None
self.attempted_hosts = []
self._start_timer()
self._continuous_paging_state = continuous_paging_state
Expand Down Expand Up @@ -4969,7 +4969,7 @@
# registered callback
to_call = tuple(
partial(fn, response, *args, **kwargs)
for (fn, args, kwargs) in self._callbacks
for (fn, args, kwargs) in self._callbacks or ()
)

self._event.set()
Expand All @@ -4991,7 +4991,7 @@
# registered errback
to_call = tuple(
partial(fn, response, *args, **kwargs)
for (fn, args, kwargs) in self._errbacks
for (fn, args, kwargs) in self._errbacks or ()
)
self._event.set()

Expand Down Expand Up @@ -5167,6 +5167,8 @@
# Always add fn to self._callbacks, even when we're about to
# execute it, to prevent races with functions like
# start_fetching_next_page that reset _final_result
if self._callbacks is None:
self._callbacks = []
self._callbacks.append((fn, args, kwargs))
if self._final_result is not _NOT_SET:
run_now = True
Expand All @@ -5185,6 +5187,8 @@
# Always add fn to self._errbacks, even when we're about to execute
# it, to prevent races with functions like start_fetching_next_page
# that reset _final_exception
if self._errbacks is None:
self._errbacks = []
self._errbacks.append((fn, args, kwargs))
if self._final_exception:
run_now = True
Expand Down Expand Up @@ -5222,8 +5226,8 @@

def clear_callbacks(self):
with self._callback_lock:
self._callbacks = []
self._errbacks = []
self._callbacks = None
self._errbacks = None

def __str__(self):
result = "(no result yet)" if self._final_result is _NOT_SET else self._final_result
Expand Down
128 changes: 128 additions & 0 deletions tests/unit/test_lazy_init_callbacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""
Unit tests for lazy initialization of _callbacks/_errbacks in ResponseFuture.
"""
import unittest
from unittest.mock import Mock, patch, PropertyMock
from threading import Lock, Event

from cassandra.cluster import ResponseFuture, _NOT_SET
from cassandra.query import SimpleStatement
from cassandra.policies import RetryPolicy


def make_response_future():
"""Create a minimal ResponseFuture for testing."""
session = Mock()
session.cluster._default_load_balancing_policy = Mock()
session.cluster._default_load_balancing_policy.make_query_plan.return_value = iter([])
session.row_factory = Mock()
session.cluster.connect_timeout = 5
session._create_clock.return_value = None

message = Mock()
query = SimpleStatement("SELECT 1")
return ResponseFuture(session, message, query, timeout=10.0,
retry_policy=RetryPolicy())


class TestLazyInitCallbacks(unittest.TestCase):

def test_callbacks_initially_none(self):
"""_callbacks and _errbacks should be None after __init__."""
rf = make_response_future()
self.assertIsNone(rf._callbacks)
self.assertIsNone(rf._errbacks)

def test_add_callback_lazy_inits(self):
"""add_callback should create the list on first use."""
rf = make_response_future()
self.assertIsNone(rf._callbacks)
rf.add_callback(lambda result: None)
self.assertIsNotNone(rf._callbacks)
self.assertEqual(len(rf._callbacks), 1)

def test_add_errback_lazy_inits(self):
"""add_errback should create the list on first use."""
rf = make_response_future()
self.assertIsNone(rf._errbacks)
rf.add_errback(lambda exc: None)
self.assertIsNotNone(rf._errbacks)
self.assertEqual(len(rf._errbacks), 1)

def test_set_final_result_no_callbacks(self):
"""_set_final_result should work when _callbacks is None."""
rf = make_response_future()
self.assertIsNone(rf._callbacks)
# Should not raise
rf._set_final_result("some result")
self.assertEqual(rf._final_result, "some result")
self.assertTrue(rf._event.is_set())

def test_set_final_exception_no_errbacks(self):
"""_set_final_exception should work when _errbacks is None."""
rf = make_response_future()
self.assertIsNone(rf._errbacks)
exc = Exception("test error")
# Should not raise
rf._set_final_exception(exc)
self.assertIs(rf._final_exception, exc)
self.assertTrue(rf._event.is_set())

def test_set_final_result_with_callbacks(self):
"""_set_final_result should invoke registered callbacks."""
rf = make_response_future()
results = []
rf.add_callback(lambda result: results.append(result))
rf._set_final_result("data")
self.assertEqual(results, ["data"])

def test_set_final_exception_with_errbacks(self):
"""_set_final_exception should invoke registered errbacks."""
rf = make_response_future()
errors = []
rf.add_errback(lambda exc: errors.append(exc))
exc = Exception("fail")
rf._set_final_exception(exc)
self.assertEqual(errors, [exc])

def test_multiple_callbacks(self):
"""Multiple callbacks should all be invoked."""
rf = make_response_future()
r1, r2 = [], []
rf.add_callback(lambda result: r1.append(result))
rf.add_callback(lambda result: r2.append(result))
rf._set_final_result("ok")
self.assertEqual(r1, ["ok"])
self.assertEqual(r2, ["ok"])

def test_clear_callbacks_resets_to_none(self):
"""clear_callbacks should set both back to None."""
rf = make_response_future()
rf.add_callback(lambda r: None)
rf.add_errback(lambda e: None)
self.assertIsNotNone(rf._callbacks)
self.assertIsNotNone(rf._errbacks)
rf.clear_callbacks()
self.assertIsNone(rf._callbacks)
self.assertIsNone(rf._errbacks)

def test_add_callback_after_result(self):
"""add_callback after _set_final_result should run immediately."""
rf = make_response_future()
rf._set_final_result("data")
results = []
rf.add_callback(lambda result: results.append(result))
self.assertEqual(results, ["data"])

def test_add_errback_after_exception(self):
"""add_errback after _set_final_exception should run immediately."""
rf = make_response_future()
exc = Exception("fail")
rf._set_final_exception(exc)
errors = []
rf.add_errback(lambda e: errors.append(e))
self.assertEqual(errors, [exc])


if __name__ == '__main__':
unittest.main()
Loading