## SQLiteのテスト その2

大量の軌道要素データをSQLiteのデータベースに格納するテスト。

軌道要素データは json2parquet.py で parquet に変換済みであることを前提とする。

In [1]:
from datetime import datetime
import pandas as pd
import json
import os
import glob
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
import sqlite3

In [2]:
pd.set_option('display.max_columns', 50)
pd.set_option("display.max_rows", 100)
pd.set_option("display.max_colwidth", 80)

In [3]:
# Parquetファイル
files = sorted(glob.glob('download/19[89]*.parquet'))

In [4]:
# DBファイル
dbfile = 'db/dbtest2.sqlite3'

In [5]:
# DBに保存するcolumn
columns_out = ['CREATION_DATE', 'EPOCH', 'OBJECT_ID', 'MEAN_MOTION', 'ECCENTRICITY', 'INCLINATION', 'RA_OF_ASC_NODE',
    'ARG_OF_PERICENTER', 'MEAN_ANOMALY', 'NORAD_CAT_ID', 'REV_AT_EPOCH', 'BSTAR', 'SEMIMAJOR_AXIS',
    'PERIOD', 'APOAPSIS', 'PERIAPSIS', 'GP_ID', 'TLE_LINE0', 'TLE_LINE1', 'TLE_LINE2']

In [6]:
# テーブルとindexを作成する
# GP_ID を primary key とする
def create_table(cur, tablename):
    cur.execute('DROP TABLE IF EXISTS {}'.format(tablename))
    # cur.execute('VACUUM')
    cur.execute('''CREATE TABLE IF NOT EXISTS {} (
        CREATION_DATE timestamp, EPOCH timestamp, OBJECT_ID text,
        MEAN_MOTION real, ECCENTRICITY real, INCLINATION real, RA_OF_ASC_NODE real, ARG_OF_PERICENTER real, MEAN_ANOMALY real,
        NORAD_CAT_ID integer, REV_AT_EPOCH integer, BSTAR real, SEMIMAJOR_AXIS real, PERIOD real, APOAPSIS real, PERIAPSIS real,
        GP_ID integer primary key, TLE_LINE0 text, TLE_LINE1 text, TLE_LINE2 text)'''.format(tablename))
    cur.execute('CREATE INDEX IF NOT EXISTS index_{0}_epoch ON {0} (EPOCH)'.format(tablename))
    cur.execute('CREATE INDEX IF NOT EXISTS index_{0}_norad_cat_id ON {0} (NORAD_CAT_ID)'.format(tablename))

In [7]:
# SQLite3 のバージョンの確認 (3.24.0 でUPSERTサポート)
sqlite3.sqlite_version

'3.33.0'

In [8]:
# DBに接続
con = sqlite3.connect(dbfile)
cur = con.cursor()

In [9]:
# テスト用データ
df = pd.read_parquet(files[0], columns = columns_out)
print(len(df))

624982


In [10]:
create_table(cur, 'elset')

In [11]:
%%time
# 普通に to_sql で追加
df.to_sql('elset', con, if_exists='append', index=None)

CPU times: user 9.42 s, sys: 5.98 s, total: 15.4 s
Wall time: 15.3 s


In [12]:
create_table(cur, 'elset2')

In [13]:
%%time
# execute は datetime64 を受け付けないので、日時を str に変換する (to_pydatetime() で datetime に変換してもよい)
df2 = df.copy()
df2['CREATION_DATE'] = df2['CREATION_DATE'].astype(str)
df2['EPOCH'] = df2['EPOCH'].astype(str)

CPU times: user 1.82 s, sys: 93.1 ms, total: 1.92 s
Wall time: 1.91 s


In [14]:
# tupleのlist (または listのlist) に変換する時間
%timeit a=list(df2.itertuples(index=False, name=None))
%timeit a=df2.to_records(index=False).tolist()
%timeit a=list(df2.to_records(index=False))
%timeit a=df2.values.tolist()

1.09 s ± 2.91 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
1.78 s ± 3.38 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
1.05 s ± 4.15 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
1.06 s ± 1.55 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [15]:
%%time
# listのlistに変換して、executemany で追加
cur.executemany('INSERT INTO elset2 VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', df2.values.tolist())
con.commit()

CPU times: user 7.74 s, sys: 5.52 s, total: 13.3 s
Wall time: 13.3 s


In [16]:
%%time
# 同じデータを再度追加する
# GP_ID を PRIMARY KEY としているので、GP_ID が重複している可能性のあるデータを追加するには REPLACE が必要だが効率が悪い
cur.executemany('REPLACE INTO elset2 VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', df2.values.tolist())
con.commit()

CPU times: user 8.73 s, sys: 3.83 s, total: 12.6 s
Wall time: 12.7 s


In [17]:
%%time
# 同じデータを再度追加する
# 最近の SQLite では ON COFLICT 句を用いることで、重複データが大量にある場合には余分なDELETE/INSERTを行わずに済むので速い
cur.executemany('''INSERT INTO elset2 VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
    ON CONFLICT(GP_ID) DO NOTHING''', df2.values.tolist())
con.commit()

CPU times: user 4.35 s, sys: 1.07 s, total: 5.42 s
Wall time: 5.4 s


In [18]:
# 以下、大量のデータをDBに格納する

In [19]:
create_table(cur, 'elset3')

In [20]:
%%time
# REPLACEの場合
l = 0
for file in files:
    df = pd.read_parquet(file, columns = columns_out)
    l += len(df)
    df['CREATION_DATE'] = df['CREATION_DATE'].astype(str)
    df['EPOCH'] = df['EPOCH'].astype(str)
    cur.executemany('REPLACE INTO elset3 VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', df.values.tolist())
con.commit()
print(l)

32688590
CPU times: user 10min 5s, sys: 6min 37s, total: 16min 42s
Wall time: 16min 32s


In [21]:
cur.execute('SELECT COUNT(*) from elset3')
cur.fetchone()

(32688585,)

In [22]:
create_table(cur, 'elset4')

In [23]:
%%time
# INSERT ON CONFLICT DO NOTHING の場合 (重複データ数が少ないと所要時間はREPLACEとほとんど変わらない)
l = 0
for file in files:
    df = pd.read_parquet(file, columns = columns_out)
    l += len(df)
    df['CREATION_DATE'] = df['CREATION_DATE'].astype(str)
    df['EPOCH'] = df['EPOCH'].astype(str)
    cur.executemany('''INSERT INTO elset4 VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
    ON CONFLICT(GP_ID) DO NOTHING''', df.values.tolist())
con.commit()
print(l)

32688590
CPU times: user 10min 4s, sys: 6min 41s, total: 16min 46s
Wall time: 16min 8s


In [24]:
cur.execute('SELECT COUNT(*) from elset4')
cur.fetchone()

(32688585,)

In [25]:
create_table(cur, 'elset5')
cur.execute('DROP INDEX IF EXISTS index_elset5_epoch')
cur.execute('DROP INDEX IF EXISTS index_elset5_norad_cat_id')

<sqlite3.Cursor at 0x2b4dd253bb90>

In [26]:
%%time
# INDEXなしで追加し、最後にINDEXを追加する
l = 0
for file in files:
    df = pd.read_parquet(file, columns = columns_out)
    l += len(df)
    df['CREATION_DATE'] = df['CREATION_DATE'].astype(str)
    df['EPOCH'] = df['EPOCH'].astype(str)
    cur.executemany('''INSERT INTO elset4 VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
    ON CONFLICT(GP_ID) DO NOTHING''', df.values.tolist())
con.commit()
cur.execute('CREATE INDEX IF NOT EXISTS index_elset5_epoch ON elset5 (EPOCH)')
cur.execute('CREATE INDEX IF NOT EXISTS index_elset5_norad_cat_id ON elset5 (NORAD_CAT_ID)')
print(l)

32688590
CPU times: user 6min 14s, sys: 1min 49s, total: 8min 3s
Wall time: 6min 59s


In [27]:
con.close()