# Data Engineering in Python with databolt  - Fast Loading to SQL with pandas (d6tlib/d6tstack)

Pandas and SQL are great but they have some problems:
* loading data from pandas to SQL is very slow. So you can't preprocess data with python and then quickly store it in a db
* Loading CSV files into SQL is cumbersome and quickly breaks when input files are not consistent

With `d6tstack` you can:
* load pandas dataframes to postgres or mysql much faster than with `pd.to_sql()` and with minimal memory consumption
* preprocess CSV files with pandas before writing to db
* solve data schema problems (eg new or renamed columns) before writing to db 
* out of core functionality where large files are processed in chunks

In this workbook we will demonstrate the usage of the d6tstack library for quickly loading data into SQL from CSV files and pandas.

# pd.to_sql() is slow
Let's see how slow `pd.to_sql()` is storing 100k rows of random data.

In [34]:
import pandas as pd
import numpy as np
import uuid
import sqlalchemy
import glob
import time

cfg_uri_psql = 'postgresql+psycopg2://psqlusr:psqlpwdpsqlpwd@localhost/psqltest'
cfg_uri_mysql = 'mysql+mysqlconnector://testusr:testpwd@localhost/testdb'

cfg_nobs = int(1e5)
np.random.seed(0)
df = pd.DataFrame({'id':range(cfg_nobs)})
df['uuid']=[uuid.uuid4().hex.upper()[0:10] for _ in range(cfg_nobs)]
df['date']=pd.date_range('1/1/2010',periods=cfg_nobs, freq='1T')
for i in range(20):
    df['d'+str(i)]=np.random.normal(size=int(cfg_nobs))

print(df.shape)

(100000, 23)


In [35]:
sqlengine = sqlalchemy.create_engine(cfg_uri_psql)

start_time = time.time()
df.to_sql('benchmark',sqlengine,if_exists='replace')
print("--- %s seconds ---" % (time.time() - start_time))

--- 28.010449647903442 seconds ---


# Speeding up pd.to_sql() in postgres and mysql with d6tstack
Let's see how we can make this faster. In this simple example we have a ~5x speedup with the speedup growing exponentially with larger datasets.

In [36]:
import d6tstack.utils

# psql
start_time = time.time()
d6tstack.utils.pd_to_psql(df, cfg_uri_psql, 'benchmark', if_exists='replace')
print("--- %s seconds ---" % (time.time() - start_time))

# mysql
start_time = time.time()
d6tstack.utils.pd_to_mysql(df, cfg_uri_mysql, 'benchmark', if_exists='replace')
print("--- %s seconds ---" % (time.time() - start_time))


--- 4.6529316902160645 seconds ---
creating mysql.csv ok
loading mysql.csv ok
--- 7.102342367172241 seconds ---


# Using Pandas for preprocessing CSVs before storing to database
Pandas is great for preprocessing data. For example lets say we want to process dates before importing them to a database. `d6tstack` makes this easy for you, you simply pass the filename or list of files along with the preprocessing function and it will be quickly loaded in SQL - without loading everything into memory.

In [46]:
cfg_fname = 'test-data/input/test-data-input-csv-colmismatch-feb.csv'
print(pd.read_csv(cfg_fname).head())

         date  sales  cost  profit
0  2011-02-01    200   -90     110
1  2011-02-02    200   -90     110
2  2011-02-03    200   -90     110
3  2011-02-04    200   -90     110
4  2011-02-05    200   -90     110


In [47]:
def apply(dfg):
    dfg['date'] = pd.to_datetime(dfg['date'], format='%Y-%m-%d')
    dfg['date_year_quarter'] = (dfg['date'].dt.year).astype(str).str[-2:]+'Q'+(dfg['date'].dt.quarter).astype(str)
    dfg['date_monthend'] = dfg['date'] + pd.tseries.offsets.MonthEnd()
    return dfg

d6tstack.combine_csv.CombinerCSV([cfg_fname], apply_after_read=apply,add_filename=False).to_psql_combine(cfg_uri_psql, 'benchmark', if_exists='replace')
print(pd.read_sql_table('benchmark',sqlengine).head())

sniffing columns ok
        date  sales  cost  profit date_year_quarter date_monthend
0 2011-02-01    200   -90     110              11Q1    2011-02-28
1 2011-02-02    200   -90     110              11Q1    2011-02-28
2 2011-02-03    200   -90     110              11Q1    2011-02-28
3 2011-02-04    200   -90     110              11Q1    2011-02-28
4 2011-02-05    200   -90     110              11Q1    2011-02-28


# Loading multiple CSV to SQL with data schema changes
Native database import commands only support one file. You can write a script to process multipe files which first of all is annoying and even worse it often breaks eg if there are schema changes. With `d6tstack` you quickly import multiple files and deal with data schema changes with just a couple of lines of python. The below is a quick example, to explore full functionality see  https://github.com/d6t/d6tstack/blob/master/examples-csv.ipynb

In [48]:
import glob
import d6tstack.combine_csv

cfg_fnames = list(glob.glob('test-data/input/test-data-input-csv-colmismatch-*.csv'))
c = d6tstack.combine_csv.CombinerCSV(cfg_fnames)

# check columns
print('all equal',c.is_all_equal())
print('')
c.is_column_present()

sniffing columns ok
all equal False



Unnamed: 0_level_0,date,sales,cost,profit,profit2
file_path,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
test-data/input/test-data-input-csv-colmismatch-feb.csv,True,True,True,True,False
test-data/input/test-data-input-csv-colmismatch-jan.csv,True,True,True,True,False
test-data/input/test-data-input-csv-colmismatch-mar.csv,True,True,True,True,True


The presence of the additional `profit2` column in the 3rd file would break the data load. `d6tstack` will fix the situation and load everything correctly. And you can run any additional preprocessing logic like in the above example. All this is done out of core so you can process even large files without any memory issues.

In [49]:
cfg_fnames = list(glob.glob('test-data/input/test-data-input-csv-colmismatch-*.csv'))
d6tstack.combine_csv.CombinerCSV(cfg_fnames, apply_after_read=apply,add_filename=False).to_psql_combine(cfg_uri_psql, 'benchmark', if_exists='replace')
print(pd.read_sql_table('benchmark',sqlengine).tail())

sniffing columns ok
         date  sales  cost  profit  profit2 date_year_quarter date_monthend
25 2011-03-06    300  -100     200    400.0              11Q1    2011-03-31
26 2011-03-07    300  -100     200    400.0              11Q1    2011-03-31
27 2011-03-08    300  -100     200    400.0              11Q1    2011-03-31
28 2011-03-09    300  -100     200    400.0              11Q1    2011-03-31
29 2011-03-10    300  -100     200    400.0              11Q1    2011-03-31
