In [5]:
import os
from sqlalchemy import create_engine

hostname = os.environ.get('DB_HOSTNAME')
database = os.environ.get('DB_DATABASE')
username = os.environ.get('DB_USERNAME')
pwd = os.environ.get('DB_PWD')
port = int(os.environ.get('DB_PORT'))


db_params = {
    'host': hostname,
    'user': username,
    'password': pwd,
    'database': database,
    'port': port,
}

db_url = f'postgresql+psycopg2://{db_params["user"]}:{db_params["password"]}@{db_params["host"]}:{db_params["port"]}/{db_params["database"]}'


engine = create_engine(db_url)


In [8]:
import pandas as pd

symbol = 'BTCUSDT'
input_path = f'../outputs/{symbol}_upload.csv'

df = pd.read_csv(input_path)
df.head()

Unnamed: 0,open_time,open_price,high_price,low_price,close_price,volume,close_time,quote_asset_volume,number_of_trades,symbol,pk,should_be_updated
0,2024-01-17 04:00:00,42849.0,42921.1,42781.49,42854.01,791.4,2024-01-17 04:59:59.999,33911017.91,32838,BTCUSDT,2024-01-17 04:00:00_btcusdt_2024-01-17 04:59:5...,False
1,2024-01-17 05:00:00,42854.01,42925.49,42765.41,42783.39,661.25,2024-01-17 05:59:59.999,28333635.53,30696,BTCUSDT,2024-01-17 05:00:00_btcusdt_2024-01-17 05:59:5...,False
2,2024-01-17 06:00:00,42783.39,42876.34,42766.0,42806.05,802.28,2024-01-17 06:59:59.999,34360257.7,28881,BTCUSDT,2024-01-17 06:00:00_btcusdt_2024-01-17 06:59:5...,False
3,2024-01-17 07:00:00,42806.05,42851.1,42619.6,42655.94,1357.31,2024-01-17 07:59:59.999,58019581.23,46737,BTCUSDT,2024-01-17 07:00:00_btcusdt_2024-01-17 07:59:5...,True
4,2024-01-17 08:00:00,42655.95,42747.65,42564.0,42728.76,1188.19,2024-01-17 08:59:59.999,50705137.24,52602,BTCUSDT,2024-01-17 08:00:00_btcusdt_2024-01-17 08:59:5...,True


In [82]:
def format_df(df, time_unit=None):
    new_df = df
    float64_columns = df.select_dtypes(include='float64').columns.tolist()
    datetime_columns = ['open_time', 'close_time']
    new_df.loc[:, float64_columns] = new_df[float64_columns].round(2)
    new_df.loc[:, 'symbol'] = new_df['symbol'].str.lower()
    for colum in datetime_columns:
        new_df.loc[:, colum] = pd.to_datetime(new_df[colum], unit=time_unit) if time_unit else pd.to_datetime(new_df[colum]) 
    return new_df

In [89]:
new_data_df = df[df['should_be_updated'] == False]
new_data_df = format_df(df)
new_data_df = new_data_df[new_data_df.columns[:-2]]

In [91]:
new_data_df.head()

Unnamed: 0,open_time,open_price,high_price,low_price,close_price,volume,close_time,quote_asset_volume,number_of_trades,symbol
0,2024-01-17 04:00:00,42849.0,42921.1,42781.49,42854.01,791.4,2024-01-17 04:59:59.999000,33911017.91,32838,btcusdt
1,2024-01-17 05:00:00,42854.01,42925.49,42765.41,42783.39,661.25,2024-01-17 05:59:59.999000,28333635.53,30696,btcusdt
2,2024-01-17 06:00:00,42783.39,42876.34,42766.0,42806.05,802.28,2024-01-17 06:59:59.999000,34360257.7,28881,btcusdt
3,2024-01-17 07:00:00,42806.05,42851.1,42619.6,42655.94,1357.31,2024-01-17 07:59:59.999000,58019581.23,46737,btcusdt
4,2024-01-17 08:00:00,42655.95,42747.65,42564.0,42728.76,1188.19,2024-01-17 08:59:59.999000,50705137.24,52602,btcusdt


In [90]:
new_data_df.to_sql(name='klines', con=engine, if_exists='append', index=False, method='multi')

24

In [83]:

update_data_df = df[df['should_be_updated'] == True]
update_data_df = format_df(update_data_df)
update_data_df.head()



Unnamed: 0,open_time,open_price,high_price,low_price,close_price,volume,close_time,quote_asset_volume,number_of_trades,symbol,pk,should_be_updated
3,2024-01-17 07:00:00,42806.05,42851.1,42619.6,42655.94,1357.31,2024-01-17 07:59:59.999000,58019581.23,46737,btcusdt,2024-01-17 07:00:00_btcusdt_2024-01-17 07:59:5...,True
4,2024-01-17 08:00:00,42655.95,42747.65,42564.0,42728.76,1188.19,2024-01-17 08:59:59.999000,50705137.24,52602,btcusdt,2024-01-17 08:00:00_btcusdt_2024-01-17 08:59:5...,True
5,2024-01-17 09:00:00,42728.76,42830.0,42674.0,42678.57,938.27,2024-01-17 09:59:59.999000,40133807.74,35711,btcusdt,2024-01-17 09:00:00_btcusdt_2024-01-17 09:59:5...,True
6,2024-01-17 10:00:00,42678.56,42850.55,42622.36,42850.04,1193.04,2024-01-17 10:59:59.999000,50992942.63,45910,btcusdt,2024-01-17 10:00:00_btcusdt_2024-01-17 10:59:5...,True
7,2024-01-17 11:00:00,42850.05,42850.55,42672.95,42680.16,1093.25,2024-01-17 11:59:59.999000,46744078.12,60902,btcusdt,2024-01-17 11:00:00_btcusdt_2024-01-17 11:59:5...,True


In [84]:
update_data_df.to_sql(name='klines_tmp', con=engine, if_exists='replace')

21

In [87]:
from sqlalchemy import text

def update_klines(engine):
    fields_to_update = ['updated_at', 'open_time', 'open_price', 'high_price', 'low_price', 'close_price', 'volume', 'close_time', 'quote_asset_volume', 'symbol']
    query = """UPDATE klines as k
               SET updated_at = now(),
                    open_time = temp.open_time,
                    open_price = temp.open_price,
                    high_price = temp.high_price,
                    low_price = temp.low_price,
                    close_price = temp.close_price,
                    volume = temp.volume,
                    close_time = temp.close_time,
                    quote_asset_volume = temp.quote_asset_volume,
                    symbol = temp.symbol
                FROM klines_tmp as temp
                WHERE concat(k.open_time, '_', lower(k.symbol), '_', k.close_time) = temp.pk;
            """
    with engine.begin() as conn:
        conn.execute(text(query))

In [88]:
update_klines(engine)