Skip to content

long2ice/asynch

Repository files navigation

asynch

pypi license workflows workflows

Introduction

asynch is an asynchronous ClickHouse Python driver with native TCP interface support, which reuses most of clickhouse-driver features and complies with PEP249.

Installation

> pip install asynch

If you want to install clickhouse-cityhash to enable transport compression

> pip install asynch[compression]

Usage

Basically, a connection to a ClickHouse server can be established in two ways:

  1. with a DSN string, e.g., clickhouse://[user:password]@host:port/database;

    from asynch import connect
    
    # connecting with a DSN string
    async def connect_database():
        conn = await connect(
            dsn = "clickhouse://ch_user:P@55w0rD:@127.0.0.1:9000/chdb",
        )
  2. with separately given connection/DSN parameters: user (optional), password (optional), host, port, database.

    from asynch import connect
    
    # connecting with DSN parameters
    async def connect_database():
        conn = await connect(
            user = "ch_user",
            password = "P@55w0rD",
            host = "127.0.0.1",
            port = 9000,
            database = "chdb",
        )

If a DSN string is given, it takes priority over any specified connection parameter.

Create a database and a table by executing SQL statements via an instance of the Cursor class (here its child DictCursor class) acquired from an instance of the Connection class.

async def create_table(conn: Connection):
    async with conn.cursor(cursor=DictCursor) as cursor:
        await cursor.execute("CREATE DATABASE IF NOT EXISTS test")
        await cursor.execute("""
        CREATE TABLE if not exists test.asynch
        (
            `id`       Int32,
            `decimal`  Decimal(10, 2),
            `date`     Date,
            `datetime` DateTime,
            `float`    Float32,
            `uuid`     UUID,
            `string`   String,
            `ipv4`     IPv4,
            `ipv6`     IPv6
        )
        ENGINE = MergeTree
        ORDER BY id
        """
        )

Fetching one row from an executed SQL statement:

async def fetchone(conn: Connection):
    # by default, an instance of the `Cursor` class
    async with conn.cursor() as cursor:
        await cursor.execute("SELECT 1")
        ret = await cursor.fetchone()
        assert ret == (1,)

Fetching all the rows from an executed SQL statement:

async def fetchall():
    async with conn.cursor() as cursor:
        await cursor.execute("SELECT 1")
        ret = await cursor.fetchall()
        assert ret == [(1,)]

Using an instance of the DictCursor class to get results as a sequence of dictionaries representing the rows of an executed SQL query:

async def dict_cursor():
    async with conn.cursor(cursor=DictCursor) as cursor:
        await cursor.execute("SELECT 1")
        ret = await cursor.fetchall()
        assert ret == [{"1": 1}]

Inserting data with dicts via a DictCursor instance:

from asynch.cursors import DictCursor

async def insert_dict():
    async with conn.cursor(cursor=DictCursor) as cursor:
        ret = await cursor.execute(
            """INSERT INTO test.asynch(id,decimal,date,datetime,float,uuid,string,ipv4,ipv6) VALUES""",
            [
                {
                    "id": 1,
                    "decimal": 1,
                    "date": "2020-08-08",
                    "datetime": "2020-08-08 00:00:00",
                    "float": 1,
                    "uuid": "59e182c4-545d-4f30-8b32-cefea2d0d5ba",
                    "string": "1",
                    "ipv4": "0.0.0.0",
                    "ipv6": "::",
                }
            ],
        )
        assert ret == 1

Inserting data with tuples:

async def insert_tuple():
    async with conn.cursor(cursor=DictCursor) as cursor:
        ret = await cursor.execute(
            """INSERT INTO test.asynch(id,decimal,date,datetime,float,uuid,string,ipv4,ipv6) VALUES""",
            [
                (
                    1,
                    1,
                    "2020-08-08",
                    "2020-08-08 00:00:00",
                    1,
                    "59e182c4-545d-4f30-8b32-cefea2d0d5ba",
                    "1",
                    "0.0.0.0",
                    "::",
                )
            ],
        )
        assert ret == 1

Connection Pool

Before the v0.2.4:

async def use_pool():
    pool = await asynch.create_pool()
    async with pool.acquire() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute("SELECT 1")
            ret = await cursor.fetchone()
            assert ret == (1,)
    pool.close()
    await pool.wait_closed()

Since the v0.2.5:

async def use_pool():
    # init a Pool and fill it with `minsize` opened connections
    async with Pool(minsize=1, maxsize=2) as pool:
        # acquire a connection from the pool
        async with pool.connection() as conn:
            async with conn.cursor() as cursor:
                await cursor.execute("SELECT 1")
                ret = await cursor.fetchone()
                assert ret == (1,)

ThanksTo

License

This project is licensed under the Apache-2.0 License.