In [None]:
import psycopg2 as db
import os
import json

In [None]:
# create the sql engine for postgres db
def sql_build_connection(dbname):
    '''A simple function to derive a postgresql connection.

    Summary
    -------
    Create a postgresql connection. This assumes that there is a file
    called .sql in the user root folder with database credentials.

    Returns
    -------
    engine: sql engine
        A postgresql engine object

    '''
    root = os.path.expanduser('~')
    with open(f'{root}/.sqluser', 'r') as f:
        creds = json.load(f)
    connection = (f'''postgresql://{creds["uid"]}:{creds["pwd"]}@localhost:5432/{dbname}''')
    engine = db.connect(connection)
    return engine


In [None]:
def sql_table_create_parts(file_, prefix=''):
    '''Create formated sql string for column definition in table create statement.

    Summary
    -------
    Based on the metadata provided in file_ the list of columns with the proper definitions as well as generates the table name.

    Parameters
    ----------
    file_: string
        A file path to a meta json object
    prefix: string
        Optional: the prefix to be used for the table naming.

    Returns
    -------
    source_path: string
        The path for the source data
    sql_name: string
        The name of the sql table
    sql_columns: string
        The sql syntax for defining the columns for the table
    '''
    # get metadata
    with open(file_, 'r') as f:
        meta = json.load(f)

    # build table and column elements
    source_path = meta['source']
    table_name = source_path.split('/')[-1][:-4]
    column_names = list(meta['data_type'].keys())
    data_types = list(meta['data_type'].values())
    has_null = list(meta['na_count'].values())
    max_length = list(meta['char_max_len'].values())

    # add prefix to sql table name
    sql_table = f'''{prefix}{table_name}'''

    # correct data types to sql types
    replacements = {'int64': 'integer', 'float64': 'double precision', 'object': 'character varying'}
    sql_types = []
    for t in data_types:
        sql_types.append(replacements.get(t))

    # set the max string length
    sql_str_len = [round(v + 15, -1) for v in max_length]

    # convert has null to boolean
    sql_not_null = [not bool(v) for v in has_null]

    # build string with column syntax
    sql_columns = False
    for i, column in enumerate(column_names):
        dtype = sql_types[i]
        
        # if the data type is a character then add
        if dtype[:4] == 'char':
            size = int(sql_str_len[i])
            dtype = f'''{dtype}({size})'''
        not_null = sql_not_null[i]
        
        # add not_null qualifier where true
        if not_null:
            dtype = f'''{dtype} NOT NULL'''
        
        # build the sql string for column defitions
        if sql_columns:
            sql_columns = f'''{sql_columns} "{column}" {dtype},'''
        else:
            sql_columns = f'''"{column}" {dtype},'''

    sql_columns = sql_columns[:-1]
    return source_path, sql_table, sql_columns

In [None]:
def sql_create_table(table, columns, connect, index_cols=False):
    '''Build a sql table based on name and column specs provided.

    Summary
    -------
    Create a table in the sql database. If the table exists it is droped and replaced with the new definition.

    Parameters
    ----------
    table: string
        The name of the table to be created
    columns: string
        The string containing the column definitions for the table
    conncet: database connection to be used
        Database engine to be used
    index_cols: string
        Optional parameter that should be a comma seperated list of column names for construction of the index
    '''    
    # sql to drop table if exists and create table based on definition provided
    drop = (f'''DROP TABLE IF EXISTS "{table}";''')
    create = (f'''CREATE TABLE "{table}" ({columns});''')
    if index_cols:
        index = (f''' CREATE INDEX idx_{table} ON {table} ({index_cols})''')
    
    # build the cursor
    cursor = connect.cursor()

    # execute statements
    cursor.execute(drop)
    cursor.execute(create)
    if index_cols:
        cursor.execute(index)
    
    # commit statements
    connect.commit()
    cursor.close()



In [None]:
def sql_csv_loader(csv_path, target_table, connect):
    '''Load csv data into database using pandas and sqlalchemy.

    Summary
    -------
    Take the file path to a csv dataset, load the csv into pandas dataframe, then pipe into database using sqlachemy.

    Parameters
    ----------
    csv_path: string
        The Fully qualify path to the csv data to be loaded into sql
    target_table: mapper
        The table object for the target table in the database
    conncet: database connection to be used
        Database engine to be used
    '''
    # build the cursor object
    cursor = connect.cursor()

    # run the data load
    copy_cmd = f'''COPY {target_table} FROM STDIN WITH (FORMAT CSV, HEADER TRUE, DELIMITER ',', FORCE_NULL("VALUE"))'''
    truncate = f'''TRUNCATE TABLE {target_table}'''
    with open(csv_path, 'r') as file_:
        cursor.execute(truncate)
        cursor.copy_expert(sql=copy_cmd, file=file_)
        connect.commit()
        cursor.close()
    print (f'The data from {csv_path} has been written to {target_table}')


In [None]:
# get the sql connection
connection = sql_build_connection('analytics')

In [None]:
# get the metadata to be used for building the tables
meta_dir = os.path.realpath('../data')
files = os.listdir(meta_dir)
match = 'meta.json'
meta_json = [f'{meta_dir}/{keep}' for keep in files if keep.endswith(match)]

# mappings to make sure that things get labelled correctly
prefixes = {
    '/Users/sean/Projects/pycaret-exploration/data/14100090-meta.json': 'labourforce_', 
    '/Users/sean/Projects/pycaret-exploration/data/14100092-meta.json': 'emp_naics_', 
    '/Users/sean/Projects/pycaret-exploration/data/14100312-meta.json': 'emp_nocs_'
    }

for_index = {
    '/Users/sean/Projects/pycaret-exploration/data/14100090-meta.json': 'Labour force characteristics', 
    '/Users/sean/Projects/pycaret-exploration/data/14100092-meta.json': 'North American Industry Classification System (NAICS)',
    '/Users/sean/Projects/pycaret-exploration/data/14100312-meta.json': 'National Occupational Classification (NOC)'
    }

In [None]:
for path in meta_json:
    # the parts for building the sql table
    source, table, columns = sql_table_create_parts(path, prefixes[path])
    index = f'''"REF_DATE", "GEO", "{for_index[path]}"'''

    # create the sql table
    sql_create_table(table, columns, connection, index)

    # load the data into sql
    sql_csv_loader(source, table, connection)

In [None]:
# close the database connection
connection.close()