# PostgreSQL Connection and Analysis Notebook

This notebook provides functionality to connect to PostgreSQL using the configuration from `config_boomer_load.yml` and run queries with results returned as pandas DataFrames.

## Environment Setup

First, let's ensure all required packages are installed in the current kernel environment.

In [10]:
# Install required packages if not already installed
import subprocess
import sys

def install_package(package):
    """Install a package using pip in the current kernel environment."""
    try:
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])
        print(f"✅ {package} installed successfully")
    except subprocess.CalledProcessError as e:
        print(f"❌ Error installing {package}: {e}")

# Required packages
required_packages = [
    "psycopg2-binary==2.9.7",
    "pandas",
    "PyYAML==6.0.1"
]

print("Installing required packages...")
for package in required_packages:
    install_package(package)

print("\n" + "="*50)
print("Package installation completed!")
print("="*50)

Installing required packages...
✅ psycopg2-binary==2.9.7 installed successfully
✅ pandas installed successfully
✅ PyYAML==6.0.1 installed successfully

Package installation completed!


In [11]:
# Import required libraries with error handling
try:
    import pandas as pd
    print("✅ pandas imported successfully")
except ImportError as e:
    print(f"❌ Error importing pandas: {e}")

try:
    import psycopg2
    print(f"✅ psycopg2 imported successfully (version: {psycopg2.__version__})")
except ImportError as e:
    print(f"❌ Error importing psycopg2: {e}")
    print("Please run the installation cell above first!")

try:
    import yaml
    print("✅ yaml imported successfully")
except ImportError as e:
    print(f"❌ Error importing yaml: {e}")

import os
import sys
from typing import Optional, Dict, Any

# Add src directory to path for imports
sys.path.append(os.path.join(os.path.dirname(os.getcwd()), 'src'))

print("\n" + "="*50)
print("All libraries imported successfully!")
print("="*50)

✅ pandas imported successfully
✅ psycopg2 imported successfully (version: 2.9.7 (dt dec pq3 ext lo64))
✅ yaml imported successfully

All libraries imported successfully!


In [13]:
def load_config(config_path: str = '../config/config_boomer_load_local.yml') -> Dict[str, Any]:
    """
    Load configuration from YAML file.
    
    Args:
        config_path: Path to the configuration file
        
    Returns:
        Dictionary containing configuration data
    """
    try:
        with open(config_path, 'r') as file:
            config = yaml.safe_load(file)
        print(f"Configuration loaded successfully from {config_path}")
        return config
    except FileNotFoundError:
        print(f"Configuration file not found: {config_path}")
        raise
    except yaml.YAMLError as e:
        print(f"Error parsing YAML file: {e}")
        raise

# Load configuration
config = load_config()
print("Available configuration sections:", list(config.keys()))

Configuration loaded successfully from ../config/config_boomer_load_local.yml
Available configuration sections: ['database', 'queries', 'neo4j_load_config']


In [14]:
def parse_postgres_config(config_string: str) -> Dict[str, str]:
    """
    Parse PostgreSQL configuration string into connection parameters.
    
    Args:
        config_string: Multi-line string containing PostgreSQL configuration
        
    Returns:
        Dictionary with connection parameters
    """
    db_config = {}
    
    for line in config_string.strip().split('\n'):
        if ':' in line:
            key, value = line.split(':', 1)
            db_config[key.strip()] = value.strip()
    
    return db_config

# Parse PostgreSQL configuration
postgres_config_str = config['database']['postgres']
postgres_config = parse_postgres_config(postgres_config_str)
print("PostgreSQL connection parameters:")
for key, value in postgres_config.items():
    if key == 'password':
        print(f"  {key}: {'*' * len(value)}")
    else:
        print(f"  {key}: {value}")

PostgreSQL connection parameters:
  host: 4.150.184.135
  port: 5432
  database: SERP_TREND_DEV_DB
  user: serptrend_user
  password: ***********************


In [None]:
# Improved PostgreSQL query function that handles both SELECT and UPDATE/INSERT/DELETE queries
def execute_postgres_query_improved(query: str, config: Dict[str, str]) -> pd.DataFrame:
    """
    Execute a PostgreSQL query and return results as a pandas DataFrame.
    
    Args:
        query: SQL query to execute
        config: PostgreSQL connection configuration
        
    Returns:
        pandas DataFrame containing query results (empty DataFrame for UPDATE/INSERT/DELETE)
    """
    connection = None
    try:
        # Establish connection
        connection = psycopg2.connect(
            host=config['host'],
            port=config['port'],
            database=config['database'],
            user=config['user'],
            password=config['password']
        )
        
        print(f"Connected to PostgreSQL database: {config['database']}")
        
        # Check if query is a SELECT statement
        query_upper = query.strip().upper()
        if query_upper.startswith('SELECT'):
            # Execute SELECT query and return as DataFrame
            df = pd.read_sql_query(query, connection)
            print(f"Query executed successfully. Retrieved {len(df)} rows.")
            return df
        else:
            # Execute UPDATE/INSERT/DELETE query
            cursor = connection.cursor()
            cursor.execute(query)
            rows_affected = cursor.rowcount
            connection.commit()
            cursor.close()
            print(f"Query executed successfully. {rows_affected} rows affected.")
            return pd.DataFrame()  # Return empty DataFrame for non-SELECT queries
        
    except psycopg2.Error as e:
        print(f"PostgreSQL error: {e}")
        raise
    except Exception as e:
        print(f"Unexpected error: {e}")
        raise
    finally:
        if connection:
            connection.close()
            print("Database connection closed.")

In [None]:
# Test the connection and function with the sample query
sample_query = "SELECT * FROM structured_content LIMIT 2;"
# sample_query = "SELECT distinct site_name FROM structured_content;"

print(f"Executing sample query: {sample_query}")
print("-" * 50)

try:
    result_df = execute_postgres_query_improved(sample_query, postgres_config)
    
    print("\nQuery Results:")
    print(f"Shape: {result_df.shape}")
    print(f"Columns: {list(result_df.columns)}")
    print("\nFirst few rows:")
    display(result_df.head())
    
except Exception as e:
    print(f"Error executing query: {e}")

Executing sample query: SELECT * FROM structured_content LIMIT 2;
--------------------------------------------------
Connected to PostgreSQL database: SERP_TREND_DEV_DB


  df = pd.read_sql_query(query, connection)


Query executed successfully. Retrieved 2 rows.
Database connection closed.

Query Results:
Shape: (2, 18)
Columns: ['id', 'url', 'raw_html_id', 'domain', 'site_name', 'title', 'author', 'publish_date', 'content', 'summary', 'tags', 'language', 'word_count', 'meta_description', 'extracted_at', 'is_latest', 'run_id', 'status_neo4j']

First few rows:


Unnamed: 0,id,url,raw_html_id,domain,site_name,title,author,publish_date,content,summary,tags,language,word_count,meta_description,extracted_at,is_latest,run_id,status_neo4j
0,156,https://global.morningstar.com/en-ca/investmen...,157,morningstar.com,,BMO Retirement Income Portfolio F Fund Summary...,,,BMO Retirement Income Portfolio F Fund Summary...,Get a detailed summary of BMO Retirement Incom...,[Retirement income],en,219,Get a detailed summary of BMO Retirement Incom...,2025-08-11 11:45:22.996649+00:00,True,fd77e405-fdb5-48b1-acb6-6e178f468e44,True
1,171,https://act.alz.org/site/TR?team_id=977496&pg=...,172,alz.org,,Delicate Hands Home Health's Walk to End Alzhe...,,,"2025 Walk to End Alzheimer's Spartanburg, SC...",Welcome to our team page for the Alzheimer's A...,[Alzheimer's],en,657,Support Delicate Hands Home Health in the figh...,2025-08-11 11:59:14.246271+00:00,True,376e84db-fa4c-4239-9588-be36d2f2fd4d,True


In [17]:
result_df.columns

Index(['id', 'url', 'raw_html_id', 'domain', 'site_name', 'title', 'author',
       'publish_date', 'content', 'summary', 'tags', 'language', 'word_count',
       'meta_description', 'extracted_at', 'is_latest', 'run_id',
       'status_neo4j'],
      dtype='object')

In [47]:
result_df = execute_postgres_query_improved("SELECT count(*) FROM structured_content ;",postgres_config)
result_df

Connected to PostgreSQL database: SERP_TREND_DEV_DB


  df = pd.read_sql_query(query, connection)


Query executed successfully. Retrieved 1 rows.
Database connection closed.


Unnamed: 0,count
0,129


In [50]:
result_df = execute_postgres_query_improved("SELECT count(*) FROM structured_content where status_neo4j=false ;",postgres_config)
result_df

Connected to PostgreSQL database: SERP_TREND_DEV_DB


  df = pd.read_sql_query(query, connection)


Query executed successfully. Retrieved 1 rows.
Database connection closed.


Unnamed: 0,count
0,19


In [None]:
# # Query to reset status_neo4j to false for all articles
# reset_query = "UPDATE structured_content SET status_neo4j = false;"
# print(f"Executing reset query: {reset_query}")
# print("-" * 50)

# try:
#     result_df = execute_postgres_query_improved(reset_query, postgres_config)
#     print("\nSuccessfully reset status_neo4j to false for all articles")
    
#     # Verify the update by checking count of articles with status_neo4j = false
#     verify_query = "SELECT count(*) FROM structured_content WHERE status_neo4j = false;"
#     verify_df = execute_postgres_query_improved(verify_query, postgres_config)
#     print(f"\nNumber of articles with status_neo4j = false: {verify_df.iloc[0,0]}")
    
# except Exception as e:
#     print(f"Error executing reset query: {e}")



Executing reset query: UPDATE structured_content SET status_neo4j = false;
--------------------------------------------------
Connected to PostgreSQL database: SERP_TREND_DEV_DB
Query executed successfully. 112 rows affected.
Database connection closed.

Successfully reset status_neo4j to false for all articles
Connected to PostgreSQL database: SERP_TREND_DEV_DB


  df = pd.read_sql_query(query, connection)


Query executed successfully. Retrieved 1 rows.
Database connection closed.

Number of articles with status_neo4j = false: 112


In [37]:
# Query to reset status_neo4j to false for all articles
reset_query = "UPDATE structured_content SET status_neo4j = false;"
print(f"Executing reset query: {reset_query}")
print("-" * 50)

try:
    result_df = execute_postgres_query_improved(reset_query, postgres_config)
    print("\nSuccessfully reset status_neo4j to false for all articles")
    
    # Verify the update by checking count of articles with status_neo4j = false
    verify_query = "SELECT count(*) FROM structured_content WHERE status_neo4j = false;"
    verify_df = execute_postgres_query_improved(verify_query, postgres_config)
    print(f"\nNumber of articles with status_neo4j = false: {verify_df.iloc[0,0]}")
    
except Exception as e:
    print(f"Error executing reset query: {e}")



Executing reset query: UPDATE structured_content SET status_neo4j = false;
--------------------------------------------------
Connected to PostgreSQL database: SERP_TREND_DEV_DB
Query executed successfully. 112 rows affected.
Database connection closed.

Successfully reset status_neo4j to false for all articles
Connected to PostgreSQL database: SERP_TREND_DEV_DB


  df = pd.read_sql_query(query, connection)


Query executed successfully. Retrieved 1 rows.
Database connection closed.

Number of articles with status_neo4j = false: 112


In [None]:
# Improved PostgreSQL query function that handles both SELECT and UPDATE/INSERT/DELETE queries
def execute_postgres_query_improved(query: str, config: Dict[str, str]) -> pd.DataFrame:
    """
    Execute a PostgreSQL query and return results as a pandas DataFrame.
    
    Args:
        query: SQL query to execute
        config: PostgreSQL connection configuration
        
    Returns:
        pandas DataFrame containing query results (empty DataFrame for UPDATE/INSERT/DELETE)
    """
    connection = None
    try:
        # Establish connection
        connection = psycopg2.connect(
            host=config['host'],
            port=config['port'],
            database=config['database'],
            user=config['user'],
            password=config['password']
        )
        
        print(f"Connected to PostgreSQL database: {config['database']}")
        
        # Check if query is a SELECT statement
        query_upper = query.strip().upper()
        if query_upper.startswith('SELECT'):
            # Execute SELECT query and return as DataFrame
            df = pd.read_sql_query(query, connection)
            print(f"Query executed successfully. Retrieved {len(df)} rows.")
            return df
        else:
            # Execute UPDATE/INSERT/DELETE query
            cursor = connection.cursor()
            cursor.execute(query)
            rows_affected = cursor.rowcount
            connection.commit()
            cursor.close()
            print(f"Query executed successfully. {rows_affected} rows affected.")
            return pd.DataFrame()  # Return empty DataFrame for non-SELECT queries
        
    except psycopg2.Error as e:
        print(f"PostgreSQL error: {e}")
        raise
    except Exception as e:
        print(f"Unexpected error: {e}")
        raise
    finally:
        if connection:
            connection.close()
            print("Database connection closed.")

In [None]:
# Improved PostgreSQL query function that handles both SELECT and UPDATE/INSERT/DELETE queries
def execute_postgres_query_improved(query: str, config: Dict[str, str]) -> pd.DataFrame:
    """
    Execute a PostgreSQL query and return results as a pandas DataFrame.
    
    Args:
        query: SQL query to execute
        config: PostgreSQL connection configuration
        
    Returns:
        pandas DataFrame containing query results (empty DataFrame for UPDATE/INSERT/DELETE)
    """
    connection = None
    try:
        # Establish connection
        connection = psycopg2.connect(
            host=config['host'],
            port=config['port'],
            database=config['database'],
            user=config['user'],
            password=config['password']
        )
        
        print(f"Connected to PostgreSQL database: {config['database']}")
        
        # Check if query is a SELECT statement
        query_upper = query.strip().upper()
        if query_upper.startswith('SELECT'):
            # Execute SELECT query and return as DataFrame
            df = pd.read_sql_query(query, connection)
            print(f"Query executed successfully. Retrieved {len(df)} rows.")
            return df
        else:
            # Execute UPDATE/INSERT/DELETE query
            cursor = connection.cursor()
            cursor.execute(query)
            rows_affected = cursor.rowcount
            connection.commit()
            cursor.close()
            print(f"Query executed successfully. {rows_affected} rows affected.")
            return pd.DataFrame()  # Return empty DataFrame for non-SELECT queries
        
    except psycopg2.Error as e:
        print(f"PostgreSQL error: {e}")
        raise
    except Exception as e:
        print(f"Unexpected error: {e}")
        raise
    finally:
        if connection:
            connection.close()
            print("Database connection closed.")

In [None]:
# Improved PostgreSQL query function that handles both SELECT and UPDATE/INSERT/DELETE queries
def execute_postgres_query_improved(query: str, config: Dict[str, str]) -> pd.DataFrame:
    """
    Execute a PostgreSQL query and return results as a pandas DataFrame.
    
    Args:
        query: SQL query to execute
        config: PostgreSQL connection configuration
        
    Returns:
        pandas DataFrame containing query results (empty DataFrame for UPDATE/INSERT/DELETE)
    """
    connection = None
    try:
        # Establish connection
        connection = psycopg2.connect(
            host=config['host'],
            port=config['port'],
            database=config['database'],
            user=config['user'],
            password=config['password']
        )
        
        print(f"Connected to PostgreSQL database: {config['database']}")
        
        # Check if query is a SELECT statement
        query_upper = query.strip().upper()
        if query_upper.startswith('SELECT'):
            # Execute SELECT query and return as DataFrame
            df = pd.read_sql_query(query, connection)
            print(f"Query executed successfully. Retrieved {len(df)} rows.")
            return df
        else:
            # Execute UPDATE/INSERT/DELETE query
            cursor = connection.cursor()
            cursor.execute(query)
            rows_affected = cursor.rowcount
            connection.commit()
            cursor.close()
            print(f"Query executed successfully. {rows_affected} rows affected.")
            return pd.DataFrame()  # Return empty DataFrame for non-SELECT queries
        
    except psycopg2.Error as e:
        print(f"PostgreSQL error: {e}")
        raise
    except Exception as e:
        print(f"Unexpected error: {e}")
        raise
    finally:
        if connection:
            connection.close()
            print("Database connection closed.")

In [None]:
#code to reset status_neo4j to false for all articles

In [None]:
result_df = execute_postgres_query_improved("SELECT distinct author FROM structured_content;",postgres_config)
result_df

Connected to PostgreSQL database: SERP_TREND_DEV_DB


  df = pd.read_sql_query(query, connection)


Query executed successfully. Retrieved 27 rows.
Database connection closed.


Unnamed: 0,author
0,
1,Cyrus Bamji
2,"Ryan K. Snover, Investment Adviser Representative"
3,Aisha Malik
4,"Afshan Musani,Meredith Mutter"
5,Rae Bennett
6,Rebecca Bellan
7,Reece Rogers
8,mshepard
9,Julie Bort


In [None]:
# how many articles id
result_df = execute_postgres_query_improved("SELECT count(distinct id) FROM structured_content;",postgres_config)
result_df

Connected to PostgreSQL database: SERP_TREND_DEV_DB


  df = pd.read_sql_query(query, connection)


Query executed successfully. Retrieved 1 rows.
Database connection closed.


Unnamed: 0,count
0,112


In [None]:
# how many articles id
result_df = execute_postgres_query_improved("SELECT count(distinct url) FROM structured_content;",postgres_config)
result_df


Connected to PostgreSQL database: SERP_TREND_DEV_DB


  df = pd.read_sql_query(query, connection)


Query executed successfully. Retrieved 1 rows.
Database connection closed.


Unnamed: 0,count
0,112


In [None]:
result_df = execute_postgres_query_improved("SELECT count(*) FROM structured_content WHERE author IS NULL;",postgres_config)
result_df

Connected to PostgreSQL database: SERP_TREND_DEV_DB


  df = pd.read_sql_query(query, connection)


Query executed successfully. Retrieved 1 rows.
Database connection closed.


Unnamed: 0,count
0,81


## Custom Query Execution

Use the cell below to execute your own custom queries. Simply modify the `custom_query` variable and run the cell.

In [None]:
# Define your custom query here
custom_query = """
SELECT 
    COUNT(*) as total_records,
    COUNT(DISTINCT url) as unique_ids
FROM structured_content;
"""

print(f"Executing custom query: {custom_query.strip()}")
print("-" * 50)

try:
    custom_result = execute_postgres_query_improved(custom_query, postgres_config)
    
    print("\nCustom Query Results:")
    display(custom_result)
    
except Exception as e:
    print(f"Error executing custom query: {e}")

Executing custom query: SELECT 
    COUNT(*) as total_records,
    COUNT(DISTINCT url) as unique_ids
FROM structured_content;
--------------------------------------------------
Connected to PostgreSQL database: SERP_TREND_DEV_DB


  df = pd.read_sql_query(query, connection)


Query executed successfully. Retrieved 1 rows.
Database connection closed.

Custom Query Results:


Unnamed: 0,total_records,unique_ids
0,112,112


## Data Analysis Examples

Here are some example analyses you can perform with the data:

In [None]:
# Example: Get table schema information
schema_query = """
SELECT 
    column_name,
    data_type,
    is_nullable,
    column_default
FROM information_schema.columns 
WHERE table_name = 'structured_content'
ORDER BY ordinal_position;
"""

try:
    schema_df = execute_postgres_query_improved(schema_query, postgres_config)
    print("\nTable Schema:")
    display(schema_df)
except Exception as e:
    print(f"Error getting schema: {e}")

In [None]:
# Example: Basic data exploration
exploration_query = """
SELECT 
    COUNT(*) as total_rows,
    COUNT(DISTINCT id) as unique_ids,
    MIN(created_at) as earliest_date,
    MAX(created_at) as latest_date
FROM structured_content;
"""

try:
    exploration_df = execute_postgres_query_improved(exploration_query, postgres_config)
    print("\nData Exploration:")
    display(exploration_df)
except Exception as e:
    print(f"Error in data exploration: {e}")

## Alternative Installation Methods

If the automatic installation above doesn't work, try these alternative methods:

In [None]:
# Alternative 1: Install from requirements.txt
# Run this cell if you want to install all project dependencies
import subprocess
import sys

try:
    result = subprocess.run([sys.executable, "-m", "pip", "install", "-r", "../requirements.txt"], 
                          capture_output=True, text=True, check=True)
    print("✅ All requirements installed successfully")
    print(result.stdout)
except subprocess.CalledProcessError as e:
    print(f"❌ Error installing requirements: {e}")
    print(f"Error output: {e.stderr}")

In [None]:
# Alternative 2: Manual conda installation (if using conda environment)
# Uncomment and run if you're using conda

# import subprocess
# import sys

# try:
#     result = subprocess.run(["conda", "install", "-c", "conda-forge", "psycopg2", "-y"], 
#                           capture_output=True, text=True, check=True)
#     print("✅ psycopg2 installed via conda successfully")
#     print(result.stdout)
# except subprocess.CalledProcessError as e:
#     print(f"❌ Error installing via conda: {e}")
#     print(f"Error output: {e.stderr}")

## Quick Reference

### Main Function
```python
# Execute any PostgreSQL query and get results as DataFrame
df = execute_postgres_query_improved("YOUR_SQL_QUERY_HERE", postgres_config)
```

### Configuration
- Configuration is loaded from `../config/config_boomer_load.yml`
- PostgreSQL connection parameters are automatically parsed
- Connection is automatically closed after each query

### Error Handling
- All database errors are caught and displayed
- Connection is always properly closed
- Detailed error messages help with troubleshooting

### Installation Troubleshooting
If you encounter issues with psycopg2 installation:

1. **Try psycopg2-binary**: This is usually easier to install
2. **Check your Python environment**: Make sure you're in the correct virtual environment
3. **System dependencies**: On some systems, you may need to install PostgreSQL development headers
4. **Use conda**: If using Anaconda/Miniconda, try installing via conda instead of pip

### Command Line Installation
If the notebook installation fails, try these commands in your terminal:

```bash
# Using pip
pip install psycopg2-binary==2.9.7

# Using conda
conda install -c conda-forge psycopg2

# Install all project requirements
pip install -r requirements.txt
```