In [1]:
import cx_Oracle
import pandas as pd
import configparser
import sys
import boto3
import psycopg2
import traceback
import datetime
import gzip
import time

In [2]:
def timeit(method):
    def timed(*args, **kw):
        ts = time.time()
        result = method(*args, **kw)
        te = time.time()
        print('%r (%r, %r) %2.2f sec' %(method.__name__, args, kw, te-ts))
        return result
    return timed

In [3]:
def config_unpacker(config_file):
    time.sleep(5)
    try:
        config = configparser.RawConfigParser()
        config.read(config_file)
        credentials = {key: value for key, value in config._sections['Credentials'].items() if not key.startswith("__")}
        return(credentials)
    except Exception as e:
        print("Error reading Config file :{}".format(e))
        

In [4]:
def sql_generator():
    db_info_loc = r"./Inputs/db_info.csv"
    df = pd.read_csv(db_info_loc)

    def sql(row):
        string = f"select * from {row['table_name']} where to_char({row['Column_name']}, 'yyyy') like {row['Year']}"
        return(string)

    df['sql'] = df.apply(lambda x: sql(x), axis  =1)
    return(df)

In [5]:
from concurrent.futures import ThreadPoolExecutor

In [11]:
@timeit
def oracle_retrieve(table,config_file,config_unpacker):
    #Asserting db credentials
    def df_csv(reader):
        print('In the loop')
        i+=1
        gzip_file = r'./Output/{}_{}.csv.gz'.format(table,i)
        print("Writing to gzip file_{}".format(i))
        reader.to_csv(gzip_file,header=True,index=False,compression = 'gzip')
    conn_dict = config_unpacker(config_file)
    try:
        connection = cx_Oracle.connect(user=conn_dict['user'], password=conn_dict['pwd'], dsn=cx_Oracle.makedsn(conn_dict['host'],conn_dict['port'],conn_dict['sid']))
    except cx_Oracle.DatabaseError as e:
        print('Database connection error: {}'.format(e))    
        sys.exit("Failed establishing connection to given database")
    else:
        with connection:     
            print("Connection established with database")
            sql_query = "select * from {} where ROWNUM <= 40000".format(table)
            try:
                print("Loading to datafarame")
                reader = pd.read_sql(sql = sql_query,con = connection, chunksize = 4)
                i = 0
                try:    
                    concurrent.futures.ProcessPoolExecutor.map(df_csv,reader)
                except Exception as exec:
                    print(f'Error occured in thereading ----{exec}')

            except Exception as e:
                print(e)
        print("Closing database Connection")

In [7]:
#Copy to S3 bucket,
def s3_copy(table_name):
    bucket = '{}'.format(table_name)
    s3_file = '.{} {}.csv'.format(table_name,time.strftime("_%d-%m-%Y-%H:%M:%S"))

    s3 = boto3.resource('s3')
    s3.meta.client.upload_file(csv_file, bucket , s3_file)
    return(list(bucket , s3_file))

In [8]:
#Copy to redshift
def redshift_upload(aws_config,bucket,s3_file,config_unpacker):

    file_path = 's3://{}/{}'.format(bucket,s3_file)

    conn_string = "dbname='{dbname}' port='{port}' user='{user}' password='{password}' host='{host_url}'".format(**config_unpacker(aws_config))
    copy_cmd ="""copy {schema}.{table} from '{file_path}' credentials 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}' \
            commit;""".format(**config_unpacker(aws_config))

    with psycopg2.connect(conn_string) as conn:
        with conn.cursor() as curs:
            curs.execute(copy_cmd)

In [9]:
def main():
    config_file = "./config_file.cfg"
    aws_config = './aws_config'
    table = "case_f"
    
    oracle_retrieve(table,config_file,config_unpacker) 
#     s3_args = s3_copy(table)
#     redshift_upload(aws_config,*s3_args,config_unpacker())

In [10]:
if __name__ == '__main__':
    try:
        main()
    except:
        print(traceback.format_exc())

Traceback (most recent call last):
  File "<ipython-input-10-6640bd7335f2>", line 3, in <module>
    main()
  File "<ipython-input-9-d742a141a8f9>", line 6, in main
    oracle_retrieve(table,config_file,config_unpacker)
  File "<ipython-input-2-f29f7f7cf3ac>", line 4, in timed
    result = method(*args, **kw)
  File "<ipython-input-6-d3d850701108>", line 12, in oracle_retrieve
    connection = cx_Oracle.connect(user=conn_dict['user'], password=conn_dict['pwd'], dsn=cx_Oracle.makedsn(conn_dict['host'],conn_dict['port'],conn_dict['sid']))
NameError: name 'conn_dict' is not defined



In [26]:
config_file = "./config_file.cfg"
aws_config = './aws_config'
table = "case_f"
conn_dict = config_unpacker(config_file)
i = 0
def df_csv(df):
    global i
    i+=1
    print('In the function')
    gzip_file = r'./Output/{}_{}.csv.gz'.format(table,i)
    print("Writing to gzip file_{}".format(i))
    df.to_csv(gzip_file,header=True,index=False,compression = 'gzip')
try:
    connection = cx_Oracle.connect(user=conn_dict['user'], password=conn_dict['pwd'], dsn=cx_Oracle.makedsn(conn_dict['host'],conn_dict['port'],conn_dict['sid']))
except cx_Oracle.DatabaseError as e:
    print('Database connection error: {}'.format(e))    
    sys.exit("Failed establishing connection to given database")
else:
    with connection:     
        print("Connection established with database")
        sql_query = "select * from {} where ROWNUM <= 4000".format(table)
        try:
            print("Loading to datafarame")
            reader = pd.read_sql(sql = sql_query,con = connection, chunksize = 1000)
            with ThreadPoolExecutor as executor:
                futures = executor.map(df_csv,reader)
                for future in concurrent.futures.as_completed(futures):
                    print(future)
        except Exception as e:
            print(e)
print("Closing database Connection")

Connection established with database
Loading to datafarame
__enter__
Closing database Connection
