Skip to content

Commit

Permalink
Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
xzkostyan committed Sep 25, 2017
1 parent 9b88271 commit bb714e6
Show file tree
Hide file tree
Showing 12 changed files with 426 additions and 32 deletions.
29 changes: 29 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
env:
- VERSION=1.1.54276

language: python
python:
- "3.5"
- "3.6"
cache: pip
services:
- docker
before_install:
- docker run -d -p 127.0.0.1:9000:9000 --name test-clickhouse-server --ulimit nofile=262144:262144 yandex/clickhouse-server:$VERSION
- docker run -d --entrypoint "/bin/sh" --name test-clickhouse-client --link test-clickhouse-server:clickhouse-server yandex/clickhouse-client:$VERSION -c 'while :; do sleep 1; done'
- docker ps -a
# Faking clickhouse-client real comminitation with container via docker exec.
- echo -e '#!/bin/bash\n\ndocker exec test-clickhouse-client clickhouse-client "$@"' | sudo tee /usr/local/bin/clickhouse-client > /dev/null
- sudo chmod +x /usr/local/bin/clickhouse-client
# Overriding setup.cfg. Set host=clickhouse-server
- echo -e '[db]\nhost=clickhouse-server\nport=9000\ndatabase=test\nuser=default\npassword=\ncompression=lz4,lz4hc,zstd' > setup.cfg
# Make host think that clickhouse-server is localhost
- echo '127.0.0.1 clickhouse-server' | sudo tee /etc/hosts > /dev/null
install:
pip install flake8 flake8-print coveralls
before_script:
flake8 .
script:
- coverage run --source=aioch setup.py test
after_success:
- coveralls
4 changes: 2 additions & 2 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License
This is the MIT license: http://www.opensource.org/licenses/mit-license.php

Copyright (c) 2017 Marilyn System
Copyright (c) 2017 by Konstantin Lebedev.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
65 changes: 64 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,65 @@
# aioch
aioch - is a library for accessing a ClickHouse database over native interface from the asyncio
**aioch** is a library for accessing a ClickHouse database over native interface from the asyncio.
It wraps features of [clickhouse-driver](https://github.com/mymarilyn/clickhouse-driver) for asynchronous usage.

[![Coverage Status](https://coveralls.io/repos/github/mymarilyn/aioch/badge.svg?branch=master)](https://coveralls.io/github/mymarilyn/aioch?branch=master)
[![Build Status](https://travis-ci.org/mymarilyn/aioch.svg?branch=master)](https://travis-ci.org/mymarilyn/aioch)


## Installation

The package can be installed using `pip`:

```bash
pip install aioch
```

## Usage
```python
from datetime import datetime

import asyncio
from aioch import Client


async def exec_progress():
client = Client('localhost')

progress = await client.execute_with_progress('LONG AND COMPLICATED QUERY')
timeout = 20
started_at = datetime.now()

async for num_rows, total_rows in progress:
done = num_rows / total_rows if total_rows else total_rows
now = datetime.now()
# Cancel query if it takes more than 20 seconds to process 50% of rows.
if (now - started_at).total_seconds() > timeout and done < 0.5:
await client.cancel()
break
else:
rv = await progress.get_result()
print(rv)


async def exec_no_progress():
client = Client('localhost')
rv = await client.execute('LONG AND COMPLICATED QUERY')
print(rv)


loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([exec_progress(), exec_no_progress()]))
```

For more information see **clickhouse-driver** usage examples.

## Parameters

* `executor` - instance of custom Executor, if not supplied default executor will be used
* `loop` - asyncio compatible event loop

Other parameters are passing to wrapped clickhouse-driver's Client.

## License

aioch is distributed under the [MIT license](http://www.opensource.org/licenses/mit-license.php).
8 changes: 8 additions & 0 deletions aioch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

from aioch.client import Client


VERSION = (0, 0, 1)
__version__ = '.'.join(str(x) for x in VERSION)

__all__ = ['Client']
67 changes: 38 additions & 29 deletions src/client.py → aioch/client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import asyncio

from .helpers import run_in_executor
from ..client import Client, QueryResult
from clickhouse_driver.client import Client as BlockingClient, QueryResult


class Progress(object):
def __init__(self, result, client):
self.client = client
def __init__(self, result, async_client):
self.async_client = async_client
self._client = async_client._client
self.result = result
self.rows_read, self.approx_rows_to_read = 0, 0

Expand Down Expand Up @@ -35,11 +36,13 @@ async def __anext__(self):

except Exception as e:
if not isinstance(e, StopAsyncIteration):
self.client.connection.disconnect()
await self.async_client.disconnect()
raise

async def read_packet(self):
packet = await self.client.run_in_executor(self.client.receive_packet)
packet = await self.async_client.run_in_executor(
self._client.receive_packet
)
if not packet:
return

Expand All @@ -55,7 +58,7 @@ async def read_packet(self):

return self.rows_read, self.approx_rows_to_read
else:
self.client.store_query_result(packet, self.result)
self._client.store_query_result(packet, self.result)

return True

Expand All @@ -67,57 +70,63 @@ async def get_result(self):
return self.result.get_result()


class AsyncClient(Client):
class Client(object):
def __init__(self, *args, **kwargs):
self.loop = kwargs.pop('loop', None) or asyncio.get_event_loop()
self.executor = kwargs.pop('executor', None)
self._loop = kwargs.pop('loop', None) or asyncio.get_event_loop()
self._executor = kwargs.pop('executor', None)
self._client = BlockingClient(*args, **kwargs)

super(AsyncClient, self).__init__(*args, **kwargs)
super(Client, self).__init__()

def run_in_executor(self, *args, **kwargs):
return run_in_executor(self.executor, self.loop, *args, **kwargs)
return run_in_executor(self._executor, self._loop, *args, **kwargs)

async def disconnect(self):
return await self.run_in_executor(self._client.disconnect)

async def execute(self, *args, **kwargs):
execute = super(AsyncClient, self).execute
return await self.run_in_executor(execute, *args, **kwargs)
return await self.run_in_executor(self._client.execute, *args,
**kwargs)

async def execute_with_progress(self, query, with_column_types=False,
external_tables=None, query_id=None,
settings=None):
self.connection.force_connect()
await self.run_in_executor(self._client.connection.force_connect)

try:
return await self.process_ordinary_query_with_progress(
query, with_column_types=with_column_types,
external_tables=external_tables,
query_id=query_id, settings=settings
)

except Exception:
self.connection.disconnect()
raise
return await self.process_ordinary_query_with_progress(
query, with_column_types=with_column_types,
external_tables=external_tables,
query_id=query_id, settings=settings
)

async def process_ordinary_query_with_progress(
self, query, with_column_types=False, external_tables=None,
query_id=None, settings=None):
await self.run_in_executor(
self.connection.send_query, query,
self._client.connection.send_query, query,
query_id=query_id, settings=settings
)
await self.run_in_executor(
self.connection.send_external_tables, external_tables
self._client.connection.send_external_tables, external_tables
)

return self.receive_result(
return await self.receive_result(
with_column_types=with_column_types, progress=True
)

def receive_result(self, with_column_types=False, progress=False):
async def receive_result(self, with_column_types=False, progress=False):
result = QueryResult(with_column_types=with_column_types)

if progress:
return Progress(result, self)

else:
self.receive_no_progress_result(result)
await self.run_in_executor(self._client.receive_no_progress_result,
result)
return result.get_result()

async def cancel(self, with_column_types=False):
# TODO: Add warning if already cancelled.
await self.run_in_executor(self._client.connection.send_cancel)
# Client must still read until END_OF_STREAM packet.
return await self.receive_result(with_column_types=with_column_types)
File renamed without changes.
7 changes: 7 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[db]
host=localhost
port=9000
database=test
user=default
password=
compression=lz4,lz4hc,zstd
84 changes: 84 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import os
import re
from codecs import open

from setuptools import setup

here = os.path.abspath(os.path.dirname(__file__))


def read_version():
regexp = re.compile('^VERSION\W*=\W*\(([^\(\)]*)\)')
init_py = os.path.join(here, 'aioch', '__init__.py')
with open(init_py) as f:
for line in f:
match = regexp.match(line)
if match is not None:
return match.group(1).replace(', ', '.')
else:
raise RuntimeError('Cannot find version in aioch/__init__.py')


with open(os.path.join(here, 'README.md'), encoding='utf-8') as f:
long_description = f.read()

# TODO: unsupported versions

setup(
name='aioch',
version=read_version(),

description=(
'Library for accessing a ClickHouse database over native interface '
'from the asyncio'
),
long_description=long_description,

url='https://github.com/mymarilyn/aioch',

author='Konstantin Lebedev',
author_email='kostyan.lebedev@gmail.com',

license='MIT',

classifiers=[
'Development Status :: 4 - Beta',


'Environment :: Console',

'Intended Audience :: Developers',
'Intended Audience :: Information Technology',


'License :: OSI Approved :: MIT License',


'Operating System :: OS Independent',


'Programming Language :: SQL',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',

'Topic :: Database',
'Topic :: Software Development',
'Topic :: Software Development :: Libraries',
'Topic :: Software Development :: Libraries :: Application Frameworks',
'Topic :: Software Development :: Libraries :: Python Modules',
'Topic :: Scientific/Engineering :: Information Analysis'
],

keywords='ClickHouse db database cloud analytics asyncio',

packages=['aioch'],
install_requires=[
'clickhouse-driver>=0.0.6'
],
test_suite='nose.collector',
tests_require=[
'nose',
'mock',
],
)
File renamed without changes.
27 changes: 27 additions & 0 deletions tests/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from logging.config import dictConfig


def configure():
dictConfig({
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': '%(asctime)s %(levelname)-8s %(name)s: %(message)s'
},
},
'handlers': {
'default': {
'level': 'ERROR',
'formatter': 'standard',
'class': 'logging.StreamHandler',
},
},
'loggers': {
'': {
'handlers': ['default'],
'level': 'ERROR',
'propagate': True
},
}
})
Loading

0 comments on commit bb714e6

Please sign in to comment.