# Database Connection Testing

This notebook is designed to test and demonstrate the connection to the PostgreSQL database using the centralized client module.

## Setup Environment Variables

First, we need to set up our environment variables for database authentication.

In [None]:
import os
import sys
from pathlib import Path
from dotenv import load_dotenv

# Add backend directory to path to import our pg_client
notebook_dir = Path(os.getcwd())
backend_dir = notebook_dir / 'backend'
sys.path.append(str(backend_dir))

# Load environment variables from .env file
load_dotenv(dotenv_path=".env")

# Define database connection parameters
DB_HOST = os.environ.get('DB_HOST', 'localhost')
DB_PORT = os.environ.get('DB_PORT', '5433')
DB_NAME = os.environ.get('DB_NAME', 'postgres')
DB_USER = os.environ.get('DB_USER', 'postgres')
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'postgres')

# Print connection info (masking the password)
print("Environment variables loaded successfully!")
print(f"Database connection parameters:")
print(f"  Host: {DB_HOST}")
print(f"  Port: {DB_PORT}")
print(f"  Name: {DB_NAME}")
print(f"  User: {DB_USER}")
print(f"  Password: {'*' * len(DB_PASSWORD) if DB_PASSWORD else 'Not set!'}")

Environment variables loaded successfully!
Supabase URL: http://localhost:8000
Supabase Key: eyJh...n_I0


## Install Required Libraries

Let's make sure we have all the necessary libraries installed.

In [None]:
# Install required packages if not already installed
import subprocess

def install_package(package):
    try:
        __import__(package)
        print(f"{package} is already installed.")
    except ImportError:
        print(f"Installing {package}...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])
        print(f"{package} has been installed.")

# Install required packages
install_package("psycopg2-binary")
install_package("pandas")  # For data display

supabase is already installed.
pandas is already installed.


## Import our PostgreSQL Client

Now we'll import the centralized PostgreSQL client we've created.

In [None]:
# First, let's see if we can import the module directly
try:
    from app.db.pg_client import get_connection, execute_query, test_connection
    print("Successfully imported the pg_client module from the application!")
except ImportError as e:
    print(f"Could not import pg_client directly: {e}")
    
    # If direct import fails, let's define the client here as a backup
    print("Defining a local version of the PostgreSQL client...")
    
    import psycopg2
    import psycopg2.extras
    import time
    import logging
    from typing import Dict, Any, List, Optional, Tuple, Union
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    def get_connection():
        """Get a connection to the PostgreSQL database"""
        try:
            conn = psycopg2.connect(
                host=DB_HOST,
                port=DB_PORT,
                dbname=DB_NAME,
                user=DB_USER,
                password=DB_PASSWORD
            )
            conn.autocommit = True
            return conn
        except Exception as e:
            print(f"Error connecting to database: {e}")
            return None
            
    def execute_query(query: str, params=None, fetchall=True) -> Union[List[Dict[str, Any]], Dict[str, Any], Dict[str, int]]:
        """Execute a SQL query and return the results"""
        conn = None
        try:
            conn = get_connection()
            with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
                cur.execute(query, params)
                
                if query.strip().upper().startswith(('SELECT', 'SHOW', 'WITH')):
                    if fetchall:
                        results = cur.fetchall()
                    else:
                        results = cur.fetchone()
                    return results
                else:
                    conn.commit()
                    return {'affected_rows': cur.rowcount}
        except Exception as e:
            print(f"Query execution failed: {e}")
            print(f"Query: {query}")
            print(f"Params: {params}")
            if conn:
                conn.rollback()
            raise
        finally:
            if conn:
                conn.close()
    
    def test_connection() -> bool:
        """Test database connection"""
        try:
            conn = get_connection()
            if not conn:
                return False
                
            with conn.cursor() as cur:
                cur.execute("SELECT 1")
                result = cur.fetchone()
            conn.close()
            return True
        except Exception as e:
            print(f"Connection test failed: {e}")
            return False
    
    print("Local PostgreSQL client defined successfully!")

## Test Database Connection

Let's test if we can successfully connect to the database.

In [None]:
# Test the database connection
print("Testing database connection...")

if test_connection():
    print("✅ Database connection successful!")
    
    # Check the PostgreSQL version
    try:
        result = execute_query("SELECT version();", fetchall=False)
        print(f"PostgreSQL version: {result['version']}")
    except Exception as e:
        print(f"Could not fetch PostgreSQL version: {e}")
else:
    print("❌ Database connection failed!")
    print("Please check your database connection parameters and ensure the database is running.")

Failed to execute script database/migrations/schema.sql: [Errno 61] Connection refused
Failed to execute script database/seed.sql: [Errno 61] Connection refused


## Check Tables

Let's check which tables exist in the database.

In [None]:
import pandas as pd

# Get a list of tables in the database
try:
    tables_query = """
    SELECT 
        table_name 
    FROM 
        information_schema.tables 
    WHERE 
        table_schema = 'public' 
        AND table_type = 'BASE TABLE'
    ORDER BY 
        table_name;
    """
    
    tables = execute_query(tables_query)
    
    if not tables:
        print("No tables found in the database.")
    else:
        print(f"Found {len(tables)} tables:")
        df_tables = pd.DataFrame(tables)
        display(df_tables)
        
        # Store the table names for later use
        table_names = [table['table_name'] for table in tables]
except Exception as e:
    print(f"Error fetching tables: {e}")
    table_names = []

Attempting to connect to Supabase...

Trying to query table: users
Could not query users: [Errno 61] Connection refused

Trying to query table: players
Could not query players: [Errno 61] Connection refused

Trying to query table: auth.users
Could not query auth.users: [Errno 61] Connection refused

Trying to query table: public.users
Could not query public.users: [Errno 61] Connection refused

Trying system health check...

Connection failed: [Errno 61] Connection refused

Possible issues:
1. Supabase URL or API key is incorrect
2. Supabase service is not running
3. Network connectivity issues
4. The required tables don't exist yet


False

## Explore Table Schemas

Let's explore the schema of our tables.

In [None]:
def get_table_schema(table_name):
    """Get schema information for a given table"""
    try:
        schema_query = """
        SELECT 
            column_name, 
            data_type, 
            is_nullable, 
            column_default
        FROM 
            information_schema.columns
        WHERE 
            table_schema = 'public'
            AND table_name = %s
        ORDER BY 
            ordinal_position;
        """
        
        columns = execute_query(schema_query, (table_name,))
        
        if not columns:
            print(f"No columns found for table {table_name}.")
            return None
        
        df_columns = pd.DataFrame(columns)
        return df_columns
    except Exception as e:
        print(f"Error fetching schema for {table_name}: {e}")
        return None

# Display schema for each table
for table_name in table_names:
    print(f"\nSchema for table: {table_name}")
    schema = get_table_schema(table_name)
    if schema is not None:
        display(schema)
        
        # Get row count
        try:
            count_result = execute_query(f"SELECT COUNT(*) as count FROM {table_name}", fetchall=False)
            print(f"Row count: {count_result['count']}")
        except Exception as e:
            print(f"Error getting row count: {e}")

## Perform Basic CRUD Operations

Let's demonstrate basic CRUD operations using the centralized PostgreSQL client.

In [None]:
import uuid
from datetime import datetime

# Select a table to work with
# Assuming 'players' table exists, if not, replace with an appropriate table
TARGET_TABLE = 'players' if 'players' in table_names else (table_names[0] if table_names else None)

if not TARGET_TABLE:
    print("No tables available for CRUD operations. Please create tables first.")
else:
    print(f"Performing CRUD operations on table: {TARGET_TABLE}")

In [None]:
# 1. CREATE - Insert a new record
def insert_record(table_name):
    """Insert a test record into the specified table"""
    if table_name == 'players':
        record = {
            "id": str(uuid.uuid4()),
            "handle": f"TestPlayer_{datetime.now().strftime('%Y%m%d%H%M%S')}",
            "elo_rating": 1000,
            "avatar_url": "https://api.dicebear.com/7.x/avataaars/svg?seed=test",
            "created_at": datetime.now().isoformat()
        }
    elif table_name == 'users':
        record = {
            "id": str(uuid.uuid4()),
            "email": f"test_{datetime.now().strftime('%Y%m%d%H%M%S')}@example.com",
            "password_hash": "dummy_hash",  # In production, use proper hashing
            "role": "user",
            "created_at": datetime.now().isoformat()
        }
    elif table_name == 'matches':
        record = {
            "id": str(uuid.uuid4()),
            "team1": "Team A",
            "team2": "Team B",
            "team1_score": 0,
            "team2_score": 0,
            "status": "scheduled",
            "match_date": datetime.now().isoformat(),
            "created_at": datetime.now().isoformat()
        }
    else:
        print(f"Insert template not defined for table {table_name}. Skipping insert.")
        return None
    
    try:
        # Get fields and values for SQL
        fields = list(record.keys())
        placeholders = ["%s"] * len(fields)
        values = [record[field] for field in fields]
        
        # Construct the query
        query = f"""
        INSERT INTO {table_name} ({', '.join(fields)})
        VALUES ({', '.join(placeholders)})
        RETURNING *
        """
        
        # Execute the query
        result = execute_query(query, tuple(values), fetchall=False)
        print("✅ Record inserted successfully:")
        return result
    except Exception as e:
        print(f"❌ Error inserting record: {e}")
        return None

# Insert a test record
if TARGET_TABLE:
    new_record = insert_record(TARGET_TABLE)
    if new_record:
        # Convert to DataFrame for better display
        pd.DataFrame([new_record]).T  # Transpose for better viewing

In [None]:
# 2. READ - Query records
def query_records(table_name, limit=5):
    """Query records from the specified table"""
    try:
        query = f"SELECT * FROM {table_name} ORDER BY created_at DESC LIMIT {limit}"
        results = execute_query(query)
        
        if not results:
            print(f"No records found in {table_name}.")
            return None
            
        print(f"✅ Retrieved {len(results)} records from {table_name}:")
        return pd.DataFrame(results)
    except Exception as e:
        print(f"❌ Error querying records: {e}")
        return None

# Query records
if TARGET_TABLE:
    df_records = query_records(TARGET_TABLE)
    if df_records is not None:
        display(df_records)
        
        # Store the ID of the first record for update/delete operations
        if not df_records.empty and 'id' in df_records.columns:
            record_id = df_records['id'].iloc[0]
            print(f"Selected record ID for update/delete operations: {record_id}")
        else:
            record_id = None
            print("No record ID available for update/delete operations.")

In [None]:
# 3. UPDATE - Update a record
def update_record(table_name, record_id, update_data):
    """Update a record in the specified table"""
    try:
        # Check if record exists
        check_query = f"SELECT id FROM {table_name} WHERE id = %s"
        existing = execute_query(check_query, (record_id,), fetchall=False)
        
        if not existing:
            print(f"Record with ID {record_id} not found in {table_name}.")
            return None
        
        # Add updated_at timestamp
        update_data["updated_at"] = datetime.now().isoformat()
        
        # Build SET clause
        set_clauses = [f"{key} = %s" for key in update_data.keys()]
        values = list(update_data.values())
        
        # Add record ID to values list for WHERE clause
        values.append(record_id)
        
        # Construct query
        query = f"""
        UPDATE {table_name} 
        SET {', '.join(set_clauses)} 
        WHERE id = %s
        RETURNING *
        """
        
        # Execute the query
        result = execute_query(query, tuple(values), fetchall=False)
        print("✅ Record updated successfully:")
        return result
    except Exception as e:
        print(f"❌ Error updating record: {e}")
        return None

# Update the record if we have a valid record ID
if TARGET_TABLE and 'record_id' in locals() and record_id:
    # Define update data based on table
    if TARGET_TABLE == 'players':
        update_data = {"elo_rating": 1100, "handle": f"Updated_{datetime.now().strftime('%H%M%S')}"}
    elif TARGET_TABLE == 'users':
        update_data = {"role": "admin"}
    elif TARGET_TABLE == 'matches':
        update_data = {"status": "completed", "team1_score": 3, "team2_score": 2}
    else:
        update_data = {"updated_at": datetime.now().isoformat()}
    
    updated_record = update_record(TARGET_TABLE, record_id, update_data)
    if updated_record:
        # Convert to DataFrame for better display
        updated_df = pd.DataFrame([updated_record]).T  # Transpose for better viewing
        display(updated_df)

## Database Reset or Setup

This section demonstrates how to set up or reset database tables using the Supabase client. 
**Use with caution as these operations can delete data!**

## Database Setup and Management

Here are utilities for setting up and managing the database schema.

In [None]:
def run_sql_file(filepath):
    """Execute an SQL file"""
    try:
        # Read the SQL script
        with open(filepath, 'r') as f:
            sql_script = f.read()
            
        # Split into commands
        commands = [cmd.strip() for cmd in sql_script.split(';') if cmd.strip()]
        
        success_count = 0
        error_count = 0
        
        # Execute each command
        conn = get_connection()
        with conn.cursor() as cur:
            for i, command in enumerate(commands, 1):
                try:
                    print(f"Executing command {i}/{len(commands)}...")
                    cur.execute(command)
                    conn.commit()
                    success_count += 1
                except Exception as e:
                    conn.rollback()
                    print(f"Error executing command {i}: {e}")
                    error_count += 1
        conn.close()
        
        print(f"SQL file execution completed: {success_count} commands succeeded, {error_count} commands failed")
        return success_count, error_count
    except Exception as e:
        print(f"Error processing SQL file: {e}")
        return 0, 0

# Example usage - commented out for safety
'''
# Run schema migrations
schema_path = Path("database/migrations/schema.sql")
if schema_path.exists():
    print(f"Running schema migrations from {schema_path}...")
    run_sql_file(schema_path)
else:
    print(f"Schema file not found at {schema_path}")

# Run seed data
seed_path = Path("database/seed.sql")
if seed_path.exists():
    print(f"Running seed data from {seed_path}...")
    run_sql_file(seed_path)
else:
    print(f"Seed file not found at {seed_path}")
'''

## Conclusion

In this notebook, we've successfully connected to the PostgreSQL database using our centralized client module. We've demonstrated how to:

1. Set up database connection parameters
2. Connect to the database
3. Explore table schemas
4. Perform basic CRUD operations
5. Execute SQL files for schema migrations and seeding

This notebook provides a convenient way to interact with the database for development and testing purposes.