Importing the required Libraries

In [None]:
import sqlalchemy as sqa
import glob
import os
import pandas as pd
import win32com.client
import getpass
import requests
import zipfile,shutil
import logging
import pyodbc
import sys
import time
import numpy as np
from pandas.io.json import json_normalize
from threading import Thread
from datetime import datetime

Get the current date to track the data loads

In [None]:
x = datetime.now()
date=x.strftime("%d")+"-"+x.strftime("%m")+"-"+x.strftime("%Y")

Create the logging 

In [None]:
logging.basicConfig(
    level = logging.INFO,
    format = "%(asctime)s [%(levelname)s] %(message)s",
    handlers[
        logging.FileHandler("debug.log"),
        logging.StreamHandler()
    ]
)

Connect to Database and create functions to load data to SQL

In [None]:
ConnStr = "DRIVER = {Provide your driver name };TrustServerCertificate = yes;Authentication=ActiveDirectoryIntegrated;SERVER = GiveyourServernamewithport;DATABASE = DBNAME"
def ConnectToDB(ConnStr):
    params = urllib.parse.quote_plus(ConnStr)
    engine = sqa.create_engine("mssql+pyodbc:///?odbc_connect=%s" % params)
    cnxn = engine.connect()
    cnxn.fast_executemany = True
    return cnxn

def LoadToSQL(tbl,cnxn,chunk):
    chunk.to_sql(name = tbl,con = cnxn,schema = 'dbo',if_exists = 'append',index = False, index_Label=None,chunksize = 100000)
    
def CreateTargetTable(tbl,cnxn,chunk):
    chunk.to_sql(name = tbl,con = cnxn;schema = 'dbo',if_exist = 'replace',index = False,index_Label = None,chunksize =100000,dtype  = None)

Function to Load huge data to SQL table

In [None]:
def ParallelLoad(df,TargetTable,ConnString,func,ThreadLimit = 16):
    connections = []
    for i in range(0,ThreadLimit,1):
        connections.append(ConnectToDB(ConnString))
    threads=[]
    StepSize = int(df.shape[0]/ThreadLimit)
    for i in range(0,ThreadLimit):
        process = Thread(target=func,args = [TargetTable,connections[i],df[i*StepSize:(i*StepSize)+StepSize]])
        process.start()
        threads.append(process)
    for process in threads:
        process.join()
    threads = []
    if (StepSize*ThreadLimit)!= df.shape[0]:
        process = Thread(target=func,args = [TargetTable,connection[0],df(StepSize*ThreadLimit):df.shape[0]])
        process.start()
        threads.append(process)
    for process in threads:
        process.join()
    return

targetdir - Directory where your zip files are present.
ProcessedDir - Directory where the processed zip files will be moved.
zippedFiles - List of Zip files in the directory.
binaryFiles - List of Unzipped files in the directory

In [None]:
targetdir = 'Provideyourfilepath/whichneedstobe/loadedtoSQL'
ProcessedDir = targetdir+"Processed"
zippedFiles = glob.glob(targetdir+'*zip',recursive = True)
binaryFiles = glob.glob(targetdir+'**/*'+'*xlsb',recursive=True)

Printing the zipped files in the directory

In [None]:
logging.info(zippedFiles)

Printing the unzipped files in the directory

In [None]:
logging.info(binaryFiles)

Exisiting excel files in the directory

In [None]:
existingfilesList = []
for files in binaryFiles:
    fileNameList = files.split('\\')
    for name in fileNameList:
        if 'xlsb' in name:
            existingfilesList.append(name.replace(".xlsb",""))
            break
logging.info(existingfilesList)

Unzip the zip files and move the zip files to processed directory

In [None]:
for fl in zippedFiles:
    f = fl.split("\\").pop().replace(".zip","")
    if f not in exisitngfilesList:
        logging.info(f)
        if(zipfile.is_zipfile(fl) and fl.split(".").pop()=="zip"):
            with zipfile.ZipFile(fl,'r') as m:
                m.extractall(path = targetdir+'/'+f)
    try:
        shutil.move(src = fl,dst = ProcessedDir)
        logging.info('Moved: '+fl)
    except:
        pass

Get all the excel files in the directory

In [None]:
updatedFiles = glob.glob(targetdir+'**/*'+'*.xlsb',recursive=True)
logging.info(updatedFiles)

Function to create table

In [None]:
def CreateTable(df,tableName):
    columnName = list(df.column.values)
    createTableStatement = 'CREATE TABLE '+tableName + ' ('
    for i in range(len(columnName)):
        createTableStatement = createTableStatement+ '\n'+"["+columnName[i]+"]"+' '+'NVARCHAR(max) NULL'+','
    createTableStatement = createTableStatement[:-1]+')'
    print("createTableStatement ",createTableStatement)
    conn = pyodbc.connect('DRIVER = {Your Driver Name};'
                         'TrustServerCertificate = yes;'
                         'Authentication=ActiveDirectoryIntegrated;'
                          'SERVER = GiveyourServername:port;'
                          'DATABASE = DBNAME')
    cursor = conn.cursor()
    dropStatement = 'DROP TABLE IF EXIST dbo.'+tableName
    cursor.execute(dropStatement)
    cursor.execute(createTableStatement)
    logging.info(tableName+"Table Created Successfully.")
    #commit your changes in database
    conn.commit()
    #close the connection
    conn.close()

Create a sequence in the database and get the next value from the sequence.

In [None]:
conn = ConnectToDB(ConnString)
sql = '''select next value for [dbo].[YourSequenceName]'''
data = pd.read_sql(sql,conn)
sequence = 0
for i,j in data.iterrows():
    sequence = j
print(sequence[0])

check the current count of the data in the table.

In [None]:
def checkRowcount(tableName):
    conn = ConnectToDB(ConnString)
    try:
        rowcount = conn.execute("select count(*) from ["+tableName+"]").scalar()
    except:
        rowcount = -1
    return rowcount

check the current count of data in the table for a specific excel file that is being loaded.

In [None]:
def checkSpecificRowcount(parameter,tableName):
    conn = ConnectToDB(ConnString)
    try:
        rowcount = conn.execute("select count(*)  from ["+tableName+"] where SourceFile= '"+parameter+"'").scalar()
    except:
        rowcount = -1
    return rowcount

Delete the data from table

In [None]:
def deleteStatement(parameter,tableName):
    conn = ConnectToDB(ConnString)
    rowcount = conn.execute("DELETE FROM ["+tableName+"] where SourceFile= '"+parameter+"'")

Provide the table name

In [None]:
YourTableName = 'YourTableName'

Create the RunLogTable in the database to track the data loading.
pattern name is the common value for all the excels in the directory.
For example, the files you want to load in the table have 'Google'in the name then 'Google' can be your pattern name.

In [None]:
def logInformation(fileName,message,status):
    conn = pyodbc.connect('DRIVER = {Provide your driver name };'
                          'TrustServerCertificate = yes;'
                          'Authentication=ActiveDirectoryIntegrated;'
                          'SERVER = GiveyourServernamewithport;'
                          'DATABASE = DBNAME')
    cursor = conn.cursor()
    cursor.execute("INSERT INTO RunLogTable(run_id,source,source_file_name,message,status,created_date) VALUES (?,?,?,?,?,?)",(int(sequence[0]),'PatternName',fileName,message,status,datetime.now()))
    logging.info(tableName+" Log Table Updated Successfully!")
    #commit your changes in the database
    conn.commit()
    #closing the connection
    conn.close()

Load the data to SQL table

In [None]:
for files in UpdatedFiles:
    MyRowCount = checkRowcount(YourTableName)
    tableName = ""
    if files not in binaryFiles:
        failedCount =0
        fileNameList=files.split('\\')
        for name in fileNameList:
            if 'xlsb' in name:
                nameList = name.split(" ")
                if len(nameList)>0:
                    for item in nameList:
                        if 'PatternName' in item:
                            tableName = item.replace(".xlsb","")
                            break
                else:
                    tableName =name.replace(".xlsb","")
                break
        logging.info("Table Name: "+tableName)
        files = files.replace('\\','/')
        df = pd.read_excel(files,engine = 'pyxlsb',sheet_name ='my_sheet',skiprows=0)
        for col in df.columns:
            if col.startswith('Unnamed'):
                df.drop(col,axis=1,inplace=True)
        df['SourceFile']=tableName
        df['UploadDate']=date
        if(MyRowCount==-1):
            createTable(df,YourTableName)
        try:
            logInformation(tableName,"Python: Data Upload Operation Started","start")
            result = ParallelLoad(df,YourTableName,ConnString,LoadToSQL,16)
            logging.info(result)
            rowcount = checkSpecificRowcount(tableName,YourTableName)
            if rowcount == df.shape[0]:
                logInformation(tableName,"Python: File has been uploaded successfully","Success")
            else:
                logInformation(tableName,"Python: All rows have not been uploaded.","Failure")
                raise Exception("All the rows have not been uploaded")
        except Exception as err:
            if(failedCount <=5):
                failedCount=failedCount+1
                logInformation(tableName,"Python Error: "+str(err),"Error")
                logging.exception(err)
                logInformation(tableName,"Python: Retrying Loading data","Retry")
                logging.info("Retrying..")
                deleteStatement(tableName,YourTableName)
                #createTable(df,tableName)
                logInformation(tableName,"Python: Data Upload Operation Started","Start")
                result = ParallelLoad(df,YourTableName,ConnString,LoadToSQL,16)
                logging.info(result)
                logInformation(tableName,"Python: File has been uploaded sucessfully.","Success")
            else:
                logInformation(tableName,"Python: Operation has failed.","Failure")