Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to insert CSV data using clickhouse-driver? #68

Closed
hodgesrm opened this issue Jan 4, 2019 · 8 comments
Closed

How to insert CSV data using clickhouse-driver? #68

hodgesrm opened this issue Jan 4, 2019 · 8 comments

Comments

@hodgesrm
Copy link

hodgesrm commented Jan 4, 2019

Is it possible to insert CSV file data using clickhouse-driver? I've been unable to figure out how to do this. A naive approach like the following fails because the Client class mistakes it for a SELECT and hangs waiting for a response from the server:

from clickhouse_driver import Client
client = Client('localhost')
with open('iris.csv', 'r') as data:
    csv = data.read()
sql = "INSERT INTO iris FORMAT CSV \n" + str(csv) + "\n;"
client.execute(sql)

It seems that INSERT needs to include a list or dictionary on the client.execute() call, which is not compatible with file data. It would be nice if clickhouse-driver supported something like the following:

from clickhouse_driver import Client
client = Client('localhost')
with open('iris.csv', 'r') as data:
    client.execute("INSERT INTO iris FORMAT CSV", data)

Properly implemented this would allow Python to stream the CSV contents up to ClickHouse, which would be helpful for loading very large files.

Let me know if you want a pull request on this and I'll try to put one together this month. It does not look too hard to implement.

@xzkostyan
Copy link
Member

Hi.

You can write preprocessor that transforms each row to python dict. See #60

Please note, that data parameter can be generator.

I don't know how exactly clickhouse-client deal with CSV files. But I assume it does the same. CSV file is parsed on the local machine and then send to the remote server in native format.

You can try to make PR but the chain "CSV -> python data types -> native format" will produce the same overhead as CSV preprocessor (csv row -> dict).

@hodgesrm
Copy link
Author

hodgesrm commented Jan 4, 2019

@xzkostyan, thanks for the quick response. It was hard to tell how a generator would behave but I had the same thought in the proposed syntax shown above. I'll give it a try.

@hodgesrm
Copy link
Author

There seem to be a couple of approaches to load CSV. Here's code that works by reading the CSV rows into a list of tuples. CSV goes into memory in this case.

from clickhouse_driver import Client
import csv
client = Client('localhost')
client.execute('DROP TABLE IF EXISTS iris_from_csv')
client.execute('CREATE TABLE iris_from_csv ('
               'sepal_length Decimal32(2), sepal_width Decimal32(2), '
               'petal_length Decimal32(2), petal_width Decimal32(2), '
               'species String) ENGINE = MergeTree '
               ' PARTITION BY species ORDER BY (species)')

with open('iris.csv') as iris_csv:
    rows=[line for line in csv.reader(iris_csv)]

# List of rows is materialized. 
client.execute("INSERT INTO iris_from_csv VALUES", rows)
print(client.execute('SELECT COUNT(*), species FROM iris_from_csv GROUP BY species ORDER BY species'))

Here's how to use a list comprehension from a generator expression. You can't really tell the difference for small files but this should stream the file which would be more efficient for large uploads.

from clickhouse_driver import Client
import csv
client = Client('localhost')
client.execute('DROP TABLE IF EXISTS iris_from_csv')
client.execute('CREATE TABLE iris_from_csv ('
               'sepal_length Decimal32(2), sepal_width Decimal32(2), '
               'petal_length Decimal32(2), petal_width Decimal32(2), '
               'species String) ENGINE = MergeTree '
               ' PARTITION BY species ORDER BY (species)')

def row_reader():
    with open('iris.csv') as iris_csv:
        for line in csv.reader(iris_csv):
            yield line 

# List comprehension creates generator expression. 
client.execute("INSERT INTO iris_from_csv VALUES", 
               (line for line in row_reader()))
print(client.execute('SELECT COUNT(*), species FROM iris_from_csv GROUP BY species ORDER BY species'))

Both examples use with...open...as to ensure the file descriptor is freed in a timely manner. They still feel a bit awkward.

@xzkostyan
Copy link
Member

Hi!

Sorry for late response.

Yep, that's it. This is how simple CSV import wrapper will look like. But also you need to cast each value to python's data type.

So minimal working example will look like:

import csv
from datetime import datetime

from clickhouse_driver import Client

client = Client('localhost')

client.execute('DROP TABLE IF EXISTS test')
client.execute(
    'CREATE TABLE test ('
    'd Date, a Int32, b Float32, c String'
    ') ENGINE = Log'
)

schema = {
    'd': lambda x: datetime.strptime(x, '%Y-%m-%d').date(),
    'a': int,
    'b': float
}
bypass = lambda x: x

with open('/tmp/test.csv') as f:
    gen = ({k: schema.get(k, bypass)(v) for k, v in row.items()} for row in csv.DictReader(f))
    client.execute('INSERT INTO test VALUES', gen)

print(client.execute('SELECT * FROM test'))

Contents of '/tmp/test.csv':

d,a,b,c
2019-01-01,10,10.123,first
2019-01-02,20,20.567,second
2019-01-03,30,30.891,third

Do you want this code become a part of this package?

schema = {
    'd': lambda x: datetime.strptime(x, '%Y-%m-%d').date(),
    'a': int,
    'b': float
}
bypass = lambda x: x

with open('/tmp/test.csv') as f:
    gen = ({k: schema.get(k, bypass)(v) for k, v in row.items()} for row in csv.DictReader(f))

Well, schema can be generated automatically by examining DESCRIBE TABLE test query and mapping each type to python's lambda.

You can make PR and place this logic in clickhouse_driver.util.%module%.py.

After hiding the schema CSV import will be look like:

from clickhouse_driver import Client

client = Client('localhost')

client.execute('DROP TABLE IF EXISTS test')
client.execute(
    'CREATE TABLE test ('
    'd Date, a Int32, b Float32, c String'
    ') ENGINE = Log'
)

clickhouse_driver.util.insert_csv(client, 'test', '/tmp/test.csv')

print(client.execute('SELECT * FROM test'))

But really bulk CSV import will still be still slow due to this row-like processing.

@hodgesrm
Copy link
Author

Thanks, that's good to know I was on the right track with the generator expression. I'm not very familiar with the csv module, which seems handy.

One further question: does the ClickHouse TCP/IP wire protocol require that you send up tuples? In theHTTP wire protocol I can do something like this, which does not require parsing of CSV on the client.

$ cat data.csv | curl 'http://localhost:8123/?query=INSERT%20INTO%20iris%20FORMAT%20CSV'  --data-binary @-

In addition to pipelining the uploaded data it removes the problem of parsing data entirely from the client and lets ClickHouse sort things out on the server side where the metadata already live. I'm guessing this is supported in the TCP/IP protocol because you can do the following using clickhouse-client:

cat data.csv | clickhouse-client --database=default \
 --query='INSERT INTO iris FORMAT CSV';

It seems to me it would make sense to add this capability to the driver. If users need to transform CSV into a format that CH can parse that's a separate step outside the driver.

@xzkostyan
Copy link
Member

One further question: does the ClickHouse TCP/IP wire protocol require that you send up tuples? In theHTTP wire protocol I can do something like this, which does not require parsing of CSV on the client.

Short answer is yes. Data in native protocol is sequence of blocks. Each block contains N rows of data represented in columnar form and in binary format.

clickhouse-client and clickhouse-server is the same binary:

$ ls -lh /usr/bin/clickhouse*
-rwxr-xr-x 1 root root 236M Dec 20 19:47 /usr/bin/clickhouse
lrwxrwxrwx 1 root root   10 Dec 20 19:47 /usr/bin/clickhouse-benchmark -> clickhouse
lrwxrwxrwx 1 root root   10 Dec 20 19:47 /usr/bin/clickhouse-clang -> clickhouse
lrwxrwxrwx 1 root root   10 Dec 20 19:47 /usr/bin/clickhouse-client -> clickhouse
lrwxrwxrwx 1 root root   10 Dec 20 19:47 /usr/bin/clickhouse-compressor -> clickhouse
lrwxrwxrwx 1 root root   10 Dec 20 19:47 /usr/bin/clickhouse-copier -> clickhouse
lrwxrwxrwx 1 root root   10 Dec 20 19:47 /usr/bin/clickhouse-extract-from-config -> clickhouse
lrwxrwxrwx 1 root root   10 Dec 20 19:47 /usr/bin/clickhouse-format -> clickhouse
lrwxrwxrwx 1 root root   10 Dec 20 19:47 /usr/bin/clickhouse-lld -> clickhouse
lrwxrwxrwx 1 root root   10 Dec 20 19:47 /usr/bin/clickhouse-local -> clickhouse
lrwxrwxrwx 1 root root   10 Dec 20 19:47 /usr/bin/clickhouse-odbc-bridge -> clickhouse
-rwxr-xr-x 1 root root 2.0K Dec 20 19:47 /usr/bin/clickhouse-report
lrwxrwxrwx 1 root root   10 Dec 20 19:47 /usr/bin/clickhouse-server -> clickhouse

Client and server share the same codebase and both know how to parse CSV. CSV parsing is performed on client's side before sending to server in native format.

This can be seen in wireshark for example: ad

@hodgesrm
Copy link
Author

Thank you @xzkostyan. That's a great response. At this point I need to play around with the Python driver a little more before proposing features to handle CSV automatically. Since parsing requires application schema knowledge to work correctly it seems at first glance better to let users code that themselves since many of them will be able to do it efficiently without extra help.

I would like submit a PR to add a CSV example to docs/quickstart.rst at which point this issue can be closed. I'll get it over to you within the next week or so. (p.s., Nice Sphinx docs.)

@xzkostyan
Copy link
Member

Ask if you have any questions. I'd let user to define schema explicitly.

If no schema is passed it will be automatically generated by analyzing DESCRIBE TABLE test query response. In this response column has ClickHouse type which can be matched with python's type:

:) describe table test;

DESCRIBE TABLE test

┌─name─┬─type────┬─default_type─┬─default_expression─┬─comment_expression─┐
│ d    │ Date    │              │                    │                    │
│ a    │ Int32   │              │                    │                    │
│ b    │ Float32 │              │                    │                    │
│ c    │ String  │              │                    │                    │
└──────┴─────────┴──────────────┴────────────────────┴────────────────────┘

4 rows in set. Elapsed: 0.013 sec. 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants