Iterate through the JSONs in the Postgres and extract valuable information. This is the main knowledge transformation. 

In [1]:
# Cell 1: Setup and Configuration
import os
import json
import psycopg2
import requests
import time
import cursor
from tqdm import tqdm
from dotenv import load_dotenv
from queue import Queue

# Load environment variables
load_dotenv()

# Ensure required environment variables are loaded
REQUIRED_ENV_VARS = ["MISTRAL_API_KEY", "MISTRAL_API_KEY_2", "POSTGRES_PASSWORD", "POSTGRES_DB"]
for var in REQUIRED_ENV_VARS:
    if not os.getenv(var):
        raise EnvironmentError(f"❌ Missing required environment variable: {var}")

MISTRAL_API_KEY_1 = os.getenv("MISTRAL_API_KEY")
MISTRAL_API_KEY_2 = os.getenv("MISTRAL_API_KEY_2")
MISTRAL_API_KEY_3 = os.getenv("MISTRAL_API_KEY_3")
BATCH_SIZE = 5

# Toggle to start from last uploaded entry
START_FROM_LAST_PROCESSED = False
LAST_PROCESSED_ID = 3859

print("🔑 API Keys Loaded:")
print(f"MISTRAL_API_KEY_1: {MISTRAL_API_KEY_1[-6:]}")
print(f"MISTRAL_API_KEY_2: {MISTRAL_API_KEY_2[-6:]}")
print(f"MISTRAL_API_KEY_3: {MISTRAL_API_KEY_3[-6:]}")

# PostgreSQL Connection
try:
    conn = psycopg2.connect(
        dbname=os.getenv("POSTGRES_DB"),
        user="rohansharma",
        password=os.getenv("POSTGRES_PASSWORD"),
        host="localhost",
        port="5432"
    )
    cursor = conn.cursor()
    print("✅ Successfully connected to PostgreSQL!")
except psycopg2.Error as e:
    print(f"❌ Failed to connect to PostgreSQL: {e}")
    exit(1)

# Count total unprocessed entries
try:
    if START_FROM_LAST_PROCESSED:
        cursor.execute(f"SELECT COUNT(*) FROM sep_embeddings WHERE id >= {LAST_PROCESSED_ID} AND mistral_output IS NULL;")
    else:
        cursor.execute("SELECT COUNT(*) FROM sep_embeddings WHERE mistral_output IS NULL;")

    total_entries = cursor.fetchone()[0]
    print(f"📊 Total unprocessed entries: {total_entries}")
except psycopg2.Error as e:
    print(f"❌ Error executing query: {e}")
    exit(1)

# Fetch unprocessed rows
if START_FROM_LAST_PROCESSED:
    cursor.execute(f"""
        SELECT id, content FROM sep_embeddings 
        WHERE id >= {LAST_PROCESSED_ID} AND mistral_output IS NULL 
        ORDER BY id;
    """)
else:
    cursor.execute("""
        SELECT id, content FROM sep_embeddings 
        WHERE mistral_output IS NULL 
        ORDER BY id;
    """)

rows = cursor.fetchall()
batch_updates = []

# Initialize progress bar
progress_bar = tqdm(total=total_entries, desc="Processing Entries", unit="entry")

# Initialize queues
queue_1 = Queue()
queue_2 = Queue()
queue_3 = Queue()
response_queue = Queue()

# Distribute requests between the three API keys
for i, (entry_id, content) in enumerate(rows, start=1):
    if i % 3 == 0:
        queue_1.put((entry_id, content))
    elif i % 3 == 1:
        queue_2.put((entry_id, content))
    else:
        queue_3.put((entry_id, content))

🔑 API Keys Loaded:
MISTRAL_API_KEY_1: ptbzbx
MISTRAL_API_KEY_2: pGxUxC
MISTRAL_API_KEY_3: m9WQW8
✅ Successfully connected to PostgreSQL!
📊 Total unprocessed entries: 38


Processing Entries:   0%|          | 0/38 [00:00<?, ?entry/s]

In [2]:
import json
import re

import os
import json
import psycopg2
import requests
import time
import cursor
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
import threading
from dotenv import load_dotenv
from queue import Queue, Empty

import json
import re

def extract_first_valid_json(api_content):
    """Extracts only the first valid JSON object from the response content and ignores any extra text."""
    
    # Regex pattern to match a valid JSON object (starting and ending with {})
    json_matches = re.findall(r"\{(?:[^{}]*|(?R))*\}", api_content, re.DOTALL)

    if not json_matches:
        return None  # No JSON found

    for match in json_matches:
        try:
            parsed_json = json.loads(match)  # Try parsing the first JSON object
            return parsed_json  # Return the first successfully parsed JSON object
        except json.JSONDecodeError:
            continue  # If this one fails, try the next one

    return None  # If all JSON objects fail to parse, return None

def call_mistral_api(api_key, lock, entry_id, content, attempt=0):
    """Sends an API request with exponential backoff for 429 errors."""
    url = "https://api.mistral.ai/v1/chat/completions"
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }

    messages = [
        {"role": "system", "content": "Extract structured philosophical knowledge. Your response should always be a single valid JSON object."},
        {"role": "user", "content": f"""
        You are an AI tasked with extracting structured philosophical knowledge from a given text.
        Your goal is to **categorize the entry and extract key information in a structured format.**  
        Be **as detailed as possible while remaining concise.**  
        **Always return a single JSON object, never multiple JSON objects.**

        ## **Input:**
        {content}

        ## **Output Format (JSON):**
        {{
            "category": "thinker" | "concept" | "era",
            "metadata": {{
                "name": "...", 
                "description": "...",
                "time_period": "..."  
            }},
            "key_beliefs": [  
                {{ "belief": "...", "justification": "...", "related_concepts": ["...", "..."] }}
            ],
            "key_concepts": [  
                {{ "name": "...", "definition": "...", "related_fields": ["...", "..."] }}
            ],
            "associated_thinkers": ["...", "..."],
            "associated_eras": ["...", "..."]
        }}
        **NEVER return multiple JSON objects.**
        **NO output except for the JSON.**
        """}
    ]

    data = {
        "model": "mistral-medium-2312",
        "messages": messages,
        "temperature": 0.4
    }

    with lock:
        time.sleep(1.1)  # Strict rate limit enforcement

    response = requests.post(url, headers=headers, json=data)

    if response.status_code == 200:
        try:
            parsed_response = response.json()
            api_content = parsed_response["choices"][0]["message"]["content"]

            structured_output = extract_first_valid_json(api_content)

            if structured_output:
                return entry_id, structured_output
            else:
                print(f"❌ No valid JSON found for entry {entry_id}. Full response:\n{api_content}")
                return None

        except json.JSONDecodeError:
            print(f"❌ Failed to parse API response for entry ID {entry_id}. Full Response:\n{response.text}")
            return None

    elif response.status_code == 429:
        if attempt < 6:
            wait_time = 5 * attempt
            print(f"\n⏳ Rate limited (429) for entry {entry_id}. Retrying in {wait_time} seconds...")
            time.sleep(wait_time)
            return call_mistral_api(api_key, lock, entry_id, content, attempt + 1)
        else:
            print(f"\n❌ Entry {entry_id} failed after 5 retries. Skipping... Response text:\n{response.text}")
            return None

    else:
        print(f"\n❌ API request failed for entry ID {entry_id}: {response.status_code}")
        print(f"❌ Full API response: {response.text}")
        return None


In [None]:
def api_worker(queue, api_key, lock):
    """Worker that fetches requests from the queue and processes them."""
    while not queue.empty():
        try:
            entry_id, content = queue.get(timeout=10)
            print(f"🔄 Processing entry {entry_id} with API Key {api_key[-4:]}")

            response = call_mistral_api(api_key, lock, entry_id, content)

            if response:
                print(f"✅ Adding entry {entry_id} to response queue...")
                response_queue.put(response)
                print(f"✅ Successfully added entry {entry_id} to response queue!")
            else:
                print(f"⚠️ Skipping entry {entry_id} due to failed response.")

            queue.task_done()

        except Empty:
            print("📭 API worker queue empty. Exiting thread.")
            break

def process_responses():
    # At the start of the function
    if conn.closed:
        print("❌ Database connection is closed!")
        return
        
    # Before each batch update
    try:
        conn.isolation_level
    except psycopg2.OperationalError:
        print("❌ Lost database connection!")
        return
    batch_updates = []
    batch_count = 0
    
    print("🚀 Response processor started.")
    
    while True:
        try:
            print(f"⏳ Current batch size: {len(batch_updates)}")  # Add this debug line
            entry = response_queue.get(timeout=10)
            
            if entry is None:
                continue
                
            entry_id, mistral_output_json = entry
            print(f"💾 Processing database update for entry {entry_id}")  # Add this debug line
            
            if entry_id and mistral_output_json:
                batch_updates.append((json.dumps(mistral_output_json), entry_id))
                print(f"📦 Added entry {entry_id} to batch (Batch size: {len(batch_updates)})")

            print(f"🔄 Current batch size: {len(batch_updates)} / {BATCH_SIZE}")

            if len(batch_updates) >= BATCH_SIZE:
                try:
                    print(f"🚀 Attempting to upload batch #{batch_count + 1} ({len(batch_updates)} entries)...")

                    cursor.executemany("""
                        UPDATE sep_embeddings 
                        SET mistral_output = %s 
                        WHERE id = %s;
                    """, batch_updates)
                    conn.commit()

                    rows_updated = cursor.rowcount
                    batch_count += 1

                    if rows_updated > 0:
                        print(f"✅ Uploaded batch #{batch_count} ({len(batch_updates)} entries, Row count: {rows_updated})")
                    else:
                        print(f"⚠️ WARNING: Batch #{batch_count} committed but no rows updated!")

                    batch_updates = []

                except Exception as e:
                    conn.rollback()
                    print(f"❌ Error uploading batch to database: {e}")

        except Empty:
            if queue_1.empty() and queue_2.empty() and queue_3.empty() and response_queue.empty():
                print("📭 No more responses to process. Exiting response thread.")
                break
            else:
                print("⏳ Still waiting for more responses...")

    if batch_updates:
        try:
            print(f"🚀 Attempting final batch upload #{batch_count + 1} ({len(batch_updates)} entries)...")

            cursor.executemany("""
                UPDATE sep_embeddings 
                SET mistral_output = %s 
                WHERE id = %s;
            """, batch_updates)
            conn.commit()

            rows_updated = cursor.rowcount
            batch_count += 1
            print(f"✅ Final batch uploaded successfully (Batch #{batch_count}, {len(batch_updates)} entries, Row count: {rows_updated})")

        except Exception as e:
            conn.rollback()
            print(f"❌ Error uploading final batch: {e}")

    print(f"\n🎉 Processing complete! Total batches uploaded: {batch_count}")
    cursor.close()
    conn.close()

def run_api_workers():
    """Starts API workers for each API key using multiple threads."""
    
    # Ensure locks are defined before starting threads
    global api_lock_1, api_lock_2, api_lock_3
    api_lock_1 = threading.Lock()
    api_lock_2 = threading.Lock()
    api_lock_3 = threading.Lock()

    with ThreadPoolExecutor(max_workers=15) as executor:  # Adjusted from 10 to 15 for more parallelism
        futures = []
        # Create multiple workers per API key
        for _ in range(5):  # 5 workers per key
            futures.append(executor.submit(api_worker, queue_1, MISTRAL_API_KEY_1, api_lock_1))
            futures.append(executor.submit(api_worker, queue_2, MISTRAL_API_KEY_2, api_lock_2))
            futures.append(executor.submit(api_worker, queue_3, MISTRAL_API_KEY_3, api_lock_3))

# Cell 3: Execution
if __name__ == "__main__":
    # Start response processing thread
    response_thread = threading.Thread(target=process_responses)
    response_thread.start()
    
    # Run API workers
    run_api_workers()
    
    # Wait for response processing to complete
    response_thread.join()

🚀 Response processor started.
⏳ Current batch size: 0


NameError: name 'api_lock_1' is not defined

⏳ Still waiting for more responses...
⏳ Current batch size: 0
⏳ Still waiting for more responses...
⏳ Current batch size: 0
⏳ Still waiting for more responses...
⏳ Current batch size: 0
⏳ Still waiting for more responses...
⏳ Current batch size: 0
⏳ Still waiting for more responses...
⏳ Current batch size: 0
⏳ Still waiting for more responses...
⏳ Current batch size: 0
⏳ Still waiting for more responses...
⏳ Current batch size: 0
⏳ Still waiting for more responses...
⏳ Current batch size: 0
⏳ Still waiting for more responses...
⏳ Current batch size: 0
⏳ Still waiting for more responses...
⏳ Current batch size: 0
⏳ Still waiting for more responses...
⏳ Current batch size: 0
⏳ Still waiting for more responses...
⏳ Current batch size: 0
⏳ Still waiting for more responses...
⏳ Current batch size: 0
⏳ Still waiting for more responses...
⏳ Current batch size: 0
⏳ Still waiting for more responses...
⏳ Current batch size: 0
⏳ Still waiting for more responses...
⏳ Current batch size: 0
⏳ Still 

Run some sanity checks on the Postgres

Compute and upload the vector embeddings. 