In [1]:
from psycopg2 import OperationalError
from dotenv import load_dotenv
from loguru import logger
import pandas as pd
import psycopg2
import os

load_dotenv()

True

In [2]:
def get_postgres_conn():
    """
    Establishes a connection to a PostgreSQL database using credentials from environment variables.
    
    Returns:
        connection (psycopg2.extensions.connection or None): A connection object to the PostgreSQL database 
        if the connection is successful, otherwise None.
    """
    connection = None
    
    try:
        connection = psycopg2.connect(
            host=os.getenv("DB_HOST"),
            user=os.getenv("DB_USER"),
            password=os.getenv("DB_PASSWORD"),
            dbname=os.getenv("DB_NAME")
        )
        logger.info("Connection to PostgreSQL DB successful")
    except OperationalError as e:
        print(f"The error '{e}' occurred")
    
    return connection


def upload_df_to_postgres(conn, table_df_pairs):
    """
    Uploads a DataFrame to a PostgreSQL table using psycopg2.
    
    Args:
    - conn: database connection object
    - table_df_pairs: list of tuples where each tuple contains the table name and the DataFrame to upload
    
    Returns:
    None
    """
    
    try:
        for table_name, df in table_df_pairs:
            with conn.cursor() as cursor:
                # Convert each row of the DataFrame to a tuple and store in a list
                values = [tuple(row) for row in df.to_numpy()]
                
                # Join the DataFrame column names into a single string separated by commas
                columns = ','.join(list(df.columns))
                
                # Create a string of placeholders for SQL query, one for each column
                placeholders = ','.join(['%s'] * len(df.columns))
                
                # Formulate the SQL query for inserting data into the specified table
                sql_query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
                
                # Execute the SQL query with the list of values
                cursor.executemany(sql_query, values)
            
            conn.commit()
            logger.info(f"Data uploaded to {table_name} in PostgreSQL.")
    except OperationalError as e:
        logger.error(f"Error uploading data to PostgreSQL: {e}")
    finally:
        conn.close()

In [3]:
# TODO: Run this and check if the data is loaded into the database

conn = get_postgres_conn()

df_1 = pd.read_csv('../data/raw/drug_effective_time.csv')
df_2 = pd.read_csv('../data/raw/drug_ndc.csv')

# Create a list of tuples with the table name and the dataframe
table_df_pairs = [
    ('public.drug_effective_time', df_1),
    ('public.drug_ndc', df_2)
]

[32m2024-10-20 16:07:54.932[0m | [1mINFO    [0m | [36m__main__[0m:[36mget_postgres_conn[0m:[36m18[0m - [1mConnection to PostgreSQL DB successful[0m


In [4]:
upload_df_to_postgres(conn, table_df_pairs)

[32m2024-10-20 16:07:56.191[0m | [1mINFO    [0m | [36m__main__[0m:[36mupload_df_to_postgres[0m:[36m56[0m - [1mData uploaded to public.drug_effective_time in PostgreSQL.[0m
[32m2024-10-20 16:07:56.213[0m | [1mINFO    [0m | [36m__main__[0m:[36mupload_df_to_postgres[0m:[36m56[0m - [1mData uploaded to public.drug_ndc in PostgreSQL.[0m
