In [1]:
import psycopg2
import pandas as pd
import os
import json
from openai import OpenAI
import warnings

# Suppress specific warnings
warnings.filterwarnings("ignore", category=UserWarning, message="pandas only supports SQLAlchemy connectable")


In [2]:
class OpenAIHelper:
    def __init__(self, api_key, model='gpt-4o-mini'):
        self.client = OpenAI(api_key=api_key)
        self.model = model

    def fill_missing_data_batch(self, rows):
        """
        Use OpenAI to classify militaria items and fill missing fields.
        """
        system_message = """
        You are a helpful assistant that classifies militaria items. 
        You infer the conflict, nation, and item type based on a title and description.

        Conflict must be one of the following:
        PRE_19TH, 19TH_CENTURY, PRE_WW1, WW1, INTER_WAR, WW2, COLD_WAR, 
        VIETNAM_WAR, KOREAN_WAR, CIVIL_WAR, MODERN, UNKNOWN.

        Item type must be one of the following:
        PAPER_ITEMS, FIELD_GEAR, UNIFORM, INSIGNIA, EDGED_WEAPONS, HELMET, 
        MEDALS_AWARDS, FLAG, HEADGEAR, ART, TINNIE, BELTS_BUCKLES, POSTCARD, 
        REPRODUCTION, FIREARMS, TOYS.
        """
        user_message = "Classify the following items:\n\n"
        for i, row in enumerate(rows, 1):
            user_message += f"{i}. Title: \"{row['title']}\"\n   Description: \"{row['description']}\"\n\n"
        user_message += "Respond in this format for each item:\nConflict: <conflict>\nNation: <nation>\nItem Type: <item_type>"

        try:
            completion = self.client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": system_message},
                    {"role": "user", "content": user_message}
                ]
            )
            response_content = completion.choices[0].message.content.strip()
            print("Raw OpenAI Output:", response_content)

            results = []
            for result in response_content.split('\n\n'):
                parts = result.split('\n')
                conflict = parts[0].split(': ')[1].strip()
                nation = parts[1].split(': ')[1].strip()
                item_type = parts[2].split(': ')[1].strip()
                results.append({"conflict": conflict, "nation": nation, "item_type": item_type})

            return results
        except Exception as e:
            print(f"Error during OpenAI classification: {e}")
            return []



In [None]:
def connect_to_db():
    """
    Establish connection to the PostgreSQL database with user-provided credentials.
    """
    try:
        print("Please enter the database connection details:")
        host = input("Host (e.g., your-database-hostname): ").strip()
        database = input("Database name: ").strip()
        user = input("User: ").strip()
        password = input("Password: ").strip()
        port = input("Port (default: 5432): ").strip() or "5432"

        connection = psycopg2.connect(
            host=host,
            database=database,
            user=user,
            password=password,
            port=port
        )
        print("Successfully connected to the database.")
        return connection
    except Exception as e:
        print(f"Error connecting to database: {e}")
        return None


In [4]:
def fetch_missing_data_rows(connection, row_limit=10):
    """
    Fetch rows that have missing data from the database.
    """
    cursor = connection.cursor()
    try:
        query = """
        SELECT * FROM militaria
        WHERE conflict IS NULL OR nation IS NULL OR item_type IS NULL
        LIMIT %s
        """
        cursor.execute(query, (row_limit,))
        rows = cursor.fetchall()
        columns = [desc[0] for desc in cursor.description]
        missing_data = pd.DataFrame(rows, columns=columns)
        return missing_data
    except Exception as e:
        print(f"Error fetching rows: {e}")
        return pd.DataFrame()
    finally:
        cursor.close()

In [5]:
def terminate_idle_connections(connection):
    """
    Terminate idle connections to the database to prevent locking issues.
    """
    cursor = connection.cursor()
    try:
        query = """
        SELECT pg_terminate_backend(pg_stat_activity.pid)
        FROM pg_stat_activity
        WHERE datname = current_database()
          AND state = 'idle'
          AND pid <> pg_backend_pid();
        """
        cursor.execute(query)
        connection.commit()
        print("Terminated idle connections successfully.")
    except Exception as e:
        print(f"Error terminating idle connections: {e}")
        connection.rollback()
    finally:
        cursor.close()

In [6]:
def update_database_row(connection, row):
    """
    Update a row in the database with the classified data.
    """
    cursor = connection.cursor()
    try:
        query = """
        UPDATE militaria
        SET conflict = %s, nation = %s, item_type = %s
        WHERE id = %s
        """
        cursor.execute(query, (row['conflict'], row['nation'], row['item_type'], row['id']))
        connection.commit()  # Commit changes after each update
        print(f"Updated row ID: {row['id']} successfully.")
    except psycopg2.errors.LockNotAvailable:
        print(f"Lock issue detected for row ID: {row['id']}. Retrying after resolving locks...")
        terminate_idle_connections(connection)  # Terminate blocking connections
        connection.rollback()  # Rollback transaction before retrying
        update_database_row(connection, row)  # Retry the update
    except Exception as e:
        connection.rollback()  # Rollback only in case of an error
        print(f"Error updating row ID: {row['id']}. Error: {e}")
    finally:
        cursor.close()

In [7]:
def main():
    """
    Main workflow to fetch rows, classify, and update the database.
    """
    os.chdir(r'C:\Users\keena\Desktop\ML & AI\AWS_OpenAI_Data_Filler')

    with open('ChatGPTAPIKey.json', 'r') as cred_file:
        json_data = json.load(cred_file)
        api_key = json_data['key']

    openai_helper = OpenAIHelper(api_key)
    connection = connect_to_db()
    if not connection:
        return

    batch_size = 10
    processed_ids = []  # List to track processed row IDs
    processed_rows = 0

    while True:
        # Fetch rows that have missing data
        missing_data = fetch_missing_data_rows(connection, row_limit=batch_size)

        # If there are no more rows to process, exit loop
        if missing_data.empty:
            print("No more rows to process.")
            break

        # Filter out rows that have already been processed
        rows = [row for row in missing_data.to_dict(orient="records") if row['id'] not in processed_ids]

        # If all fetched rows are already processed, skip to the next iteration
        if not rows:
            print("All fetched rows are already processed.")
            continue

        print(f"Fetched {len(rows)} rows to process...")

        # Use OpenAI to classify and fill the missing fields
        results = openai_helper.fill_missing_data_batch(rows)

        for i, row in enumerate(rows):
            if i < len(results):
                result = results[i]
                # Update row with the result if all fields are present
                if result["conflict"] and result["nation"] and result["item_type"]:
                    row.update(result)
                    update_database_row(connection, row)
                    processed_ids.append(row['id'])  # Track the processed row ID

        processed_rows += len(rows)
        print(f"Processed {processed_rows} rows so far.")

    # Close the database connection
    connection.close()


if __name__ == "__main__":
    main()


No more rows to process.
