## Создание базы данных в ClickHouse и загрузка данных

In [30]:
import numpy as np
import pandas as pd
import math
from clickhouse_driver.client import Client

In [23]:
# Подключение к серверу
# У нас на Амазоне развернут кластер из двух серверов
client = Client('3.23.248.232', user='default', password='qwerty') # наш первый сервер
#client = Client('3.23.221.144', user='default', password='qwerty') # наш второй сервер

## Создание базы данных и структуры таблиц

Наша база данных имеет название `realty_db` содержит три основные таблицы:
* realty_db.infr - с информацией об объектах инфраструктуры
* realty_db.offers - с данными о предложениях по продаже квартир в новостройках
* realty_db.complexes_info - с информацией о новых жилых комплексах

In [16]:
# Создание базы данных realty_db
client.execute("CREATE DATABASE realty_db")

[]

In [17]:
# Создание таблицы, в которой будет храниться информация об инфраструктуре 
# Запустить необходимо на всех серверах в кластере (у нас их 2)
client.execute('''
    CREATE TABLE realty_db.infr
    (
      com_id Int64,
      longitude Float32,
      latitude Float32,
      name String,
      address String,
      main_class String,
      main_category String,
      our_category String,
      extra_categories Array(String)
    ) 
    ENGINE = ReplacingMergeTree()
    PARTITION BY main_category
    ORDER BY com_id
    SETTINGS index_granularity=256
    ''')

[]

In [25]:
# Делаем таблицу инфраструктуры распределенной
client.execute('''
    CREATE TABLE realty_db.distr_infr as realty_db.infr
    ENGINE = Distributed(bd_real_estate_cluster, realty_db, infr, xxHash64(com_id))
    ''')

[]

In [19]:
# Создание таблицы, в которой будет храниться информация о жилых комплексах
# Запустить необходимо на всех серверах в кластере (у нас их 2)
client.execute('''
    CREATE TABLE realty_db.complexes_info
    (
    complex_id Int64,
    builder_id Int64,
    longitude Float32,
    latitude Float32,
    complex_name String,
    name_yandex String
    ) 
    ENGINE = ReplacingMergeTree()
    PARTITION BY builder_id
    ORDER BY complex_id
    SETTINGS index_granularity=16
    ''')

[]

In [26]:
# Делаем таблицу с информацией о комплексах распределенной
client.execute('''
    CREATE TABLE realty_db.distr_complexes_info as realty_db.complexes_info
    ENGINE = Distributed(bd_real_estate_cluster, realty_db, complexes_info, xxHash64(complex_id))
    ''')

[]

In [21]:
# Создание таблицы, в которой будет храниться информация о предложениях по продаже квартир
# Запустить необходимо на всех серверах в кластере (у нас их 2)
client.execute('''
    CREATE TABLE realty_db.offers
    (
    offer_id Int64,
    complex_id Int64,
    builder_id Int64,
    rooms_cnt Int32,
    square Float32,
    floor Int32,
    total_floors Int32,
    total_price Int64,
    price_for_meter Int64,
    nearest_subway Nullable(String),
    minutes_to_subway Int64,
    is_walk Int32
    ) 
    ENGINE = ReplacingMergeTree()
    PARTITION BY complex_id
    ORDER BY offer_id
    SETTINGS index_granularity=256
    ''')

[]

In [27]:
# Делаем таблицу с информацией о предложениях распределенной
client.execute('''
    CREATE TABLE realty_db.distr_offers as realty_db.offers
    ENGINE = Distributed(bd_real_estate_cluster, realty_db, offers, xxHash64(offer_id))
    ''')

[]

In [28]:
client.execute('SHOW tables from realty_db')

[('complexes_info',),
 ('distr_complexes_info',),
 ('distr_infr',),
 ('distr_offers',),
 ('infr',),
 ('offers',)]

## Загрузка данных в таблицы

In [34]:
# Подготовка данных об объектах инфраструктуры, полученных в результате парсинга, к загрузке в ClickHouse
infr_for_loading = pd.read_csv('./data_gathering/infrastructure_processed.csv')
infr_for_loading = infr_for_loading.astype({"com_id": np.int64})

infr_for_loading = infr_for_loading.fillna('')
rows = infr_for_loading.values.tolist()

for i in range(len(rows)):
    for j in range(len(rows[i])):
        if j == len(rows[i]) - 1:
            if rows[i][j] == '':
                rows[i][j] = []
            else:
                rows[i][j] = rows[i][j].split('[\'')[1].split('\']')[0].replace("'", "").split(',')

In [35]:
# Загрузка данных об инфраструктуре в таблицу realty_db.distr_infr
client.execute("INSERT INTO realty_db.distr_infr VALUES", rows)

21069

In [41]:
# Подготовка данных о предложениях по продаже квартир, полученных в результате парсинга, к загрузке в ClickHouse
df_offers = pd.read_csv('./data_gathering/all_offers_processed.csv', sep=';', encoding='utf-8')
df_offers = df_offers.astype({"rooms_cnt": int, "price_for_meter": int})

offers_matrix = df_offers.values

In [42]:
# Загрузка данных о предложениях в таблицу realty_db.distr_offers
# Загрузка данных производится порциями, чтобы в одной порции было не более 100 разных значений ключа партиционирования
# Размер одной порции - 3000 строк
for i in range(math.ceil(len(offers_matrix) / 3000)):
    start_ptr = i * 3000
    if (i + 1) * 3000 < len(offers_matrix):
        end_ptr = (i + 1) * 3000
    else:
        end_ptr = len(offers_matrix)
        
    offers_matrix_portion = offers_matrix[start_ptr:end_ptr]
    
    client.execute('INSERT INTO realty_db.distr_offers VALUES', 
               (list(row) for row in offers_matrix_portion), 
               types_check=True)

In [43]:
# Подготовка данных о жилых комплексах, полученных в результате парсинга, к загрузке в ClickHouse
df_complex_info = pd.read_csv('./data_gathering/complexes_info.csv', sep=',', encoding='utf-8')
df_complex_info = df_complex_info[['complex_id', 'builder_id', 'longitude', 'latitude', 'complex_name', 'name_yandex']]

complex_info_matrix = df_complex_info.values

In [44]:
# Загрузка данных о новых жилых комплексах в таблицу realty_db.distr_complexes_info
# Загрузка данных производится порциями, чтобы в одной порции было не более 100 разных значений ключа партиционирования
# Размер одной порции - 200 строк
for i in range(math.ceil(len(complex_info_matrix) / 200)):
    start_ptr = i * 200
    if (i + 1) * 200 < len(complex_info_matrix):
        end_ptr = (i + 1) * 200
    else:
        end_ptr = len(complex_info_matrix)

    complex_info_matrix_portion = complex_info_matrix[start_ptr:end_ptr]
    
    client.execute('INSERT INTO realty_db.distr_complexes_info VALUES', 
                   (list(row) for row in complex_info_matrix_portion), 
                   types_check=True)

In [16]:
# Разрываем соединение с сервером
client.disconnect()