diff --git a/eng/pipelines/pr-validation-pipeline.yml b/eng/pipelines/pr-validation-pipeline.yml index 2f09b48f..401b4cf1 100644 --- a/eng/pipelines/pr-validation-pipeline.yml +++ b/eng/pipelines/pr-validation-pipeline.yml @@ -313,11 +313,12 @@ jobs: distroName: 'Ubuntu-SQL2025' sqlServerImage: 'mcr.microsoft.com/mssql/server:2025-latest' useAzureSQL: 'false' - Ubuntu_AzureSQL: - dockerImage: 'ubuntu:22.04' - distroName: 'Ubuntu-AzureSQL' - sqlServerImage: '' - useAzureSQL: 'true' + ${{ if ne(variables['AZURE_CONNECTION_STRING'], '') }}: + Ubuntu_AzureSQL: + dockerImage: 'ubuntu:22.04' + distroName: 'Ubuntu-AzureSQL' + sqlServerImage: '' + useAzureSQL: 'true' Debian: dockerImage: 'debian:12' distroName: 'Debian' diff --git a/mssql_python/__init__.py b/mssql_python/__init__.py index cf510ca2..b1bd7e3b 100644 --- a/mssql_python/__init__.py +++ b/mssql_python/__init__.py @@ -3,7 +3,6 @@ Licensed under the MIT license. This module initializes the mssql_python package. """ - import sys import types from typing import Dict @@ -174,7 +173,6 @@ def pooling(max_size: int = 100, idle_timeout: int = 600, enabled: bool = True) else: PoolingManager.enable(max_size, idle_timeout) - _original_module_setattr = sys.modules[__name__].__setattr__ # Export SQL constants at module level @@ -251,22 +249,8 @@ def get_info_constants() -> Dict[str, int]: """ return {name: member.value for name, member in GetInfoConstants.__members__.items()} - # Create a custom module class that uses properties instead of __setattr__ class _MSSQLModule(types.ModuleType): - @property - def native_uuid(self) -> bool: - """Get the native UUID setting.""" - return _settings.native_uuid - - @native_uuid.setter - def native_uuid(self, value: bool) -> None: - """Set the native UUID setting.""" - if not isinstance(value, bool): - raise ValueError("native_uuid must be a boolean value") - with _settings_lock: - _settings.native_uuid = value - @property def lowercase(self) -> bool: """Get the lowercase setting.""" @@ -297,5 +281,4 @@ def lowercase(self, value: bool) -> None: sys.modules[__name__] = new_module # Initialize property values -lowercase: bool = _settings.lowercase -native_uuid: bool = _settings.native_uuid +lowercase: bool = _settings.lowercase \ No newline at end of file diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index 446a2dfb..08e3fdd2 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -1125,8 +1125,6 @@ def execute( # pylint: disable=too-many-locals,too-many-branches,too-many-state # After successful execution, initialize description if there are results column_metadata = [] try: - # ODBC specification guarantees that column metadata is available immediately after - # a successful SQLExecute/SQLExecDirect for the first result set ddbc_bindings.DDBCSQLDescribeCol(self.hstmt, column_metadata) self._initialize_description(column_metadata) except Exception as e: # pylint: disable=broad-exception-caught @@ -1135,30 +1133,21 @@ def execute( # pylint: disable=too-many-locals,too-many-branches,too-many-state # Reset rownumber for new result set (only for SELECT statements) if self.description: # If we have column descriptions, it's likely a SELECT - # Capture settings snapshot for this result set - settings = get_settings() - self._settings_snapshot = { # pylint: disable=attribute-defined-outside-init - "lowercase": settings.lowercase, - "native_uuid": settings.native_uuid, - } - # Identify UUID columns based on Python type in description[1] - # This relies on _map_data_type correctly mapping SQL_GUID to uuid.UUID - self._uuid_indices = [] # pylint: disable=attribute-defined-outside-init - for i, desc in enumerate(self.description): - if desc and desc[1] == uuid.UUID: # Column type code at index 1 - self._uuid_indices.append(i) - # Verify we have complete description tuples (7 items per PEP-249) - elif desc and len(desc) != 7: - log( - "warning", - f"Column description at index {i} has incorrect tuple length: {len(desc)}", - ) self.rowcount = -1 self._reset_rownumber() else: self.rowcount = ddbc_bindings.DDBCSQLRowCount(self.hstmt) self._clear_rownumber() + # After successful execution, initialize description if there are results + column_metadata = [] + try: + ddbc_bindings.DDBCSQLDescribeCol(self.hstmt, column_metadata) + self._initialize_description(column_metadata) + except Exception as e: + # If describe fails, it's likely there are no results (e.g., for INSERT) + self.description = None + self._reset_inputsizes() # Reset input sizes after execution # Return self for method chaining return self @@ -1971,8 +1960,7 @@ def fetchone(self) -> Union[None, Row]: # Create and return a Row object, passing column name map if available column_map = getattr(self, "_column_name_map", None) - settings_snapshot = getattr(self, "_settings_snapshot", None) - return Row(self, self.description, row_data, column_map, settings_snapshot) + return Row(self, self.description, row_data, column_map) except Exception as e: # pylint: disable=broad-exception-caught # On error, don't increment rownumber - rethrow the error raise e @@ -2019,9 +2007,8 @@ def fetchmany(self, size: Optional[int] = None) -> List[Row]: # Convert raw data to Row objects column_map = getattr(self, "_column_name_map", None) - settings_snapshot = getattr(self, "_settings_snapshot", None) return [ - Row(self, self.description, row_data, column_map, settings_snapshot) + Row(self, self.description, row_data, column_map) for row_data in rows_data ] except Exception as e: # pylint: disable=broad-exception-caught @@ -2060,9 +2047,8 @@ def fetchall(self) -> List[Row]: # Convert raw data to Row objects column_map = getattr(self, "_column_name_map", None) - settings_snapshot = getattr(self, "_settings_snapshot", None) return [ - Row(self, self.description, row_data, column_map, settings_snapshot) + Row(self, self.description, row_data, column_map) for row_data in rows_data ] except Exception as e: # pylint: disable=broad-exception-caught @@ -2471,4 +2457,4 @@ def setoutputsize(self, size: int, column: Optional[int] = None) -> None: This method is a no-op in this implementation as buffer sizes are managed automatically by the underlying driver. """ - # This is a no-op - buffer sizes are managed automatically + # This is a no-op - buffer sizes are managed automatically \ No newline at end of file diff --git a/mssql_python/helpers.py b/mssql_python/helpers.py index 1be730ee..25011308 100644 --- a/mssql_python/helpers.py +++ b/mssql_python/helpers.py @@ -328,14 +328,12 @@ class Settings: Settings class for mssql_python package configuration. This class holds global settings that affect the behavior of the package, - including lowercase column names, decimal separator, and native UUID handling. + including lowercase column names, decimal separator. """ def __init__(self) -> None: self.lowercase: bool = False # Use the pre-determined separator - no locale access here self.decimal_separator: str = _default_decimal_separator - self.native_uuid: bool = False # Default to False for backwards compatibility - # Global settings instance _settings: Settings = Settings() @@ -345,4 +343,4 @@ def __init__(self) -> None: def get_settings() -> Settings: """Return the global settings object""" with _settings_lock: - return _settings + return _settings \ No newline at end of file diff --git a/mssql_python/row.py b/mssql_python/row.py index 8ffcb6e0..d1f1c85b 100644 --- a/mssql_python/row.py +++ b/mssql_python/row.py @@ -5,39 +5,25 @@ from a cursor fetch operation. """ import decimal -import uuid -from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING - -from mssql_python.constants import ConstantsDDBC +from typing import Any from mssql_python.helpers import get_settings -if TYPE_CHECKING: - from mssql_python.cursor import Cursor - - class Row: """ - A row of data from a cursor fetch operation. - """ + A row of data from a cursor fetch operation. Provides both tuple-like indexing + and attribute access to column values. + + Column attribute access behavior depends on the global 'lowercase' setting: + - When enabled: Case-insensitive attribute access + - When disabled (default): Case-sensitive attribute access matching original column names - def __init__( - self, - cursor: "Cursor", - description: List[ - Tuple[ - str, - Any, - Optional[int], - Optional[int], - Optional[int], - Optional[int], - Optional[bool], - ] - ], - values: List[Any], - column_map: Optional[Dict[str, int]] = None, - settings_snapshot: Optional[Dict[str, Any]] = None, - ) -> None: + Example: + row = cursor.fetchone() + print(row[0]) # Access by index + print(row.column_name) # Access by column name (case sensitivity varies) + """ + + def __init__(self, cursor, description, values, column_map=None): """ Initialize a Row object with values and description. @@ -46,115 +32,33 @@ def __init__( description: The cursor description containing column metadata values: List of values for this row column_map: Optional pre-built column map (for optimization) - settings_snapshot: Settings snapshot from cursor to ensure consistency """ self._cursor = cursor self._description = description - - # Use settings snapshot if provided, otherwise fallback to global settings - if settings_snapshot is not None: - self._settings = settings_snapshot + + # Apply output converters if available + if hasattr(cursor.connection, '_output_converters') and cursor.connection._output_converters: + self._values = self._apply_output_converters(values) else: - settings = get_settings() - self._settings = { - "lowercase": settings.lowercase, - "native_uuid": settings.native_uuid, - } - # Create mapping of column names to indices first + self._values = values + + # TODO: ADO task - Optimize memory usage by sharing column map across rows + # Instead of storing the full cursor_description in each Row object: + # 1. Build the column map once at the cursor level after setting description + # 2. Pass only this map to each Row instance + # 3. Remove cursor_description from Row objects entirely + + # Create mapping of column names to indices # If column_map is not provided, build it from description if column_map is None: - self._column_map = {} + column_map = {} for i, col_desc in enumerate(description): - if col_desc: # Ensure column description exists - col_name = col_desc[0] # Name is first item in description tuple - if self._settings.get("lowercase"): - col_name = col_name.lower() - self._column_map[col_name] = i - else: - self._column_map = column_map - - # First make a mutable copy of values - processed_values = list(values) - - # Apply output converters if available - if ( - hasattr(cursor.connection, "_output_converters") - and cursor.connection._output_converters - ): - processed_values = self._apply_output_converters(processed_values) - - # Process UUID values using the snapshotted setting - self._values = self._process_uuid_values(processed_values, description) - - def _process_uuid_values( - self, - values: List[Any], - description: List[ - Tuple[ - str, - Any, - Optional[int], - Optional[int], - Optional[int], - Optional[int], - Optional[bool], - ] - ], - ) -> List[Any]: - """ - Convert string UUIDs to uuid.UUID objects if native_uuid setting is True, - or ensure UUIDs are returned as strings if False. - """ - - # Use the snapshot setting for native_uuid - native_uuid = self._settings.get("native_uuid") - - # Early return if no conversion needed - if not native_uuid and not any(isinstance(v, uuid.UUID) for v in values): - return values - - # Get pre-identified UUID indices from cursor if available - uuid_indices = getattr(self._cursor, "_uuid_indices", None) - processed_values = list(values) # Create a copy to modify - - # Process only UUID columns when native_uuid is True - if native_uuid: - # If we have pre-identified UUID columns - if uuid_indices is not None: - for i in uuid_indices: - if i < len(processed_values) and processed_values[i] is not None: - value = processed_values[i] - if isinstance(value, str): - try: - # Remove braces if present - clean_value = value.strip("{}") - processed_values[i] = uuid.UUID(clean_value) - except (ValueError, AttributeError): - pass # Keep original if conversion fails - # Fallback to scanning all columns if indices weren't pre-identified - else: - for i, value in enumerate(processed_values): - if value is None: - continue - - if i < len(description) and description[i]: - # Check SQL type for UNIQUEIDENTIFIER (-11) - sql_type = description[i][1] - if sql_type == -11: # SQL_GUID - if isinstance(value, str): - try: - processed_values[i] = uuid.UUID(value.strip("{}")) - except (ValueError, AttributeError): - pass - # When native_uuid is False, convert UUID objects to strings - else: - for i, value in enumerate(processed_values): - if isinstance(value, uuid.UUID): - processed_values[i] = str(value) - - return processed_values - - def _apply_output_converters(self, values: List[Any]) -> List[Any]: + col_name = col_desc[0] # Name is first item in description tuple + column_map[col_name] = i + + self._column_map = column_map + + def _apply_output_converters(self, values): """ Apply output converters to raw values. @@ -168,19 +72,6 @@ def _apply_output_converters(self, values: List[Any]) -> List[Any]: return values converted_values = list(values) - - # Map SQL type codes to appropriate byte sizes - int_size_map = { - # SQL_TINYINT - ConstantsDDBC.SQL_TINYINT.value: 1, - # SQL_SMALLINT - ConstantsDDBC.SQL_SMALLINT.value: 2, - # SQL_INTEGER - ConstantsDDBC.SQL_INTEGER.value: 4, - # SQL_BIGINT - ConstantsDDBC.SQL_BIGINT.value: 8, - } - for i, (value, desc) in enumerate(zip(values, self._description)): if desc is None or value is None: continue @@ -194,50 +85,27 @@ def _apply_output_converters(self, values: List[Any]) -> List[Any]: # If no converter found for the SQL type but the value is a string or bytes, # try the WVARCHAR converter as a fallback if converter is None and isinstance(value, (str, bytes)): - converter = self._cursor.connection.get_output_converter( - ConstantsDDBC.SQL_WVARCHAR.value - ) - + from mssql_python.constants import ConstantsDDBC + converter = self._cursor.connection.get_output_converter(ConstantsDDBC.SQL_WVARCHAR.value) + # If we found a converter, apply it if converter: try: - # If value is already a Python type (str, int, etc.), - # we need to handle it appropriately + # If value is already a Python type (str, int, etc.), + # we need to convert it to bytes for our converters if isinstance(value, str): # Encode as UTF-16LE for string values (SQL_WVARCHAR format) value_bytes = value.encode("utf-16-le") converted_values[i] = converter(value_bytes) - elif isinstance(value, int): - # Get appropriate byte size for this integer type - byte_size = int_size_map.get(sql_type, 8) - try: - # Use signed=True to properly handle negative values - value_bytes = value.to_bytes( - byte_size, byteorder="little", signed=True - ) - converted_values[i] = converter(value_bytes) - except OverflowError: - # Log specific overflow error with details to help diagnose the issue - if hasattr(self._cursor, "log"): - self._cursor.log( - "warning", - f"Integer overflow: value {value} does not fit in " - f"{byte_size} bytes for SQL type {sql_type}", - ) - # Keep the original value in this case else: - # Pass the value directly for other types converted_values[i] = converter(value) - except Exception as e: + except Exception: # Log the exception for debugging without leaking sensitive data - if hasattr(self._cursor, "log"): - self._cursor.log( - "warning", - f"Exception in output converter: {type(e).__name__} " - f"for SQL type {sql_type}", - ) + if hasattr(self._cursor, 'log'): + self._cursor.log('debug', 'Exception occurred in output converter', exc_info=True) # If conversion fails, keep the original value - + pass + return converted_values def __getitem__(self, index: int) -> Any: @@ -247,22 +115,25 @@ def __getitem__(self, index: int) -> Any: def __getattr__(self, name: str) -> Any: """ Allow accessing by column name as attribute: row.column_name + + Note: Case sensitivity depends on the global 'lowercase' setting: + - When lowercase=True: Column names are stored in lowercase, enabling + case-insensitive attribute access (e.g., row.NAME, row.name, row.Name all work). + - When lowercase=False (default): Column names preserve original casing, + requiring exact case matching for attribute access. """ - # _column_map should already be set in __init__, but check to be safe - if not hasattr(self, "_column_map"): - self._column_map = {} - - # Try direct lookup first + # Handle lowercase attribute access - if lowercase is enabled, + # try to match attribute names case-insensitively if name in self._column_map: return self._values[self._column_map[name]] - - # Use the snapshot lowercase setting instead of global - if self._settings.get("lowercase"): - # If lowercase is enabled, try case-insensitive lookup + + # If lowercase is enabled on the cursor, try case-insensitive lookup + if hasattr(self._cursor, 'lowercase') and self._cursor.lowercase: name_lower = name.lower() - if name_lower in self._column_map: - return self._values[self._column_map[name_lower]] - + for col_name in self._column_map: + if col_name.lower() == name_lower: + return self._values[self._column_map[col_name]] + raise AttributeError(f"Row has no attribute '{name}'") def __eq__(self, other: Any) -> bool: diff --git a/tests/test_001_globals.py b/tests/test_001_globals.py index 308f882c..d90d2acc 100644 --- a/tests/test_001_globals.py +++ b/tests/test_001_globals.py @@ -21,10 +21,8 @@ lowercase, getDecimalSeparator, setDecimalSeparator, - native_uuid, ) - def test_apilevel(): # Check if apilevel has the expected value assert apilevel == "2.0", "apilevel should be '2.0'" @@ -771,52 +769,3 @@ def separator_reader_worker(): # Always make sure to clean up stop_event.set() setDecimalSeparator(original_separator) - - -def test_native_uuid_type_validation(): - """Test that native_uuid only accepts boolean values""" - # Save original value - original = mssql_python.native_uuid - - try: - # Test valid values - mssql_python.native_uuid = True - assert mssql_python.native_uuid is True - - mssql_python.native_uuid = False - assert mssql_python.native_uuid is False - - # Test invalid types - invalid_values = [1, 0, "True", "False", None, [], {}, "yes", "no", "t", "f"] - - for value in invalid_values: - with pytest.raises(ValueError, match="native_uuid must be a boolean value"): - mssql_python.native_uuid = value - - finally: - # Restore original value - mssql_python.native_uuid = original - - -def test_lowercase_type_validation(): - """Test that lowercase only accepts boolean values""" - # Save original value - original = mssql_python.lowercase - - try: - # Test valid values - mssql_python.lowercase = True - assert mssql_python.lowercase is True - - mssql_python.lowercase = False - assert mssql_python.lowercase is False - - # Test invalid types - invalid_values = [1, 0, "True", "False", None, [], {}, "yes", "no", "t", "f"] - - for value in invalid_values: - with pytest.raises(ValueError, match="lowercase must be a boolean value"): - mssql_python.lowercase = value - finally: - # Restore original value - mssql_python.lowercase = original diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index f096bf7c..e475c68e 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -19,6 +19,7 @@ from conftest import is_azure_sql_connection + # Setup test table TEST_TABLE = """ CREATE TABLE #pytest_all_data_types ( @@ -8369,16 +8370,8 @@ def test_uuid_insert_and_select_none(cursor, db_connection): def test_insert_multiple_uuids(cursor, db_connection): """Test inserting multiple UUIDs and verifying retrieval.""" - import uuid - - # Save original setting - original_value = mssql_python.native_uuid - + table_name = "#pytest_uuid_multiple" try: - # Set native_uuid to True for this test - mssql_python.native_uuid = True - - table_name = "#pytest_uuid_multiple" cursor.execute(f"DROP TABLE IF EXISTS {table_name}") cursor.execute( f""" @@ -8408,28 +8401,18 @@ def test_insert_multiple_uuids(cursor, db_connection): assert len(rows) == len(uuids_to_insert), "Fetched row count mismatch" for retrieved_uuid, retrieved_desc in rows: - assert isinstance( - retrieved_uuid, uuid.UUID - ), f"Expected uuid.UUID, got {type(retrieved_uuid)}" + assert isinstance(retrieved_uuid, uuid.UUID), f"Expected uuid.UUID, got {type(retrieved_uuid)}" + expected_uuid = uuids_to_insert[retrieved_desc] + assert retrieved_uuid == expected_uuid, f"UUID mismatch for '{retrieved_desc}': expected {expected_uuid}, got {retrieved_uuid}" finally: - # Reset to original value - mssql_python.native_uuid = original_value cursor.execute(f"DROP TABLE IF EXISTS {table_name}") db_connection.commit() def test_fetchmany_uuids(cursor, db_connection): """Test fetching multiple UUID rows with fetchmany().""" - import uuid - - # Save original setting - original_value = mssql_python.native_uuid - + table_name = "#pytest_uuid_fetchmany" try: - # Set native_uuid to True for this test - mssql_python.native_uuid = True - - table_name = "#pytest_uuid_fetchmany" cursor.execute(f"DROP TABLE IF EXISTS {table_name}") cursor.execute( f""" @@ -8464,9 +8447,9 @@ def test_fetchmany_uuids(cursor, db_connection): assert len(fetched_rows) == len(uuids_to_insert), "Fetched row count mismatch" for retrieved_uuid, retrieved_desc in fetched_rows: assert isinstance(retrieved_uuid, uuid.UUID) + expected_uuid = uuids_to_insert[retrieved_desc] + assert retrieved_uuid == expected_uuid finally: - # Reset to original value - mssql_python.native_uuid = original_value cursor.execute(f"DROP TABLE IF EXISTS {table_name}") db_connection.commit() @@ -8548,16 +8531,8 @@ def test_duplicate_uuid_inserts(cursor, db_connection): def test_extreme_uuids(cursor, db_connection): """Test inserting extreme but valid UUIDs.""" - import uuid - - # Save original setting - original_value = mssql_python.native_uuid - + table_name = "#pytest_uuid_extreme" try: - # Set native_uuid to True for this test - mssql_python.native_uuid = True - - table_name = "#pytest_uuid_extreme" cursor.execute(f"DROP TABLE IF EXISTS {table_name}") cursor.execute(f"CREATE TABLE {table_name} (id UNIQUEIDENTIFIER)") db_connection.commit() @@ -8578,8 +8553,6 @@ def test_extreme_uuids(cursor, db_connection): for uid in extreme_uuids: assert uid in fetched_uuids, f"Extreme UUID {uid} not retrieved correctly" finally: - # Reset to original value - mssql_python.native_uuid = original_value cursor.execute(f"DROP TABLE IF EXISTS {table_name}") db_connection.commit() @@ -13518,132 +13491,6 @@ def test_datetime_string_parameter_binding(cursor, db_connection): drop_table_if_exists(cursor, table_name) db_connection.commit() - -def test_native_uuid_setting(db_connection): - """Test that the native_uuid setting affects how UUID values are returned.""" - import uuid - - cursor = db_connection.cursor() - - # Create a temporary table with a UUID column - drop_table_if_exists(cursor, "#test_uuid") - cursor.execute("CREATE TABLE #test_uuid (id int, uuid_col uniqueidentifier)") - - # Generate a test UUID and insert it - test_uuid = uuid.uuid4() - cursor.execute("INSERT INTO #test_uuid VALUES (1, ?)", (test_uuid,)) - - # Save original setting - original_value = mssql_python.native_uuid - - try: - # Test with native_uuid = False - mssql_python.native_uuid = False - - cursor.execute("SELECT uuid_col FROM #test_uuid") - row = cursor.fetchone() - assert isinstance( - row[0], str - ), "With native_uuid=False, UUIDs should be returned as strings" - assert row[0] == str( - test_uuid - ), "UUID string value should match the original UUID" - - # Test with native_uuid = True - mssql_python.native_uuid = True - - cursor.execute("SELECT uuid_col FROM #test_uuid") - row = cursor.fetchone() - assert isinstance( - row[0], uuid.UUID - ), "With native_uuid=True, UUIDs should be returned as uuid.UUID objects" - assert row[0] == test_uuid, "UUID object should match the original UUID" - - finally: - # Reset to original value and clean up - mssql_python.native_uuid = original_value - drop_table_if_exists(cursor, "#test_uuid") - - -def test_wide_result_set_with_uuid(db_connection): - """Test UUID handling in wide result sets (performance test)""" - import uuid - import time - - # Store original setting - original_value = mssql_python.native_uuid - - cursor = db_connection.cursor() - try: - # Create a wide table with one UUID column - cursor.execute("DROP TABLE IF EXISTS #wide_uuid_test") - create_stmt = "CREATE TABLE #wide_uuid_test (id UNIQUEIDENTIFIER" - for i in range(1, 31): - create_stmt += f", col{i} VARCHAR(50)" - create_stmt += ")" - cursor.execute(create_stmt) - - # Insert test data - test_uuid = uuid.uuid4() - values = [test_uuid] - for i in range(1, 31): - values.append(f"Value {i}") - - placeholders = ", ".join(["?"] * 31) - cursor.execute(f"INSERT INTO #wide_uuid_test VALUES ({placeholders})", values) - - # Test with native_uuid = True - mssql_python.native_uuid = True - - # Check if _uuid_indices is populated - cursor.execute("SELECT * FROM #wide_uuid_test") - assert hasattr(cursor, "_uuid_indices"), "UUID indices not identified" - assert cursor._uuid_indices == [0], "Expected UUID at index 0" - - # Verify correct conversion - row = cursor.fetchone() - assert isinstance(row[0], uuid.UUID), "UUID not converted to uuid.UUID object" - assert row[0] == test_uuid, "UUID value mismatch" - - # Verify all other columns remain strings - for i in range(1, 31): - assert isinstance(row[i], str), f"Column {i} should be a string" - - finally: - mssql_python.native_uuid = original_value - - -def test_null_uuid_column(db_connection): - """Test handling NULL values in UUID columns""" - import uuid - - # Store original setting - original_value = mssql_python.native_uuid - - cursor = db_connection.cursor() - try: - # Create test table - cursor.execute("DROP TABLE IF EXISTS #null_uuid_test") - cursor.execute( - "CREATE TABLE #null_uuid_test (id INT, uuid_col UNIQUEIDENTIFIER)" - ) - - # Insert NULL UUID - cursor.execute("INSERT INTO #null_uuid_test VALUES (1, NULL)") - - # Test with native_uuid = True - mssql_python.native_uuid = True - - cursor.execute("SELECT * FROM #null_uuid_test") - row = cursor.fetchone() - - # NULL should remain None - assert row[1] is None, "NULL UUID should remain None" - - finally: - mssql_python.native_uuid = original_value - - # --------------------------------------------------------- # Test 1: Basic numeric insertion and fetch roundtrip # ---------------------------------------------------------