In [95]:
import psycopg2 
import pandas as pd 
from sqlalchemy import create_engine
import os
import boto3
from io import StringIO, BytesIO

In [25]:
def list_all_tables(path):
    # List all files in the directory
    files = os.listdir(path)

    return files

In [45]:
def check_meta_data(path):
    if os.path.exists(path):
        # If CSV file exists, read data from it
        df = pd.read_csv(path)
    else:
        df = pd.DataFrame(columns=['table', 'logging_id'])
        
    return df

In [2]:
def establish_connection_to_database(user, password, host, port, db):
    # establish connections 
    conn_string = f'postgresql://{user}:{password}@{host}:{port}/{db}'

    db = create_engine(conn_string)
     
    conn = db.connect()

    return conn

In [35]:
def extracting_data(path):
    print(f"Extacting Data from {path}")
    df = pd.read_csv(path)

    return df

In [109]:
def dataframe_to_s3(input_datafame, bucket_name, filepath, format):
    import boto3

    # Initialize the S3 client with specific configurations for non-AWS S3-compatible service
    s3 = boto3.client('s3')
    
    if format == 'parquet':
        out_buffer = BytesIO()
        input_datafame.to_parquet(out_buffer, index=False)

    elif format == 'csv':
        out_buffer = StringIO()
        input_datafame.to_parquet(out_buffer, index=False)

    s3.put_object(Bucket=bucket_name, Key=filepath, Body=out_buffer.getvalue())

In [133]:
def adding_new_columns(dataframe, logging_id):
    dataframe['batch_id'] = logging_id
    dataframe['start_date'] = None
    dataframe['end_date'] = None
    dataframe['is_active'] = None

    return dataframe

In [145]:
def loading_data(dataframe, table, schemaName):
    user = "postgres"
    password = "admin"
    host = "localhost"
    port = "5432"
    db = "DWH"

    conn = establish_connection_to_database(user, password, host, port, db)

    dataframe.to_sql(table, conn, schema = schemaName, index = False, if_exists = 'replace')

In [166]:
def ETL(base):
    print("<<<----------------------Fetching All Tables---------------------->>>")
    tables = list_all_tables(base)

    print("<<<----------------------Checking for Logging Table---------------------->>>")
    meta = check_meta_data("logging_table.csv")

    for table_name in tables:
        df = extracting_data(base + r'\{}'.format(table_name))

        table_name = table_name.split('.')[0]

        if table_name in meta['table'].values:
            print("<<<----------------------Table found---------------------->>>")
            logging_id = meta.loc[meta['table'] == table_name, 'logging_id'].iloc[0] + 1

            print("<<<----------------------Adding new fields---------------------->>>")
            adding_new_columns(df, logging_id)
            meta.loc[meta['table'] == table_name, 'logging_id'] += 1

        else:
            print("<<<----------------------Table not found---------------------->>>")
            logging_id = 1
            new_row = pd.DataFrame({'table': [table_name], 'logging_id': [logging_id]})

            print("<<<----------------------Adding new fields---------------------->>>")
            adding_new_columns(df, logging_id)
            meta = pd.concat([meta, new_row], ignore_index=True)

        schema = 'bronze'

        print("<<<----------------------Writing data into DB---------------------->>>")
        loading_data(df, table_name, schema)

    print("<<<----------------------Updating the logging table---------------------->>>")
    meta.to_csv("logging_table.csv", index = False)

In [167]:
base_path = r"C:\Users\safwa\OneDrive\Documents\Codes\project\Raw_Data"

In [168]:
ETL(base_path)

<<<----------------------Fetching All Tables---------------------->>>
<<<----------------------Checking for Logging Table---------------------->>>
Extacting Data from C:\Users\safwa\OneDrive\Documents\Codes\project\Raw_Data\Points_Table.csv
<<<----------------------Table not found---------------------->>>
<<<----------------------Adding new fields---------------------->>>
<<<----------------------Writing data into DB---------------------->>>
<<<----------------------Updating the logging table---------------------->>>


In [164]:
meta = check_meta_data("logging_table.csv")

In [165]:
meta

Unnamed: 0,table,logging_id
0,Points_Table,2


In [129]:
table = 'Points_Table'

In [130]:
if table in meta['table'].values:
    print("Table found")

In [137]:
if table in meta['table'].values:
    print("Table found")
    logging_id = meta.loc[meta['table'] == table, 'logging_id'].iloc[0]
    adding_new_columns(df, logging_id)
    meta.loc[meta['table'] == table, 'logging_id'] += 1

else:
    print("Table not found")
    logging_id = 1
    new_row = pd.DataFrame({'table': [table], 'logging_id': [logging_id]})
    adding_new_columns(df, logging_id)
    meta = pd.concat([meta, new_row], ignore_index=True)

Table found


In [138]:
df

Unnamed: 0,Season_End_Year,Team,Rk,MP,W,D,L,GF,GA,GD,Pts,Notes,batch_id,start_date,end_date,is_active
0,1993,Arsenal,10,42,15,11,16,40,38,2,56,→ European Cup Winners' Cup via cup win 2,1,,,
1,1993,Aston Villa,2,42,21,11,10,57,40,17,74,→ UEFA Cup via league finish,1,,,
2,1993,Blackburn,4,42,20,11,11,68,46,22,71,,1,,,
3,1993,Chelsea,11,42,14,14,14,51,54,-3,56,,1,,,
4,1993,Coventry City,15,42,13,13,16,52,57,-5,52,,1,,,
5,1993,Crystal Palace,20,42,11,16,15,48,61,-13,49,Relegated,1,,,
6,1993,Everton,13,42,15,8,19,53,55,-2,53,,1,,,
7,1993,Ipswich Town,16,42,12,16,14,50,55,-5,52,,1,,,
8,1993,Leeds United,17,42,12,15,15,57,62,-5,51,,1,,,
9,1993,Liverpool,6,42,16,11,15,62,55,7,59,,1,,,


In [139]:
tables = list_all_tables(base_path)

In [144]:
tables[0].split('.')[0]

'Points_Table'

In [135]:
tables = list_all_tables(base_path)
for table_name in tables:
    df = ETL(base_path, table_name)

Extacting Data from C:\Users\safwa\OneDrive\Documents\Codes\project\Raw_Data\1993_season.csv


In [119]:
df

Unnamed: 0,Season_End_Year,Team,Rk,MP,W,D,L,GF,GA,GD,Pts,Notes
0,1993,Arsenal,10,42,15,11,16,40,38,2,56,→ European Cup Winners' Cup via cup win 2
1,1993,Aston Villa,2,42,21,11,10,57,40,17,74,→ UEFA Cup via league finish
2,1993,Blackburn,4,42,20,11,11,68,46,22,71,
3,1993,Chelsea,11,42,14,14,14,51,54,-3,56,
4,1993,Coventry City,15,42,13,13,16,52,57,-5,52,
5,1993,Crystal Palace,20,42,11,16,15,48,61,-13,49,Relegated
6,1993,Everton,13,42,15,8,19,53,55,-2,53,
7,1993,Ipswich Town,16,42,12,16,14,50,55,-5,52,
8,1993,Leeds United,17,42,12,15,15,57,62,-5,51,
9,1993,Liverpool,6,42,16,11,15,62,55,7,59,


In [98]:
df = extracting_data(r"C:\Users\safwa\OneDrive\Documents\Codes\project\Points_Table\1993_season.csv")

Extacting Data from C:\Users\safwa\OneDrive\Documents\Codes\project\Points_Table\1993_season.csv


In [13]:
df

Unnamed: 0,Season_End_Year,Team,Rk,MP,W,D,L,GF,GA,GD,Pts,Notes
0,1993,Arsenal,10,42,15,11,16,40,38,2,56,→ European Cup Winners' Cup via cup win 2
1,1993,Aston Villa,2,42,21,11,10,57,40,17,74,→ UEFA Cup via league finish
2,1993,Blackburn,4,42,20,11,11,68,46,22,71,
3,1993,Chelsea,11,42,14,14,14,51,54,-3,56,
4,1993,Coventry City,15,42,13,13,16,52,57,-5,52,
5,1993,Crystal Palace,20,42,11,16,15,48,61,-13,49,Relegated
6,1993,Everton,13,42,15,8,19,53,55,-2,53,
7,1993,Ipswich Town,16,42,12,16,14,50,55,-5,52,
8,1993,Leeds United,17,42,12,15,15,57,62,-5,51,
9,1993,Liverpool,6,42,16,11,15,62,55,7,59,


In [None]:
def adding_new_columns(dataframe):
    

In [6]:
df['batch_id'] = 1

In [8]:
df['is_active'] = None

In [9]:
df['start_date'] = None

In [10]:
df['end_date'] = None

In [None]:
user = "postgres"
password = "admin"
host = "localhost"
port = "5432"
db = "DWH"

conn = establish_connection_to_database(user, password, host, port, db)

In [None]:
# converting data to sql 
dataset.to_sql(tableName, conn, schema = schemaName, index = False, if_exists = 'replace')

print("<<<------------------WRITTEN INTO THE DATABASE------------------>>>")