Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions mssql_python/pooling.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
class PoolingManager:
_enabled = False
_initialized = False
_pools_closed = False # Track if pools have been closed
_lock = threading.Lock()
_config = {
"max_size": 100,
Expand All @@ -30,6 +31,9 @@ def enable(cls, max_size=100, idle_timeout=600):
@classmethod
def disable(cls):
with cls._lock:
if cls._enabled and not cls._pools_closed: # Only cleanup if enabled and not already closed
ddbc_bindings.close_pooling()
cls._pools_closed = True
cls._enabled = False
cls._initialized = True

Expand All @@ -40,8 +44,18 @@ def is_enabled(cls):
@classmethod
def is_initialized(cls):
return cls._initialized

@classmethod
def _reset_for_testing(cls):
"""Reset pooling state - for testing purposes only"""
with cls._lock:
cls._enabled = False
cls._initialized = False
cls._pools_closed = False

@atexit.register
def shutdown_pooling():
if PoolingManager.is_enabled():
ddbc_bindings.close_pooling()
with PoolingManager._lock:
if PoolingManager._enabled and not PoolingManager._pools_closed:
ddbc_bindings.close_pooling()
PoolingManager._pools_closed = True
228 changes: 1 addition & 227 deletions tests/test_003_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
- test_commit: Make a transaction and commit.
- test_rollback: Make a transaction and rollback.
- test_invalid_connection_string: Check if initializing with an invalid connection string raises an exception.
- test_connection_pooling_speed: Test connection pooling speed.
- test_connection_pooling_basic: Test basic connection pooling functionality.
- test_autocommit_default: Check if autocommit is False by default.
- test_autocommit_setter: Test setting autocommit mode and its effect on transactions.
- test_set_autocommit: Test the setautocommit method.
Expand Down Expand Up @@ -288,131 +286,6 @@ def test_connection_close(conn_str):
# Check if the database connection can be closed
temp_conn.close()

def test_connection_pooling_speed(conn_str):
"""Test that connection pooling provides performance benefits over multiple iterations."""
import statistics

# Warm up to eliminate cold start effects
for _ in range(3):
conn = connect(conn_str)
conn.close()

# Disable pooling first
pooling(enabled=False)

# Test without pooling (multiple times)
no_pool_times = []
for _ in range(10):
start = time.perf_counter()
conn = connect(conn_str)
conn.close()
end = time.perf_counter()
no_pool_times.append(end - start)

# Enable pooling
pooling(max_size=5, idle_timeout=30)

# Test with pooling (multiple times)
pool_times = []
for _ in range(10):
start = time.perf_counter()
conn = connect(conn_str)
conn.close()
end = time.perf_counter()
pool_times.append(end - start)

# Use median times to reduce impact of outliers
median_no_pool = statistics.median(no_pool_times)
median_pool = statistics.median(pool_times)

# Allow for some variance - pooling should be at least 30% faster on average
improvement_threshold = 0.7 # Pool should be <= 70% of no-pool time

print(f"No pool median: {median_no_pool:.6f}s")
print(f"Pool median: {median_pool:.6f}s")
print(f"Improvement ratio: {median_pool/median_no_pool:.2f}")

# Clean up - disable pooling for other tests
pooling(enabled=False)

assert median_pool <= median_no_pool * improvement_threshold, \
f"Expected pooling to be at least 30% faster. No-pool: {median_no_pool:.6f}s, Pool: {median_pool:.6f}s"

def test_connection_pooling_reuse_spid(conn_str):
"""Test that connections are actually reused from the pool"""
# Enable pooling
pooling(max_size=1, idle_timeout=30)

# Create and close a connection
conn1 = connect(conn_str)
cursor1 = conn1.cursor()
cursor1.execute("SELECT @@SPID") # Get SQL Server process ID
spid1 = cursor1.fetchone()[0]
conn1.close()

# Get another connection - should be the same one from pool
conn2 = connect(conn_str)
cursor2 = conn2.cursor()
cursor2.execute("SELECT @@SPID")
spid2 = cursor2.fetchone()[0]
conn2.close()

# The SPID should be the same, indicating connection reuse
assert spid1 == spid2, "Connections not reused - different SPIDs"

# Clean up

def test_pool_exhaustion_max_size_1(conn_str):
"""Test pool exhaustion when max_size=1 and multiple concurrent connections are requested."""
pooling(max_size=1, idle_timeout=30)
conn1 = connect(conn_str)
results = []

def try_connect():
try:
conn2 = connect(conn_str)
results.append("success")
conn2.close()
except Exception as e:
results.append(str(e))

# Start a thread that will attempt to get a second connection while the first is open
t = threading.Thread(target=try_connect)
t.start()
t.join(timeout=2)
conn1.close()

# Depending on implementation, either blocks, raises, or times out
assert results, "Second connection attempt did not complete"
# If pool blocks, the thread may not finish until conn1 is closed, so allow both outcomes
assert results[0] == "success" or "pool" in results[0].lower() or "timeout" in results[0].lower(), \
f"Unexpected pool exhaustion result: {results[0]}"
pooling(enabled=False)

def test_pool_idle_timeout_removes_connections(conn_str):
"""Test that idle_timeout removes connections from the pool after the timeout."""
pooling(max_size=2, idle_timeout=2)
conn1 = connect(conn_str)
spid_list = []
cursor1 = conn1.cursor()
cursor1.execute("SELECT @@SPID")
spid1 = cursor1.fetchone()[0]
spid_list.append(spid1)
conn1.close()

# Wait for longer than idle_timeout
time.sleep(3)

# Get a new connection, which should not reuse the previous SPID
conn2 = connect(conn_str)
cursor2 = conn2.cursor()
cursor2.execute("SELECT @@SPID")
spid2 = cursor2.fetchone()[0]
spid_list.append(spid2)
conn2.close()

assert spid1 != spid2, "Idle timeout did not remove connection from pool"

def test_connection_timeout_invalid_password(conn_str):
"""Test that connecting with an invalid password raises an exception quickly (timeout)."""
# Modify the connection string to use an invalid password
Expand All @@ -429,6 +302,7 @@ def test_connection_timeout_invalid_password(conn_str):
# Should fail quickly (within 10 seconds)
assert elapsed < 10, f"Connection with invalid password took too long: {elapsed:.2f}s"


def test_connection_timeout_invalid_host(conn_str):
"""Test that connecting to an invalid host fails with a timeout."""
# Replace server/host with an invalid one
Expand All @@ -449,106 +323,6 @@ def test_connection_timeout_invalid_host(conn_str):
# If it takes too long, it may indicate a misconfiguration or network issue.
assert elapsed < 30, f"Connection to invalid host took too long: {elapsed:.2f}s"

def test_pool_removes_invalid_connections(conn_str):
"""Test that the pool removes connections that become invalid (simulate by closing underlying connection)."""
pooling(max_size=1, idle_timeout=30)
conn = connect(conn_str)
cursor = conn.cursor()
cursor.execute("SELECT 1")
# Simulate invalidation by forcibly closing the connection at the driver level
try:
# Try to access a private attribute or method to forcibly close the underlying connection
# This is implementation-specific; if not possible, skip
if hasattr(conn, "_conn") and hasattr(conn._conn, "close"):
conn._conn.close()
else:
pytest.skip("Cannot forcibly close underlying connection for this driver")
except Exception:
pass
# Safely close the connection, ignoring errors due to forced invalidation
try:
conn.close()
except RuntimeError as e:
if "not initialized" not in str(e):
raise
# Now, get a new connection from the pool and ensure it works
new_conn = connect(conn_str)
new_cursor = new_conn.cursor()
try:
new_cursor.execute("SELECT 1")
result = new_cursor.fetchone()
assert result is not None and result[0] == 1, "Pool did not remove invalid connection"
finally:
new_conn.close()
pooling(enabled=False)

def test_pool_recovery_after_failed_connection(conn_str):
"""Test that the pool recovers after a failed connection attempt."""
pooling(max_size=1, idle_timeout=30)
# First, try to connect with a bad password (should fail)
if "Pwd=" in conn_str:
bad_conn_str = conn_str.replace("Pwd=", "Pwd=wrongpassword")
elif "Password=" in conn_str:
bad_conn_str = conn_str.replace("Password=", "Password=wrongpassword")
else:
pytest.skip("No password found in connection string to modify")
with pytest.raises(Exception):
connect(bad_conn_str)
# Now, connect with the correct string and ensure it works
conn = connect(conn_str)
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
assert result is not None and result[0] == 1, "Pool did not recover after failed connection"
conn.close()
pooling(enabled=False)

def test_pool_capacity_limit_and_overflow(conn_str):
"""Test that pool does not grow beyond max_size and handles overflow gracefully."""
pooling(max_size=2, idle_timeout=30)
conns = []
try:
# Open up to max_size connections
conns.append(connect(conn_str))
conns.append(connect(conn_str))
# Try to open a third connection, which should fail or block
overflow_result = []
def try_overflow():
try:
c = connect(conn_str)
overflow_result.append("success")
c.close()
except Exception as e:
overflow_result.append(str(e))
t = threading.Thread(target=try_overflow)
t.start()
t.join(timeout=2)
assert overflow_result, "Overflow connection attempt did not complete"
# Accept either block, error, or success if pool implementation allows overflow
assert overflow_result[0] == "success" or "pool" in overflow_result[0].lower() or "timeout" in overflow_result[0].lower(), \
f"Unexpected pool overflow result: {overflow_result[0]}"
finally:
for c in conns:
c.close()
pooling(enabled=False)

def test_connection_pooling_basic(conn_str):
# Enable pooling with small pool size
pooling(max_size=2, idle_timeout=5)
conn1 = connect(conn_str)
conn2 = connect(conn_str)
assert conn1 is not None
assert conn2 is not None
try:
conn3 = connect(conn_str)
assert conn3 is not None, "Third connection failed — pooling is not working or limit is too strict"
conn3.close()
except Exception as e:
print(f"Expected: Could not open third connection due to max_size=2: {e}")

conn1.close()
conn2.close()

def test_context_manager_commit(conn_str):
"""Test that context manager closes connection on normal exit"""
# Create a permanent table for testing across connections
Expand Down
Loading