Skip to content

Commit

Permalink
Async driver support (#214)
Browse files Browse the repository at this point in the history
* feature: added async driver support

via asynch

* added tests for asynch driver

* added asynch to docs

* fixed sqlalchemy version

* fixed docs

* changed min python version

* updated actions

* returned way to import connector to drivers

* removed redundant tests

* removed unused imports
  • Loading branch information
randomowo committed Jan 12, 2023
1 parent 76af10a commit e2fe139
Show file tree
Hide file tree
Showing 26 changed files with 752 additions and 49 deletions.
1 change: 0 additions & 1 deletion .github/workflows/actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ jobs:
strategy:
matrix:
python-version:
- "3.6"
- "3.7"
- "3.8"
- "3.9"
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/sa-versions.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
on: [push, pull_request]
name: "SQLAlchemy 1.4.x versions"
name: "SQLAlchemy >=1.4.24 versions"
jobs:
tests:
runs-on: ubuntu-20.04
Expand All @@ -9,7 +9,7 @@ jobs:
- "3.8"
clickhouse-version:
- 19.16.2.2
sa-version: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44]
sa-version: [24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44]

name: ${{ matrix.python-version }} SA=1.4.${{ matrix.sa-version }}
steps:
Expand Down
3 changes: 2 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ Usage

Supported interfaces:

- **native** [recommended] (TCP) via `clickhouse-driver <https://github.com/mymarilyn/clickhouse-driver>`_
- **native** [recommended] (TCP) via `clickhouse-driver <https://github.com/mymarilyn/clickhouse-driver>`
- **async native** (TCP) via `asynch <https://github.com/long2ice/asynch>`
- **http** via requests

Define table
Expand Down
Empty file.
42 changes: 42 additions & 0 deletions clickhouse_sqlalchemy/drivers/asynch/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import asynch

from sqlalchemy.sql.elements import TextClause
from sqlalchemy.pool import AsyncAdaptedQueuePool

from .connector import AsyncAdapt_asynch_dbapi
from ..native.base import ClickHouseDialect_native

# Export connector version
VERSION = (0, 0, 1, None)


class ClickHouseDialect_asynch(ClickHouseDialect_native):
driver = 'asynch'

is_async = True
supports_statement_cache = True

@classmethod
def dbapi(cls):
return AsyncAdapt_asynch_dbapi(asynch)

@classmethod
def get_pool_class(cls, url):
return AsyncAdaptedQueuePool

def _execute(self, connection, sql, scalar=False, **kwargs):
if isinstance(sql, str):
# Makes sure the query will go through the
# `ClickHouseExecutionContext` logic.
sql = TextClause(sql)
f = connection.scalar if scalar else connection.execute
return f(sql, parameters=kwargs)

def do_execute(self, cursor, statement, parameters, context=None):
cursor.execute(statement, parameters, context)

def do_executemany(self, cursor, statement, parameters, context=None):
cursor.executemany(statement, parameters, context)


dialect = ClickHouseDialect_asynch
185 changes: 185 additions & 0 deletions clickhouse_sqlalchemy/drivers/asynch/connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import asyncio

from sqlalchemy.engine.interfaces import AdaptedConnection
from sqlalchemy.util.concurrency import await_only


class AsyncAdapt_asynch_cursor:
__slots__ = (
'_adapt_connection',
'_connection',
'await_',
'_cursor',
'_rows'
)

def __init__(self, adapt_connection):
self._adapt_connection = adapt_connection
self._connection = adapt_connection._connection # noqa
self.await_ = adapt_connection.await_

cursor = self._connection.cursor()

self._cursor = self.await_(cursor.__aenter__())
self._rows = []

@property
def _execute_mutex(self):
return self._adapt_connection._execute_mutex # noqa

@property
def description(self):
return self._cursor.description

@property
def rowcount(self):
return self._cursor.rowcount

@property
def arraysize(self):
return self._cursor.arraysize

@arraysize.setter
def arraysize(self, value):
self._cursor.arraysize = value

@property
def lastrowid(self):
return self._cursor.lastrowid

def close(self):
# note we aren't actually closing the cursor here,
# we are just letting GC do it. to allow this to be async
# we would need the Result to change how it does "Safe close cursor".
self._rows[:] = [] # noqa

def execute(self, operation, params=None, context=None):
return self.await_(self._execute_async(operation, params, context))

async def _execute_async(self, operation, params, context):
async with self._execute_mutex:
result = await self._cursor.execute(
operation,
args=params,
context=context
)

self._rows = list(await self._cursor.fetchall())
return result

def executemany(self, operation, params=None, context=None):
return self.await_(self._executemany_async(operation, params, context))

async def _executemany_async(self, operation, params, context):
async with self._execute_mutex:
return await self._cursor.executemany(
operation,
args=params,
context=context
)

def setinputsizes(self, *args):
pass

def setoutputsizes(self, *args):
pass

def __iter__(self):
while self._rows:
yield self._rows.pop(0)

def fetchone(self):
if self._rows:
return self._rows.pop(0)
else:
return None

def fetchmany(self, size=None):
if size is None:
size = self.arraysize

retval = self._rows[0:size]
self._rows[:] = self._rows[size:]
return retval

def fetchall(self):
retval = self._rows[:]
self._rows[:] = []
return retval


class AsyncAdapt_asynch_dbapi:
def __init__(self, asynch):
self.asynch = asynch
self.paramstyle = 'pyformat'
self._init_dbapi_attributes()

class Error(Exception):
pass

def _init_dbapi_attributes(self):
for name in (
'ServerException',
'UnexpectedPacketFromServerError',
'LogicalError',
'UnknownTypeError',
'ChecksumDoesntMatchError',
'TypeMismatchError',
'UnknownCompressionMethod',
'TooLargeStringSize',
'NetworkError',
'SocketTimeoutError',
'UnknownPacketFromServerError',
'CannotParseUuidError',
'CannotParseDomainError',
'PartiallyConsumedQueryError',
'ColumnException',
'ColumnTypeMismatchException',
'StructPackException',
'InterfaceError',
'DatabaseError',
'ProgrammingError',
'NotSupportedError',
):
setattr(self, name, getattr(self.asynch.errors, name))

def connect(self, *args, **kwargs) -> 'AsyncAdapt_asynch_connection':
return AsyncAdapt_asynch_connection(
self,
await_only(self.asynch.connect(*args, **kwargs))
)


class AsyncAdapt_asynch_connection(AdaptedConnection):
await_ = staticmethod(await_only)
__slots__ = ('dbapi', '_execute_mutex')

def __init__(self, dbapi, connection):
self.dbapi = dbapi
self._connection = connection
self._execute_mutex = asyncio.Lock()

def ping(self, reconnect):
return self.await_(self._ping_async())

async def _ping_async(self):
async with self._execute_mutex:
return await self._connection.ping()

def character_set_name(self):
return self._connection.character_set_name()

def autocommit(self, value):
self.await_(self._connection.autocommit(value))

def cursor(self, server_side=False):
return AsyncAdapt_asynch_cursor(self)

def rollback(self):
self.await_(self._connection.rollback())

def commit(self):
self.await_(self._connection.commit())

def close(self):
self.await_(self._connection.close())
13 changes: 9 additions & 4 deletions clickhouse_sqlalchemy/orm/session.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.ext.asyncio import AsyncSession

from .query import Query


def make_session(engine):
Session = sessionmaker(bind=engine)
def make_session(engine, is_async=False):
session_class = Session
if is_async:
session_class = AsyncSession

return Session(query_cls=Query)
factory = sessionmaker(bind=engine, class_=session_class)

return factory(query_cls=Query)
35 changes: 25 additions & 10 deletions docs/connection.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ ClickHouse SQLAlchemy uses the following syntax for the connection string:
Where:

- **driver** is driver to use. Possible choices: ``http``, ``native``. ``http``
is default. When you omit driver http is used.
- **driver** is driver to use. Possible choices: ``http``, ``native``, ``asynch``.
``http`` is default. When you omit driver http is used.
- **database** is database connect to. Default is ``default``.
- **user** is database user. Defaults to ``'default'``.
- **password** of the user. Defaults to ``''`` (no password).
Expand Down Expand Up @@ -49,20 +49,20 @@ HTTP

Simple DSN example:

.. code-block::
.. code-block:: RST
clickhouse+http://host/db
DSN example for ClickHouse https port:

.. code-block::
.. code-block:: RST
clickhouse+http://user:password@host:8443/db?protocol=https
When you are using `nginx` as proxy server for ClickHouse server connection
string might look like:

.. code-block::
.. code-block:: RST
clickhouse+http://user:password@host:8124/test?protocol=https
Expand Down Expand Up @@ -92,12 +92,12 @@ SSH or VPN (for example) while communicating over untrusted network.

Simple DSN example:

.. code-block::
.. code-block:: RST
clickhouse+http://host/db
clickhouse+native://host/db
All connection string parameters are proxied to ``clickhouse-driver``.
See it's `parameters <https://clickhouse-driver.readthedocs.io/en/latest/api.html#clickhouse_driver.connection.Connection>`_.
See it's `parameters <https://clickhouse-driver.readthedocs.io/en/latest/api.html#clickhouse_driver.connection.Connection>`__.

Example DSN with LZ4 compression secured with Let's Encrypt certificate on server side:

Expand All @@ -112,6 +112,21 @@ Example DSN with LZ4 compression secured with Let's Encrypt certificate on serve
Example with multiple hosts

.. code-block::
.. code-block:: RST
clickhouse+native://wronghost/default?alt_hosts=localhost:9000
Asynch
~~~~~~

Same as Native.

Simple DSN example:

.. code-block:: RST
clickhouse+asynch://host/db
clickhouse+native://wronghost/default?alt_hosts=localhost:9000
All connection string parameters are proxied to ``asynch``.
See it's `parameters <https://github.com/long2ice/asynch/blob/dev/asynch/connection.py>`__.
2 changes: 1 addition & 1 deletion docs/contents.rst.inc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ ClickHouse server provides a lot of interfaces. This dialect supports:
Each interface has it's own support by corresponding "driver":

- **http** via ``requests``
- **native** via ``clickhouse-driver``
- **native** via ``clickhouse-driver`` or via ``asynch`` for async support

Native driver is recommended due to rich ``clickhouse-driver`` support. HTTP
driver has poor development support compare to native driver.
Expand Down
2 changes: 1 addition & 1 deletion docs/features.rst
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,7 @@ To avoid this side effect you should create another session
Execution options
+++++++++++++++++

.. attention:: This supported only in native driver.
.. attention:: This supported only in native and asynch drivers.

You can override default ClickHouse server settings and pass desired settings
with ``execution_options``. Set lower priority to query and limit max number
Expand Down
2 changes: 2 additions & 0 deletions docs/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ clickhouse-sqlalchemy:
* `clickhouse-driver`_ ClickHouse Python Driver with native (TCP) interface support.
* `requests`_ a simple and elegant HTTP library.
* `ipaddress`_ backport ipaddress module.
* `asynch`_ An asyncio ClickHouse Python Driver with native (TCP) interface support.

.. _clickhouse-driver: https://pypi.org/project/clickhouse-driver/
.. _requests: https://pypi.org/project/requests/
.. _ipaddress: https://pypi.org/project/ipaddress/
.. _asynch: https://pypi.org/project/asynch/

If you are planning to use ``clickhouse-driver`` with compression you should
also install compression extras as well. See clickhouse-driver `documentation <https://clickhouse-driver.readthedocs.io>`_.
Expand Down
2 changes: 1 addition & 1 deletion docs/license.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
License
=======

clickHouse-sqlalchemy is distributed under the `MIT license
clickhouse-sqlalchemy is distributed under the `MIT license
<http://www.opensource.org/licenses/mit-license.php>`_.

0 comments on commit e2fe139

Please sign in to comment.