Consolidate data across multiple SQL servers and databases into a single table

Contributors: Orvin Bellamy (https://github.com/orvinbellamy)

In [1]:
# Import libraries (some may be redundant)

import pyodbc
import pandas as pd
import numpy as np
import math
import requests
import os
import json
import re
from pathlib import Path
from dotenv import load_dotenv
from requests.auth import HTTPBasicAuth
from itertools import repeat
from sqlalchemy import create_engine, event
from urllib.parse import quote_plus
from IPython.display import clear_output

In [2]:
# Specify where you store the .env file
# Note that .env file should contain server name of production & development database, and userID & password
# For example --> production_server = 'xxxx.database.xxxxx.net'
# Follow naming conventions: production_server, development_server, userid, password
env_path = Path('INSERT PATH')
load_dotenv(dotenv_path=env_path)

# Store server names in variables
production_db = os.getenv('production_server')
dev_db = os.getenv('development_server')

In [3]:
# all processing functions. You can create your own.

def composite_key(dframe, new_key, key1, key2):
    dframe[new_key] = dframe[key1]+"-"+dframe[key2].astype(str)

    return dframe
    
def drop_col(dframe, colnames):
    dframe = dframe.drop_duplicates(subset=colnames, keep='first')

    return dframe

def split_column(dframe, arg: list):
    dframe[arg[0]] = dframe[arg[1]].str.split(',', n=-1, expand=True).loc[:, :(len(arg[0])-1)]

    return dframe

def drop_na(dframe, colname):
    dframe = dframe[dframe[colname].notna()]

    return dframe

def datetime_to_date(dframe, colnames: list):
    for col in colnames:
        dframe[col] = pd.to_datetime(dframe[col]).dt.date
    
    return dframe

def to_int(dframe, colnames: list):
    for col in colnames:
        dframe[col] = dframe[col].astype(float)
    
    return dframe

def to_str(dframe, colnames: list):
    for col in colnames:
        dframe[col] = dframe[col].astype(str)

    return dframe

def to_none(dframe, colnames: list, null_value):
    if null_value == '':
        for col in colnames:
            dframe[col] = dframe[col].replace(pd.NA, 'None')
    else:
        for col in colnames:
            dframe[col] = dframe[col].replace(null_value, '')
    
    return dframe

def to_datetime(dframe, colnames: list):
    for col in colnames:
        dframe[col] = pd.to_datetime(dframe[col])
    
    return dframe

In [68]:
# processing function. JSON cannot store Python functions, so we store it directly in a Python dictionary.
# the JSON file stores the key to the function stored in pp_functions dictionary
# main() will call the function based on the key stored in the JSON file --> pp_functions[key stored in JSON]()
pp_functions = {
    "composite_key": composite_key,
    "drop_col": drop_col,
    "split_column": split_column,
    "drop_na": drop_na,
    "datetime_to_date": datetime_to_date,
    "to_int": to_int,
    "to_str": to_str,
    "to_none": to_none,
    "to_datetime": to_datetime
}

# server information are stored in .env, not JSON. JSON stores the keys to query_db where the server information are stored
# similar concept with processing, main() refer to query_db based on the key stored in the JSON file
query_db = {
    "production_db": production_db,
    "dev_db": dev_db
}

In [5]:
# function to connect to the database. Only arguments needed are server address and database
# we can loop through the list of database name and use this function to extract all their data
def connect(server, database):

    # set global variables. We need cur (cursor) to perform SQL queries
    global cnxn_str, cnxn, cur, quoted, engine

    # may need to change driver depending on your server
    cnxn_str = ("Driver={SQL Server Native Client 11.0};"
                "Server=" + server + ";"
                "Database=" + database + ";"
                "UID=" + os.getenv('userid') + ";"
                "PWD=" + os.getenv('password') + ";")

    # connect to the database
    cnxn = pyodbc.connect(cnxn_str)
    
    cur = cnxn.cursor()
    cur.fast_executemany=True

    quoted = quote_plus(cnxn_str)

    # engine is used for INSERT INTO function by pandas libraries (see later)
    engine = create_engine('mssql+pyodbc:///?odbc_connect={}'.format(quoted), fast_executemany=True)

In [1]:
# for privacy/security purposes, I have stored all the names of the database in a table on SQL server
# this function queries all the database names from the table
# if you have the list of all the databases you want to loop through, ignore this and set the db_name_list variable manually
def get_db_name():

    connect(dev_db, 'database_name')

    cur.execute("SELECT [db_name] FROM table_where_i_store_db_names")

    results = cur.fetchall()

    results = [i[0] for i in results]
    
    return results

In [69]:
# list of database names. We will use this to loop queries for each database
db_name_list = get_db_name()

# get the JSON file
with open('operations.json', 'r') as json_file:
    operations = json.loads(json_file.read())

    print("type: ", type(operations))

In [None]:
def left_outer_merge(df_left, df_right, primary_key):
    df_new = pd.merge(df_left, df_right[[primary_key]], on=primary_key, how='outer', indicator=True)
    df_new = df_new[df_new['_merge']=='left_only']
    df_new = df_new.drop('_merge', axis=1)

    # df_new.columns = df_right.columns.tolist()
    # df_new = df_new.set_axis([col[:-2] for col in df_new.columns.to_list()], axis=1, inplace=True)
    
    
    return df_new

def update_dataframe(df_new, df_old, primary_key):

    # left outer merge of old & new df
    df_merge = pd.merge(df_new, df_old, on=df_old.columns.to_list(), how='outer', indicator=True)
    df_merge = df_merge[df_merge['_merge']=='left_only']
    df_merge = df_merge.drop('_merge', axis=1)

    # df_update is whichever rows exist in old df
    df_update = df_merge[df_merge[primary_key].isin(df_old[primary_key].tolist())]
    # df_update is whichever rows doesn't exist in old df
    df_append = df_merge[~df_merge[primary_key].isin(df_old[primary_key].tolist())]
    
    # df_update = df_update[df_old.columns.to_list()[1:]]
    # df_append = df_append[df_old.columns.to_list()[1:]]

    update_rows = df_update[primary_key].tolist()
    
    return df_update, df_append, df_merge, update_rows

In [7]:
# the main() function executes all the steps
# 1. connect to the server
# 2. loop through each databases and query the same table
# 3. combine all query results into one dataframe
# 4. perform all preprocessing as defined in the JSON file
# 5. update (delete and insert) the consolidated data into a single table on SQL server

from math import fabs


def main(op_key):

    clear_output(wait=True)
    print('Operation: ' + op_key)

    # define global variables df (dataframe) and tb (table)
    # tb is a numpy array in (M x N+1) shape where M is the number of columns and N is the number of rows
    # first row of tb is always the column name
    global df, tb, results, df_append, df_old, df_update, df_merge

    # set first row of tb as column name
    tb = np.array([operations[op_key]['header']], dtype="object")

    # set row_tracker as a list that tracks the database
    # the first item is set as 'db' because this will be the column header when added to df (dataframe)
    # for example, if the first 10 rows extracted are from Abilene, then item 2 to 11 in row_tracker will be 'Abilene'
    row_tracker = ['db']

    # loop through each database from eadbpool (list of database names)
    print('Querying Data...')

    # if query from production database, execute code below
    if query_db[operations[op_key]["query_db"]] == production_db:
        for db_name in db_name_list:
            
            # connect to the database
            try:
                connect(production_db, db_name)
            except:
                try:
                    connect(dev_db, db_name)
                except:
                    pass

            cur.rollback()

            # execute the query according to the data we want to get from operations dictionary
            cur.execute(operations[op_key]['query'])
            
            # store query result in a temporary variable
            results = cur.fetchall()

            # print the database name and the number of rows extracted
            print(db_name + " " + str(len(results)))

            # add database name to row_tracker for each row queried
            row_tracker.extend([db_name] * len(results))

            # add queried data into the numpy array
            try:
                tb = np.insert(tb, tb.shape[0], results, axis=0)
            except:
                pass

        # add the row_tracker as a new column in the numpy array
        tb = np.insert(tb, tb.shape[1], row_tracker, axis=1)
    
    # if query from Eproval-Company, execute code below
    elif query_db[operations[op_key]["query_db"]] == dev_db:
        connect(dev_db, 'Eproval-Company')

        cur.rollback()

        cur.execute(operations[op_key]['query'])
        results = cur.fetchall()
        tb = np.insert(tb, tb.shape[0], results, axis=0)


    print('Querying done!')
    print('Data processing...')

    # transform numpy array to dataframe
    df = pd.DataFrame(tb[1:,],columns=tb[0])

    # for each list of arguments set, execute preprocessing function
    # this will mostly be creating composite keys
    # try:
    for key in operations[op_key]['preprocess']:
        for arguments in operations[op_key]['preprocess'][key]:
            df = pp_functions[key](df, *arguments)
            # print(key + ' - ' + str(arguments))


    # using convert_dic dictionary, convert each column to the appropriate data type
    df = df.astype(operations[op_key]['convert_dic'])

    # set chunk number, this is to set how much data is uploaded (INSERT INTO statement) to SQL server at once
    # more columns means fewer rows per chunks
    # maximum chunk is 2100, but is set to 2000 to avoid error. This is limitation of SQL server, do not change
    chunknum = math.floor(2000/len(df.columns))
    chunknum = 1000 if chunknum > 1000 else chunknum

    # connect to Eproval-Company
    print('Connecting to Eproval-Company')
    connect(dev_db, 'Eproval-Company')

    # reset cursor
    cur.rollback()

    if operations[op_key]['need_update']:
        print('Filtering out old data')
        
        # get exiting data from table in Eproval-Company
        cur.execute("SELECT * FROM [dbo].[" + operations[op_key]['table_name'] + "]")

        results = cur.fetchall()

        # get column names
        list_columns = [column[0] for column in cur.description]
        
        # assign column names to an array
        tb = np.array([list_columns], dtype="object")
        # append array with the existing data
        tb = np.insert(tb, tb.shape[0], results, axis=0)

        # create dataframe of old table
        df_old = pd.DataFrame(tb[1:,], columns=tb[0])
        
        try:
            for key in operations[op_key]['df_old_preprocess']:
                for arguments in operations[op_key]['df_old_preprocess'][key]:
                    df_old = pp_functions[key](df_old, *arguments)
                    # print(key + ' - ' + str(arguments))
        except:
            pass

        # get data to update and data to append
        df_update, df_append, df_merge, update_rows = update_dataframe(df, df_old, operations[op_key]['primary_key'])

        try:
            for key in operations[op_key]['df_update_preprocess']:
                for arguments in operations[op_key]['df_update_preprocess'][key]:
                    df_update = pp_functions[key](df_update, *arguments)
                    # print(key + ' - ' + str(arguments))
        except:
            pass
        
        if df_update.shape[0] > 0:
            if len(update_rows) == 1:
                print('Deleting old data in table... ')

                print("DELETE FROM [dbo].[" + operations[op_key]['table_name'] + "] WHERE " +
                    operations[op_key]['primary_key'] + " = " + update_rows[0])
                cur.execute("DELETE FROM [dbo].[" + operations[op_key]['table_name'] + "] WHERE " +
                    operations[op_key]['primary_key'] + " = '" + update_rows[0] + "'")
                # commit the query
                cur.commit()

            else:
                # delete old data from the table in Eproval-Company
                print('Deleting old data in table...')

                # print("DELETE FROM [dbo].[" + operations[op_key]['table_name'] + "] WHERE " +
                #     operations[op_key]['primary_key'] + " IN " + str(tuple(update_rows)))
                cur.execute("DELETE FROM [dbo].[" + operations[op_key]['table_name'] + "] WHERE " +
                    operations[op_key]['primary_key'] + " IN " + str(tuple(update_rows)))
                # commit the query
                cur.commit()

            # insert the update of the deleted data
            print('Updating the old deleted data... ' + str(df_update.shape[0]) + ' rows')
            df_update.to_sql(operations[op_key]['table_name'], engine, index=False, \
                if_exists='append', schema='dbo', chunksize=chunknum, method='multi')
        
        else:
            print('No rows to update')
        
        if df_append.shape[0] > 0:
            # upload (INSERT INTO statement) data from dataframe to the table in SQL server
            # this is a function from pandas library. Do not change arguments, it's already optimized for maximum speed
            print('Uploading new data (INSERT INTO)... ' + str(df_append.shape[0]) + ' rows')
            df_append.to_sql(operations[op_key]['table_name'], engine, index=False, \
                if_exists='append', schema='dbo', chunksize=chunknum, method='multi')

        else:
            print('No new rows to append')

    else:
        print('Filtering out old data')
        
        # get exiting data from table in Eproval-Company
        cur.execute("SELECT * FROM [dbo].[" + operations[op_key]['table_name'] + "]")

        results = cur.fetchall()

        # get column names
        list_columns = [column[0] for column in cur.description]
        
        # assign column names to an array
        tb = np.array([list_columns], dtype="object")
        # append array with the existing data
        tb = np.insert(tb, tb.shape[0], results, axis=0)

        # create dataframe of old table
        df_old = pd.DataFrame(tb[1:,], columns=tb[0])

        try:
            for key in operations[op_key]['update_preprocess']:
                for arguments in operations[op_key]['update_preprocess'][key]:
                    df_old = pp_functions[key](df_old, *arguments)
        except:
            pass

        # left outer merge -- only keep queried data that doesn't intersect with existing data (new data)
        df_append = left_outer_merge(df, df_old, operations[op_key]['primary_key'])
        
        if df_append.shape[0] > 0:
            # upload (INSERT INTO statement) data from dataframe to the table in SQL server
            # this is a function from pandas library. Do not change arguments, it's already optimized for maximum speed
            print('Uploading new data (INSERT INTO)... ' + str(df_append.shape[0]) + ' rows')
            df_append.to_sql(operations[op_key]['table_name'], engine, index=False,
                if_exists='append', schema='dbo', chunksize=chunknum, method='multi')
        
        else:
            print('No new rows to append')
        
    
    # error handling. If there is any ALTER query required after upload, execute now
    # mostly for correcting DATE data type in SQL server
    for query in operations[op_key]['alter_query']:
        try:
            cur.execute(query)
            cur.commit()
        except:
            cur.rollback()
    cnxn.commit()

    cur.close()
    cnxn.close
    print('Done!')

In [None]:
# loop through each key of operations and execute main() function
# basically get all data

for keys in operations:
    main(keys)