PostgreSQL Database Management with Python and Psycopg2

**Introduction**

In this project, we will explore various functionalities related to PostgreSQL database management using Python and Psycopg2. The objective is to understand how to interact with a PostgreSQL database using Python scripts and Psycopg2 library.

**Tools and Libraries**
- PostgreSQL: A powerful, open-source object-relational database system.
- Psycopg2: A PostgreSQL adapter for the Python programming language.

**Prerequisites**
- Install PostgreSQL: PostgreSQL Installation Guide
- Install Psycopg2: pip install psycopg2

**Project Structure**

- Create a PostgreSQL Database
- Connect to the Database
- Create a Table
- Insert Data
- Query Data
- Execute Stored Procedures and Functions
- Delete Data
- Remove Table
- Remove Database

# Create a PostgreSQL Database

In [1]:
import psycopg2
import pandas as pd

In [2]:
# password in separate text file
# Read the password from the text file
with open('login.txt', 'r') as file:
    pw = file.readline().strip()

In [3]:
def create_database():
    try:
        conn = psycopg2.connect(user="postgres",
                                password= pw,
                                host="localhost",
                                port="5432")
        conn.autocommit = True
        cursor = conn.cursor()
        
        cursor.execute("CREATE DATABASE schdbm;")
        print("Database created successfully!")
        
    except Exception as e:
        print("Error:", e)
        
    finally:
        if conn:
            conn.close()

create_database()

Database created successfully!


# Connect to the Database

In [4]:
def connect_to_database(database_name):
    try:
        conn = psycopg2.connect(user="postgres",
                                password=pw,
                                host="localhost",
                                port="5432",
                                database=database_name)
        print(f"Connected to {database_name} successfully!")
        return conn
        
    except Exception as e:
        print("Error:", e)
        return None

conn = connect_to_database("schdbm")

Connected to schdbm successfully!


# Create a Table

In [5]:
def create_table(conn):
    try:
        cursor = conn.cursor()
        
        create_table_query = '''
        CREATE TABLE IF NOT EXISTS students (
           id SERIAL PRIMARY KEY,
           first_name VARCHAR(50) NOT NULL,
           last_name VARCHAR(50) NOT NULL,
           email VARCHAR(100) UNIQUE NOT NULL,
           date_of_birth DATE,
           gender VARCHAR(10),
           address VARCHAR(200),
           city VARCHAR(50),
           state VARCHAR(50),
           zipcode VARCHAR(10),
           phone_number VARCHAR(15)
        );
        '''
        
        cursor.execute(create_table_query)
        print("Table created successfully!")
        
    except Exception as e:
        print("Error:", e)
        
    finally:
        if cursor:
            cursor.close()

create_table(conn)

Table created successfully!


# Insert Data

In [6]:
def insert_student_data(conn, first_name, last_name, email, date_of_birth, gender, address, city, state, zipcode, phone_number):
    try:
        cursor = conn.cursor()

        insert_query = '''
            INSERT INTO students (first_name, last_name, email, date_of_birth, gender, address, city, state, zipcode, phone_number)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
        '''
        cursor.execute(insert_query, (first_name, last_name, email, date_of_birth, gender, address, city, state, zipcode, phone_number))

        conn.commit()
        print("Data inserted successfully!")

    except Exception as e:
        print("Error:", e)

    finally:
        if cursor:
            cursor.close()

# Sample data for 10 records 
student_data = [
    ("John", "Doe", "john.doe@example.com", "1990-01-01", "Male", "123 Main St", "City1", "State1", "12345", "123-456-7890"),
    ("Jane", "Smith", "jane.smith@example.com", "1991-02-02", "Female", "456 Elm St", "City2", "State2", "23456", "234-567-8901"),
    ("Emily", "Johnson", "emily.johnson@example.com", "1992-03-03", "Female", "789 Oak St", "City3", "State3", "34567", "345-678-9012"),
    ("Michael", "Williams", "michael.williams@example.com", "1993-04-04", "Male", "101 Pine St", "City4", "State4", "45678", "456-789-0123"),
    ("Jessica", "Brown", "jessica.brown@example.com", "1994-05-05", "Female", "112 Maple St", "City5", "State5", "56789", "567-890-1234"),
    ("Matthew", "Jones", "matthew.jones@example.com", "1995-06-06", "Male", "213 Cedar St", "City6", "State6", "67890", "678-901-2345"),
    ("Sarah", "Miller", "sarah.miller@example.com", "1996-07-07", "Female", "314 Birch St", "City7", "State7", "78901", "789-012-3456"),
    ("Daniel", "Davis", "daniel.davis@example.com", "1997-08-08", "Male", "415 Walnut St", "City8", "State8", "89012", "890-123-4567"),
    ("Lauren", "Wilson", "lauren.wilson@example.com", "1998-09-09", "Female", "516 Spruce St", "City9", "State9", "90123", "901-234-5678"),
    ("James", "Taylor", "james.taylor@example.com", "1999-10-10", "Male", "617 Pineapple St", "City10", "State10", "01234", "012-345-6789"),
]

# Insert the sample data into the database
for data in student_data:
    insert_student_data(conn, *data)


Data inserted successfully!
Data inserted successfully!
Data inserted successfully!
Data inserted successfully!
Data inserted successfully!
Data inserted successfully!
Data inserted successfully!
Data inserted successfully!
Data inserted successfully!
Data inserted successfully!


# Query Data

In [None]:
def query_data(conn):
    cursor = None
    try:
        cursor = conn.cursor()

        # Check if there was an error in the previous operations
        if conn.notices:
            print("Previous operation had warnings or errors:", conn.notices)

        query = "SELECT * FROM students;"
        cursor.execute(query)

        rows = cursor.fetchall()
        for row in rows:
            print(row)

    except Exception as e:
        print("Error:", e)

        # Rollback the transaction in case of an error
        if cursor:
            conn.rollback()

    finally:
        if cursor:
            cursor.close()

query_data(conn)


# Execute Stored Procedures and Functions
- To execute stored procedures and functions, first, we need to create them in the PostgreSQL database. Once they are created, we can call them using Psycopg2.

- Create a simple function in PostgreSQL

CREATE OR REPLACE FUNCTION get_students_count()
RETURNS INTEGER AS $$
BEGIN
    RETURN (SELECT COUNT(*) FROM students);
END;
$$ LANGUAGE plpgsql;

In [10]:
def execute_function(conn):
    cursor = None
    try:
        cursor = conn.cursor()

        # Check if there was an error in the previous operations
        if conn.notices:
            print("Previous operation had warnings or errors:", conn.notices)

        cursor.callproc('get_students_count')
        count = cursor.fetchone()[0]

        print(f"Total students: {count}")

    except Exception as e:
        print("Error:", e)

        # Rollback the transaction in case of an error
        if cursor:
            conn.rollback()

    finally:
        if cursor:
            cursor.close()

# Example usage
execute_function(conn)

Total students: 10


# Delete Data

In [11]:
def delete_data(conn, email):
    cursor = None
    try:
        cursor = conn.cursor()

        # Check if there was an error in the previous operations
        if conn.notices:
            print("Previous operation had warnings or errors:", conn.notices)

        delete_query = "DELETE FROM students WHERE email = %s;"
        cursor.execute(delete_query, (email,))

        conn.commit()
        print("Data deleted successfully!")

    except Exception as e:
        print("Error:", e)

        # Rollback the transaction in case of an error
        if cursor:
            conn.rollback()

    finally:
        if cursor:
            cursor.close()

# Example usage
delete_data(conn, "john.doe@example.com")

Data deleted successfully!


# Remove Table

In [12]:
def remove_table(conn):
    try:
        cursor = conn.cursor()
        
        remove_query = "DROP TABLE IF EXISTS students;"
        cursor.execute(remove_query)
        
        print("Table removed successfully!")
        
    except Exception as e:
        print("Error:", e)
        
    finally:
        if cursor:
            cursor.close()

remove_table(conn)

Table removed successfully!


# Remove Database

In [13]:
import psycopg2

def remove_database(database_name):
    conn = None
    try:
        conn = psycopg2.connect(user="postgres",
                                password=pw,
                                host="localhost",
                                port="5432")
        conn.autocommit = True
        cursor = conn.cursor()

        # Forcefully terminate all connections to the database
        cursor.execute(f"SELECT pg_terminate_backend(pg_stat_activity.pid) "
                       f"FROM pg_stat_activity "
                       f"WHERE pg_stat_activity.datname = '{database_name}';")

        # Drop the database
        cursor.execute(f"DROP DATABASE IF EXISTS {database_name};")
        print(f"Database {database_name} removed successfully!")

    except Exception as e:
        print("Error:", e)

    finally:
        if conn:
            conn.close()


remove_database("schdbm")

Database schdbm removed successfully!
