In [None]:
!pip install numpy psycopg2 sqlalchemy clickhouse-sqlalchemy

In [1]:
import sqlalchemy
import datetime


postgres_engine = sqlalchemy.create_engine('postgresql+psycopg2://postgres:5555@db.mpkazantsev.ru/postgres')
clickhouse_engine = sqlalchemy.create_engine('clickhouse://stager:stager@db.mpkazantsev.ru/maindb')


In [2]:
TYPE_MAPPER = {
    'boolean': 'UInt8',
    'smallint': 'Int16',
    'integer': 'Int32',
    'bigint': 'Int64',
    'real': 'Float32',
    'double precision': 'Float64',
    'numeric': 'Decimal(38, 18)',
    'character varying': 'String',
    'text': 'Text',
    'date': 'Date',
    'timestamp without time zone': 'DateTime',
    'bytea': 'String',
    'json': 'String',
    'jsonb': 'String',
    'uuid': 'String',
    'inet': 'String',
}


def map_pg_to_ch_type(pg_data_type):
    return TYPE_MAPPER.get(pg_data_type, 'String')


def create_table_in_clickhouse(source_table_name, dest_table_name,
                               postgres_engine, clickhouse_engine,
                               order_columns_names):
     query_result = postgres_engine.execute("SELECT column_name, data_type "
                                            "FROM information_schema.columns "
                                            f"WHERE table_name = '{source_table_name}'")
     pg_schema = query_result.fetchall()
     string_for_columns_name_and_types = ','.join([f'{"`" + col_name + "`"} {map_pg_to_ch_type(pg_data_type)}' for col_name, pg_data_type in pg_schema])
     string_for_order_columns = ','.join(map(lambda x: '`' + x + '`', order_columns_names))
     create_table_query =  f"CREATE TABLE {dest_table_name} ({string_for_columns_name_and_types})"\
                           f"ENGINE = MergeTree() ORDER BY ({string_for_order_columns})"
     clickhouse_engine.execute(create_table_query)


In [3]:
def get_correct_value(value):
    if type(value) in [int, float]:
        return str(value)
    elif value == '':
        return 'Null'
    else:
        return "'" + (str(value)).replace("'", r"\'") + "'"


def get_string_for_column_names(source_table_name, postgres_engine):
    query_result = postgres_engine.execute(f"SELECT * FROM {source_table_name} WHERE 1<>1")
    return ','.join(["`" + desc.name + "`" for desc in query_result.cursor.description])


def insert_increment_from_postgres_into_clickhouse(source_table_name, dest_table_name,
                                                   postgres_engine, clickhouse_engine,
                                                   start_value_of_increment, end_value_of_increment,
                                                   increment_column_name, string_for_column_names):
    query_result = postgres_engine.execute(f"SELECT * FROM {source_table_name} "\
                                           f"WHERE \"{increment_column_name}\" >= '{start_value_of_increment}' "
                                           f"AND \"{increment_column_name}\" <= '{end_value_of_increment}'")
    pg_data = query_result.fetchall()
    string_for_rows = ','.join(['(' + ','.join(map(get_correct_value, row)) + ')' for row in pg_data])
    insert_query = f"INSERT INTO {dest_table_name} ({string_for_column_names}) VALUES {string_for_rows}"
    clickhouse_engine.execute(insert_query)


In [4]:
def insert_from_postgres_into_clickhouse(source_table_name, dest_table_name,
                                         postgres_engine, clickhouse_engine,
                                         start_value, end_value,
                                         increment_column_name, increment_size, data_unit):
    string_for_column_names = get_string_for_column_names(source_table_name, postgres_engine)
    current_value = start_value
    while current_value <= end_value:
        insert_increment_from_postgres_into_clickhouse(source_table_name, dest_table_name,
                                                       postgres_engine, clickhouse_engine,
                                                       str(current_value), str(current_value + increment_size - data_unit),
                                                       increment_column_name, string_for_column_names)
        current_value += increment_size


In [6]:
create_table_in_clickhouse('ontime', 'adm_ontime',
                           postgres_engine, clickhouse_engine,
                           ['Year', 'Month', 'DayofMonth'])

insert_from_postgres_into_clickhouse('ontime', 'adm_ontime',
                                     postgres_engine, clickhouse_engine,
                                     datetime.date(2017, 5, 1), datetime.date(2017, 6, 1),
                                     'FlightDate', datetime.timedelta(days=1), datetime.timedelta(days=1))
