In [34]:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
import os
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
import clickhouse_connect
from minio import Minio

In [98]:
class Init:
    def __init__(self):
        self.__set_engine_and_client()
        self.__use_database()
        self.__check_empty()

    def __get_environ(self):
        load_dotenv()
        return {"pg_user": os.getenv("PG_USER"),
                "pg_password": os.getenv("PG_PASSWORD"),
                # "pg_host": os.getenv("PG_HOST"),
                "pg_host": "localhost",
                "pg_port": os.getenv("PG_PORT"),
                "pg_database": os.getenv("PG_NAME"),
                "click_user": os.getenv("CLICKHOUSE_USER"),
                "click_password": os.getenv("CLICKHOUSE_PASSWORD"),
                # "click_host": os.getenv("CLICKHOUSE_HOST"),
                "click_host": "localhost",
                "click_port": os.getenv("CLICKHOUSE_PORT"),
                "click_database": os.getenv("CLICKHOUSE_DB"),
                # "minio_endpoint": os.getenv("MLFLOW_S3_ENDPOINT_URL"),
                "minio_endpoint": "localhost:9099",
                "access_key": os.getenv("MINIO_ROOT_USER"),
                "secret_key": os.getenv("MINIO_ROOT_PASSWORD")
               }

    def __set_engine_and_client(self):
        EV = self.__get_environ()
        self.__engine = create_engine(f"postgresql+psycopg2://{EV['pg_user']}:{EV['pg_password']}@{EV['pg_host']}:{EV['pg_port']}/{EV['pg_database']}")
        self.__click_client = clickhouse_connect.get_client(host=EV['click_host'], 
                                                      port=EV['click_port'], 
                                                      username=EV['click_user'], 
                                                      password=EV['click_password'],
                                                     database=EV['click_database'])
        self.__minio_client = Minio(
            EV['minio_endpoint'],
            access_key=EV['access_key'],
            secret_key=EV['secret_key'],
            secure=False  # Set to False if using HTTP
        )
        self.__use_database()

    def __get_list_files(self):
        return os.listdir('/data/')

    def __use_database(self):
        query = r'USE wb_orders'
        self.__click_client.command(query)

    def __check_empty(self):
        query = "EXISTS TABLE wb_orders;"
        self.__is_empty = True if self.__click_client.command(query) == 0 else False

    def create_clickhouse_table(self):
        if self.__is_empty:
            query = r'''
                CREATE TABLE IF NOT EXISTS wb_orders (
                    date DateTime,
                    last_change_date DateTime, 
                    total_price Float32, 
                    discount_percent Int8,
                    warehouse_name String, 
                    oblast Nullable(String), 
                    nm_id Int64, 
                    category String, 
                    brand String, 
                    is_cancel Bool,
                    cancel_dt Nullable(DateTime), 
                    created_at Nullable(DateTime), 
                    updated_at DateTime, 
                    order_type Nullable(String)
                    ) ENGINE = MergeTree PARTITION BY toYYYYMM(date) ORDER BY date SETTINGS index_granularity = 2048 
                '''
            self.__click_client.command(query)
            print("Table created successfully")
            query = r'DROP DATABASE IF EXISTS default;'
            self.__click_client.command(query)
        else:
            print("Table already exists. Pass.")

    def load_data(self):
        if self.__is_empty:
            df = pd.read_csv('../init/data/wb_orders.csv', 
                             parse_dates=['date', 'last_change_date', 'created_at', 'updated_at', 'cancel_dt'], 
                             dayfirst=True, decimal=',')
            self.__click_client.insert_df('wb_orders.wb_orders', df)
            print('Data loaded successfully')
        
    def __get_list_databases(self):
        query = "SELECT datname FROM pg_database;"
        return pd.read_sql_query(query, self.__engine)['datname'].to_list()

    def create_postgres_database(self):
        for db_name in ['mlflow', 'airflow']:
            if db_name not in self.__get_list_databases():
                query = text(f'Create database {db_name}')
                with self.__engine.connect() as con:
                    con.connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
                    con.execute(query)
        query = r'drop database postgres;'
        with self.__engine.connect() as con:
            con.connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
            con.execute(query)

    def create_minio_mlflow_bucket(self):
        bucket_name = 'mlflow'
        if not self.__minio_client.bucket_exists(bucket_name):
            self.__minio_client.make_bucket(bucket_name)
            print(f"Bucket '{bucket_name}' created successfully.")
        else:
            print(f"Bucket '{bucket_name}' already exists.")
        

def main():
    init = Init()
    init.create_clickhouse_table()
    init.load_data()
    init.create_postgres_database()
    init.create_minio_mlflow_bucket()
    

# if __name__ == "__main__":
#     main()


In [97]:
main()
# init = Init()
# init.create_clickhouse_table()
# init.create_minio_mlflow_bucket()
# init.load_data()

Table created successfully
Table already exists. Pass.
Data loaded successfully
['postgres', 'mlflow', 'template1', 'template0']
['postgres', 'mlflow', 'template1', 'template0']
airflow
Bucket 'mlflow' created successfully.


In [67]:
df = pd.read_csv('../init/data/wb_orders.csv', parse_dates=['date', 'last_change_date', 'created_at', 'updated_at', 'cancel_dt'], dayfirst=True, decimal=',')

In [69]:
df.isna().sum()

date                    0
last_change_date        0
total_price             0
discount_percent        0
warehouse_name          0
oblast               2964
nm_id                   0
category                0
brand                   0
is_cancel               0
cancel_dt           76240
created_at            536
updated_at              0
order_type          14570
dtype: int64

In [72]:
client = clickhouse_connect.get_client(host='localhost', username='konsin1988', password='r13l02c1988', database='wb_orders')

client.insert_df('wb_orders', df)

<clickhouse_connect.driver.summary.QuerySummary at 0x7d3a983f2630>

In [76]:
query = r'''
    SELECT *
    FROM wb_orders
    LIMIT 5
'''
pd.DataFrame(client.query(query).named_results())

Unnamed: 0,date,last_change_date,total_price,discount_percent,warehouse_name,oblast,nm_id,category,brand,is_cancel,cancel_dt,created_at,updated_at,order_type
0,2022-07-31,2022-07-31,1742.0,15,МЛП-Подольск,Московская,905559214,Товары для животных,e129baf5351375dd,False,,2023-02-16 13:38:00,2024-02-05 14:30:00,
1,2022-07-31,2022-07-31,1742.0,15,МЛП-Подольск,Крым,905559214,Товары для животных,e129baf5351375dd,False,,2023-02-16 13:38:00,2024-02-05 14:30:00,
2,2022-07-31,2022-07-31,1742.0,15,МЛП-Подольск,Санкт-Петербург,905559214,Товары для животных,e129baf5351375dd,False,,2023-02-16 13:38:00,2024-02-05 14:30:00,
3,2022-07-31,2022-07-31,1742.0,15,МЛП-Подольск,Крым,905559214,Товары для животных,e129baf5351375dd,False,,2023-02-16 13:38:00,2024-02-05 14:30:00,
4,2022-07-31,2022-07-31,1742.0,15,Коледино,Москва,905559214,Товары для животных,e129baf5351375dd,False,,2023-02-16 13:38:00,2024-02-05 14:30:00,


In [78]:
query = "EXISTS TABLE wb_orders;"
client.command(query)

0