From 89ac0194424049d5e55668c0617a874b04653458 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 8 Nov 2025 09:38:39 +0000 Subject: [PATCH 01/11] Initial plan From 2fc623aba37cfb29859a9aae1adf651b5e5d565f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 8 Nov 2025 09:51:56 +0000 Subject: [PATCH 02/11] Optimize column_encryption_policy checks in recv_results_rows - Check column_encryption_policy once per recv_results_rows call instead of per value - Create two separate decode paths: one with encryption, one without - Pre-compute encryption info per column to avoid repeated lookups - Add comprehensive unit tests to verify optimization Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com> --- cassandra/protocol.py | 73 ++++++--- .../unit/test_protocol_decode_optimization.py | 152 ++++++++++++++++++ 2 files changed, 205 insertions(+), 20 deletions(-) create mode 100644 tests/unit/test_protocol_decode_optimization.py diff --git a/cassandra/protocol.py b/cassandra/protocol.py index d8716f4eeb..0f57d19ec2 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -720,26 +720,59 @@ def recv_results_rows(self, f, protocol_version, user_type_map, result_metadata, self.column_types = [c[3] for c in column_metadata] col_descs = [ColDesc(md[0], md[1], md[2]) for md in column_metadata] - def decode_val(val, col_md, col_desc): - uses_ce = column_encryption_policy and column_encryption_policy.contains_column(col_desc) - col_type = column_encryption_policy.column_type(col_desc) if uses_ce else col_md[3] - raw_bytes = column_encryption_policy.decrypt(col_desc, val) if uses_ce else val - return col_type.from_binary(raw_bytes, protocol_version) - - def decode_row(row): - return tuple(decode_val(val, col_md, col_desc) for val, col_md, col_desc in zip(row, column_metadata, col_descs)) - - try: - self.parsed_rows = [decode_row(row) for row in rows] - except Exception: - for row in rows: - for val, col_md, col_desc in zip(row, column_metadata, col_descs): - try: - decode_val(val, col_md, col_desc) - except Exception as e: - raise DriverException('Failed decoding result column "%s" of type %s: %s' % (col_md[2], - col_md[3].cql_parameterized_type(), - str(e))) + # Optimize by checking column_encryption_policy once and creating appropriate decode path + if column_encryption_policy: + # Pre-compute encryption info for each column to avoid repeated lookups + column_encryption_info = [ + (column_encryption_policy.contains_column(col_desc), col_desc) + for col_desc in col_descs + ] + + def decode_val_with_encryption(val, col_md, uses_ce, col_desc): + if uses_ce: + col_type = column_encryption_policy.column_type(col_desc) + raw_bytes = column_encryption_policy.decrypt(col_desc, val) + else: + col_type = col_md[3] + raw_bytes = val + return col_type.from_binary(raw_bytes, protocol_version) + + def decode_row(row): + return tuple( + decode_val_with_encryption(val, col_md, uses_ce, col_desc) + for val, col_md, (uses_ce, col_desc) in zip(row, column_metadata, column_encryption_info) + ) + + try: + self.parsed_rows = [decode_row(row) for row in rows] + except Exception: + for row in rows: + for val, col_md, (uses_ce, col_desc) in zip(row, column_metadata, column_encryption_info): + try: + decode_val_with_encryption(val, col_md, uses_ce, col_desc) + except Exception as e: + raise DriverException('Failed decoding result column "%s" of type %s: %s' % (col_md[2], + col_md[3].cql_parameterized_type(), + str(e))) + else: + # Simple path without encryption - just decode raw bytes directly + def decode_val_simple(val, col_type): + return col_type.from_binary(val, protocol_version) + + def decode_row(row): + return tuple(decode_val_simple(val, col_md[3]) for val, col_md in zip(row, column_metadata)) + + try: + self.parsed_rows = [decode_row(row) for row in rows] + except Exception: + for row in rows: + for val, col_md in zip(row, column_metadata): + try: + decode_val_simple(val, col_md[3]) + except Exception as e: + raise DriverException('Failed decoding result column "%s" of type %s: %s' % (col_md[2], + col_md[3].cql_parameterized_type(), + str(e))) def recv_results_prepared(self, f, protocol_version, user_type_map): self.query_id = read_binary_string(f) diff --git a/tests/unit/test_protocol_decode_optimization.py b/tests/unit/test_protocol_decode_optimization.py new file mode 100644 index 0000000000..0c51803aa2 --- /dev/null +++ b/tests/unit/test_protocol_decode_optimization.py @@ -0,0 +1,152 @@ +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from unittest.mock import Mock, MagicMock +import io + +from cassandra import ProtocolVersion +from cassandra.protocol import ResultMessage, RESULT_KIND_ROWS +from cassandra.cqltypes import Int32Type, UTF8Type +from cassandra.policies import ColDesc +from cassandra.marshal import int32_pack + + +class DecodeOptimizationTest(unittest.TestCase): + """ + Tests to verify the optimization of column_encryption_policy checks + in recv_results_rows. The optimization should avoid checking the policy + for every value and instead check once per recv_results_rows call. + """ + + def _create_mock_result_metadata(self): + """Create mock result metadata for testing""" + return [ + ('keyspace1', 'table1', 'col1', Int32Type), + ('keyspace1', 'table1', 'col2', UTF8Type), + ] + + def _create_mock_result_message(self): + """Create a mock result message with data""" + msg = ResultMessage(kind=RESULT_KIND_ROWS) + msg.column_metadata = self._create_mock_result_metadata() + msg.recv_results_metadata = Mock() + msg.recv_row = Mock(side_effect=[ + [int32_pack(42), b'hello'], + [int32_pack(100), b'world'], + ]) + return msg + + def _create_mock_stream(self): + """Create a mock stream for reading rows""" + # Pack rowcount (2 rows) + data = int32_pack(2) + return io.BytesIO(data) + + def test_decode_without_encryption_policy(self): + """ + Test that decoding works correctly without column encryption policy. + This should use the optimized simple path. + """ + msg = self._create_mock_result_message() + f = self._create_mock_stream() + + msg.recv_results_rows(f, ProtocolVersion.V4, {}, None, None) + + # Verify results + self.assertEqual(len(msg.parsed_rows), 2) + self.assertEqual(msg.parsed_rows[0][0], 42) + self.assertEqual(msg.parsed_rows[0][1], 'hello') + self.assertEqual(msg.parsed_rows[1][0], 100) + self.assertEqual(msg.parsed_rows[1][1], 'world') + + def test_decode_with_encryption_policy_no_encrypted_columns(self): + """ + Test that decoding works with encryption policy when no columns are encrypted. + """ + msg = self._create_mock_result_message() + f = self._create_mock_stream() + + # Create mock encryption policy that has no encrypted columns + mock_policy = Mock() + mock_policy.contains_column = Mock(return_value=False) + + msg.recv_results_rows(f, ProtocolVersion.V4, {}, None, mock_policy) + + # Verify results + self.assertEqual(len(msg.parsed_rows), 2) + self.assertEqual(msg.parsed_rows[0][0], 42) + self.assertEqual(msg.parsed_rows[0][1], 'hello') + + # Verify contains_column was called only once per column (optimization check) + # Should be called 2 times total (once per column, not per value per row) + self.assertEqual(mock_policy.contains_column.call_count, 2) + + def test_decode_with_encryption_policy_with_encrypted_column(self): + """ + Test that decoding works with encryption policy when one column is encrypted. + """ + msg = self._create_mock_result_message() + f = self._create_mock_stream() + + # Create mock encryption policy where first column is encrypted + mock_policy = Mock() + def contains_column_side_effect(col_desc): + return col_desc.col == 'col1' + mock_policy.contains_column = Mock(side_effect=contains_column_side_effect) + mock_policy.column_type = Mock(return_value=Int32Type) + mock_policy.decrypt = Mock(side_effect=lambda col_desc, val: val) + + msg.recv_results_rows(f, ProtocolVersion.V4, {}, None, mock_policy) + + # Verify results + self.assertEqual(len(msg.parsed_rows), 2) + self.assertEqual(msg.parsed_rows[0][0], 42) + self.assertEqual(msg.parsed_rows[0][1], 'hello') + + # Verify contains_column was called only once per column (optimization) + self.assertEqual(mock_policy.contains_column.call_count, 2) + + # Verify decrypt was called for each encrypted value (2 rows * 1 encrypted column) + self.assertEqual(mock_policy.decrypt.call_count, 2) + + def test_optimization_efficiency(self): + """ + Verify that the optimization reduces the number of policy checks. + With the old code, contains_column would be called for every value. + With the new code, it's called once per column. + """ + msg = self._create_mock_result_message() + + # Create more rows to make the optimization more apparent + msg.recv_row = Mock(side_effect=[ + [int32_pack(i), f'text{i}'.encode()] for i in range(100) + ]) + + # Create mock stream with 100 rows + f = io.BytesIO(int32_pack(100)) + + mock_policy = Mock() + mock_policy.contains_column = Mock(return_value=False) + + msg.recv_results_rows(f, ProtocolVersion.V4, {}, None, mock_policy) + + # With optimization: contains_column called once per column = 2 calls + # Without optimization: would be called per value = 100 rows * 2 columns = 200 calls + self.assertEqual(mock_policy.contains_column.call_count, 2, + "Optimization failed: contains_column should be called once per column, not per value") + + +if __name__ == '__main__': + unittest.main() From b192ca5ed74f7a843b02447b2d67e2d0470f7c3b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 8 Nov 2025 09:59:26 +0000 Subject: [PATCH 03/11] Add detailed comments explaining the optimization benefits Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com> --- cassandra/protocol.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cassandra/protocol.py b/cassandra/protocol.py index 0f57d19ec2..16acb3bffa 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -720,9 +720,11 @@ def recv_results_rows(self, f, protocol_version, user_type_map, result_metadata, self.column_types = [c[3] for c in column_metadata] col_descs = [ColDesc(md[0], md[1], md[2]) for md in column_metadata] - # Optimize by checking column_encryption_policy once and creating appropriate decode path + # Optimize by checking column_encryption_policy once and creating appropriate decode path. + # This avoids checking the policy for every single value decoded (rows × columns times). if column_encryption_policy: - # Pre-compute encryption info for each column to avoid repeated lookups + # Pre-compute encryption info for each column to avoid repeated lookups. + # For N rows with M columns, this reduces contains_column() calls from N×M to just M. column_encryption_info = [ (column_encryption_policy.contains_column(col_desc), col_desc) for col_desc in col_descs From 12eac3896e25f8a66b8e0e6f8cfa473d600e832e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 8 Nov 2025 10:12:37 +0000 Subject: [PATCH 04/11] Simplify optimization - only define decode_row differently Per review feedback, removed code duplication: - Only define decode_row() function differently based on encryption policy - Removed separate decode_val helper functions - Keep single try/except block instead of duplicating it - Reduced from 75 lines to 57 lines (-18 lines) Still maintains the same optimization: contains_column() called once per column instead of per value. Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com> --- cassandra/protocol.py | 60 +++++++++++++++---------------------------- 1 file changed, 21 insertions(+), 39 deletions(-) diff --git a/cassandra/protocol.py b/cassandra/protocol.py index 16acb3bffa..6b8839222d 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -720,7 +720,7 @@ def recv_results_rows(self, f, protocol_version, user_type_map, result_metadata, self.column_types = [c[3] for c in column_metadata] col_descs = [ColDesc(md[0], md[1], md[2]) for md in column_metadata] - # Optimize by checking column_encryption_policy once and creating appropriate decode path. + # Optimize by checking column_encryption_policy once and defining appropriate decode_row function. # This avoids checking the policy for every single value decoded (rows × columns times). if column_encryption_policy: # Pre-compute encryption info for each column to avoid repeated lookups. @@ -730,51 +730,33 @@ def recv_results_rows(self, f, protocol_version, user_type_map, result_metadata, for col_desc in col_descs ] - def decode_val_with_encryption(val, col_md, uses_ce, col_desc): - if uses_ce: - col_type = column_encryption_policy.column_type(col_desc) - raw_bytes = column_encryption_policy.decrypt(col_desc, val) - else: - col_type = col_md[3] - raw_bytes = val - return col_type.from_binary(raw_bytes, protocol_version) - def decode_row(row): return tuple( - decode_val_with_encryption(val, col_md, uses_ce, col_desc) + column_encryption_policy.column_type(col_desc).from_binary( + column_encryption_policy.decrypt(col_desc, val), protocol_version + ) if uses_ce else col_md[3].from_binary(val, protocol_version) for val, col_md, (uses_ce, col_desc) in zip(row, column_metadata, column_encryption_info) ) - - try: - self.parsed_rows = [decode_row(row) for row in rows] - except Exception: - for row in rows: - for val, col_md, (uses_ce, col_desc) in zip(row, column_metadata, column_encryption_info): - try: - decode_val_with_encryption(val, col_md, uses_ce, col_desc) - except Exception as e: - raise DriverException('Failed decoding result column "%s" of type %s: %s' % (col_md[2], - col_md[3].cql_parameterized_type(), - str(e))) else: # Simple path without encryption - just decode raw bytes directly - def decode_val_simple(val, col_type): - return col_type.from_binary(val, protocol_version) - def decode_row(row): - return tuple(decode_val_simple(val, col_md[3]) for val, col_md in zip(row, column_metadata)) - - try: - self.parsed_rows = [decode_row(row) for row in rows] - except Exception: - for row in rows: - for val, col_md in zip(row, column_metadata): - try: - decode_val_simple(val, col_md[3]) - except Exception as e: - raise DriverException('Failed decoding result column "%s" of type %s: %s' % (col_md[2], - col_md[3].cql_parameterized_type(), - str(e))) + return tuple(col_md[3].from_binary(val, protocol_version) for val, col_md in zip(row, column_metadata)) + + try: + self.parsed_rows = [decode_row(row) for row in rows] + except Exception: + for row in rows: + for val, col_md, col_desc in zip(row, column_metadata, col_descs): + try: + # Fallback to original decode_val logic for error reporting + uses_ce = column_encryption_policy and column_encryption_policy.contains_column(col_desc) + col_type = column_encryption_policy.column_type(col_desc) if uses_ce else col_md[3] + raw_bytes = column_encryption_policy.decrypt(col_desc, val) if uses_ce else val + col_type.from_binary(raw_bytes, protocol_version) + except Exception as e: + raise DriverException('Failed decoding result column "%s" of type %s: %s' % (col_md[2], + col_md[3].cql_parameterized_type(), + str(e))) def recv_results_prepared(self, f, protocol_version, user_type_map): self.query_id = read_binary_string(f) From 89a321161d1785eeaf339dcb3fc5892a1f068ab8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 8 Nov 2025 10:33:09 +0000 Subject: [PATCH 05/11] Restore decode_val function for readability and remove unused import Per review feedback: - Restored decode_val function in encryption path for better readability - Keeps the structure similar to original code but without redundant policy check - Removed unused MagicMock import from test file The optimization remains the same: contains_column() called once per column. Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com> --- cassandra/protocol.py | 12 ++++++------ tests/unit/test_protocol_decode_optimization.py | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cassandra/protocol.py b/cassandra/protocol.py index 6b8839222d..b6b9ba3f25 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -730,13 +730,13 @@ def recv_results_rows(self, f, protocol_version, user_type_map, result_metadata, for col_desc in col_descs ] + def decode_val(val, col_md, uses_ce, col_desc): + col_type = column_encryption_policy.column_type(col_desc) if uses_ce else col_md[3] + raw_bytes = column_encryption_policy.decrypt(col_desc, val) if uses_ce else val + return col_type.from_binary(raw_bytes, protocol_version) + def decode_row(row): - return tuple( - column_encryption_policy.column_type(col_desc).from_binary( - column_encryption_policy.decrypt(col_desc, val), protocol_version - ) if uses_ce else col_md[3].from_binary(val, protocol_version) - for val, col_md, (uses_ce, col_desc) in zip(row, column_metadata, column_encryption_info) - ) + return tuple(decode_val(val, col_md, uses_ce, col_desc) for val, col_md, (uses_ce, col_desc) in zip(row, column_metadata, column_encryption_info)) else: # Simple path without encryption - just decode raw bytes directly def decode_row(row): diff --git a/tests/unit/test_protocol_decode_optimization.py b/tests/unit/test_protocol_decode_optimization.py index 0c51803aa2..4b8af3b073 100644 --- a/tests/unit/test_protocol_decode_optimization.py +++ b/tests/unit/test_protocol_decode_optimization.py @@ -13,7 +13,7 @@ # limitations under the License. import unittest -from unittest.mock import Mock, MagicMock +from unittest.mock import Mock import io from cassandra import ProtocolVersion From 8e82b03cd724605b18fad134c3fbe519cc691a0c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 8 Nov 2025 10:56:18 +0000 Subject: [PATCH 06/11] Check column_encryption_policy once per result message MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per review feedback, simplified the optimization to check the policy existence only once per result message, not per column. Changes: - Check 'if column_encryption_policy:' once at function entry - Within the encryption path, decode_val checks contains_column() without redundant policy existence check - Updated tests to reflect this optimization approach - Updated comments and documentation The key optimization is avoiding the repeated 'column_encryption_policy and ...' check for every value (N×M times), checking policy existence just once instead. Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com> --- cassandra/protocol.py | 27 +++++++-------- .../unit/test_protocol_decode_optimization.py | 33 ++++++++++--------- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/cassandra/protocol.py b/cassandra/protocol.py index b6b9ba3f25..a2bc171af8 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -720,23 +720,17 @@ def recv_results_rows(self, f, protocol_version, user_type_map, result_metadata, self.column_types = [c[3] for c in column_metadata] col_descs = [ColDesc(md[0], md[1], md[2]) for md in column_metadata] - # Optimize by checking column_encryption_policy once and defining appropriate decode_row function. - # This avoids checking the policy for every single value decoded (rows × columns times). + # Optimize by checking column_encryption_policy once per result message. + # This avoids checking if the policy exists for every single value decoded. if column_encryption_policy: - # Pre-compute encryption info for each column to avoid repeated lookups. - # For N rows with M columns, this reduces contains_column() calls from N×M to just M. - column_encryption_info = [ - (column_encryption_policy.contains_column(col_desc), col_desc) - for col_desc in col_descs - ] - - def decode_val(val, col_md, uses_ce, col_desc): + def decode_val(val, col_md, col_desc): + uses_ce = column_encryption_policy.contains_column(col_desc) col_type = column_encryption_policy.column_type(col_desc) if uses_ce else col_md[3] raw_bytes = column_encryption_policy.decrypt(col_desc, val) if uses_ce else val return col_type.from_binary(raw_bytes, protocol_version) def decode_row(row): - return tuple(decode_val(val, col_md, uses_ce, col_desc) for val, col_md, (uses_ce, col_desc) in zip(row, column_metadata, column_encryption_info)) + return tuple(decode_val(val, col_md, col_desc) for val, col_md, col_desc in zip(row, column_metadata, col_descs)) else: # Simple path without encryption - just decode raw bytes directly def decode_row(row): @@ -749,10 +743,13 @@ def decode_row(row): for val, col_md, col_desc in zip(row, column_metadata, col_descs): try: # Fallback to original decode_val logic for error reporting - uses_ce = column_encryption_policy and column_encryption_policy.contains_column(col_desc) - col_type = column_encryption_policy.column_type(col_desc) if uses_ce else col_md[3] - raw_bytes = column_encryption_policy.decrypt(col_desc, val) if uses_ce else val - col_type.from_binary(raw_bytes, protocol_version) + if column_encryption_policy: + uses_ce = column_encryption_policy.contains_column(col_desc) + col_type = column_encryption_policy.column_type(col_desc) if uses_ce else col_md[3] + raw_bytes = column_encryption_policy.decrypt(col_desc, val) if uses_ce else val + col_type.from_binary(raw_bytes, protocol_version) + else: + col_md[3].from_binary(val, protocol_version) except Exception as e: raise DriverException('Failed decoding result column "%s" of type %s: %s' % (col_md[2], col_md[3].cql_parameterized_type(), diff --git a/tests/unit/test_protocol_decode_optimization.py b/tests/unit/test_protocol_decode_optimization.py index 4b8af3b073..73f2968ef5 100644 --- a/tests/unit/test_protocol_decode_optimization.py +++ b/tests/unit/test_protocol_decode_optimization.py @@ -26,8 +26,9 @@ class DecodeOptimizationTest(unittest.TestCase): """ Tests to verify the optimization of column_encryption_policy checks - in recv_results_rows. The optimization should avoid checking the policy - for every value and instead check once per recv_results_rows call. + in recv_results_rows. The optimization checks if the policy exists once + per result message, avoiding the redundant 'column_encryption_policy and ...' + check for every value. """ def _create_mock_result_metadata(self): @@ -89,9 +90,9 @@ def test_decode_with_encryption_policy_no_encrypted_columns(self): self.assertEqual(msg.parsed_rows[0][0], 42) self.assertEqual(msg.parsed_rows[0][1], 'hello') - # Verify contains_column was called only once per column (optimization check) - # Should be called 2 times total (once per column, not per value per row) - self.assertEqual(mock_policy.contains_column.call_count, 2) + # Verify contains_column was called for each value (but policy existence check happens once) + # Should be called 4 times (2 rows × 2 columns) + self.assertEqual(mock_policy.contains_column.call_count, 4) def test_decode_with_encryption_policy_with_encrypted_column(self): """ @@ -115,21 +116,22 @@ def contains_column_side_effect(col_desc): self.assertEqual(msg.parsed_rows[0][0], 42) self.assertEqual(msg.parsed_rows[0][1], 'hello') - # Verify contains_column was called only once per column (optimization) - self.assertEqual(mock_policy.contains_column.call_count, 2) + # Verify contains_column was called for each value (but policy existence check happens once) + # Should be called 4 times (2 rows × 2 columns) + self.assertEqual(mock_policy.contains_column.call_count, 4) # Verify decrypt was called for each encrypted value (2 rows * 1 encrypted column) self.assertEqual(mock_policy.decrypt.call_count, 2) def test_optimization_efficiency(self): """ - Verify that the optimization reduces the number of policy checks. - With the old code, contains_column would be called for every value. - With the new code, it's called once per column. + Verify that the optimization checks policy existence once per result message. + The key optimization is checking 'if column_encryption_policy:' once, + rather than 'column_encryption_policy and ...' for every value. """ msg = self._create_mock_result_message() - # Create more rows to make the optimization more apparent + # Create more rows to make the check pattern clear msg.recv_row = Mock(side_effect=[ [int32_pack(i), f'text{i}'.encode()] for i in range(100) ]) @@ -142,10 +144,11 @@ def test_optimization_efficiency(self): msg.recv_results_rows(f, ProtocolVersion.V4, {}, None, mock_policy) - # With optimization: contains_column called once per column = 2 calls - # Without optimization: would be called per value = 100 rows * 2 columns = 200 calls - self.assertEqual(mock_policy.contains_column.call_count, 2, - "Optimization failed: contains_column should be called once per column, not per value") + # With optimization: policy existence checked once, contains_column called per value + # = 100 rows * 2 columns = 200 calls to contains_column + # The key is we avoid checking 'column_encryption_policy and ...' 200 times + self.assertEqual(mock_policy.contains_column.call_count, 200, + "contains_column should be called for each value when policy exists") if __name__ == '__main__': From 92a5963913b6850f5177a0fe33d96230c6fa15b7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 8 Nov 2025 11:18:39 +0000 Subject: [PATCH 07/11] Only create col_descs when needed Per review feedback, optimized to only create col_descs when necessary: - Created inside 'if column_encryption_policy:' block when policy is enabled - Created in exception handler when needed for error reporting - NOT created when no policy and no exceptions (the common fast path) This avoids unnecessary list comprehension when column_encryption_policy is None. Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com> --- cassandra/protocol.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cassandra/protocol.py b/cassandra/protocol.py index a2bc171af8..bc4fcc9bfd 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -718,11 +718,12 @@ def recv_results_rows(self, f, protocol_version, user_type_map, result_metadata, rows = [self.recv_row(f, len(column_metadata)) for _ in range(rowcount)] self.column_names = [c[2] for c in column_metadata] self.column_types = [c[3] for c in column_metadata] - col_descs = [ColDesc(md[0], md[1], md[2]) for md in column_metadata] # Optimize by checking column_encryption_policy once per result message. # This avoids checking if the policy exists for every single value decoded. if column_encryption_policy: + col_descs = [ColDesc(md[0], md[1], md[2]) for md in column_metadata] + def decode_val(val, col_md, col_desc): uses_ce = column_encryption_policy.contains_column(col_desc) col_type = column_encryption_policy.column_type(col_desc) if uses_ce else col_md[3] @@ -739,6 +740,8 @@ def decode_row(row): try: self.parsed_rows = [decode_row(row) for row in rows] except Exception: + # Create col_descs only if needed for error reporting + col_descs = [ColDesc(md[0], md[1], md[2]) for md in column_metadata] for row in rows: for val, col_md, col_desc in zip(row, column_metadata, col_descs): try: From 8910184440cac7f7725dc32cfbeb693f4dc17f65 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 8 Nov 2025 14:20:17 +0000 Subject: [PATCH 08/11] Reuse decode_val function in exception handler Per review feedback, refactored to reuse decode_val() in exception handler: - Moved try/except blocks inside each branch (encryption enabled/disabled) - Exception handler now calls decode_val() when column_encryption_policy exists - Eliminates code duplication by reusing the decode_val function - Maintains same error reporting behavior This makes the code cleaner and more maintainable. Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com> --- cassandra/protocol.py | 43 ++++++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/cassandra/protocol.py b/cassandra/protocol.py index bc4fcc9bfd..6bebcdaf4e 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -732,31 +732,36 @@ def decode_val(val, col_md, col_desc): def decode_row(row): return tuple(decode_val(val, col_md, col_desc) for val, col_md, col_desc in zip(row, column_metadata, col_descs)) + + try: + self.parsed_rows = [decode_row(row) for row in rows] + except Exception: + for row in rows: + for val, col_md, col_desc in zip(row, column_metadata, col_descs): + try: + decode_val(val, col_md, col_desc) + except Exception as e: + raise DriverException('Failed decoding result column "%s" of type %s: %s' % (col_md[2], + col_md[3].cql_parameterized_type(), + str(e))) else: # Simple path without encryption - just decode raw bytes directly def decode_row(row): return tuple(col_md[3].from_binary(val, protocol_version) for val, col_md in zip(row, column_metadata)) - try: - self.parsed_rows = [decode_row(row) for row in rows] - except Exception: - # Create col_descs only if needed for error reporting - col_descs = [ColDesc(md[0], md[1], md[2]) for md in column_metadata] - for row in rows: - for val, col_md, col_desc in zip(row, column_metadata, col_descs): - try: - # Fallback to original decode_val logic for error reporting - if column_encryption_policy: - uses_ce = column_encryption_policy.contains_column(col_desc) - col_type = column_encryption_policy.column_type(col_desc) if uses_ce else col_md[3] - raw_bytes = column_encryption_policy.decrypt(col_desc, val) if uses_ce else val - col_type.from_binary(raw_bytes, protocol_version) - else: + try: + self.parsed_rows = [decode_row(row) for row in rows] + except Exception: + # Create col_descs only if needed for error reporting + col_descs = [ColDesc(md[0], md[1], md[2]) for md in column_metadata] + for row in rows: + for val, col_md, col_desc in zip(row, column_metadata, col_descs): + try: col_md[3].from_binary(val, protocol_version) - except Exception as e: - raise DriverException('Failed decoding result column "%s" of type %s: %s' % (col_md[2], - col_md[3].cql_parameterized_type(), - str(e))) + except Exception as e: + raise DriverException('Failed decoding result column "%s" of type %s: %s' % (col_md[2], + col_md[3].cql_parameterized_type(), + str(e))) def recv_results_prepared(self, f, protocol_version, user_type_map): self.query_id = read_binary_string(f) From f5161802502c64bf2863e87ebcca6958e99ebae1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 8 Nov 2025 15:18:58 +0000 Subject: [PATCH 09/11] Remove duplicate try-except blocks (DRY principle) Per review feedback, eliminated code duplication: - Combined two separate try-except blocks into one - Single exception handler now checks column_encryption_policy to decide path - Maintains same error reporting behavior - Follows Python's DRY (Don't Repeat Yourself) principle This makes the code cleaner and more maintainable. Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com> --- cassandra/protocol.py | 38 +++++++++++++++----------------------- 1 file changed, 15 insertions(+), 23 deletions(-) diff --git a/cassandra/protocol.py b/cassandra/protocol.py index 6bebcdaf4e..86fbc5afad 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -732,36 +732,28 @@ def decode_val(val, col_md, col_desc): def decode_row(row): return tuple(decode_val(val, col_md, col_desc) for val, col_md, col_desc in zip(row, column_metadata, col_descs)) - - try: - self.parsed_rows = [decode_row(row) for row in rows] - except Exception: - for row in rows: - for val, col_md, col_desc in zip(row, column_metadata, col_descs): - try: - decode_val(val, col_md, col_desc) - except Exception as e: - raise DriverException('Failed decoding result column "%s" of type %s: %s' % (col_md[2], - col_md[3].cql_parameterized_type(), - str(e))) else: # Simple path without encryption - just decode raw bytes directly def decode_row(row): return tuple(col_md[3].from_binary(val, protocol_version) for val, col_md in zip(row, column_metadata)) - try: - self.parsed_rows = [decode_row(row) for row in rows] - except Exception: - # Create col_descs only if needed for error reporting + try: + self.parsed_rows = [decode_row(row) for row in rows] + except Exception: + # Create col_descs only if needed for error reporting + if not column_encryption_policy: col_descs = [ColDesc(md[0], md[1], md[2]) for md in column_metadata] - for row in rows: - for val, col_md, col_desc in zip(row, column_metadata, col_descs): - try: + for row in rows: + for val, col_md, col_desc in zip(row, column_metadata, col_descs): + try: + if column_encryption_policy: + decode_val(val, col_md, col_desc) + else: col_md[3].from_binary(val, protocol_version) - except Exception as e: - raise DriverException('Failed decoding result column "%s" of type %s: %s' % (col_md[2], - col_md[3].cql_parameterized_type(), - str(e))) + except Exception as e: + raise DriverException('Failed decoding result column "%s" of type %s: %s' % (col_md[2], + col_md[3].cql_parameterized_type(), + str(e))) def recv_results_prepared(self, f, protocol_version, user_type_map): self.query_id = read_binary_string(f) From 8cda794f2b4489769feb7f450057298486f7abf6 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 8 Nov 2025 15:50:32 +0000 Subject: [PATCH 10/11] Sort imports alphabetically in test file Per review feedback, reorganized imports following project conventions: - Standard library imports (io, unittest) grouped together and sorted - unittest.mock import kept with standard library - Blank line separator - Project imports (cassandra.*) grouped and sorted alphabetically This matches the import style used in other test files in the repository. Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com> --- tests/unit/test_protocol_decode_optimization.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/test_protocol_decode_optimization.py b/tests/unit/test_protocol_decode_optimization.py index 73f2968ef5..e0fd81fe3e 100644 --- a/tests/unit/test_protocol_decode_optimization.py +++ b/tests/unit/test_protocol_decode_optimization.py @@ -12,15 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import io import unittest from unittest.mock import Mock -import io from cassandra import ProtocolVersion -from cassandra.protocol import ResultMessage, RESULT_KIND_ROWS from cassandra.cqltypes import Int32Type, UTF8Type -from cassandra.policies import ColDesc from cassandra.marshal import int32_pack +from cassandra.policies import ColDesc +from cassandra.protocol import ResultMessage, RESULT_KIND_ROWS class DecodeOptimizationTest(unittest.TestCase): From e9e4459a9564c475b5ccde1f2984e713383b6a3e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 9 Nov 2025 14:59:11 +0000 Subject: [PATCH 11/11] Refactor decode_val to use if/else instead of ternary operators Per review feedback, simplified decode_val function structure: - Use single 'if uses_ce:' clause for encrypted column path - Else clause simply returns col_md[3].from_binary(val, protocol_version) - More readable than chained ternary operators Maintains same functionality while improving code clarity. Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com> --- cassandra/protocol.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cassandra/protocol.py b/cassandra/protocol.py index 86fbc5afad..b5cdfb764b 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -726,9 +726,12 @@ def recv_results_rows(self, f, protocol_version, user_type_map, result_metadata, def decode_val(val, col_md, col_desc): uses_ce = column_encryption_policy.contains_column(col_desc) - col_type = column_encryption_policy.column_type(col_desc) if uses_ce else col_md[3] - raw_bytes = column_encryption_policy.decrypt(col_desc, val) if uses_ce else val - return col_type.from_binary(raw_bytes, protocol_version) + if uses_ce: + col_type = column_encryption_policy.column_type(col_desc) + raw_bytes = column_encryption_policy.decrypt(col_desc, val) + return col_type.from_binary(raw_bytes, protocol_version) + else: + return col_md[3].from_binary(val, protocol_version) def decode_row(row): return tuple(decode_val(val, col_md, col_desc) for val, col_md, col_desc in zip(row, column_metadata, col_descs))