In [6]:
from sqlalchemy import create_engine
import pandas as pd

class PostgresDatabase:
    """A class to handle PostgreSQL database connection and queries using SQLAlchemy."""
    
    def __init__(self, host: str, port: str, dbname: str, user: str, password: str):
        """
        Initialize database connection parameters and create an SQLAlchemy engine.
        """
        self.db_url = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}"
        self.engine = create_engine(self.db_url)
        self.connection = None
    
    def connect(self):
        """Establishes a connection using SQLAlchemy engine."""
        try:
            self.connection = self.engine.connect()
            print("Database connection established successfully.")
        except Exception as e:
            print(f"Error connecting to database: {e}")
            self.connection = None
    
    def fetch_all(self, table_name: str):
        """
        Fetch all rows from a specified table.
        Returns a list of tuples.
        """
        if not self.connection:
            print("No database connection. Call connect() first.")
            return []
        
        try:
            result = self.connection.execute(f"SELECT * FROM {table_name};")
            rows = result.fetchall()
            return rows
        except Exception as e:
            print(f"Error fetching data from {table_name}: {e}")
            return []

    def fetch_as_dataframe(self, table_name: str, **kwargs) -> pd.DataFrame:
        """
        Fetch data from a specified table and return it as a Pandas DataFrame.
        Allows additional kwargs for Pandas `read_sql` function.
        """
        if not self.connection:
            print("No database connection. Call connect() first.")
            return pd.DataFrame()
        
        try:
            query = f"SELECT * FROM {table_name};"
            df = pd.read_sql(query, self.engine, **kwargs)
            return df
        except Exception as e:
            print(f"Error fetching data as DataFrame from {table_name}: {e}")
            return pd.DataFrame()

    def close(self):
        """Closes the database connection."""
        if self.connection:
            self.connection.close()
            print("Database connection closed.")

In [7]:
DB_CONFIG = {
    "host": "localhost",
    "port": "5432",
    "dbname": "dev",
    "user": "postgres",
    "password": "postgres"
}

In [None]:
db = PostgresDatabase(**DB_CONFIG)
db.connect()
df = db.fetch_as_dataframe("users")
display(df)
db.close()

Database connection established successfully.


Unnamed: 0,user_resource_id,email_id,created_ts
0,7e8dce8d-aa31-4e08-960a-ce33ca3e9833,nikhilkamlesh.soni@gmail.com,2023-11-18 13:12:55.167622


Database connection closed.
