# исследование произвеодительности хранилищ данных

### 1. библиотеки

In [1]:
import pandas as pd
import numpy as np
from uuid import UUID, uuid4
from datetime import datetime, timedelta
import json
from clickhouse_driver import Client, connect
import psycopg2
import pymongo
from tqdm import tqdm_notebook, tqdm
import warnings
warnings.filterwarnings('ignore')
import seaborn as sns
import matplotlib.pyplot as plt
import time
from functools import wraps
from pydantic import BaseConfig
import os
from multiprocessing import Pool
import psycopg2
from uuid import uuid4

In [2]:
SEED=13
TOTAL_RECORDS= 100_000
CHUNK_SIZE=1_000
N_USERS=1_000
ACTIONS = [
    'start_watch',
    'stop_watch',
    'continue_watch',
    'like',
    'dislike',
    'comment',
    'add_to_favorite',
    'delete_from_favorite',  
    'user_login',
    'user_logout'
]

In [3]:
from dataclasses import dataclass

@dataclass
class Input:
    id: int
    user_id: int
    timestamp: int
    action: str

In [4]:
from abc import ABC, abstractmethod

class DatabaseBenchmarkRunnerBase(ABC):
 
    @abstractmethod
    def run_write(self):
        pass
    
    @abstractmethod
    def run_read(self, chunk_size):
        pass
    

    
class VerticaBenchmarkRunner(DatabaseBenchmarkRunnerBase): 
    
    def __init__(self, db_uri)  :
        pass
    
    def run_write(self, data):
        duration_seconds = np.random.uniform(.001, 1)
        return duration_seconds
    
    def run_read(self, chunk_size):
        duration_seconds = np.random.uniform(.001, 1)
        return duration_seconds
    
class PostgresBenchmarkRunner(DatabaseBenchmarkRunnerBase): 
    
    def __init__(self, db_uri)  :
        pass
    
    def run_write(self, data):
        duration_seconds = np.random.uniform(.001, 1)
        return duration_seconds
    
    def run_read(self, chunk_size):
        duration_seconds = np.random.uniform(.001, 1)
        return duration_seconds
    
class MongoBenchmarkRunner(DatabaseBenchmarkRunnerBase): 
    
    def __init__(self, db_uri)  :
        pass
    
    def run_write(self, data):
        duration_seconds = np.random.uniform(.001, 1)
        return duration_seconds
    
    def run_read(self, chunk_size):
        duration_seconds = np.random.uniform(.001, 1)
        return duration_seconds
    
class ElasticBenchmarkRunner(DatabaseBenchmarkRunnerBase): 
    
    def __init__(self, db_uri)  :
        pass
    
    def run_write(self, data):
        duration_seconds = np.random.uniform(.001, 1)
        return duration_seconds
    
    def run_read(self, chunk_size):
        duration_seconds = np.random.uniform(.001, 1)
        return duration_seconds
    
class RedisBenchmarkRunner(DatabaseBenchmarkRunnerBase): 
    
    def __init__(self, db_uri)  :
        pass
    
    def run_write(self, data):
        duration_seconds = np.random.uniform(.001, 1)
        return duration_seconds
    
    def run_read(self, chunk_size):
        duration_seconds = np.random.uniform(.001, 1)
        return duration_seconds
    
    
class DataGenerator:
    
    def __init__(self, seed, n_users, total_records, actions):
        self.seed = seed 
        self.n_users = n_users
        self.total_records = total_records        
        self.actions=actions    
        self.user_ids = np.int32(np.rint(np.random.uniform(0, 1e6, size=self.n_users)))      
        
    def generate_chunk_data(self, chunk_size):
        np.random.seed(self.seed)        
        n_chunks = np.int32(np.ceil(self.total_records/chunk_size))
        for _ in tqdm_notebook(np.arange(n_chunks)): 
            np.random.seed(self.seed+_+1)
            ids = np.int32(np.rint(np.random.uniform(0, 1e6, size=chunk_size)))
            np.random.seed(self.seed+_+2)
            user_ids = np.random.choice(self.user_ids, size=chunk_size)
            np.random.seed(self.seed+_+3)
            timestamps = np.int32(np.rint(np.random.uniform(0, 1e6, size=chunk_size)))
            np.random.seed(self.seed+_+4)
            actions    = np.random.choice(self.actions, size=chunk_size)
            data       = np.column_stack([ids, user_ids, timestamps, actions]).tolist()
            df = pd.DataFrame(data)
            del data
            df[[0, 1, 2]]  = df[[0, 1, 2]].astype('int')
            data = list(df.itertuples(index=False, name=None))
            del df
            data = list(map(lambda row: Input(id=row[0], user_id=row[1], timestamp=row[2], action=row[3]), data))
            yield data
            del data

In [10]:

import vertica_python

clickhouse_client = Client(host='localhost', port=9000)
clickhouse_client.execute('DROP TABLE IF EXISTS test;') 
clickhouse_client.execute('CREATE TABLE IF NOT EXISTS test(id Int32, user_id Int32, timestamp Float32, payload String) ENGINE = Memory')

conn_info = {'host': '127.0.0.1',
             'port': 5433,
             'user': 'test',             
}
with vertica_python.connect(**conn_info) as conn:
    cur = conn.cursor()    
    cur.execute('DROP TABLE IF EXISTS test;') 
    cur.execute('CREATE TABLE IF NOT EXISTS test(id int, user_id int, timestamp float, payload varchar)')

In [11]:
def write_clickhouse(clickhouse_client, records_to_insert):
    records_list_template = ','.join(['%s'] * len(records_to_insert))
    insert_query = 'insert into test (id, user_id, timestamp, payload) values {}'.format(records_to_insert)   
    start = time.time()
    clickhouse_client.execute('INSERT INTO test (id, user_id, timestamp, payload) VALUES', records_to_insert)
    end = time.time()         
    duration_seconds = end-start
    return duration_seconds

def read_clickhouse(clickhouse_client, chunk_size):
    select_query = f'select * from test limit {chunk_size}'
    start = time.time()
    clickhouse_client.execute(select_query) 
    end = time.time()         
    duration_seconds = end-start
    return duration_seconds

def write_vertica(conn_info, records_to_insert):   
    start = time.time()
    with vertica_python.connect(**conn_info) as conn:
        cur = conn.cursor()    
        cur.executemany("INSERT INTO test(id, user_id, timestamp, payload) VALUES (?, ?, ?, ?)", records_to_insert, use_prepared_statements=True)
    end = time.time()         
    duration_seconds = end-start
    return duration_seconds

def read_vertica(conn_info, chunk_size):
    select_query = f'select * from test limit {chunk_size}'
    start = time.time()
    with vertica_python.connect(**conn_info) as conn:
        cur = conn.cursor()    
        cur.execute(select_query) 
    end = time.time()         
    duration_seconds = end-start
    return duration_seconds

In [12]:
data_generator = DataGenerator(SEED, N_USERS, TOTAL_RECORDS, ACTIONS) 
chunk_sizes = [1_000, 10_000, 100_000]
L_results =[]
for db_name, client, (write, read) in tqdm_notebook(zip(['vertica', 'clickhouse',], [conn_info,clickhouse_client], [(write_vertica, read_vertica), (write_clickhouse, read_clickhouse)]), total=2):
    for chunk_size in tqdm_notebook(chunk_sizes):               
        for data in data_generator.generate_chunk_data(chunk_size):  
            data_to_write = [(row.id, row.user_id, row.timestamp, row.action) for row in data]
            results = {}            
            results['database'] = db_name
            results['chunk_size'] = chunk_size
            results['write_duration_seconds'] = write(client, data_to_write)
            results['read_duration_seconds'] = read(client, chunk_size)
            L_results.append(results)
            del results, data
            pass
        clickhouse_client.execute('DROP TABLE IF EXISTS test;') 
        clickhouse_client.execute('CREATE TABLE IF NOT EXISTS test(id Int32, user_id Int32, timestamp Float32, payload String) ENGINE = Memory')
        with vertica_python.connect(**conn_info) as conn:
            cur = conn.cursor()    
            cur.execute('DROP TABLE IF EXISTS test;') 
            cur.execute('CREATE TABLE IF NOT EXISTS test(id int, user_id int, timestamp float, payload varchar)')
    

  0%|          | 0/2 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/1000 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [18]:
pd.DataFrame.from_records(L_results).groupby(['database','chunk_size'])['write_duration_seconds'].mean().unstack()

chunk_size,1000,10000,100000
database,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
clickhouse,0.001943,0.006313,0.050835
vertica,5.015732,6.862327,6.567055


In [16]:
pd.DataFrame.from_records(L_results).groupby(['database','chunk_size'])['read_duration_seconds'].mean().unstack()

chunk_size,1000,10000,100000
database,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
clickhouse,0.001936,0.004904,0.045079
vertica,0.013536,0.019803,0.004282


In [None]:
def generate_random_data(chunk_size):
    ids        = np.array([uuid.uuid4().hex for _ in range(chunk_size)])
    user_ids   = np.random.choice(USER_IDS, size=chunk_size) 
    timestamps = np.int32(np.rint(np.random.uniform(0, 1e6, size=chunk_size)))
    actions    = np.random.choice(ACTIONS, size=chunk_size)
    data       = np.column_stack([ids, user_ids, timestamps, actions])
    df         = pd.DataFrame(data=data, columns=['id', 'user_id', 'timestamp','action'])
    del data 
    df['id']        = df['id'].astype('str')
    df['user_id']   = df['user_id'].astype('str')
    df['timestamp'] = df['timestamp'].astype('int')
    df['action']    = df['action'].astype('str')
    data = list(df.itertuples(index=False, name=None))
    del df
    return data

### 2. генерация данных

In [2]:
ACTIONS = [
    'start_watch',
    'stop_watch',
    'continue_watch',
    'like',
    'dislike',
    'comment',
    'add_to_favorite',
    'delete_from_favorite',  
    'user_login',
    'user_logout'
]

def generate_random_data(chunk_size):     
    timestamps = np.int32(np.rint(np.random.uniform(0, 1e6, size=chunk_size)))
    actions    = np.random.choice(ACTIONS, size=chunk_size)
    data       = np.column_stack([timestamps, actions])
    df         = pd.DataFrame(data=data, columns=['timestamp','action'])
    del data    
    df['timestamp']= df['timestamp'].astype('int')
    df['action']= df['action'].astype('str')
    data = list(df.itertuples(index=False, name=None))
    del df
    return data
    

In [3]:
generate_random_data(10)

[(204026, 'dislike'),
 (776999, 'add_to_favorite'),
 (662138, 'delete_from_favorite'),
 (756579, 'start_watch'),
 (392866, 'dislike'),
 (358562, 'continue_watch'),
 (529297, 'start_watch'),
 (375801, 'stop_watch'),
 (693043, 'stop_watch'),
 (881558, 'dislike')]

In [4]:
postgres_client = psycopg2.connect(
    dbname='test', 
    user='app', 
    password='123qwe', 
    host='localhost'
).cursor()

postgres_client.execute('CREATE SCHEMA IF NOT EXISTS test;') 
postgres_client.execute('DROP TABLE IF EXISTS test.data;') 
postgres_client.execute('CREATE TABLE IF NOT EXISTS test.data(timestamp float,payload text NOT NULL);') 

clickhouse_client = Client('localhost')


clickhouse_client.execute('DROP TABLE IF EXISTS test;') 
clickhouse_client.execute('CREATE TABLE IF NOT EXISTS test(timestamp Float32, payload String) ENGINE = Memory')

mongo_client = pymongo.MongoClient("localhost", 27017)
db = mongo_client.test
collection = db.test

### 3. функции чтения и записи данных в БД

In [5]:
def write_postgres(postgres_client, records_to_insert):
    records_list_template = ','.join(['%s'] * len(records_to_insert))
    insert_query = 'insert into test.data (timestamp, payload) values {}'.format(records_list_template)   
    start = time.time()
    postgres_client.execute(insert_query, records_to_insert)   
    end = time.time()         
    duration_seconds = end-start
    return duration_seconds

def read_postgres(postgres_client, chunk_size):
    select_query = f'select * from test.data limit {chunk_size}'
    start = time.time()
    postgres_client.execute(select_query)
    postgres_client.fetchall()     
    end = time.time()         
    duration_seconds = end-start
    return duration_seconds

def write_clickhouse(clickhouse_client, records_to_insert):
    records_list_template = ','.join(['%s'] * len(records_to_insert))
    insert_query = 'insert into test (timestamp, payload) values {}'.format(records_to_insert)   
    start = time.time()
    clickhouse_client.execute('INSERT INTO test (timestamp, payload) VALUES', records_to_insert)
    end = time.time()         
    duration_seconds = end-start
    return duration_seconds

def read_clickhouse(clickhouse_client, chunk_size):
    select_query = f'select * from test limit {chunk_size}'
    start = time.time()
    clickhouse_client.execute(select_query) 
    end = time.time()         
    duration_seconds = end-start
    return duration_seconds

def write_mongo(collection, records_to_insert): 
    rows = [{'timestamp':row[0], 'payload':row[1]} for row in records_to_insert]    
    start = time.time()
    collection.insert_many(rows) 
    end = time.time()         
    duration_seconds = end-start
    return duration_seconds

def read_mongo(collection, chunk_size):       
    start = time.time()
    collection.find().limit(chunk_size).skip(0)
    end = time.time()         
    duration_seconds = end-start
    return duration_seconds

### 4. гиперпараметры эксперимента

In [5]:
# директория с данными
PATH_TO_DATA = 'data'
# число итераций
ITERATIONS = np.arange(1, 101)
# число записей
TOTAL_RECORDS = 10_000_000
# cразмер чанка для чтения/записи
CHUNK_SIZES = [1_000, 10_000, 100_000]

In [11]:
# число параллельных процессов
POOL_SIZE=8
# функция для мультипроцессинга
def generate_random_data_mp(chunk_i):
    data= generate_random_data(chunk_size)  
    path_to_dir = f'{PATH_TO_DATA}/{chunk_size}'
    try:
        if not(os.path.exists(path_to_dir)):
            os.mkdir(path_to_dir)    
    except:
        pass     
    path_to_dir = f'{path_to_dir}/{iteration}'
    try:
        if not(os.path.exists(path_to_dir)):
            os.mkdir(path_to_dir)
    except:
        pass     
    fnm = f'{chunk_i+1}.json'      
    with open(f'{path_to_dir}/{fnm}', 'w') as f:
        json.dump(data, f)
    del data

In [13]:
for chunk_size in tqdm_notebook(CHUNK_SIZES):
    for iteration in tqdm_notebook(ITERATIONS):  
        n_chunks = np.int32(np.ceil(TOTAL_RECORDS/chunk_size))  
        chunks = np.arange(n_chunks)
        with Pool(POOL_SIZE) as pool:
            pool.map(generate_random_data_mp, chunks)

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/100 [00:00<?, ?it/s]

### 5. эксперименты

In [6]:
DB_CLIENTS = {}
DB_CLIENTS['postgres'] = (postgres_client, write_postgres, read_postgres)
DB_CLIENTS['clickhouse'] = (clickhouse_client, write_clickhouse, read_clickhouse)
DB_CLIENTS['mongo'] = (collection, write_mongo, read_mongo)

In [3]:
for chunk_size in tqdm_notebook(os.listdir('data')):    
    chunk_size = int(chunk_size)   
    ls = os.listdir(f'data/{chunk_size}')
    df_results = pd.DataFrame()
    for iteration, chunk in tqdm_notebook(enumerate(ls), total = len(ls)): 
        rows=[]
        for fnm in tqdm_notebook(os.listdir(f'data/{chunk_size}/{chunk}')):
            pth = f'data/{chunk_size}/{chunk}/{fnm}'
            with open(pth, 'r') as f:
                data = json.load(f)    
            data = list(map(lambda x: tuple(x), data))        
            for client_name, (client, write_func, read_func) in DB_CLIENTS.items(): 
                row = {}
                row['client'] = client_name
                row['iteration'] = iteration+1
                row['chunk_size'] = chunk_size
                row['write_duration_seconds'] = write_func(client, data)
                row['read_duration_seconds'] = read_func(client, chunk_size)
                rows.append(row)
                del row
            del data
        subdf_results = pd.DataFrame.from_records(rows)
        del rows  
        df_results = pd.concat([df_results, subdf_results], axis=0)  
        del subdf_results
        postgres_client.execute('DROP TABLE IF EXISTS test.data;') 
        postgres_client.execute('CREATE TABLE IF NOT EXISTS test.data(timestamp float,payload text NOT NULL);') 
        clickhouse_client.execute('DROP TABLE IF EXISTS test;') 
        clickhouse_client.execute('CREATE TABLE IF NOT EXISTS test(timestamp Float32, payload String) ENGINE = Memory')
        db.test.drop()
        collection = db.test
    df_results.to_csv(f'df_results_{chunk_size}.csv')
    del df_results

### 6. результаты

In [4]:
df1 = pd.read_csv('df_results_1000.csv').iloc[:, 1:]
# df2 = pd.read_csv('df_results_10000.csv').iloc[:, 1:]
df3 = pd.read_csv('df_results_100000.csv').iloc[:, 1:]
df = pd.concat([df1, df3], axis= 0)
by_keys = ['client', 'chunk_size']
agg_keys = ['read_duration_seconds', 'write_duration_seconds']
df_report = df.groupby(by_keys)[agg_keys].median().unstack(1)
df_report_read = df_report['read_duration_seconds']
df_report_read['chunk_10000/chunk_1000'] = (df_report_read.iloc[:, 1] / df_report_read.iloc[:, 0])
df_report_write = df_report['write_duration_seconds']
df_report_write['chunk_10000/chunk_1000'] = (df_report_write.iloc[:, 1] / df_report_write.iloc[:, 0])
df_report_read_to_write = df_report_read.iloc[:, :-1] / df_report_write.iloc[:, :-1]

In [5]:
df_report_read

chunk_size,1000,100000,chunk_10000/chunk_1000
client,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
clickhouse,0.001797,0.032581,18.12888
mongo,2.8e-05,5.9e-05,2.146552
postgres,0.00082,0.054052,65.922797


In [6]:
df_report_write

chunk_size,1000,100000,chunk_10000/chunk_1000
client,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
clickhouse,0.004154,0.033146,7.978822
mongo,0.007862,0.903528,114.92908
postgres,0.005117,0.506959,99.070004


In [7]:
df_report_read_to_write

chunk_size,1000,100000
client,Unnamed: 1_level_1,Unnamed: 2_level_1
clickhouse,0.432622,0.98297
mongo,0.003518,6.6e-05
postgres,0.160229,0.106619


### 7. выводы

1. чтение на всех размерах чанка быстрее всего у монго
2. при малом размере чанка чтение постгреса быстрее чтения кликхауса примерно в 10 раз, при большом размере чанка чтение кликхауса примерно в 2 раза быстрее, чем чтение постгреса
3. запись на всех размерах чанка быстрее всего у кликхауса, причем с увеличением размера чанка кликхаус деградирует меньше всего
4. с увеличением размера чанка чтение меньше всего деградирует у монго(x2), больше всего - у постгреса(х65)
5. с увеличением размера чанка запись меньше всего деградирует у кликхауса(x8), больше всего - у монго(х114)

### в качестве хранилища аналитических данных предпочтительнее ```clickhouse```