In [1]:
# Standard library imports
from pathlib import Path

# Import Dependencies
import psycopg2
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session
from sqlalchemy import create_engine
import pandas as pd

# Local application imports
from utils import fetch_api_data, load_config, write_to_csv

In [2]:
# Load configuration from a JSON file
config = load_config('config.json')

# Retrieve the database credentials from the configuration
postgres_user = config.get('postgres_connection', {}).get('user')
postgres_pswd = config.get('postgres_connection', {}).get('password')

# Check if the API key was found in the configuration
if not postgres_user or not postgres_pswd:
    print("Postgres credentials not found in the configuration file.")
    sys.exit()  # Exit the script if the key is missing


In [3]:
db_name ='postgres'
postgres_engine = create_engine(f"postgresql+psycopg2://{postgres_user}:{postgres_pswd}@localhost:5432/{db_name}")

In [4]:
def execute_sql_script(dbname, user, password, host, port, sql_file):
    try:
        # Connect to PostgreSQL server (use an administrative database for dropping/creating databases)
        conn = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port)
        conn.autocommit = True
        cur = conn.cursor()

        # Read the SQL script
        with open(sql_file, 'r') as file:
            sql_script = file.read()

        # Split the script into individual commands (assuming they are properly separated)
        commands = sql_script.split(';')
        
        # Execute each command separately
        for command in commands:
            if command.strip():  # Avoid empty commands
                cur.execute(command)

        print(f"SQL script {sql_file} executed successfully.")
        
    except Exception as e:
        print(f"Error executing {sql_file}: {e}")
    finally:
        if cur:
            cur.close()
        if conn:
            conn.close()

In [5]:
# Step 1: Execute the setup database script
def setUpDatabase(database_info):
    db_name ='postgres'
    for info in database_info:
        execute_sql_script(
            db_name,                 # Use the maintenance database
            user=postgres_user,      # Replace with your PostgreSQL username
            password=postgres_pswd,  # Replace with your PostgreSQL password
            host='localhost',        # Replace with your PostgreSQL host
            port='5432',             # Replace with your PostgreSQL port
            sql_file= info  # SQL script to drop and recreate the database
        )

In [6]:
# Reflect Database into ORM classes
def getBase(postgres_engine):
    Base = automap_base()
    Base.prepare(autoload_with=postgres_engine)
    return Base

In [7]:
def writeCSVDataToDB(postgres_engine,csv_file_path, table_name):
    # Read the CSV file into a Pandas DataFrame
    df = pd.read_csv(csv_file_path)   
    # Write the DataFrame to the PostgreSQL table
    df.to_sql(table_name, con=postgres_engine, if_exists='append',index=False)

In [8]:
def getTableCount(postgres_engine, postgres_session, table_name):
    table_class = getBase(postgres_engine).classes.get(table_name)
    results = postgres_session.query(table_class).count()
    return results

In [9]:
def populateDatabase(postgres_engine,database_info, object_name):
    # Create DB and Table Schema
    setUpDatabase(database_info)
    # Populate the tables with data in the CSV file
    for each in object_name:
        writeCSVDataToDB(postgres_engine,f'Resources/{each}.csv',each)   
    

In [10]:
def createDatabase(database_info, object_names):
    # Database name    
    postgres_session = Session(bind=postgres_engine)
    populateDatabase(postgres_engine, database_info, object_names)
    for table_name in object_names:
        print(f'{table_name} {getTableCount(postgres_engine, postgres_session, table_name)}')
    postgres_session.close()
    