# Самостоятельная функция clickhouse_multiquery

In [None]:
from clickhttp import clickhouse_multiquery


# Данная функция предназначена только для Apache Airflow и не предусматривает возврата DataFrame
multiquery: str = """{множественный_запрос_к_БД_Clickhouse}"""
connection_id: str = "{airflow_connection_id}"
clickhouse_multiquery(multiquery=multiquery, connection=connection_id,)

# Класс ClickHttpSession

## Начало работы

### Логирование

In [1]:
import logging


# Логирование событий в консоль
logging.basicConfig(level=logging.INFO)

### Создание объекта UserConn для передачи параметров соединения

In [2]:
from clickhttp import UserConn


# Создание объекта вручную для локального тестирования
conn = UserConn('user',       # Логин
                'password',   # Пароль
                'localhost',  # Адрес сервера
                8123,         # Порт
                'default',)   # Схема в БД

In [None]:
from clickhttp import get_conn, UserConn


# Создание объекта из Airflow Connection ID
conn: UserConn = get_conn('connection_id')

### Инициализация класса ClickHttpSession

In [4]:
from clickhttp import ClickHttpSession, FrameType


sess = ClickHttpSession(connection=conn,              # Объект UserConn. Единственный обязательный параметр для инициализации класса
                        frame_type=FrameType.pandas,  # Предпочитаемый тип датафрейм, по умолчанию pandas
                        chunk_size=52_428_800,        # Максимальный размер передаваемого пакета в байтах для метода insert_frame. По умолчанию 50 МБ
                        is_compressed=True,           # Работа сервера в режиме сжатия. По умолчанию True
                        proxy=None,                   # Адрес прокси-сервер. По умолчанию отсутствует
                        timeout=None,)                # Время ожидания ответа от сервера. По умолчанию отключен

print(sess)  # Проверка состояния сессии

INFO:root:
---------------------------------------------------------------------------------------------
| Clickhouse Multi-Query session started. Session ID: 4a9adf30-3a26-48fd-8b36-8d955c0f3c95. |
---------------------------------------------------------------------------------------------


ClickHttpSession object.
Status:      Open
Session ID:  4a9adf30-3a26-48fd-8b36-8d955c0f3c95
Server Mode: Compressed


### В случае необходимости можно отключить или включить сжатие пакетов после инициализации класса

In [8]:
sess.change_mode  # Данный статический метод меняет состояние режима сжатие/без сжатия

INFO:root:
--------------------------------------------------------------
| Clickhouse Multi-Query session mode changed to Compressed. |
--------------------------------------------------------------


### Статический метод output_format выведет в консоль выбранный тип Датафрейм для чтения данных с сервера

In [6]:
sess.output_format  # текущий тип pandas

'pandas'

In [5]:

sess.frame_type = FrameType.polars  # изменить тип на polars.DataFrame
sess.output_format  # текущий тип polars

'polars'

### Открытие и закрытие сессии

#### При работе с контекстным менеджером with при завершении работы сессия будет открывать и закрываться автоматически.

#### Закрытие сессии вручную осучествляется через метод close(), открытие новой сессии осуществляется через метод reopen()

In [8]:
sess.close()  # Закрытие сессии

sess.reopen()  # Открытие новой сессии

INFO:root:
------------------------------------------
| Clickhouse Multi-Query session closed. |
------------------------------------------
INFO:root:
-------------------------------------------------------------------------------------------------
| New Clickhouse Multi-Query session started. Session ID: 774a247c-493d-4a42-bdba-84ade0df27a9. |
-------------------------------------------------------------------------------------------------


### Установка прокси-сервера после инициализации класса

In [9]:
# Для установки соединения через прокси необходимо передать строкой адрес прокси-сервера
# в метод set_proxy в формате <адрес прокси-сервера>:<порт>
# проксирование применится для http и https протоколов
sess.set_proxy('http://localhost:431')

# Для удаления проксирования необходимо вызвать метод set_proxy без передачи аргумента
sess.set_proxy()

INFO:root:
-----------------------------------------------------------------------------
| ClickHttpSession change proxy settings with proxy 'http://localhost:431'. |
-----------------------------------------------------------------------------
INFO:root:
------------------------------------------
| ClickHttpSession proxy settings clear. |
------------------------------------------


### Отправка команды на сервер

In [None]:
# Для отправки команды на сервер, не предполагающей возврата DataFrame добавлен метод exetute
command: str = "TRUNCATE TABLE {table_name}"  # Указано в качестве примера
sess.execute(command)  # Данный метод ничего не возвращает

### Чтение DataFrame

In [6]:
# В качестве примера получим из базы Clickhouse вывод простых данных, не требующих запроса к таблицам в виде polars.DataFrame
query: str = """select
    today()                     as test_date
  , now()                       as test_datetime
  , toDateTime(now()
             , 'Europe/Moscow') as test_timestamp
  , toDateTime(now()
             , 'UTC')           as test_tsutc
  , 1                           as test_int
  , 1.0                         as test_float
  , false                       as test_bool
  , array(1
        , 2
        , 3)                    as test_arrayint
  , array('one'
        , 'two'
        , 'three')              as test_arraystr
"""

df = sess.read_frame(query)

print(df.data)

shape: (1, 9)
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ test_date ┆ test_date ┆ test_time ┆ test_tsut ┆ … ┆ test_floa ┆ test_bool ┆ test_arra ┆ test_arr │
│ ---       ┆ time      ┆ stamp     ┆ c         ┆   ┆ t         ┆ ---       ┆ yint      ┆ aystr    │
│ date      ┆ ---       ┆ ---       ┆ ---       ┆   ┆ ---       ┆ bool      ┆ ---       ┆ ---      │
│           ┆ datetime[ ┆ datetime[ ┆ datetime[ ┆   ┆ i64       ┆           ┆ list[i64] ┆ list[str │
│           ┆ μs]       ┆ μs]       ┆ μs]       ┆   ┆           ┆           ┆           ┆ ]        │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡
│ 2024-08-2 ┆ 2024-08-2 ┆ 2024-08-2 ┆ 2024-08-2 ┆ … ┆ 1         ┆ false     ┆ [1, 2, 3] ┆ ["one",  │
│ 6         ┆ 6         ┆ 6         ┆ 6         ┆   ┆           ┆           ┆           ┆ "two",   │
│           ┆ 06:44:58  ┆ 06:44:58  ┆ 06:44:58  ┆   ┆           ┆           ┆

### Структура объекта Frame

In [7]:
# Ранее полученный нами объект df кроме самого датафрейм содержит дополнительную полезную информацию
print(df.columns)     # Список колонок
print(df.types)       # Список оригинальных типов данных
print(df.time_read)   # Время, затраченное сервером на отправку данных
print(df.bytes_read)  # Количество байт, переданных сервером (данные передает сам сервер, сложно сказать как именно он их считает)
df.data               # сам датафрейм

['test_date', 'test_datetime', 'test_timestamp', 'test_tsutc', 'test_int', 'test_float', 'test_bool', 'test_arrayint', 'test_arraystr']
['Date', 'DateTime', "DateTime('Europe/Moscow')", "DateTime('UTC')", 'UInt8', 'Float64', 'Bool', 'Array(UInt8)', 'Array(String)']
0.001652217
1


test_date,test_datetime,test_timestamp,test_tsutc,test_int,test_float,test_bool,test_arrayint,test_arraystr
date,datetime[μs],datetime[μs],datetime[μs],i64,i64,bool,list[i64],list[str]
2024-08-26,2024-08-26 06:44:58,2024-08-26 06:44:58,2024-08-26 06:44:58,1,1,False,"[1, 2, 3]","[""one"", ""two"", ""three""]"


### Метод temp_query для автоматического создания временной таблицы из запроса

In [8]:
# В качестве примера используем предыдущий запрос

temp_table: str = sess.temp_query(query)  # автоматическое создание временной таблицы с данными из запроса и возврат ее названия

temp_data = sess.read_frame(f"select * from {temp_table}").data  # обращение к ранее созданной временной таблице для получения датафрейма

print(temp_data)

INFO:root:
---------------------------------------
| Get names and data types from query |
---------------------------------------
INFO:root:
----------------------------------------------------------
| Generate DDL for temporary table temp_141ac3cf966d8116 |
----------------------------------------------------------
INFO:root:
--------------------------------------------------
| Temporary table temp_141ac3cf966d8116 created. |
--------------------------------------------------


shape: (1, 9)
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ test_date ┆ test_date ┆ test_time ┆ test_tsut ┆ … ┆ test_floa ┆ test_bool ┆ test_arra ┆ test_arr │
│ ---       ┆ time      ┆ stamp     ┆ c         ┆   ┆ t         ┆ ---       ┆ yint      ┆ aystr    │
│ date      ┆ ---       ┆ ---       ┆ ---       ┆   ┆ ---       ┆ bool      ┆ ---       ┆ ---      │
│           ┆ datetime[ ┆ datetime[ ┆ datetime[ ┆   ┆ i64       ┆           ┆ list[i64] ┆ list[str │
│           ┆ μs]       ┆ μs]       ┆ μs]       ┆   ┆           ┆           ┆           ┆ ]        │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡
│ 2024-08-2 ┆ 2024-08-2 ┆ 2024-08-2 ┆ 2024-08-2 ┆ … ┆ 1         ┆ false     ┆ [1, 2, 3] ┆ ["one",  │
│ 6         ┆ 6         ┆ 6         ┆ 6         ┆   ┆           ┆           ┆           ┆ "two",   │
│           ┆ 06:45:15  ┆ 06:45:15  ┆ 06:45:15  ┆   ┆           ┆           ┆

### Метод send_multiquery для выполнения множественного запроса к базе

#### Данный метод является основной причиной создания данного класса. В качестве параметра передается строка с множественными запросами, после выполнения всех шагов метод вернет результат последнего запроса как DataFrame

In [9]:
# В качестве примера создадим множественный запрос с созданием временной таблицы на основе query и вернем результирующий датафрейм
multiquery: str = f"""
select 1; -- для проверки что запрос не будет возвращать результат из этого действия
CREATE TEMPORARY TABLE IF NOT EXISTS test_temp_table
(
test_date Date,
test_datetime DateTime,
test_timestamp DateTime('Europe/Moscow'),
test_tsutc DateTime('UTC'),
test_int UInt8,
test_float Float64,
test_bool Bool,
test_arrayint Array(UInt8),
test_arraystr Array(String)
)
ENGINE = MergeTree
ORDER BY test_date
AS
{query};
select * from test_temp_table;
"""
# print(multiquery.rstrip().rstrip(";").split(";"))
multiframe = sess.send_multiquery(multiquery).data

multiframe

INFO:root:
-------------------
| Part 1 started. |
-------------------
INFO:root:
-------------------
| Part 1 success. |
-------------------
INFO:root:
-------------------
| Part 2 started. |
-------------------
INFO:root:
-------------------
| Part 2 success. |
-------------------
INFO:root:
-------------------
| Part 3 started. |
-------------------
INFO:root:
-----------------------------
| Part 3 success. All done. |
-----------------------------


test_date,test_datetime,test_timestamp,test_tsutc,test_int,test_float,test_bool,test_arrayint,test_arraystr
date,datetime[μs],datetime[μs],datetime[μs],i64,i64,bool,list[i64],list[str]
2024-08-26,2024-08-26 06:45:19,2024-08-26 06:45:19,2024-08-26 06:45:19,1,1,False,"[1, 2, 3]","[""one"", ""two"", ""three""]"


### Метод insert_table для записи данных из DataFrame в таблицу

In [10]:
# Данный метод работает с любым поддерживаемым типом DataFrame.
# Метод принимает два обязательных аргумента table - название целевой таблицы, data_frame - датафрейм с данными в одном из поддерживаемых форматов.
# Так же в методе присутствует один не обязательный аргумент use_columns - булево, по умолчанию True.
# При True в таблицу будет передан порядок и название колонок из DataFrame. Напоминаю, что фреймы в форматах python(вложенный список) и numpy не содержат имена колонок.
# При записи данных из фрейма будут формироваться строки, размер одного пакета для отправки не превышает sess.chunk_size, заданный в атрибуте класса.
# При включенной компрессии каждый пакет после формирования будет сжат в gzip и передан на сервер в более компактном размере.

# В качестве примера создадим таблицу базе и запишем в нее данные из multiframe
table: str = "default.test_table"
ddl: str = f"""CREATE TABLE IF NOT EXISTS {table}
(
test_date Date,
test_datetime DateTime,
test_timestamp DateTime('Europe/Moscow'),
test_tsutc DateTime('UTC'),
test_int UInt8,
test_float Float64,
test_bool Bool,
test_arrayint Array(UInt8),
test_arraystr Array(String)
)
ENGINE = MergeTree
ORDER BY test_date"""

sess.read_frame(ddl)  # создадим таблицу

sess.insert_table(table=table, data_frame=multiframe,)  # выполним вставку данных

print(sess.read_frame(f"select * from {table}").data)  # заберем назад фрейм с данными

sess.execute(f"drop table {table}")  # удалим таблицу из базы

INFO:root:
--------------------------------------------
| Sending chunk with 1—1 rows from 1 rows. |
--------------------------------------------
INFO:root:
-------------------------
| Insert chunk success. |
-------------------------
INFO:root:
-----------------------------
| Insert operation success. |
-----------------------------
INFO:root:
-----------------
| Command send. |
-----------------


shape: (1, 9)
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ test_date ┆ test_date ┆ test_time ┆ test_tsut ┆ … ┆ test_floa ┆ test_bool ┆ test_arra ┆ test_arr │
│ ---       ┆ time      ┆ stamp     ┆ c         ┆   ┆ t         ┆ ---       ┆ yint      ┆ aystr    │
│ date      ┆ ---       ┆ ---       ┆ ---       ┆   ┆ ---       ┆ bool      ┆ ---       ┆ ---      │
│           ┆ datetime[ ┆ datetime[ ┆ datetime[ ┆   ┆ i64       ┆           ┆ list[i64] ┆ list[str │
│           ┆ μs]       ┆ μs]       ┆ μs]       ┆   ┆           ┆           ┆           ┆ ]        │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡
│ 2024-08-2 ┆ 2024-08-2 ┆ 2024-08-2 ┆ 2024-08-2 ┆ … ┆ 1         ┆ false     ┆ [1, 2, 3] ┆ ["one",  │
│ 6         ┆ 5         ┆ 5         ┆ 5         ┆   ┆           ┆           ┆           ┆ "two",   │
│           ┆ 20:45:19  ┆ 17:45:19  ┆ 20:45:19  ┆   ┆           ┆           ┆