# Summary

Need to figure out how to generate parquet files for Spark
from the SQLite databases.

In [6]:
from datetime import datetime
import os
import logging
from urllib.parse import urljoin

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

from okra.models import DataAccessLayer
from okra.logging_utils import enable_cloud_log

In [7]:
enable_cloud_log()

logger = logging.getLogger(__name__)

In [8]:
db_dir = '/Users/tylerbrown/code/'
dbs = [i for i in os.listdir(db_dir) if i[-3:] == ".db"]
dbs

['torvalds__REPODB__linux.db',
 'apache__REPODB__attic-wink.db',
 'apache__REPODB__attic-lucy.db',
 'docker__REPODB__docker-ce.db',
 'apache__REPODB__lucene-solr.db',
 'apache__REPODB__spark.db']

In [15]:
def write_parquet_table(table_name: str, query: str, db_dir: str, dbs: list, batch_size):
    logger.info("Writing parquet files for {}".format(table_name))
    df = None
    first = True
    datenow = datetime.now().strftime('%Y-%m-%d')
    table_format = table_name + "_{}_{}.parquet"
    
    subnum = 0
    for db in dbs:
        
        conn = 'sqlite:///' + db_dir + db
        
        if first:
            df = pd.read_sql(query, conn)
            first = False
        else:
            da = pd.read_sql(query, conn)
            df = df.append(da)
            
        if df.shape[0] > batch_size:
            
            table_name = table_format.format(datenow, subnum)
            table_path = urljoin(db_dir, table_name)
            df_out = df.iloc[0:batch_size]
            table = pa.Table.from_pandas(df_out)
            pq.write_table(table, table_path)
            logger.info("Wrote parquet file: {}".format(table_path))
            subnum += 1
            
            df = df.iloc[batch_size:]
            
    if df.shape[0] > 0:
        
        table_name = table_format.format(datenow, subnum)
        table_path = urljoin(db_dir, table_name)
        table = pa.Table.from_pandas(df)
        pq.write_table(table, table_path)
        logger.info("Wrote parquet file: {}".format(table_path))
        
            
    

In [16]:
table_name = 'commit_file'
query = 'SELECT * FROM commit_file;'
db_dir = '/Users/tylerbrown/code/'

write_parquet_table(table_name, query, db_dir, dbs, batch_size=int(2e6))

2019-04-20 14:17:58,870 INFO Writing parquet files for commit_file
  result = infer_dtype(pandas_collection)
2019-04-20 14:18:07,736 INFO Wrote parquet file: /Users/tylerbrown/code/commit_file_2019-04-20_0.parquet
2019-04-20 14:18:08,513 INFO Wrote parquet file: /Users/tylerbrown/code/commit_file_2019-04-20_1.parquet


In [17]:
def sqlite_to_parquet(tables: list, db_dir, batch_size):
    
    logger.info("STARTED converting all sqlite databases to parquet")
    dbs = [i for i in os.listdir(db_dir) if i[-3:] == ".db"]
    query_format = "SELECT * FROM {};"
    for table_name in tables:
       
        query = query_format.format(table_name)
        logger.info("Created query: {}".format(query))
        write_parquet_table(table_name, query, db_dir, dbs, batch_size)
        
    logger.info("FINISHED converting all sqlite databases to parquet")

In [18]:
tables = ['meta','author','contrib', 'commit_file', 'info']
db_dir = '/Users/tylerbrown/code/'
batch_size = int(2e6)
sqlite_to_parquet(tables, db_dir, batch_size)

2019-04-20 14:18:08,709 INFO STARTED converting all sqlite databases to parquet
2019-04-20 14:18:08,710 INFO Created query: SELECT * FROM meta;
2019-04-20 14:18:08,711 INFO Writing parquet files for meta
2019-04-20 14:18:11,951 INFO Wrote parquet file: /Users/tylerbrown/code/meta_2019-04-20_0.parquet
2019-04-20 14:18:12,016 INFO Created query: SELECT * FROM author;
2019-04-20 14:18:12,016 INFO Writing parquet files for author
2019-04-20 14:18:23,694 INFO Wrote parquet file: /Users/tylerbrown/code/author_2019-04-20_0.parquet
2019-04-20 14:18:23,817 INFO Created query: SELECT * FROM contrib;
2019-04-20 14:18:23,818 INFO Writing parquet files for contrib
2019-04-20 14:18:37,129 INFO Wrote parquet file: /Users/tylerbrown/code/contrib_2019-04-20_0.parquet
2019-04-20 14:18:37,248 INFO Created query: SELECT * FROM commit_file;
2019-04-20 14:18:37,249 INFO Writing parquet files for commit_file
2019-04-20 14:18:46,426 INFO Wrote parquet file: /Users/tylerbrown/code/commit_file_2019-04-20_0.parq