From 48ecc6a3a0e3a86e8da6e9bf1d61c647c50ef5a5 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev Date: Sat, 23 Sep 2017 21:41:31 +0300 Subject: [PATCH] Tests --- .travis.yml | 29 ++++++++++ CHANGELOG.md | 7 ++- LICENSE | 4 +- README.md | 73 ++++++++++++++++++++++++- aioch/__init__.py | 8 +++ {src => aioch}/client.py | 76 +++++++++++++++----------- src/helpers.py => aioch/utils.py | 0 setup.cfg | 6 ++ setup.py | 90 ++++++++++++++++++++++++++++++ {src => tests}/__init__.py | 0 tests/log.py | 27 +++++++++ tests/test_client.py | 73 +++++++++++++++++++++++++ tests/testcase.py | 94 ++++++++++++++++++++++++++++++++ 13 files changed, 450 insertions(+), 37 deletions(-) create mode 100644 .travis.yml create mode 100644 aioch/__init__.py rename {src => aioch}/client.py (53%) rename src/helpers.py => aioch/utils.py (100%) create mode 100644 setup.cfg create mode 100644 setup.py rename {src => tests}/__init__.py (100%) create mode 100644 tests/log.py create mode 100644 tests/test_client.py create mode 100644 tests/testcase.py diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..e96d359 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,29 @@ +env: + - VERSION=1.1.54276 + +language: python +python: + - "3.5" + - "3.6" +cache: pip +services: + - docker +before_install: + - docker run -d -p 127.0.0.1:9000:9000 --name test-clickhouse-server --ulimit nofile=262144:262144 yandex/clickhouse-server:$VERSION + - docker run -d --entrypoint "/bin/sh" --name test-clickhouse-client --link test-clickhouse-server:clickhouse-server yandex/clickhouse-client:$VERSION -c 'while :; do sleep 1; done' + - docker ps -a + # Faking clickhouse-client real comminitation with container via docker exec. + - echo -e '#!/bin/bash\n\ndocker exec test-clickhouse-client clickhouse-client "$@"' | sudo tee /usr/local/bin/clickhouse-client > /dev/null + - sudo chmod +x /usr/local/bin/clickhouse-client + # Overriding setup.cfg. Set host=clickhouse-server + - echo -e '[db]\nhost=clickhouse-server\nport=9000\ndatabase=test\nuser=default\npassword=\ncompression=lz4,lz4hc,zstd' > setup.cfg + # Make host think that clickhouse-server is localhost + - echo '127.0.0.1 clickhouse-server' | sudo tee /etc/hosts > /dev/null +install: + pip install flake8 flake8-print coveralls +before_script: + flake8 . +script: + - coverage run --source=aioch setup.py test +after_success: + - coveralls diff --git a/CHANGELOG.md b/CHANGELOG.md index c5c1662..4f4ac61 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog -## 0.0.1 +## [Unreleased] + +## 0.0.1 - 2017-09-27 ### Added -- Init version. +- `execute` / `execute_with_progress`wrappers. +- `loop`, `executor` client params. diff --git a/LICENSE b/LICENSE index ae34e55..69803c8 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ -MIT License +This is the MIT license: http://www.opensource.org/licenses/mit-license.php -Copyright (c) 2017 Marilyn System +Copyright (c) 2017 by Konstantin Lebedev. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 60151b1..c15a7e4 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,73 @@ # aioch - aioch - is a library for accessing a ClickHouse database over native interface from the asyncio +**aioch** is a library for accessing a ClickHouse database over native interface from the asyncio. +It wraps features of [clickhouse-driver](https://github.com/mymarilyn/clickhouse-driver) for asynchronous usage. + +[![Coverage Status](https://coveralls.io/repos/github/mymarilyn/aioch/badge.svg?branch=master)](https://coveralls.io/github/mymarilyn/aioch?branch=master) +[![Build Status](https://travis-ci.org/mymarilyn/aioch.svg?branch=master)](https://travis-ci.org/mymarilyn/aioch) + + +## Installation + +The package can be installed using `pip`: + +```bash +pip install aioch +``` + +To install from source: + +```bash +git clone https://github.com/mymarilyn/aioch +cd aioch +python setup.py install +``` + +## Usage +```python +from datetime import datetime + +import asyncio +from aioch import Client + + +async def exec_progress(): + client = Client('localhost') + + progress = await client.execute_with_progress('LONG AND COMPLICATED QUERY') + timeout = 20 + started_at = datetime.now() + + async for num_rows, total_rows in progress: + done = num_rows / total_rows if total_rows else total_rows + now = datetime.now() + # Cancel query if it takes more than 20 seconds to process 50% of rows. + if (now - started_at).total_seconds() > timeout and done < 0.5: + await client.cancel() + break + else: + rv = await progress.get_result() + print(rv) + + +async def exec_no_progress(): + client = Client('localhost') + rv = await client.execute('LONG AND COMPLICATED QUERY') + print(rv) + + +loop = asyncio.get_event_loop() +loop.run_until_complete(asyncio.wait([exec_progress(), exec_no_progress()])) +``` + +For more information see **clickhouse-driver** usage examples. + +## Parameters + +* `executor` - instance of custom Executor, if not supplied default executor will be used +* `loop` - asyncio compatible event loop + +Other parameters are passing to wrapped clickhouse-driver's Client. + +## License + +aioch is distributed under the [MIT license](http://www.opensource.org/licenses/mit-license.php). diff --git a/aioch/__init__.py b/aioch/__init__.py new file mode 100644 index 0000000..eb76d9b --- /dev/null +++ b/aioch/__init__.py @@ -0,0 +1,8 @@ + +from aioch.client import Client + + +VERSION = (0, 0, 1) +__version__ = '.'.join(str(x) for x in VERSION) + +__all__ = ['Client'] diff --git a/src/client.py b/aioch/client.py similarity index 53% rename from src/client.py rename to aioch/client.py index 3f46514..0eb6bb7 100644 --- a/src/client.py +++ b/aioch/client.py @@ -1,12 +1,14 @@ import asyncio -from .helpers import run_in_executor -from ..client import Client, QueryResult +from clickhouse_driver.client import Client as BlockingClient, QueryResult + +from .utils import run_in_executor class Progress(object): - def __init__(self, result, client): - self.client = client + def __init__(self, result, async_client): + self.async_client = async_client + self._client = async_client._client self.result = result self.rows_read, self.approx_rows_to_read = 0, 0 @@ -33,13 +35,17 @@ async def __anext__(self): return read_more - except Exception as e: - if not isinstance(e, StopAsyncIteration): - self.client.connection.disconnect() + except StopAsyncIteration: + raise + + except (Exception, KeyboardInterrupt): + await self.async_client.disconnect() raise async def read_packet(self): - packet = await self.client.run_in_executor(self.client.receive_packet) + packet = await self.async_client.run_in_executor( + self._client.receive_packet + ) if not packet: return @@ -55,7 +61,7 @@ async def read_packet(self): return self.rows_read, self.approx_rows_to_read else: - self.client.store_query_result(packet, self.result) + self._client.store_query_result(packet, self.result) return True @@ -67,57 +73,63 @@ async def get_result(self): return self.result.get_result() -class AsyncClient(Client): +class Client(object): def __init__(self, *args, **kwargs): - self.loop = kwargs.pop('loop', None) or asyncio.get_event_loop() - self.executor = kwargs.pop('executor', None) + self._loop = kwargs.pop('loop', None) or asyncio.get_event_loop() + self._executor = kwargs.pop('executor', None) + self._client = BlockingClient(*args, **kwargs) - super(AsyncClient, self).__init__(*args, **kwargs) + super(Client, self).__init__() def run_in_executor(self, *args, **kwargs): - return run_in_executor(self.executor, self.loop, *args, **kwargs) + return run_in_executor(self._executor, self._loop, *args, **kwargs) + + async def disconnect(self): + return await self.run_in_executor(self._client.disconnect) async def execute(self, *args, **kwargs): - execute = super(AsyncClient, self).execute - return await self.run_in_executor(execute, *args, **kwargs) + return await self.run_in_executor(self._client.execute, *args, + **kwargs) async def execute_with_progress(self, query, with_column_types=False, external_tables=None, query_id=None, settings=None): - self.connection.force_connect() + await self.run_in_executor(self._client.connection.force_connect) - try: - return await self.process_ordinary_query_with_progress( - query, with_column_types=with_column_types, - external_tables=external_tables, - query_id=query_id, settings=settings - ) - - except Exception: - self.connection.disconnect() - raise + return await self.process_ordinary_query_with_progress( + query, with_column_types=with_column_types, + external_tables=external_tables, + query_id=query_id, settings=settings + ) async def process_ordinary_query_with_progress( self, query, with_column_types=False, external_tables=None, query_id=None, settings=None): await self.run_in_executor( - self.connection.send_query, query, + self._client.connection.send_query, query, query_id=query_id, settings=settings ) await self.run_in_executor( - self.connection.send_external_tables, external_tables + self._client.connection.send_external_tables, external_tables ) - return self.receive_result( + return await self.receive_result( with_column_types=with_column_types, progress=True ) - def receive_result(self, with_column_types=False, progress=False): + async def receive_result(self, with_column_types=False, progress=False): result = QueryResult(with_column_types=with_column_types) if progress: return Progress(result, self) else: - self.receive_no_progress_result(result) + await self.run_in_executor(self._client.receive_no_progress_result, + result) return result.get_result() + + async def cancel(self, with_column_types=False): + # TODO: Add warning if already cancelled. + await self.run_in_executor(self._client.connection.send_cancel) + # Client must still read until END_OF_STREAM packet. + return await self.receive_result(with_column_types=with_column_types) diff --git a/src/helpers.py b/aioch/utils.py similarity index 100% rename from src/helpers.py rename to aioch/utils.py diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..7898d6a --- /dev/null +++ b/setup.cfg @@ -0,0 +1,6 @@ +[db] +host=localhost +port=9000 +database=test +user=default +password= diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..50fe9eb --- /dev/null +++ b/setup.py @@ -0,0 +1,90 @@ +from codecs import open +import os +import re +import sys +from setuptools import setup + + +PY_VER = sys.version_info + + +if PY_VER < (3, 5): + raise RuntimeError("aioch doesn't suppport Python earlier than 3.5") + + +here = os.path.abspath(os.path.dirname(__file__)) + + +def read_version(): + regexp = re.compile('^VERSION\W*=\W*\(([^\(\)]*)\)') + init_py = os.path.join(here, 'aioch', '__init__.py') + with open(init_py) as f: + for line in f: + match = regexp.match(line) + if match is not None: + return match.group(1).replace(', ', '.') + else: + raise RuntimeError('Cannot find version in aioch/__init__.py') + + +with open(os.path.join(here, 'README.md'), encoding='utf-8') as f: + long_description = f.read() + + +setup( + name='aioch', + version=read_version(), + + description=( + 'Library for accessing a ClickHouse database over native interface ' + 'from the asyncio' + ), + long_description=long_description, + + url='https://github.com/mymarilyn/aioch', + + author='Konstantin Lebedev', + author_email='kostyan.lebedev@gmail.com', + + license='MIT', + + classifiers=[ + 'Development Status :: 4 - Beta', + + + 'Environment :: Console', + + 'Intended Audience :: Developers', + 'Intended Audience :: Information Technology', + + + 'License :: OSI Approved :: MIT License', + + + 'Operating System :: OS Independent', + + + 'Programming Language :: SQL', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', + + 'Topic :: Database', + 'Topic :: Software Development', + 'Topic :: Software Development :: Libraries', + 'Topic :: Software Development :: Libraries :: Application Frameworks', + 'Topic :: Software Development :: Libraries :: Python Modules', + 'Topic :: Scientific/Engineering :: Information Analysis' + ], + + keywords='ClickHouse db database cloud analytics asyncio', + + packages=['aioch'], + install_requires=[ + 'clickhouse-driver>=0.0.6' + ], + test_suite='nose.collector', + tests_require=[ + 'nose' + ], +) diff --git a/src/__init__.py b/tests/__init__.py similarity index 100% rename from src/__init__.py rename to tests/__init__.py diff --git a/tests/log.py b/tests/log.py new file mode 100644 index 0000000..320aa60 --- /dev/null +++ b/tests/log.py @@ -0,0 +1,27 @@ +from logging.config import dictConfig + + +def configure(): + dictConfig({ + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': { + 'standard': { + 'format': '%(asctime)s %(levelname)-8s %(name)s: %(message)s' + }, + }, + 'handlers': { + 'default': { + 'level': 'ERROR', + 'formatter': 'standard', + 'class': 'logging.StreamHandler', + }, + }, + 'loggers': { + '': { + 'handlers': ['default'], + 'level': 'ERROR', + 'propagate': True + }, + } + }) diff --git a/tests/test_client.py b/tests/test_client.py new file mode 100644 index 0000000..1d88859 --- /dev/null +++ b/tests/test_client.py @@ -0,0 +1,73 @@ +from clickhouse_driver import errors + +from aioch import Client +from tests.testcase import BaseTestCase + + +class PacketsTestCase(BaseTestCase): + def create_client(self): + return Client( + self.host, self.port, self.database, 'wrong_user', loop=self.loop + ) + + def test_exception_on_hello_packet(self): + async def run(): + with self.assertRaises(errors.ServerException) as e: + await self.client.execute('SHOW TABLES') + + # Simple exception formatting checks + exc = e.exception + self.assertIn('Code:', str(exc)) + self.assertIn('Stack trace:', str(exc)) + + self.loop.run_until_complete(run()) + + +class SelectTestCase(BaseTestCase): + def test_simple_select(self): + async def run(): + rv = await self.client.execute('SELECT 2') + self.assertEqual(rv, [(2,)]) + + self.loop.run_until_complete(run()) + + +class ProgressTestCase(BaseTestCase): + def test_select_with_progress(self): + async def run(): + progress = await self.client.execute_with_progress('SELECT 2') + + progress_rv = [] + async for x in progress: + progress_rv.append(x) + + self.assertEqual(progress_rv, [(1, 0)]) + rv = await progress.get_result() + self.assertEqual(rv, [(2,)]) + + self.loop.run_until_complete(run()) + + def test_select_with_progress_error(self): + async def run(): + with self.assertRaises(errors.ServerException): + progress = await self.client.execute_with_progress( + 'SELECT error' + ) + await progress.get_result() + + self.loop.run_until_complete(run()) + + def test_select_with_progress_no_progress_unwind(self): + async def run(): + progress = await self.client.execute_with_progress('SELECT 2') + self.assertEqual(await progress.get_result(), [(2,)]) + + self.loop.run_until_complete(run()) + + def test_select_with_progress_cancel(self): + async def run(): + await self.client.execute_with_progress('SELECT 2') + rv = await self.client.cancel() + self.assertEqual(rv, [(2,)]) + + self.loop.run_until_complete(run()) diff --git a/tests/testcase.py b/tests/testcase.py new file mode 100644 index 0000000..6bb10bb --- /dev/null +++ b/tests/testcase.py @@ -0,0 +1,94 @@ +import asyncio +import configparser +from contextlib import contextmanager +import subprocess +from unittest import TestCase + +from aioch.client import Client +from tests import log + + +log.configure() + +file_config = configparser.ConfigParser() +file_config.read(['setup.cfg']) + + +class BaseTestCase(TestCase): + host = file_config.get('db', 'host') + port = int(file_config.get('db', 'port')) + database = file_config.get('db', 'database') + user = file_config.get('db', 'user') + password = file_config.get('db', 'password') + + client = None + loop = None + + @classmethod + def emit_cli(cls, statement, database=None): + if database is None: + database = cls.database + + args = [ + 'clickhouse-client', + '--database', database, + '--host', cls.host, + '--port', str(cls.port), + '--query', str(statement) + ] + + process = subprocess.Popen( + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + output = process.communicate() + out, err = output + + if err: + raise RuntimeError( + 'Error during communication. {}'.format(err) + ) + + return out.decode('utf-8') + + def create_client(self, **kwargs): + kwargs.setdefault('loop', self.loop) + + return Client( + self.host, self.port, self.database, self.user, self.password, + **kwargs + ) + + @classmethod + def setUpClass(cls): + cls.emit_cli( + 'DROP DATABASE IF EXISTS {}'.format(cls.database), 'default' + ) + cls.emit_cli('CREATE DATABASE {}'.format(cls.database), 'default') + super(BaseTestCase, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + cls.emit_cli('DROP DATABASE {}'.format(cls.database)) + super(BaseTestCase, cls).tearDownClass() + + def setUp(self): + self.loop = asyncio.new_event_loop() + self.client = self.create_client() + super(BaseTestCase, self).setUp() + + def tearDown(self): + self.loop.run_until_complete(self.client.disconnect()) + self.loop.stop() + super(BaseTestCase, self).setUp() + + @contextmanager + def create_table(self, columns): + self.emit_cli( + 'CREATE TABLE test ({}) ''ENGINE = Memory'.format(columns) + ) + try: + yield + except Exception: + raise + finally: + self.emit_cli('DROP TABLE test')