Skip to content

Commit

Permalink
Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
xzkostyan committed Sep 24, 2017
1 parent 9b88271 commit 6c1c971
Show file tree
Hide file tree
Showing 12 changed files with 375 additions and 32 deletions.
29 changes: 29 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
env:
- VERSION=1.1.54276

language: python
python:
- "3.5"
- "3.6"
cache: pip
services:
- docker
before_install:
- docker run -d -p 127.0.0.1:9000:9000 --name test-clickhouse-server --ulimit nofile=262144:262144 yandex/clickhouse-server:$VERSION
- docker run -d --entrypoint "/bin/sh" --name test-clickhouse-client --link test-clickhouse-server:clickhouse-server yandex/clickhouse-client:$VERSION -c 'while :; do sleep 1; done'
- docker ps -a
# Faking clickhouse-client real comminitation with container via docker exec.
- echo -e '#!/bin/bash\n\ndocker exec test-clickhouse-client clickhouse-client "$@"' | sudo tee /usr/local/bin/clickhouse-client > /dev/null
- sudo chmod +x /usr/local/bin/clickhouse-client
# Overriding setup.cfg. Set host=clickhouse-server
- echo -e '[db]\nhost=clickhouse-server\nport=9000\ndatabase=test\nuser=default\npassword=\ncompression=lz4,lz4hc,zstd' > setup.cfg
# Make host think that clickhouse-server is localhost
- echo '127.0.0.1 clickhouse-server' | sudo tee /etc/hosts > /dev/null
install:
pip install flake8 flake8-print coveralls
before_script:
flake8 .
script:
- coverage run --source=aioch setup.py test
after_success:
- coveralls
4 changes: 2 additions & 2 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License
This is the MIT license: http://www.opensource.org/licenses/mit-license.php

Copyright (c) 2017 Marilyn System
Copyright (c) 2017 by Konstantin Lebedev.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,20 @@
# aioch
aioch - is a library for accessing a ClickHouse database over native interface from the asyncio
**aioch** is a library for accessing a ClickHouse database over native interface from the asyncio.
It wraps features of [clickhouse-driver](https://github.com/mymarilyn/clickhouse-driver) for asynchronous usage.

[![Coverage Status](https://coveralls.io/repos/github/mymarilyn/aioch/badge.svg?branch=master)](https://coveralls.io/github/mymarilyn/aioch?branch=master)
[![Build Status](https://travis-ci.org/mymarilyn/aioch.svg?branch=master)](https://travis-ci.org/mymarilyn/aioch)


## Installation

The package can be installed using `pip`:

```bash
pip install aioch
```

## Usage


For more detailed information see **clickhouse-driver** usage examples.
2 changes: 2 additions & 0 deletions aioch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
VERSION = (0, 0, 1)
__version__ = '.'.join(str(x) for x in VERSION)
67 changes: 38 additions & 29 deletions src/client.py → aioch/client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import asyncio

from .helpers import run_in_executor
from ..client import Client, QueryResult
from clickhouse_driver.client import Client as BlockingClient, QueryResult


class Progress(object):
def __init__(self, result, client):
self.client = client
def __init__(self, result, async_client):
self.async_client = async_client
self._client = async_client._client
self.result = result
self.rows_read, self.approx_rows_to_read = 0, 0

Expand Down Expand Up @@ -35,11 +36,13 @@ async def __anext__(self):

except Exception as e:
if not isinstance(e, StopAsyncIteration):
self.client.connection.disconnect()
await self.async_client.disconnect()
raise

async def read_packet(self):
packet = await self.client.run_in_executor(self.client.receive_packet)
packet = await self.async_client.run_in_executor(
self._client.receive_packet
)
if not packet:
return

Expand All @@ -55,7 +58,7 @@ async def read_packet(self):

return self.rows_read, self.approx_rows_to_read
else:
self.client.store_query_result(packet, self.result)
self._client.store_query_result(packet, self.result)

return True

Expand All @@ -67,57 +70,63 @@ async def get_result(self):
return self.result.get_result()


class AsyncClient(Client):
class Client(object):
def __init__(self, *args, **kwargs):
self.loop = kwargs.pop('loop', None) or asyncio.get_event_loop()
self.executor = kwargs.pop('executor', None)
self._loop = kwargs.pop('loop', None) or asyncio.get_event_loop()
self._executor = kwargs.pop('executor', None)
self._client = BlockingClient(*args, **kwargs)

super(AsyncClient, self).__init__(*args, **kwargs)
super(Client, self).__init__()

def run_in_executor(self, *args, **kwargs):
return run_in_executor(self.executor, self.loop, *args, **kwargs)
return run_in_executor(self._executor, self._loop, *args, **kwargs)

async def disconnect(self):
return await self.run_in_executor(self._client.disconnect)

async def execute(self, *args, **kwargs):
execute = super(AsyncClient, self).execute
return await self.run_in_executor(execute, *args, **kwargs)
return await self.run_in_executor(self._client.execute, *args,
**kwargs)

async def execute_with_progress(self, query, with_column_types=False,
external_tables=None, query_id=None,
settings=None):
self.connection.force_connect()
await self.run_in_executor(self._client.connection.force_connect)

try:
return await self.process_ordinary_query_with_progress(
query, with_column_types=with_column_types,
external_tables=external_tables,
query_id=query_id, settings=settings
)

except Exception:
self.connection.disconnect()
raise
return await self.process_ordinary_query_with_progress(
query, with_column_types=with_column_types,
external_tables=external_tables,
query_id=query_id, settings=settings
)

async def process_ordinary_query_with_progress(
self, query, with_column_types=False, external_tables=None,
query_id=None, settings=None):
await self.run_in_executor(
self.connection.send_query, query,
self._client.connection.send_query, query,
query_id=query_id, settings=settings
)
await self.run_in_executor(
self.connection.send_external_tables, external_tables
self._client.connection.send_external_tables, external_tables
)

return self.receive_result(
return await self.receive_result(
with_column_types=with_column_types, progress=True
)

def receive_result(self, with_column_types=False, progress=False):
async def receive_result(self, with_column_types=False, progress=False):
result = QueryResult(with_column_types=with_column_types)

if progress:
return Progress(result, self)

else:
self.receive_no_progress_result(result)
await self.run_in_executor(self._client.receive_no_progress_result,
result)
return result.get_result()

async def cancel(self, with_column_types=False):
# TODO: Add warning if already cancelled.
await self.run_in_executor(self._client.connection.send_cancel)
# Client must still read until END_OF_STREAM packet.
return await self.receive_result(with_column_types=with_column_types)
File renamed without changes.
7 changes: 7 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[db]
host=localhost
port=9000
database=test
user=default
password=
compression=lz4,lz4hc,zstd
84 changes: 84 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import os
import re
from codecs import open

from setuptools import setup

here = os.path.abspath(os.path.dirname(__file__))


def read_version():
regexp = re.compile('^VERSION\W*=\W*\(([^\(\)]*)\)')
init_py = os.path.join(here, 'aioch', '__init__.py')
with open(init_py) as f:
for line in f:
match = regexp.match(line)
if match is not None:
return match.group(1).replace(', ', '.')
else:
raise RuntimeError('Cannot find version in aioch/__init__.py')


with open(os.path.join(here, 'README.md'), encoding='utf-8') as f:
long_description = f.read()

# TODO: unsupported versions

setup(
name='aioch',
version=read_version(),

description=(
'Library for accessing a ClickHouse database over native interface '
'from the asyncio'
),
long_description=long_description,

url='https://github.com/mymarilyn/aioch',

author='Konstantin Lebedev',
author_email='kostyan.lebedev@gmail.com',

license='MIT',

classifiers=[
'Development Status :: 4 - Beta',


'Environment :: Console',

'Intended Audience :: Developers',
'Intended Audience :: Information Technology',


'License :: OSI Approved :: MIT License',


'Operating System :: OS Independent',


'Programming Language :: SQL',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',

'Topic :: Database',
'Topic :: Software Development',
'Topic :: Software Development :: Libraries',
'Topic :: Software Development :: Libraries :: Application Frameworks',
'Topic :: Software Development :: Libraries :: Python Modules',
'Topic :: Scientific/Engineering :: Information Analysis'
],

keywords='ClickHouse db database cloud analytics asyncio',

packages=['aioch'],
install_requires=[
'clickhouse-driver>=0.0.6'
],
test_suite='nose.collector',
tests_require=[
'nose',
'mock',
],
)
File renamed without changes.
27 changes: 27 additions & 0 deletions tests/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from logging.config import dictConfig


def configure():
dictConfig({
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': '%(asctime)s %(levelname)-8s %(name)s: %(message)s'
},
},
'handlers': {
'default': {
'level': 'ERROR',
'formatter': 'standard',
'class': 'logging.StreamHandler',
},
},
'loggers': {
'': {
'handlers': ['default'],
'level': 'ERROR',
'propagate': True
},
}
})
73 changes: 73 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from clickhouse_driver import errors

from aioch.client import Client
from tests.testcase import BaseTestCase


class PacketsTestCase(BaseTestCase):
def create_client(self):
return Client(
self.host, self.port, self.database, 'wrong_user', loop=self.loop
)

def test_exception_on_hello_packet(self):
async def run():
with self.assertRaises(errors.ServerException) as e:
await self.client.execute('SHOW TABLES')

# Simple exception formatting checks
exc = e.exception
self.assertIn('Code:', str(exc))
self.assertIn('Stack trace:', str(exc))

self.loop.run_until_complete(run())


class SelectTestCase(BaseTestCase):
def test_simple_select(self):
async def run():
rv = await self.client.execute('SELECT 2')
self.assertEqual(rv, [(2,)])

self.loop.run_until_complete(run())


class ProgressTestCase(BaseTestCase):
def test_select_with_progress(self):
async def run():
progress = await self.client.execute_with_progress('SELECT 2')

progress_rv = []
async for x in progress:
progress_rv.append(x)

self.assertEqual(progress_rv, [(1, 0)])
rv = await progress.get_result()
self.assertEqual(rv, [(2,)])

self.loop.run_until_complete(run())

def test_select_with_progress_error(self):
async def run():
with self.assertRaises(errors.ServerException):
progress = await self.client.execute_with_progress(
'SELECT error'
)
await progress.get_result()

self.loop.run_until_complete(run())

def test_select_with_progress_no_progress_unwind(self):
async def run():
progress = await self.client.execute_with_progress('SELECT 2')
self.assertEqual(await progress.get_result(), [(2,)])

self.loop.run_until_complete(run())

def test_select_with_progress_cancel(self):
async def run():
await self.client.execute_with_progress('SELECT 2')
rv = await self.client.cancel()
self.assertEqual(rv, [(2,)])

self.loop.run_until_complete(run())
Loading

0 comments on commit 6c1c971

Please sign in to comment.