### Dump Pandas DataFrame in PSQL
Step 1: Create Connection
<br>
Step 2: Create Cursor
<br>
Step 3: Actual SQL
<br>
Step 4: Commit
<br>
Step 5: Close Connection

# Psycopg: PostgreSQL Adapter for Python

Psycopg is the most popular PostgreSQL adapter used in Python. It works on the principle of the whole implementation of Python DB API 2.0 along with thread safety (the same connection is shared by multiple threads). It is designed to perform heavily multi-threaded applications that usually create and destroy lots of cursors and make a large number of simultaneous `INSERT` or `UPDATE` operations.

## Key Features

- **Thread Safety:** The same connection can be shared by multiple threads.
- **High Performance:** Suitable for heavily multi-threaded applications.
- **Client-side and Server-side Cursors:** Efficient data fetching and manipulation.
- **Asynchronous Communication and Notification:** Allows for non-blocking database interactions.
- **Unicode and Python 3 Friendly:** Fully supports Unicode and is compatible with Python 3.

Psycopg is an excellent choice for applications that require robust and efficient interaction with PostgreSQL databases.


In [1]:
# If pyscopg2 is not installed, install using !pip install psycopg2
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

In [2]:
# Create a connection to default db
pgconn = psycopg2.connect(
    host = 'localhost',
    user = 'postgres',  # default username
    password = "'",    
    database = 'postgres'   # default database
)

In [3]:
# Lets create a cursor that allows us to execute SQL
# commands and interact with the database: querying
# data, inserting data, updating records etc.
pgcursor = pgconn.cursor()

# Lets make sure 'CREATE DATABASE' statement is
# executed immediately and cannot be rolled back
pgconn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

In [4]:
# Drop database if it exists, use execute() to run
# SQL queries
pgcursor.execute('DROP DATABASE IF EXISTS adhoc')

# Create database
pgcursor.execute('CREATE OR REPLACE DATABASE adhoc')

ObjectInUse: database "adhoc" is being accessed by other users
DETAIL:  There are 2 other sessions using the database.


In [None]:
# Commit the changes
pgconn.commit()

# Finally close the connection
pgconn.close()

In [None]:
import json
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.exc import OperationalError
from dataclasses import dataclass, field
from typing import Any

@dataclass
class DatabaseManager:
    credentials_file: str
    table_name: str
    credentials: dict = field(init=False)
    engine: Any = field(init=False)

    def __post_init__(self):
        self.credentials = self._load_credentials(self.credentials_file)
        self.engine = self._create_engine()

    def _load_credentials(self, credentials_file):
        with open(credentials_file, 'r') as file:
            return json.load(file)

    def _create_engine(self):
        try:
            credentials = self.credentials
            engine = create_engine(
                f"postgresql+psycopg2://{credentials['username']}:{credentials['password']}@"
                f"{credentials['host']}:{credentials['port']}/{credentials['database']}"
            )
            return engine
        except OperationalError as e:
            print(f"Error connecting to the database: {e}")
            raise

    def create_database(self):
        with self.engine.connect() as connection:
            connection.execute(text(f"CREATE DATABASE {self.credentials['database']}"))

    def create_table(self, df: pd.DataFrame):
        df.to_sql(self.table_name, self.engine, if_exists='replace', index=False)
        print(f"Table '{self.table_name}' created successfully.")

    def insert_data(self, df: pd.DataFrame):
        df.to_sql(self.table_name, self.engine, if_exists='append', index=False)
        print(f"Data inserted into table '{self.table_name}' successfully.")

# Usage example:
if __name__ == "__main__":
    # Path to the credentials JSON file
    credentials_file = 'credentials.json'
    
    # Example table name and DataFrame
    table_name = 'DoE327'
  
    df = pd.read_csv(r"C:\Users\anita\OneDrive\Desktop\Ad-Hocs\DoE327.csv")
    
    # Create DatabaseManager instance and perform operations
    db_manager = DatabaseManager(credentials_file, table_name)
    
    # Create a new table and insert data
    db_manager.create_table(df)
    # db_manager.insert_data(df)


OperationalError: (psycopg2.OperationalError) connection to server at "localhost" (::1), port 5432 failed: FATAL:  database "adhoc" does not exist

(Background on this error at: https://sqlalche.me/e/20/e3q8)

ValueError: Excel file format cannot be determined, you must specify an engine manually.

In [None]:
from dataclasses import dataclass
import json
import pandas as pd
from sqlalchemy import create_engine, Table, MetaData

@dataclass
class DatabaseManager:
    credentials_file: str
    table_name: str

    def __post_init__(self):
        self.load_credentials()
        self.engine = create_engine(f"postgresql+psycopg2://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}")

    def load_credentials(self):
        with open(self.credentials_file, 'r') as file:
            credentials = json.load(file)
            self.username = credentials['username']
            self.password = credentials['password']
            self.host = credentials['host']
            self.port = credentials['port']
            self.database = credentials['database']

    def create_table(self, df: pd.DataFrame):
        metadata = MetaData(bind=self.engine)
        table = Table(self.table_name, metadata, autoload_with=self.engine)
        if not table.exists():
            df.to_sql(self.table_name, self.engine, index=False)
        else:
            print(f"Table {self.table_name} already exists.")

    def insert_data(self, df: pd.DataFrame):
        df.to_sql(self.table_name, self.engine, if_exists='append', index=False)

# Ensure the db_credentials.json file is in the same directory as your script
credentials_file = 'db_credentials.json'
table_name = 'example_table'

# Example DataFrame
data = {
    'column1': [1, 2, 3],
    'column2': ['a', 'b', 'c']
}
df = pd.DataFrame(data)

# Create DatabaseManager instance
db_manager = DatabaseManager(credentials_file, table_name)

# Create a new table and insert data
db_manager.create_table(df)
db_manager.insert_data(df)


TypeError: MetaData.__init__() got an unexpected keyword argument 'bind'

In [None]:
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from sqlalchemy import create_engine
import pandas as pd
import json
from dataclasses import dataclass
import os

# Define a dataclass for the database credentials
@dataclass
class DBCredentials:
    user: str
    password: str
    host: str
    port: int
    dbname: str

# Load database credentials from a JSON file
def load_credentials(json_file: str) -> DBCredentials:
    with open(json_file, 'r') as file:
        creds_dict = json.load(file)
    return DBCredentials(**creds_dict)

# Function to create a database and dump data into a table
def create_db_and_dump_data(creds: DBCredentials, table_name: str, data_file: str):
    try:
        # Connect to the default database to create the new database
        with psycopg2.connect(
            host=creds.host,
            user=creds.user,
            password=creds.password,
            database='postgres'
        ) as pgconn:
            pgconn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
            with pgconn.cursor() as pgcursor:
                pgcursor.execute(f"CREATE DATABASE {creds.dbname};")
                print(f"Database '{creds.dbname}' created successfully.")
        
        # Connect to the newly created database using SQLAlchemy
        engine = create_engine(f'postgresql+psycopg2://{creds.user}:{creds.password}@{creds.host}/{creds.dbname}')
        
        # Determine the file extension and load the data accordingly
        file_extension = os.path.splitext(data_file)[1].lower()
        
        if file_extension == '.xlsx':
            df = pd.read_excel(data_file)
        elif file_extension == '.csv':
            df = pd.read_csv(data_file)
        else:
            raise ValueError("Unsupported file format. Please use an Excel (.xlsx) or CSV (.csv) file.")
        
        # Dump data into the table
        df.to_sql(table_name, engine, if_exists='replace', index=False)
        print(f"Data dumped into table '{table_name}' successfully.")
        
    except Exception as e:
        print(f"Error: {e}")

# Main script
if __name__ == "__main__":
    # Load credentials from JSON file
    creds = load_credentials(r"C:\Users\anita\OneDrive\Desktop\Ad-Hocs\db_credentials.json")
    
    # Create the database and dump data into the table
    create_db_and_dump_data(creds, 'your_table_name', r"C:\Users\anita\OneDrive\Desktop\Ad-Hocs\DoE327.csv")


Error: CREATE DATABASE cannot run inside a transaction block



In [None]:
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from sqlalchemy import create_engine
import pandas as pd
import json
import os

# Define a dataclass for the database credentials
@dataclass
class DBCredentials:
    user: str
    password: str
    host: str
    port: int
    dbname: str

# Load database credentials from a JSON file
def load_credentials(json_file: str) -> DBCredentials:
    with open(json_file, 'r') as file:
        creds_dict = json.load(file)
    return DBCredentials(**creds_dict)

# Function to create a database and dump data into a table
def create_db_and_dump_data(creds: DBCredentials, table_name: str, data_file: str):
    try:
        # Connect to the default database to create the new database
        with psycopg2.connect(
            host=creds.host,
            user=creds.user,
            password=creds.password,
            database='postgres'
        ) as pgconn:
            pgconn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
            with pgconn.cursor() as pgcursor:
                pgcursor.execute(f"CREATE DATABASE {creds.dbname};")
                print(f"Database '{creds.dbname}' created successfully.")
        
        # Reconnect to the newly created database using SQLAlchemy
        engine = create_engine(f'postgresql+psycopg2://{creds.user}:{creds.password}@{creds.host}/{creds.dbname}')
        
        # Determine the file extension and load the data accordingly
        file_extension = os.path.splitext(data_file)[1].lower()
        
        if file_extension == '.xlsx':
            df = pd.read_excel(data_file)
        elif file_extension == '.csv':
            df = pd.read_csv(data_file)
        else:
            raise ValueError("Unsupported file format. Please use an Excel (.xlsx) or CSV (.csv) file.")
        
        # Dump data into the table
        df.to_sql(table_name, engine, if_exists='replace', index=False)
        print(f"Data dumped into table '{table_name}' successfully.")
        
    except Exception as e:
        print(f"Error: {e}")

# Main script
if __name__ == "__main__":
    # Load credentials from JSON file
    creds = load_credentials(r"C:\Users\anita\OneDrive\Desktop\Ad-Hocs\db_credentials.json")
    
    # Create the database and dump data into the table
    create_db_and_dump_data(creds, 'your_table_name', r"C:\Users\anita\OneDrive\Desktop\Ad-Hocs\DoE327.csv")


Error: CREATE DATABASE cannot run inside a transaction block



In [None]:
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from sqlalchemy import create_engine
import pandas as pd
import json
import os

# Define a dataclass for the database credentials
@dataclass
class DBCredentials:
    user: str
    password: str
    host: str
    port: int
    dbname: str

# Load database credentials from a JSON file
def load_credentials(json_file: str) -> DBCredentials:
    with open(json_file, 'r') as file:
        creds_dict = json.load(file)
    return DBCredentials(**creds_dict)

# Function to create a database and dump data into a table
def create_db_and_dump_data(creds: DBCredentials, table_name: str, data_file: str):
    try:
        # Connect to the default database to create the new database
        default_conn = psycopg2.connect(
            host=creds.host,
            user=creds.user,
            password=creds.password,
            database='postgres'
        )
        default_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
        default_cursor = default_conn.cursor()
        default_cursor.execute(f"CREATE DATABASE {creds.dbname};")
        print(f"Database '{creds.dbname}' created successfully.")
        default_cursor.close()
        default_conn.close()
        
        # Reconnect to the newly created database using SQLAlchemy
        engine = create_engine(f'postgresql+psycopg2://{creds.user}:{creds.password}@{creds.host}/{creds.dbname}')
        
        # Determine the file extension and load the data accordingly
        file_extension = os.path.splitext(data_file)[1].lower()
        
        if file_extension == '.xlsx':
            df = pd.read_excel(data_file)
        elif file_extension == '.csv':
            df = pd.read_csv(data_file)
        else:
            raise ValueError("Unsupported file format. Please use an Excel (.xlsx) or CSV (.csv) file.")
        
        # Dump data into the table
        df.to_sql(table_name, engine, if_exists='replace', index=False)
        print(f"Data dumped into table '{table_name}' successfully.")
        
    except Exception as e:
        print(f"Error: {e}")

# Main script
if __name__ == "__main__":
    # User specify these parameters
    your_table_name = 'DoE327'
    data_to_dump_file_path = r"C:\Users\anita\OneDrive\Desktop\Ad-Hocs\DoE327.csv"

    # Load credentials from JSON file
    creds = load_credentials(r"C:\Users\anita\OneDrive\Desktop\Ad-Hocs\db_credentials.json")
    
    # Create the database and dump data into the table
    create_db_and_dump_data(creds, your_table_name, data_to_dump_file_path)


Error: database "adhoc" already exists



In [None]:
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from sqlalchemy import create_engine
import pandas as pd
import json
import os
from dataclasses import dataclass

@dataclass
class DBCredentials:
    """
    A dataclass to store database credentials.
    
    Attributes:
        user (str): Username for the database.
        password (str): Password for the database.
        host (str): Host address of the database.
        port (int): Port number to connect to the database.
        dbname (str): Name of the database.
    """
    user: str
    password: str
    host: str
    port: int
    dbname: str

def load_credentials(json_file: str) -> DBCredentials:
    """
    Load database credentials from a JSON file.
    
    Args:
        json_file (str): Path to the JSON file containing the credentials.
    
    Returns:
        DBCredentials: An instance of the DBCredentials dataclass.
    """
    with open(json_file, 'r') as file:
        creds_dict = json.load(file)
    return DBCredentials(**creds_dict)

def check_database_exists(creds: DBCredentials) -> bool:
    """
    Check if a database already exists.
    
    Args:
        creds (DBCredentials): The database credentials.
    
    Returns:
        bool: True if the database exists, False otherwise.
    """
    conn = psycopg2.connect(
        host=creds.host,
        user=creds.user,
        password=creds.password,
        database='postgres'  # Connect to the default 'postgres' database to check
    )
    conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
    cursor = conn.cursor()
    cursor.execute(f"SELECT 1 FROM pg_database WHERE datname='{creds.dbname}';")
    exists = cursor.fetchone() is not None
    cursor.close()
    conn.close()
    return exists

def create_db_and_dump_data(creds: DBCredentials, table_name: str, data_file: str, if_exists: str = 'replace'):
    """
    Create a database (if it doesn't already exist) and dump data into a table.
    
    Args:
        creds (DBCredentials): The database credentials.
        table_name (str): The name of the table to create or append data to.
        data_file (str): Path to the data file (Excel or CSV) to be dumped into the table.
        if_exists (str): What to do if the table already exists. Options are 'replace' or 'append'.
    """
    try:
        # Check if the database already exists
        if not check_database_exists(creds):
            # Connect to the default 'postgres' database to create the new database
            default_conn = psycopg2.connect(
                host=creds.host,
                user=creds.user,
                password=creds.password,
                database='postgres'
            )
            default_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
            default_cursor = default_conn.cursor()
            default_cursor.execute(f"CREATE DATABASE {creds.dbname};")
            print(f"Database '{creds.dbname}' created successfully.")
            default_cursor.close()
            default_conn.close()
        else:
            print(f"Database '{creds.dbname}' already exists. Skipping creation.")

        # Connect to the target database using SQLAlchemy
        engine = create_engine(f'postgresql+psycopg2://{creds.user}:{creds.password}@{creds.host}/{creds.dbname}')

        # Determine the file extension and load the data accordingly
        file_extension = os.path.splitext(data_file)[1].lower()
        
        if file_extension == '.xlsx':
            df = pd.read_excel(data_file)
        elif file_extension == '.csv':
            df = pd.read_csv(data_file)
        else:
            raise ValueError("Unsupported file format. Please use an Excel (.xlsx) or CSV (.csv) file.")

        # Dump data into the table
        df.to_sql(table_name, engine, if_exists=if_exists, index=False)
        print(f"Data dumped into table '{table_name}' successfully.")

    except Exception as e:
        print(f"Error: {e}")

if __name__ == "__main__":
    # Specify the table name and path to the data file
    your_table_name = 'testCell'
    data_to_dump_file_path = r"C:\Users\anita\OneDrive\Desktop\Ad-Hocs\testCell.csv"

    # Load credentials from the JSON file
    creds = load_credentials(r"C:\Users\anita\OneDrive\Desktop\Ad-Hocs\db_credentials.json")

    # Create the database (if it doesn't exist) and dump data into the table
    create_db_and_dump_data(creds, your_table_name, data_to_dump_file_path, if_exists = 'replace')  # Change 'replace' to 'append' if needed


Database 'adhoc' already exists. Skipping creation.
Data dumped into table 'testCell' successfully.
