##### Вспомогательные функции для обработки строк

In [1]:
#helper functions

def get_type_of_string_value(str_value, is_nullable):
    if str_value == '':
        return None
    try:
        int(str_value)
        return 'Nullable(Int)' if is_nullable else 'Int'
    except ValueError:
        pass
    try:
        float(str_value)
        return 'Nullable(Float)' if is_nullable else 'Int'
    except ValueError:
        pass
    return 'Nullable(String)' if is_nullable else 'Int'


def get_value_or_null(value, expected_type):
    if value == '':
        return 'Null'
    elif expected_type in (int, float):
        try:
            expected_type(value)
            return str(value)
        except ValueError:
            return 'Null'
    elif expected_type == str:
        return "'" + value + "'"


def remove_first_escape(str_value):
    if str_value.startswith('\n'):
        return str_value[1:]
    else:
        return str_value

    
def convert_list_to_dict(lst):
    res_dct = {lst[i]: lst[i + 1] for i in range(0, len(lst), 2)}
    return res_dct


##### Ручные корретировки на данном датасете (все строки с колонкой Seller-Type = 'Individual' внесены некорректно)

In [19]:
def apply_manual_adj_with_honda_dataset(row: dict):
    if row.get('Seller_Type', None) == 'Individual':
        row['Mileage'] = int(row['VIN'].split(' ')[0].replace(',', ''))
        row['VIN'] = row['Engine']
        row['Engine'] = row['Transmission']
        row['Transmission'] = row['Fuel_Type']
        row['Stock_#'] = ''


##### Декоратор, замеряющий время работы

In [61]:
from functools import wraps
import time


def timeit(func, args_to_print: list):
    @wraps(func)
    def timeit_wrapper(*args, **kwargs):
        start_time = time.perf_counter()
        result = func(*args, **kwargs)
        end_time = time.perf_counter()
        total_time = end_time - start_time
        args_to_print_with_values = {k: v for k, v in kwargs.items() if k in args_to_print}
        print('-' * 25)
        print(f'Function {func.__name__} called with {args_to_print_with_values} took {total_time:.4f} seconds')
        print('-' * 25)
        return result
    return timeit_wrapper


##### Функция *__get_column_types_from_csv__* принимает путь до csv файла и список колонок сортировки и возвращает словарь типа {имя колонки: тип колонки, ...}, основанный на csv файле (например может вернуть словарь {'Year': 'Int', 'Make': 'Nullable(String)', ... })

In [49]:
import clickhouse_connect
import csv


def get_column_types_from_csv(path_to_csv_file: str,
                              order_column_names: list) -> dict:
    dict_of_column_types = dict()
    with open(path_to_csv_file, 'r') as file:
        for row in csv.DictReader(file):
            dict_of_column_types.update({column_name: get_type_of_string_value(value, is_nullable=(column_name not in order_column_names))
                                            for column_name, value in row.items()
                                            if column_name not in dict_of_column_types or value is not None})
            if None not in dict_of_column_types.values():
                break
    return dict_of_column_types


##### Функуция *__get_create_query_based_on_csv_file__* возвращает  sql-запрос в виде строки на создание таблицы. Принимает в качестве параметров путь до csv файла, имя таблицы, движок таблицы, список колонок сортировки, список колонок партиционирования.

In [50]:
def get_create_query_based_on_csv_file(path_to_csv_file: str,
                                       table_name: str,
                                       table_engine='MergeTree',
                                       order_column_names=[],
                                       partition_column_names=[]) -> str:
    dict_of_column_types = get_column_types_from_csv(path_to_csv_file, order_column_names)
    string_of_column_names_and_types = ", ".join(['`' + column_name + '` ' + column_type 
                                                  for column_name, column_type in dict_of_column_types.items()])
    string_of_order_by_part = '' if len(order_column_names) == 0 else 'ORDER BY ({})'.format(', '.join(order_column_names))
    string_of_partition_by_part = '' if len(partition_column_names) == 0 else 'PARTITION BY ({})'.format(', '.join(partition_column_names))
    return ('''CREATE TABLE IF NOT EXISTS {} ({}) ENGINE = {} {} {};'''
        .format(table_name,
                string_of_column_names_and_types,
                table_engine,
                string_of_partition_by_part,
                string_of_order_by_part))


##### Функция *__get_delete_query__* возвращает  sql-запрос в виде строки на удаление таблицы. В качестве параметра принимает имя таблицы.

In [51]:
def get_delete_query(table_name: str) -> str:
    return "DROP TABLE IF EXISTS {};".format(table_name)


##### Функция *__get_column_types_from_table__* возвращает словарь типа {имя колонки: тип колонки, ...}, основанный на таблице в ClickHouse (например {'Year': <class 'int'>, 'Make': <class 'str'>, 'Comfort_Rating': <class 'float'>, ...}). В качестве параметра принимает http клиент ClickHouse и имя таблицы.

In [52]:
type_mapper = {
               'Int32': int,
               'Float32': float,
               'String': str,
               'Nullable(Int32)': int,
               'Nullable(Float32)': float,
               'Nullable(String)': str
              }


def get_column_types_from_table(client: clickhouse_connect.driver.httpclient.HttpClient,
                                table_name: str) -> dict:
    query_result = client.command('DESCRIBE TABLE {};'.format(table_name))
    query_result_adapted = map(remove_first_escape, (filter(lambda x : x != '', query_result)))
    list_of_column_names_and_types = list(map(lambda x: type_mapper.get(x, x), query_result_adapted))
    return convert_list_to_dict(list_of_column_names_and_types)


##### Интерфейс *__InserterInterface__* содержит метод *__insert__*, в котором должна быть определена логика заполнения таблицы. Метод *__insert__* принмает в качестве параметров 1) http клиент ClickHouse, 2) путь до csv файла, 3) имя таблицы, в которую осуществляется вставка записей, 4) размер инкрементов, которыми осуществляется вставка записей, 5) типы колонок таблицы, 6) функцию, осущ. ручную корректировку каждой записи.

In [53]:
from string import Template
from overrides import override
from abc import ABC, abstractmethod
from typing import Callable


class InserterInterface(ABC):
    insert_query_template = Template('INSERT INTO $table_name (*) VALUES ')
    
    @staticmethod
    @abstractmethod
    def insert(client: clickhouse_connect.driver.httpclient.HttpClient,
               path_to_csv_file: str,
               table_name: str,
               batch_size: int,
               column_types: dict,
               apply_manual_adj: Callable):
        pass


##### Два класса, имплементирующие интерфейс *__InserterInterface__*. Класс *__PandasInserter__* осущ. вставку с помощью pandas, класс *__SimpleInserter__*  осущ. вставку, не использую pandas.

In [54]:
import pandas as pd


class PandasInserter(InserterInterface):
    @override
    def insert(client, path_to_csv_file, table_name, batch_size, column_types, apply_manual_adj=lambda: None):
        insert_query = InserterInterface.insert_query_template.substitute(table_name=table_name)
        df_iterator = pd.read_csv(path_to_csv_file, chunksize=batch_size, na_filter=False)
        for df in df_iterator:
            row_list = []
            for row_index in range(df.shape[0]):
                row = df.iloc[row_index].to_dict()
                apply_manual_adj(row)
                row_values = ", ".join([get_value_or_null(str(column_value), column_types[column_name]) for column_name, column_value in row.items()])
                row_list.append('(' + row_values + ')')
            client.command(insert_query + ','.join(row_list))


class SimpleInserter(InserterInterface):
    @override
    def insert(client, path_to_csv_file, table_name, batch_size, column_types, apply_manual_adj=lambda: None):
        insert_query = InserterInterface.insert_query_template.substitute(table_name=table_name)
        with open(path_to_csv_file, 'r') as file:
            row_list = []
            for i, row in enumerate(csv.DictReader(file)):
                apply_manual_adj(row) if apply_manual_adj is not None else None
                row_values = ", ".join([get_value_or_null(column_value, column_types[column_name])
                                        for column_name, column_value in row.items()])
                row_list.append('(' + row_values + ')')
                if (len(row_list) == batch_size):
                    client.command(insert_query + ','.join(row_list))
                    row_list = []
            else:
                if (len(row_list) != 0):
                    client.command(insert_query + ','.join(row_list))


##### Функция *__do_insert__* создана для удобства вызова метода *__insert__*. Также в этой функции вешаем на метод insert декоратор, замеряющий время выполнения вставки в таблицу.

In [58]:
def do_insert_timed(client: clickhouse_connect.driver.httpclient.HttpClient,
              path_to_csv_file: str,
              table_name: str,
              batch_size: int,
              inserter: InserterInterface,
              apply_manual_adj=lambda: None):
    column_types = get_column_types_from_table(client, table_name)
    insert_timed = timeit(inserter.insert, ['batch_size'])
    insert_timed(client=client,
                 path_to_csv_file=path_to_csv_file,
                 table_name=table_name,
                 batch_size=batch_size,
                 column_types=column_types,
                 apply_manual_adj=apply_manual_adj)


##### Обернём процесс создания, заполнения и удаления таблицы в функцию для удобства.

In [59]:
def do_create_insert_delete_operation(client: clickhouse_connect.driver.httpclient.HttpClient,
                                      path_to_csv_file: str,
                                      table_name: str,
                                      table_engine: str,
                                      order_column_names: list,
                                      partition_column_names: list,
                                      batch_size: int,
                                      inserter: InserterInterface,
                                      apply_manual_adj=lambda: None):
    client.command(get_create_query_based_on_csv_file(path_to_csv_file, table_name, table_engine, order_column_names, partition_column_names))
    do_insert_timed(client, path_to_csv_file, table_name, batch_size, inserter, apply_manual_adj)
    client.command(get_delete_query(table_name))


#### __Смотрим движок MergeTree без pandas__

In [62]:
client = clickhouse_connect.get_client(host='db.mpkazantsev.ru',
                                       port=8123, 
                                       database='datasets')

parametrs = {
              'client': client,
              'path_to_csv_file': '/home/alexander/honda_sell_data.csv',
              'table_name': 'maindb.ad_honda_sell_data_MergeTree',
              'table_engine': 'MergeTree',
              'order_column_names': ['Year'],
              'partition_column_names': [],
              'inserter': SimpleInserter,
              'apply_manual_adj': apply_manual_adj_with_honda_dataset,
             }

do_create_insert_delete_operation(**parametrs, batch_size=1)

do_create_insert_delete_operation(**parametrs, batch_size=1000)

do_create_insert_delete_operation(**parametrs, batch_size=2500)

do_create_insert_delete_operation(**parametrs, batch_size=5000)

print('Script completed')


-------------------------
Function insert called with {'batch_size': 1} took 18.7040 seconds
-------------------------
-------------------------
Function insert called with {'batch_size': 1000} took 0.1575 seconds
-------------------------
-------------------------
Function insert called with {'batch_size': 2500} took 0.1509 seconds
-------------------------
-------------------------
Function insert called with {'batch_size': 5000} took 0.1450 seconds
-------------------------
Script completed


#### __Смотрим движок TinyLog без pandas__

In [63]:
client = clickhouse_connect.get_client(host='db.mpkazantsev.ru',
                                       port=8123, 
                                       database='datasets')

parametrs = {
              'client': client,
              'path_to_csv_file': '/home/alexander/honda_sell_data.csv',
              'table_name': 'maindb.ad_honda_sell_data_MergeTree',
              'table_engine': 'TinyLog',
              'order_column_names': [],
              'partition_column_names': [],
              'inserter': SimpleInserter,
              'apply_manual_adj': apply_manual_adj_with_honda_dataset,
             }

do_create_insert_delete_operation(**parametrs, batch_size=1)

do_create_insert_delete_operation(**parametrs, batch_size=1000)

do_create_insert_delete_operation(**parametrs, batch_size=2500)

do_create_insert_delete_operation(**parametrs, batch_size=5000)

print('Script completed')


-------------------------
Function insert called with {'batch_size': 1} took 41.3030 seconds
-------------------------
-------------------------
Function insert called with {'batch_size': 1000} took 0.2074 seconds
-------------------------
-------------------------
Function insert called with {'batch_size': 2500} took 0.1549 seconds
-------------------------
-------------------------
Function insert called with {'batch_size': 5000} took 0.1497 seconds
-------------------------
Script completed


#### __Смотрим движок MergeTree c pandas__

In [64]:
client = clickhouse_connect.get_client(host='db.mpkazantsev.ru',
                                       port=8123, 
                                       database='datasets')

parametrs = {
              'client': client,
              'path_to_csv_file': '/home/alexander/honda_sell_data.csv',
              'table_name': 'maindb.ad_honda_sell_data_MergeTree',
              'table_engine': 'MergeTree',
              'order_column_names': ['Year'],
              'partition_column_names': [],
              'inserter': PandasInserter,
              'apply_manual_adj': apply_manual_adj_with_honda_dataset,
             }

do_create_insert_delete_operation(**parametrs, batch_size=1)

do_create_insert_delete_operation(**parametrs, batch_size=1000)

do_create_insert_delete_operation(**parametrs, batch_size=2500)

do_create_insert_delete_operation(**parametrs, batch_size=5000)


print('Script completed')


-------------------------
Function insert called with {'batch_size': 1} took 32.5142 seconds
-------------------------
-------------------------
Function insert called with {'batch_size': 1000} took 1.0786 seconds
-------------------------
-------------------------
Function insert called with {'batch_size': 2500} took 1.0926 seconds
-------------------------
-------------------------
Function insert called with {'batch_size': 5000} took 1.1266 seconds
-------------------------
Script completed


#### __Смотрим движок TinyLog c pandas__

In [65]:
client = clickhouse_connect.get_client(host='db.mpkazantsev.ru',
                                       port=8123, 
                                       database='datasets')

parametrs = {
              'client': client,
              'path_to_csv_file': '/home/alexander/honda_sell_data.csv',
              'table_name': 'maindb.ad_honda_sell_data_TinyLog',
              'table_engine': 'TinyLog',
              'order_column_names': [],
              'partition_column_names': [],
              'inserter': PandasInserter,
              'apply_manual_adj': apply_manual_adj_with_honda_dataset,
             }

do_create_insert_delete_operation(**parametrs, batch_size=1)

do_create_insert_delete_operation(**parametrs, batch_size=1000)

do_create_insert_delete_operation(**parametrs, batch_size=2500)

do_create_insert_delete_operation(**parametrs, batch_size=5000)

print('Script completed')


-------------------------
Function insert called with {'batch_size': 1} took 53.0412 seconds
-------------------------
-------------------------
Function insert called with {'batch_size': 1000} took 1.1202 seconds
-------------------------
-------------------------
Function insert called with {'batch_size': 2500} took 1.0373 seconds
-------------------------
-------------------------
Function insert called with {'batch_size': 5000} took 1.0645 seconds
-------------------------
Script completed
