Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions mssql_python/pybind/ddbc_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2805,7 +2805,6 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p
microseconds,
tzinfo
);
py_dt = py_dt.attr("astimezone")(datetime.attr("timezone").attr("utc"));
row.append(py_dt);
} else {
LOG("Error fetching DATETIMEOFFSET for column {}, ret={}", i, ret);
Expand Down Expand Up @@ -3318,7 +3317,6 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum
dtoValue.fraction / 1000, // ns → µs
tzinfo
);
py_dt = py_dt.attr("astimezone")(datetime.attr("timezone").attr("utc"));
row.append(py_dt);
} else {
row.append(py::none());
Expand Down
72 changes: 39 additions & 33 deletions tests/test_004_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7887,12 +7887,7 @@ def test_datetimeoffset_read_write(cursor, db_connection):
assert row is not None
fetched_id, fetched_dt = row
assert fetched_dt.tzinfo is not None
expected_utc = dt.astimezone(timezone.utc)
fetched_utc = fetched_dt.astimezone(timezone.utc)
# Ignore sub-microsecond differences
expected_utc = expected_utc.replace(microsecond=int(expected_utc.microsecond / 1000) * 1000)
fetched_utc = fetched_utc.replace(microsecond=int(fetched_utc.microsecond / 1000) * 1000)
assert fetched_utc == expected_utc
assert fetched_dt == dt
finally:
cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_read_write;")
db_connection.commit()
Expand Down Expand Up @@ -7926,12 +7921,7 @@ def test_datetimeoffset_max_min_offsets(cursor, db_connection):
assert fetched_id == expected_id, f"ID mismatch: expected {expected_id}, got {fetched_id}"
assert fetched_dt.tzinfo is not None, f"Fetched datetime object is naive for id {fetched_id}"

# Compare in UTC to avoid offset differences
expected_utc = expected_dt.astimezone(timezone.utc).replace(tzinfo=None)
fetched_utc = fetched_dt.astimezone(timezone.utc).replace(tzinfo=None)
assert fetched_utc == expected_utc, (
f"Value mismatch for id {expected_id}: expected UTC {expected_utc}, got {fetched_utc}"
)
assert fetched_dt == expected_dt, f"Value mismatch for id {expected_id}: expected {expected_dt}, got {fetched_dt}"

finally:
cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_read_write;")
Expand Down Expand Up @@ -7986,12 +7976,7 @@ def test_datetimeoffset_dst_transitions(cursor, db_connection):
assert fetched_id == expected_id, f"ID mismatch: expected {expected_id}, got {fetched_id}"
assert fetched_dt.tzinfo is not None, f"Fetched datetime object is naive for id {fetched_id}"

# Compare UTC time to avoid issues due to offsets changing in DST
expected_utc = expected_dt.astimezone(timezone.utc).replace(tzinfo=None)
fetched_utc = fetched_dt.astimezone(timezone.utc).replace(tzinfo=None)
assert fetched_utc == expected_utc, (
f"Value mismatch for id {expected_id}: expected UTC {expected_utc}, got {fetched_utc}"
)
assert fetched_dt == expected_dt, f"Value mismatch for id {expected_id}: expected {expected_dt}, got {fetched_dt}"

finally:
cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_dst_transitions;")
Expand Down Expand Up @@ -8068,17 +8053,7 @@ def test_datetimeoffset_executemany(cursor, db_connection):
fetched_id, fetched_dto = rows[i]
assert fetched_dto.tzinfo is not None, "Fetched datetime object is naive."

expected_utc = python_dt.astimezone(timezone.utc).replace(tzinfo=None)
fetched_utc = fetched_dto.astimezone(timezone.utc).replace(tzinfo=None)

# Round microseconds to nearest millisecond for comparison
expected_utc = expected_utc.replace(microsecond=int(expected_utc.microsecond / 1000) * 1000)
fetched_utc = fetched_utc.replace(microsecond=int(fetched_utc.microsecond / 1000) * 1000)

assert fetched_utc == expected_utc, (
f"Value mismatch for test case {i}. "
f"Expected UTC: {expected_utc}, Got UTC: {fetched_utc}"
)
assert fetched_dto == python_dt, f"Value mismatch for id {fetched_id}: expected {python_dt}, got {fetched_dto}"
finally:
cursor.execute("IF OBJECT_ID('tempdb..#pytest_dto', 'U') IS NOT NULL DROP TABLE #pytest_dto;")
db_connection.commit()
Expand Down Expand Up @@ -8144,13 +8119,44 @@ def test_datetimeoffset_extreme_offsets(cursor, db_connection):
for i, dt in enumerate(extreme_offsets):
_, fetched = rows[i]
assert fetched.tzinfo is not None
# Round-trip comparison via UTC
expected_utc = dt.astimezone(timezone.utc).replace(tzinfo=None)
fetched_utc = fetched.astimezone(timezone.utc).replace(tzinfo=None)
assert expected_utc == fetched_utc, f"Extreme offset round-trip failed for {dt.tzinfo}"
assert fetched == dt, f"Value mismatch for id {i}: expected {dt}, got {fetched}"
finally:
cursor.execute("IF OBJECT_ID('tempdb..#pytest_dto', 'U') IS NOT NULL DROP TABLE #pytest_dto;")
db_connection.commit()

def test_datetimeoffset_native_vs_string_simple(cursor, db_connection):
"""
Replicates the user's testing scenario: fetch DATETIMEOFFSET as native datetime
and as string using CONVERT(nvarchar(35), ..., 121).
"""
try:
cursor.execute("CREATE TABLE #pytest_dto_user_test (id INT PRIMARY KEY, Systime DATETIMEOFFSET);")
db_connection.commit()

# Insert rows similar to user's example
test_rows = [
(1, datetime(2025, 5, 14, 12, 35, 52, 501000, tzinfo=timezone(timedelta(hours=1)))),
(2, datetime(2025, 5, 14, 15, 20, 30, 123000, tzinfo=timezone(timedelta(hours=-5))))
]

for i, dt in test_rows:
cursor.execute("INSERT INTO #pytest_dto_user_test (id, Systime) VALUES (?, ?);", i, dt)
db_connection.commit()

# Native fetch (like the user's first execute)
cursor.execute("SELECT Systime FROM #pytest_dto_user_test WHERE id=1;")
dt_native = cursor.fetchone()[0]
assert dt_native.tzinfo is not None
assert dt_native == test_rows[0][1]

# String fetch (like the user's convert to nvarchar)
cursor.execute("SELECT CONVERT(nvarchar(35), Systime, 121) FROM #pytest_dto_user_test WHERE id=1;")
dt_str = cursor.fetchone()[0]
assert dt_str.endswith("+01:00") # original offset preserved

finally:
cursor.execute("DROP TABLE IF EXISTS #pytest_dto_user_test;")
db_connection.commit()

def test_lowercase_attribute(cursor, db_connection):
"""Test that the lowercase attribute properly converts column names to lowercase"""
Expand Down
Loading