diff --git a/mssql_python/connection.py b/mssql_python/connection.py index 857496d8..f3a81f09 100644 --- a/mssql_python/connection.py +++ b/mssql_python/connection.py @@ -59,8 +59,7 @@ def __init__(self, connection_str: str = "", autocommit: bool = False, attrs_bef ) self._attrs_before = attrs_before or {} self._pooling = PoolingManager.is_enabled() - self._conn = ddbc_bindings.Connection(self.connection_str, autocommit, self._pooling) - self._conn.connect(self._attrs_before) + self._conn = ddbc_bindings.Connection(self.connection_str, self._pooling, self._attrs_before) self.setautocommit(autocommit) def _construct_connection_string(self, connection_str: str = "", **kwargs) -> str: diff --git a/mssql_python/pybind/CMakeLists.txt b/mssql_python/pybind/CMakeLists.txt index dceb2efc..aea9a323 100644 --- a/mssql_python/pybind/CMakeLists.txt +++ b/mssql_python/pybind/CMakeLists.txt @@ -90,7 +90,7 @@ execute_process( ) # Add module library -add_library(ddbc_bindings MODULE ddbc_bindings.cpp connection/connection.cpp) +add_library(ddbc_bindings MODULE ddbc_bindings.cpp connection/connection.cpp connection/connection_pool.cpp) # Add include directories for your project target_include_directories(ddbc_bindings PRIVATE diff --git a/mssql_python/pybind/connection/connection.cpp b/mssql_python/pybind/connection/connection.cpp index 8073c249..58f35ae4 100644 --- a/mssql_python/pybind/connection/connection.cpp +++ b/mssql_python/pybind/connection/connection.cpp @@ -5,34 +5,41 @@ // taken up in future #include "connection.h" +#include "connection_pool.h" #include #include #define SQL_COPT_SS_ACCESS_TOKEN 1256 // Custom attribute ID for access token -SqlHandlePtr Connection::_envHandle = nullptr; -//------------------------------------------------------------------------------------------------- -// Implements the Connection class declared in connection.h. -// This class wraps low-level ODBC operations like connect/disconnect, -// transaction control, and autocommit configuration. -//------------------------------------------------------------------------------------------------- -Connection::Connection(const std::wstring& conn_str, bool autocommit, bool use_pooling) - : _connStr(conn_str) , _autocommit(autocommit), _usePool(use_pooling) { - if (!_envHandle) { - LOG("Allocating environment handle"); - SQLHANDLE env = nullptr; +static SqlHandlePtr getEnvHandle() { + static SqlHandlePtr envHandle = []() -> SqlHandlePtr { + LOG("Allocating ODBC environment handle"); if (!SQLAllocHandle_ptr) { LOG("Function pointers not initialized, loading driver"); DriverLoader::getInstance().loadDriver(); } + SQLHANDLE env = nullptr; SQLRETURN ret = SQLAllocHandle_ptr(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env); - checkError(ret); - _envHandle = std::make_shared(SQL_HANDLE_ENV, env); + if (!SQL_SUCCEEDED(ret)) { + ThrowStdException("Failed to allocate environment handle"); + } + ret = SQLSetEnvAttr_ptr(env, SQL_ATTR_ODBC_VERSION, (void*)SQL_OV_ODBC3_80, 0); + if (!SQL_SUCCEEDED(ret)) { + ThrowStdException("Failed to set environment attributes"); + } + return std::make_shared(SQL_HANDLE_ENV, env); + }(); - LOG("Setting environment attributes"); - ret = SQLSetEnvAttr_ptr(_envHandle->get(), SQL_ATTR_ODBC_VERSION, (void*)SQL_OV_ODBC3_80, 0); - checkError(ret); - } + return envHandle; +} + +//------------------------------------------------------------------------------------------------- +// Implements the Connection class declared in connection.h. +// This class wraps low-level ODBC operations like connect/disconnect, +// transaction control, and autocommit configuration. +//------------------------------------------------------------------------------------------------- +Connection::Connection(const std::wstring& conn_str, bool use_pool) + : _connStr(conn_str), _autocommit(false), _fromPool(use_pool) { allocateDbcHandle(); } @@ -42,6 +49,7 @@ Connection::~Connection() { // Allocates connection handle void Connection::allocateDbcHandle() { + auto _envHandle = getEnvHandle(); SQLHANDLE dbc = nullptr; LOG("Allocate SQL Connection Handle"); SQLRETURN ret = SQLAllocHandle_ptr(SQL_HANDLE_DBC, _envHandle->get(), &dbc); @@ -64,6 +72,7 @@ void Connection::connect(const py::dict& attrs_before) { (SQLWCHAR*)_connStr.c_str(), SQL_NTS, nullptr, 0, nullptr, SQL_DRIVER_NOPROMPT); checkError(ret); + updateLastUsed(); } void Connection::disconnect() { @@ -91,6 +100,7 @@ void Connection::commit() { if (!_dbcHandle) { ThrowStdException("Connection handle not allocated"); } + updateLastUsed(); LOG("Committing transaction"); SQLRETURN ret = SQLEndTran_ptr(SQL_HANDLE_DBC, _dbcHandle->get(), SQL_COMMIT); checkError(ret); @@ -100,6 +110,7 @@ void Connection::rollback() { if (!_dbcHandle) { ThrowStdException("Connection handle not allocated"); } + updateLastUsed(); LOG("Rolling back transaction"); SQLRETURN ret = SQLEndTran_ptr(SQL_HANDLE_DBC, _dbcHandle->get(), SQL_ROLLBACK); checkError(ret); @@ -132,6 +143,7 @@ SqlHandlePtr Connection::allocStatementHandle() { if (!_dbcHandle) { ThrowStdException("Connection handle not allocated"); } + updateLastUsed(); LOG("Allocating statement handle"); SQLHANDLE stmt = nullptr; SQLRETURN ret = SQLAllocHandle_ptr(SQL_HANDLE_STMT, _dbcHandle->get(), &stmt); @@ -185,4 +197,105 @@ void Connection::applyAttrsBefore(const py::dict& attrs) { } } } +} + +bool Connection::isAlive() const { + if (!_dbcHandle) { + ThrowStdException("Connection handle not allocated"); + } + SQLUINTEGER status; + SQLRETURN ret = SQLGetConnectAttr_ptr(_dbcHandle->get(), SQL_ATTR_CONNECTION_DEAD, + &status, 0, nullptr); + return SQL_SUCCEEDED(ret) && status == SQL_CD_FALSE; +} + +bool Connection::reset() { + if (!_dbcHandle) { + ThrowStdException("Connection handle not allocated"); + } + LOG("Resetting connection via SQL_ATTR_RESET_CONNECTION"); + SQLULEN reset = SQL_TRUE; + SQLRETURN ret = SQLSetConnectAttr_ptr( + _dbcHandle->get(), + SQL_ATTR_RESET_CONNECTION, + (SQLPOINTER)SQL_RESET_CONNECTION_YES, + SQL_IS_INTEGER); + if (!SQL_SUCCEEDED(ret)) { + LOG("Failed to reset connection. Marking as dead."); + disconnect(); + return false; + } + updateLastUsed(); + return true; +} + +void Connection::updateLastUsed() { + _lastUsed = std::chrono::steady_clock::now(); +} + +std::chrono::steady_clock::time_point Connection::lastUsed() const { + return _lastUsed; +} + +ConnectionHandle::ConnectionHandle(const std::wstring& connStr, bool usePool, const py::dict& attrsBefore) + : _connStr(connStr), _usePool(usePool) { + if (_usePool) { + _conn = ConnectionPoolManager::getInstance().acquireConnection(connStr, attrsBefore); + } else { + _conn = std::make_shared(connStr, false); + _conn->connect(attrsBefore); + } +} + +ConnectionHandle::~ConnectionHandle() { + if (_conn) { + close(); + } +} + +void ConnectionHandle::close() { + if (!_conn) { + ThrowStdException("Connection object is not initialized"); + } + if (_usePool) { + ConnectionPoolManager::getInstance().returnConnection(_connStr, _conn); + } else { + _conn->disconnect(); + } + _conn = nullptr; +} + +void ConnectionHandle::commit() { + if (!_conn) { + ThrowStdException("Connection object is not initialized"); + } + _conn->commit(); +} + +void ConnectionHandle::rollback() { + if (!_conn) { + ThrowStdException("Connection object is not initialized"); + } + _conn->rollback(); +} + +void ConnectionHandle::setAutocommit(bool enabled) { + if (!_conn) { + ThrowStdException("Connection object is not initialized"); + } + _conn->setAutocommit(enabled); +} + +bool ConnectionHandle::getAutocommit() const { + if (!_conn) { + ThrowStdException("Connection object is not initialized"); + } + return _conn->getAutocommit(); +} + +SqlHandlePtr ConnectionHandle::allocStatementHandle() { + if (!_conn) { + ThrowStdException("Connection object is not initialized"); + } + return _conn->allocStatementHandle(); } \ No newline at end of file diff --git a/mssql_python/pybind/connection/connection.h b/mssql_python/pybind/connection/connection.h index 24e07e0a..b9cc50b6 100644 --- a/mssql_python/pybind/connection/connection.h +++ b/mssql_python/pybind/connection/connection.h @@ -13,7 +13,8 @@ class Connection { public: - Connection(const std::wstring& conn_str, bool autocommit = false, bool use_pooling = false); + Connection(const std::wstring& connStr, bool fromPool); + ~Connection(); // Establish the connection using the stored connection string. @@ -33,6 +34,10 @@ class Connection { // Check whether autocommit is enabled. bool getAutocommit() const; + bool isAlive() const; + bool reset(); + void updateLastUsed(); + std::chrono::steady_clock::time_point lastUsed() const; // Allocate a new statement handle on this connection. SqlHandlePtr allocStatementHandle(); @@ -44,9 +49,26 @@ class Connection { void applyAttrsBefore(const py::dict& attrs_before); std::wstring _connStr; - bool _usePool = false; + bool _fromPool = false; bool _autocommit = true; SqlHandlePtr _dbcHandle; - - static SqlHandlePtr _envHandle; + std::chrono::steady_clock::time_point _lastUsed; }; + +class ConnectionHandle { +public: + ConnectionHandle(const std::wstring& connStr, bool usePool, const py::dict& attrsBefore = py::dict()); + ~ConnectionHandle(); + + void close(); + void commit(); + void rollback(); + void setAutocommit(bool enabled); + bool getAutocommit() const; + SqlHandlePtr allocStatementHandle(); + +private: + std::shared_ptr _conn; + bool _usePool; + std::wstring _connStr; +}; \ No newline at end of file diff --git a/mssql_python/pybind/connection/connection_pool.cpp b/mssql_python/pybind/connection/connection_pool.cpp new file mode 100644 index 00000000..dbe2765e --- /dev/null +++ b/mssql_python/pybind/connection/connection_pool.cpp @@ -0,0 +1,114 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +// INFO|TODO - Note that is file is Windows specific right now. Making it arch agnostic will be +// taken up in future. + +#include "connection_pool.h" +#include +#include + +ConnectionPool::ConnectionPool(size_t max_size, int idle_timeout_secs) + : _max_size(max_size), _idle_timeout_secs(idle_timeout_secs), _current_size(0) {} + +std::shared_ptr ConnectionPool::acquire(const std::wstring& connStr, const py::dict& attrs_before) { + std::vector> to_disconnect; + std::shared_ptr valid_conn = nullptr; + { + std::lock_guard lock(_mutex); + auto now = std::chrono::steady_clock::now(); + size_t before = _pool.size(); + + // Phase 1: Remove stale connections, collect for later disconnect + _pool.erase(std::remove_if(_pool.begin(), _pool.end(), + [&](const std::shared_ptr& conn) { + auto idle_time = std::chrono::duration_cast(now - conn->lastUsed()).count(); + if (idle_time > _idle_timeout_secs) { + to_disconnect.push_back(conn); + return true; + } + return false; + }), _pool.end()); + + size_t pruned = before - _pool.size(); + _current_size = (_current_size >= pruned) ? (_current_size - pruned) : 0; + + // Phase 2: Attempt to reuse healthy connections + while (!_pool.empty()) { + auto conn = _pool.front(); + _pool.pop_front(); + if (conn->isAlive()) { + if (!conn->reset()) { + to_disconnect.push_back(conn); + --_current_size; + continue; + } + valid_conn = conn; + break; + } else { + to_disconnect.push_back(conn); + --_current_size; + } + } + + // Create new connection if none reusable + if (!valid_conn && _current_size < _max_size) { + valid_conn = std::make_shared(connStr, true); + valid_conn->connect(attrs_before); + ++_current_size; + } else if (!valid_conn) { + throw std::runtime_error("ConnectionPool::acquire: pool size limit reached"); + } + } + + // Phase 3: Disconnect expired/bad connections outside lock + for (auto& conn : to_disconnect) { + try { + conn->disconnect(); + } catch (const std::exception& ex) { + std::cout << "disconnect() failed: " << ex.what() << std::endl; + } + } + return valid_conn; +} + +void ConnectionPool::release(std::shared_ptr conn) { + std::lock_guard lock(_mutex); + if (_pool.size() < _max_size) { + conn->updateLastUsed(); + _pool.push_back(conn); + } + else { + conn->disconnect(); + if (_current_size > 0) --_current_size; + } +} + +ConnectionPoolManager& ConnectionPoolManager::getInstance() { + static ConnectionPoolManager manager; + return manager; +} + +std::shared_ptr ConnectionPoolManager::acquireConnection(const std::wstring& connStr, const py::dict& attrs_before) { + std::lock_guard lock(_manager_mutex); + + auto& pool = _pools[connStr]; + if (!pool) { + LOG("Creating new connection pool"); + pool = std::make_shared(_default_max_size, _default_idle_secs); + } + return pool->acquire(connStr, attrs_before); +} + +void ConnectionPoolManager::returnConnection(const std::wstring& conn_str, const std::shared_ptr conn) { + std::lock_guard lock(_manager_mutex); + if (_pools.find(conn_str) != _pools.end()) { + _pools[conn_str]->release((conn)); + } +} + +void ConnectionPoolManager::configure(int max_size, int idle_timeout_secs) { + std::lock_guard lock(_manager_mutex); + _default_max_size = max_size; + _default_idle_secs = idle_timeout_secs; +} \ No newline at end of file diff --git a/mssql_python/pybind/connection/connection_pool.h b/mssql_python/pybind/connection/connection_pool.h new file mode 100644 index 00000000..f4a5dbfd --- /dev/null +++ b/mssql_python/pybind/connection/connection_pool.h @@ -0,0 +1,64 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +// INFO|TODO - Note that is file is Windows specific right now. Making it arch agnostic will be +// taken up in future. + +#pragma once +#include +#include +#include +#include +#include +#include +#include "connection.h" + +// Manages a fixed-size pool of reusable database connections for a single connection string +class ConnectionPool { +public: + ConnectionPool(size_t max_size, int idle_timeout_secs); + + // Acquires a connection from the pool or creates a new one if under limit + std::shared_ptr acquire(const std::wstring& connStr, const py::dict& attrs_before = py::dict()); + + // Returns a connection to the pool for reuse + void release(std::shared_ptr conn); + +private: + size_t _max_size; // Maximum number of connections allowed + int _idle_timeout_secs; // Idle time before connections are considered stale + size_t _current_size = 0; + std::deque> _pool; // Available connections + std::mutex _mutex; // Mutex for thread-safe access +}; + +// Singleton manager that handles multiple pools keyed by connection string +class ConnectionPoolManager { +public: + // Returns the singleton instance of the manager + static ConnectionPoolManager& getInstance(); + + void configure(int max_size, int idle_timeout); + + // Gets a connection from the appropriate pool (creates one if none exists) + std::shared_ptr acquireConnection(const std::wstring& conn_str, const py::dict& attrs_before = py::dict()); + + // Returns a connection to its original pool + void returnConnection(const std::wstring& conn_str, std::shared_ptr conn); + +private: + ConnectionPoolManager() = default; + ~ConnectionPoolManager() = default; + + // Map from connection string to connection pool + std::unordered_map> _pools; + + // Protects access to the _pools map + std::mutex _manager_mutex; + size_t _default_max_size = 10; + int _default_idle_secs = 300; + + // Prevent copying + ConnectionPoolManager(const ConnectionPoolManager&) = delete; + ConnectionPoolManager& operator=(const ConnectionPoolManager&) = delete; +}; diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index 06fb8cfe..f480fe2a 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -5,6 +5,7 @@ // taken up in beta release #include "ddbc_bindings.h" #include "connection/connection.h" +#include "connection/connection_pool.h" #include #include // std::setw, std::setfill @@ -1876,6 +1877,13 @@ SQLLEN SQLRowCount_wrap(SqlHandlePtr StatementHandle) { return rowCount; } +static std::once_flag pooling_init_flag; +void enable_pooling(int maxSize, int idleTimeout) { + std::call_once(pooling_init_flag, [&]() { + ConnectionPoolManager::getInstance().configure(maxSize, idleTimeout); + }); +} + // Architecture-specific defines #ifndef ARCHITECTURE #define ARCHITECTURE "win64" // Default to win64 if not defined during compilation @@ -1919,17 +1927,16 @@ PYBIND11_MODULE(ddbc_bindings, m) { py::class_(m, "SqlHandle") .def("free", &SqlHandle::free, "Free the handle"); - py::class_(m, "Connection") - .def(py::init(), py::arg("conn_str"), py::arg("autocommit") = false, py::arg("use_pooling") = false, - "Create a new connection with the given connection string, " - "autocommit mode, and pooling option") - .def("connect", &Connection::connect) - .def("close", &Connection::disconnect, "Close the connection") - .def("commit", &Connection::commit, "Commit the current transaction") - .def("rollback", &Connection::rollback, "Rollback the current transaction") - .def("set_autocommit", &Connection::setAutocommit) - .def("get_autocommit", &Connection::getAutocommit) - .def("alloc_statement_handle", &Connection::allocStatementHandle); + + py::class_(m, "Connection") + .def(py::init(), py::arg("conn_str"), py::arg("use_pool"), py::arg("attrs_before") = py::dict()) + .def("close", &ConnectionHandle::close, "Close the connection") + .def("commit", &ConnectionHandle::commit, "Commit the current transaction") + .def("rollback", &ConnectionHandle::rollback, "Rollback the current transaction") + .def("set_autocommit", &ConnectionHandle::setAutocommit) + .def("get_autocommit", &ConnectionHandle::getAutocommit) + .def("alloc_statement_handle", &ConnectionHandle::allocStatementHandle); + m.def("enable_pooling", &enable_pooling, "Enable global connection pooling"); m.def("DDBCSQLExecDirect", &SQLExecDirect_wrap, "Execute a SQL query directly"); m.def("DDBCSQLExecute", &SQLExecute_wrap, "Prepare and execute T-SQL statements"); m.def("DDBCSQLRowCount", &SQLRowCount_wrap,