In [117]:
# Import libraries

import pandas as pd
import sqlalchemy as sa
import pymssql
import glob
import os
import shutil

In [118]:
# Download and Archive folder path

DOWNLOAD_FOLDER = '../data/'
ARCHIVE_FOLDER = '../archive/'

# Server details

RDBMS_NAME = 'mssql'
DRIVER_NAME = 'pymssql'
USER_NAME = 'sa'
PASSWORD = 'Datac3rtsql'
SERVER_NAME = 'ELM-NSUNDAR-02\\S2016'
DATABASE_NAME = 'NSE'

In [119]:
# Create table query

# CREATE TABLE [dbo].[BHAV_COPY_TEST](
#     [ID] [int] IDENTITY(1,1) NOT NULL,
#     [FNAME] [nvarchar](255) NULL,
#     [RCOUNT] [int]
# ) ON [PRIMARY]



# CREATE TABLE [dbo].[BHAV_COPY](
#     [ID] [int] IDENTITY(1,1) NOT NULL,
#     [SYMBOL] [nvarchar](255) NULL,
#     [SERIES] [nvarchar](255) NULL,
#     [OPEN] [float] NULL,
#     [HIGH] [float] NULL,
#     [LOW] [float] NULL,
#     [CLOSE] [float] NULL,
#     [LAST] [float] NULL,
#     [PREVCLOSE] [float] NULL,
#     [TOTTRDQTY] [float] NULL,
#     [TOTTRDVAL] [float] NULL,
#     [TIMESTAMP] [date] NULL,
#     [TOTALTRADES] [float] NULL,
#     [ISIN] [nvarchar](255) NULL,
#     [FNAME] [nvarchar](255) NULL
# ) ON [PRIMARY]

In [120]:
# Table name for bhav_copy and test table

BHAV_COPY_TABLE_NAME = 'BHAV_COPY'
BHAV_COPY_TEST_TABLE_NAME = 'BHAV_COPY_TEST'

In [121]:
# Create folder if not exist

if not os.path.exists(DOWNLOAD_FOLDER):
    os.makedirs(DOWNLOAD_FOLDER)

if not os.path.exists(ARCHIVE_FOLDER):
    os.makedirs(ARCHIVE_FOLDER)

In [122]:
# downloaded files

downloaded_files = glob.glob(f'{DOWNLOAD_FOLDER}/*.csv')

In [123]:
# bhav copy columns

cols = ['SYMBOL','SERIES','OPEN','HIGH','LOW','CLOSE','LAST',
        'PREVCLOSE','TOTTRDQTY','TOTTRDVAL','TIMESTAMP','TOTALTRADES','ISIN']

In [124]:
# Database connection

engine = sa.create_engine(f'{RDBMS_NAME}+{DRIVER_NAME}://{USER_NAME}:{PASSWORD}@{SERVER_NAME}/{DATABASE_NAME}')

In [125]:
# Loading to database

if not downloaded_files:
    print('No downloaded files to load.')
else:
    print(f'No. of files to load {len(downloaded_files)}')
    for i, f in enumerate(downloaded_files):
        file_name = os.path.basename(f)
        bhav_df = pd.read_csv(f, usecols=lambda c: c in cols, parse_dates=['TIMESTAMP'])
        bhav_df['FNAME'] = file_name
        data = [{'FNAME':file_name,'RCOUNT':bhav_df.shape[0]}]
        bhav_check_df = pd.DataFrame(data=data)
        
        with engine.connect() as con:
            bhav_df.to_sql(BHAV_COPY_TABLE_NAME, con, if_exists='append', index=False)
            bhav_check_df.to_sql(BHAV_COPY_TEST_TABLE_NAME, con, if_exists='append', index=False)
            
        print(f'Out of {len(downloaded_files)}, {i+1} loaded into DB - {file_name}.')

No downloaded files to load.


In [126]:
# delete duplicate rows for bhav_copy table if any

del_bhav_copy = f'''   DELETE 
FROM 
    {BHAV_COPY_TABLE_NAME} 
WHERE 
    "ID" IN (
SELECT "ID" 
FROM(
SELECT 
    *, ROW_NUMBER() OVER (PARTITION BY "SYMBOL", "SERIES", "TIMESTAMP" ORDER BY "FNAME") AS "RNo" 
FROM
    {BHAV_COPY_TABLE_NAME}) A
WHERE A."RNo" >1)  '''

In [127]:
# delete duplicate rows for bhav_copy_test table if any

del_bhav_copy_test = f'''  DELETE 
FROM 
    {BHAV_COPY_TEST_TABLE_NAME} 
WHERE 
    "ID" IN (
SELECT "ID" 
FROM(
SELECT 
    *, ROW_NUMBER() OVER (PARTITION BY "FNAME", "RCOUNT" ORDER BY "FNAME") AS "RNo" 
FROM
    {BHAV_COPY_TEST_TABLE_NAME}) A
WHERE A."RNo" >1) '''


In [128]:
# Delete duplicate rows

with engine.connect() as con:
    con.execute(del_bhav_copy)
    con.execute(del_bhav_copy_test)

In [129]:
# Validating data load

bhav_copy_val_sql = f''' SELECT "FNAME", COUNT(*) AS "RCOUNT" FROM {BHAV_COPY_TABLE_NAME} GROUP BY "FNAME"'''

with engine.connect() as con:
    bhav_count_df = pd.read_sql(bhav_copy_val_sql, con)
    bhav_test_count_df = pd.read_sql(BHAV_COPY_TEST_TABLE_NAME, con, columns = ['FNAME', 'RCOUNT'])
    
    val_df = pd.merge(bhav_count_df, bhav_test_count_df, left_on = 'FNAME', right_on='FNAME', how='outer')
    val_df['RESULT'] = (val_df['RCOUNT_x'] == val_df['RCOUNT_y'])
    
    if val_df['RESULT'].all():
        print('Validation Success')
        for i, f in enumerate(downloaded_files):
            file_name = os.path.basename(f)
            file_name_in_archive = f'{ARCHIVE_FOLDER}{file_name}'
            if not os.path.isfile(file_name_in_archive):
                shutil.move(f, f'{ARCHIVE_FOLDER}')
            else:
                os.remove(f)
        print('Bhav files are moved to Archive')
    else:
        print('Validation failed.')
        print(val_df[~val_df.RESULT])


print('Program ends...')

Validation Success
Bhav files are moved to Archive
Program ends...
