In [None]:
import psycopg2
import pandas as pd
import clickhouse_connect
import logging
import time
import datetime

# Logging conf

In [None]:
logging.basicConfig(level=logging.INFO, filename="batch_logs.log",filemode="w",
                    format="%(asctime)s %(levelname)s %(message)s")

# Connections

In [None]:
conn = psycopg2.connect(dbname="postgres", 
                        user="postgres", 
                        password="5555", 
                        host="db.mpkazantsev.ru",
                        port="5432")

In [None]:
client = clickhouse_connect.get_client(host='db.mpkazantsev.ru',
                                       port=8123,
                                       username='sergey',
                                       password='sergey')

# Clear sl_from_pg table

In [None]:
def clear_table(ch_client, table):
    ch_client.command('ALTER TABLE maindb.{} DELETE WHERE 1=1'.format(table))
    logging.info("Clear table {}".format(table))

In [None]:
def drop_table(ch_client, table):
    ch_client.command('DROP TABLE maindb.{}'.format(table))
    logging.info("Drop table {}".format(table))

In [None]:
#drop_table(client, 'sl_from_pg')

In [None]:
#clear_table(client, 'sl_from_pg')

# Get PG types

In [None]:
types = pd.read_sql("""SELECT column_name, data_type 
                       FROM information_schema.columns
                       WHERE table_name = 'ontime'
                    """, 'postgresql://postgres:5555@db.mpkazantsev.ru:5432/postgres')
types.data_type.unique()
logging.info("PG types ready")

# Map func to map types pg->ch

In [None]:
mapping_dict = {'bigint': 'Int64',
                'date': 'Date',
                'text': 'String'}

In [None]:
def mapping_types(pg_type):
    return mapping_dict[pg_type]

In [None]:
types['clickhouse_type'] = types['data_type'].apply(mapping_types)

In [None]:
types = types[['column_name', 'clickhouse_type']]
logging.info("CH types ready")

# Create table query for sl_from_pg table

In [None]:
query = "CREATE TABLE maindb.sl_from_pg ( `index` Int64, "
for _, row in types.iloc[1:].iterrows():
    query += "{} {} NULL, ".format(row[0], row[1])
query = query[: -2]
query += ') ENGINE = MergeTree ORDER BY tuple()'

In [None]:
client.command(query)
logging.info("Table created")

# Migration (batch) from PG to CH (data for 1 month)

In [None]:
def correct_values(values):
    formatted_results = []
    for row in values:
        formatted_row = []
        for value in row:
            if isinstance(value, datetime.date):
                formatted_row.append(value.strftime('%Y-%m-%d'))
            elif value == '':
                formatted_row.append('NULL')
            else:
                if type(value) == str and value.find("'") != -1:
                        formatted_row.append(value.replace("'", " "))
                else:
                    formatted_row.append(value)
        formatted_results.append(tuple(formatted_row))
    return formatted_results

In [None]:
cursor = conn.cursor()

In [None]:
def load_click(dfrom, dto):
    cursor.execute("""SELECT *
                  FROM public.ontime 
                  WHERE ontime."FlightDate" BETWEEN '{}'::date AND '{}'::date""".format(dfrom, dto))
    values = cursor.fetchall()
    values = correct_values(values)
    query = 'INSERT INTO maindb.sl_from_pg VALUES ' + str(values)[1:-1]
    client.command(query)

## To do
### function wrap

In [None]:
start = time.time()
logging.info("Start migration")
load_click('2017-05-01', '2017-05-04')
logging.info("01-04 inserted")
load_click('2017-05-05', '2017-05-09')
logging.info("05-09 inserted")
load_click('2017-05-10', '2017-05-14')
logging.info("10-14 inserted")
load_click('2017-05-15', '2017-05-19')
logging.info("15-19 inserted")
load_click('2017-05-20', '2017-05-24')
logging.info("20-24 inserted")
load_click('2017-05-25', '2017-05-29')
logging.info("25-29 inserted")
load_click('2017-05-30', '2017-06-01')
logging.info("30-01 inserted")
logging.info("End migration, time = {}".format(start - time.time()))

In [None]:
client.close()
conn.close()