In [1]:
import pandas as pd
from sqlalchemy import create_engine
from tqdm.auto import tqdm
import xml.etree.ElementTree as elemTree

In [2]:
csv_paths = [ 
  ('069500', r'..\data\kodex_200.csv'),
  ('114800', r'..\data\kodex_inverse.csv'),
  ('226490', r'..\data\kodex_kospi.csv'),
  ('001', r'..\data\kospi.csv'),
  ('201', r'..\data\kospi200.csv')
]

In [3]:
dfs = []
for st_code, csv_path in tqdm(csv_paths):
  df = pd.read_csv(csv_path, dtype={'체결시간':str})[['체결시간', '시가', '고가', '저가', '현재가', '거래량']]
  df.columns = ['dt', 'open', 'high', 'low', 'close', 'volume']
  # sqlite3에서는 datetime을 지원하지 않으므로, str로 유지한다.
  # df['dt'] = pd.to_datetime(df['dt'], format='%Y%m%d%H%M%S')
  df['st_code'] = st_code
  for col in ['open', 'high', 'low', 'close', 'volume']:
    df[col] = df[col].abs()
  dfs.append(df)
whole_df = pd.concat(dfs, ignore_index=True)

  0%|          | 0/5 [00:00<?, ?it/s]

In [5]:
tree = elemTree.parse(r'../config/.config.xml')
root = tree.getroot()
node_sqlite3 = root.find('./DBMS/sqlite3')
config_db = {tag:node_sqlite3.find(tag).text for tag in ['database']}

In [6]:
db_engine = create_engine(f'sqlite:///{config_db["database"]}', echo=False)

In [7]:
db_engine.execute('DROP TABLE IF EXISTS data_in_minute')

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7ce2dd8>

In [8]:
table_query = '''
CREATE TABLE data_in_minute (
  st_code TEXT not NULL,
  dt TEXT not NULL,
  open INTEGER,
  high INTEGER,
  low INTEGER,
  close INTEGER,
  volume INTEGER,
  PRIMARY KEY (st_code, dt)
)
'''

In [9]:
db_engine.execute(table_query)

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x10a3ee38>

## SqlAlchemy + sqlite3 + pd.DataFrame.to_sql()

5.2s (4.8s, 5.1s, 5.4s)

In [10]:
whole_df.to_sql('data_in_minute', db_engine, if_exists='append', index=False)

467847

In [None]:
db_engine.execute("select * from data_in_minute where st_code='069500' and dt >= '20220601000000'").fetchall()

# 주식체결 데이터 저장

In [None]:
import pandas as pd
from sqlalchemy import create_engine

class RealTimeTickDataPrivder:
  table_query = '''
  CREATE TABLE IF NOT EXISTS today_in_ticks (
    st_code TEXT not NULL,
    dt TEXT not NULL,
    open INTEGER,
    high INTEGER,
    low INTEGER,
    close INTEGER,
    volume INTEGER
  )
  '''
  index_query = '''
  CREATE INDEX IF NOT EXISTS idx_today_in_ticks ON today_in_ticks (st_code, dt)
  '''

  insert_query = '''
  INSERT INTO today_in_ticks (st_code, dt, open, high, low, close, volume)
  VALUES (?, ?, ?, ?, ?, ?, ?)
  '''

  def __init__(self, db_path, in_memory_db = False, with_index=False):
      self.engine = create_engine(f"sqlite://") if in_memory_db else create_engine(f"sqlite:///{db_path}")
      self.with_index = with_index
      self.create_table()

  def clear_table(self):
    with self.engine.connect() as connection:
      connection.execute('DROP TABLE IF EXISTS today_in_ticks')
      if self.with_index:
        connection.execute('DROP INDEX IF EXISTS idx_today_in_ticks')

  def create_table(self):
    with self.engine.connect() as connection:
      connection.execute(self.table_query)
      connection.execute(self.index_query)
  
  def __build_data(self, real_data):
    return (
      real_data['code'],
      real_data['20'], # 체결시간 (HHMMSS)
      abs(int(real_data['16'])), # 시가 +-
      abs(int(real_data['17'])), # 고가 +-
      abs(int(real_data['18'])), # 저가 +-
      abs(int(real_data['10'])), # 현재가 +-
      abs(int(real_data['15'])), # 거래량 +-
    )

  def build_dataframe(self, real_data):
    return pd.DataFrame(
      [self.__build_data(real_data)],
      columns=['st_code', 'dt', 'open', 'high', 'low', 'close', 'volume']
    )

  def insert1(self, real_data):
    with self.engine.begin() as connection:
      connection.execute(self.insert_query, self.__build_data(real_data))

  def insert2(self, real_data):
    self.build_dataframe(real_data).to_sql('today_in_ticks', self.engine, if_exists='append', index=False)

In [None]:
real_data = {'code': '069500', 'type': '주식체결', '20': '100645', '16': '+31345', '17': '+31370', '18': '-31215', '10': ' 31275', '15': '+50', '11': ' 0', '12': '0.00', '13': '1613182'}

In [None]:
provider = RealTimeTickDataPrivder("kiwoom_db.sqlite3", in_memory_db=True, with_index=True)

In [None]:
real_data

connection.execute()로 수행시
- 인덱스 있을 때
  - 10,000건 추가에 70초
- 인덱스 없을 때
  - 10,000건 추가에 65초
- 인메모리
  - 0.7초

dataframe.to_sql()로 수행시
- 인덱스 있을 때
  - 10,000건 추가에 140초
- 인덱스 없을 때
  - 10,000건 추가에 141초
- 인메모리
  - 33초

dataframe.to_sql()로 벌크 수행시 (10,000건)
- 0.7초

결론
- 일정 주기 모아서 인서트 하는게 낫다
- 인메모리 디비로 선택하고, 표준 SQL로 작업하는게 낫다

In [None]:
for i in range(10000):
  real_data['20'] = f"{i:06d}"
  provider.insert1(real_data)

In [None]:
for i in range(10000):
  real_data['20'] = f"{i:06d}"
  provider.insert2(real_data)

In [None]:
ll = []
for i in range(10000):
  real_data['20'] = f"{i:06d}"
  ll.append(provider.build_dataframe(real_data))

In [None]:
pd.concat(ll, ignore_index=True).to_sql('today_in_ticks', provider.engine, if_exists='append', index=False)

In [None]:
provider.clear_table()

In [None]:
provider.create_table()

In [None]:
with provider.engine.connect() as connection:
  rr = connection.execute('select * from today_in_ticks').fetchall()