# Import all csv files to PSQL

In [1]:
from pathlib import Path

from sqlalchemy import create_engine
import psycopg2
import pandas as pd
from tqdm import tqdm

## Prepare the sql connection and a list of the csv files

In [2]:
conn = psycopg2.connect(
    database='kkbox_churn',
    user='mariosk',
    password='pass',
    host='localhost')

engine = create_engine('postgresql://mariosk:pass@localhost:5432/kkbox_churn')

In [3]:
files_to_ingest = list(Path('/home/mariosk/data/kkbox-churn').glob('**/*.csv'))
files_to_ingest

[PosixPath('/home/mariosk/data/kkbox-churn/user_logs.csv'),
 PosixPath('/home/mariosk/data/kkbox-churn/train.csv'),
 PosixPath('/home/mariosk/data/kkbox-churn/members_v3.csv'),
 PosixPath('/home/mariosk/data/kkbox-churn/sample_submission_zero.csv'),
 PosixPath('/home/mariosk/data/kkbox-churn/transactions.csv'),
 PosixPath('/home/mariosk/data/kkbox-churn/sample_submission_v2/data/churn_comp_refresh/sample_submission_v2.csv'),
 PosixPath('/home/mariosk/data/kkbox-churn/user_logs_v2/data/churn_comp_refresh/user_logs_v2.csv'),
 PosixPath('/home/mariosk/data/kkbox-churn/train_v2/data/churn_comp_refresh/train_v2.csv'),
 PosixPath('/home/mariosk/data/kkbox-churn/transactions_v2/data/churn_comp_refresh/transactions_v2.csv')]

## Ingest the files
**Note:** Use the chunksize argument to stream the large files

In [6]:
def execute_on_transaction(cursor, command):
    try:
        cursor.execute(command)
        conn.commit()
    except Exception as e:
        print(e)
        conn.rollback()

def create_table_from_csv(path, engine, conn):
    single_line_table = next(pd.read_csv(path, chunksize=1000))
    table_name = path.stem
    
    with conn.cursor() as cursor:
        execute_on_transaction(cursor, f'DROP TABLE IF EXISTS {table_name}')
    
    single_line_table.to_sql(table_name, engine, index=False, if_exists='append')
    with conn.cursor() as cursor:
        execute_on_transaction(cursor, f'DELETE FROM {table_name};')

def stream_from_file_to_psql(path, conn):
    table_name = path.stem
    with open(path, 'r') as f:
        next(f) # skip the header
        with conn.cursor() as cursor:
            try:
                cursor.copy_from(f, table_name, sep=",")
                conn.commit()
            except Exception as e:
                print(f'Failed to ingest {path.stem}: {e}')
                conn.rollback()

In [9]:
def create_index_on_msno(path, conn):
    table_name = path.stem
    with conn.cursor() as cursor:
        execute_on_transaction(cursor, f'CREATE INDEX msno_idx_{table_name} ON {table_name} (msno);')

### Create all tables
Unfortunately `sqlalchemy` is too slow for insertions, but still convenient to get the schemas defined.

In [6]:
for path in tqdm(files_to_ingest):
    create_table_from_csv(path, engine, conn)

100%|██████████| 9/9 [00:00<00:00, 10.59it/s]


In [7]:
%%time
# Leave the larger 'user_logs' its own cell
for path in tqdm(files_to_ingest[1:]):
    stream_from_file_to_psql(path, conn)

100%|██████████| 8/8 [00:56<00:00,  7.11s/it]

CPU times: user 2.1 s, sys: 1.19 s, total: 3.29 s
Wall time: 56.9 s





In [9]:
%%time
# Stream user_logs
stream_from_file_to_psql(files_to_ingest[0], conn)

CPU times: user 17.3 s, sys: 12.5 s, total: 29.8 s
Wall time: 9min 22s


In [10]:
%%time
for path in tqdm(files_to_ingest[1:]):
    create_index_on_msno(path, conn)

100%|██████████| 8/8 [02:03<00:00, 15.42s/it]

CPU times: user 17.3 ms, sys: 1.34 ms, total: 18.7 ms
Wall time: 2min 3s



