# NYC Landmarks Vector Database - Processing Status Analysis

This notebook analyzes the processing status of NYC landmark records in the Pinecone vector database. It determines which landmarks have already been processed (have vectors in Pinecone) and which landmarks still need processing.

## Objectives

1. Connect to CoreDataStore API and Pinecone database
2. Fetch all available landmark IDs from the CoreDataStore API
3. Check which landmarks already have vectors in Pinecone
4. Generate statistics and visualizations of processing status
5. Export a list of unprocessed landmarks for batch processing
6. Generate a GitHub Actions workflow configuration for remaining processing

## 1. Setup & Imports

First, we'll import the necessary libraries and set up the environment.

In [None]:
# Standard libraries
import json
import sys
import time
from datetime import datetime
from pathlib import Path

# Visualization libraries
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm.notebook import tqdm

# Add project directory to path
sys.path.append("..")

# Set visualization style
plt.style.use("seaborn-v0_8-whitegrid")
sns.set(style="whitegrid")
plt.rcParams["figure.figsize"] = (12, 8)

# Set random seed for reproducibility
np.random.seed(42)

In [None]:
# Configure logging
import logging

# Import project modules
from nyc_landmarks.config.settings import settings
from nyc_landmarks.db.db_client import get_db_client
from nyc_landmarks.vectordb.pinecone_db import PineconeDB

# Set up logger
logger = logging.getLogger()
logging.basicConfig(
    level=settings.LOG_LEVEL.value,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)

## 2. Connect to Databases

Next, we'll establish connections to both the CoreDataStore API and the Pinecone vector database.

In [None]:
# Initialize the database client for CoreDataStore API
db_client = get_db_client()
print("✅ Initialized CoreDataStore API client")

In [None]:
# Initialize the Pinecone database client
try:
    # Create PineconeDB instance
    pinecone_db = PineconeDB()

    # Check if the connection was successful
    if pinecone_db.index:
        print(f"✅ Successfully connected to Pinecone index: {pinecone_db.index_name}")
        print(f"Namespace: {pinecone_db.namespace}")
        print(f"Dimensions: {pinecone_db.dimensions}")
        print(f"Metric: {pinecone_db.metric}")
    else:
        print(
            "❌ Failed to connect to Pinecone. Check your credentials and network connection."
        )
except Exception as e:
    print(f"❌ Error initializing Pinecone: {e}")

In [None]:
# Get index statistics from Pinecone
try:
    stats = pinecone_db.get_index_stats()

    # Check for errors
    if "error" in stats:
        print(f"❌ Error retrieving index stats: {stats['error']}")
        # Create fallback mock stats for demonstration
        total_vector_count = 0
        namespaces = {}
    else:
        print("✅ Successfully retrieved index stats")
        total_vector_count = stats.get("total_vector_count", 0)
        namespaces = stats.get("namespaces", {})
except Exception as e:
    print(f"❌ Error retrieving index stats: {e}")
    # Create fallback mock stats for demonstration
    total_vector_count = 0
    namespaces = {}
    stats = {}

print("\n📊 Index Statistics:")
print(f"Total Vector Count: {total_vector_count:,}")
print(f"Dimension: {stats.get('dimension')}")
print(f"Index Fullness: {stats.get('index_fullness')}")

## 3. Fetch All Landmark IDs

Now we'll fetch all landmark IDs from the CoreDataStore API to determine the total universe of landmarks.

In [None]:
def fetch_all_landmark_ids(start_page=1, end_page=None, page_size=100, max_pages=500):
    """Fetch all landmark IDs from CoreDataStore API.

    Args:
        start_page: Starting page number (default: 1)
        end_page: Ending page number (default: None, fetch until no more results)
        page_size: Number of landmarks per page (default: 100)
        max_pages: Maximum number of pages to fetch (safety limit)

    Returns:
        Set of landmark IDs
    """
    all_landmark_ids = set()
    current_page = start_page
    total_pages_fetched = 0

    try:
        with tqdm(desc="Fetching landmark IDs", unit="page") as pbar:
            while True:
                # Check if we've reached the end page or max pages
                if (
                    end_page and current_page > end_page
                ) or total_pages_fetched >= max_pages:
                    break

                # Fetch landmarks for the current page
                try:
                    landmarks = db_client.get_landmarks_page(page_size, current_page)
                except Exception as e:
                    print(f"Error fetching page {current_page}: {e}")
                    # Try to continue with next page
                    current_page += 1
                    total_pages_fetched += 1
                    pbar.update(1)
                    continue

                # If no landmarks found, we've reached the end
                if not landmarks:
                    print(f"No landmarks found on page {current_page}, ending fetch")
                    break

                # Process the landmarks
                for landmark in landmarks:
                    landmark_id = landmark.get("id", "") or landmark.get("lpNumber", "")
                    if landmark_id:
                        all_landmark_ids.add(landmark_id)

                # Update progress
                pbar.set_postfix(
                    {
                        "page": current_page,
                        "landmarks": len(landmarks),
                        "total": len(all_landmark_ids),
                    }
                )
                pbar.update(1)

                # Move to next page
                current_page += 1
                total_pages_fetched += 1

                # Small delay to avoid rate limiting
                time.sleep(0.5)
    except Exception as e:
        print(f"Error fetching landmark IDs: {e}")

    print(
        f"Completed fetching {len(all_landmark_ids)} landmark IDs from {total_pages_fetched} pages"
    )
    return all_landmark_ids


# Configure fetch parameters
page_size = 100
start_page = 1
# Set a reasonable end_page based on expected data volume - adjust if needed
# Leave as None to fetch all available landmarks
end_page = 10  # Example limit for testing - set to None for full dataset

In [None]:
# Fetch all landmark IDs
start_time = time.time()
all_landmark_ids = fetch_all_landmark_ids(
    start_page=start_page, end_page=end_page, page_size=page_size
)
elapsed_time = time.time() - start_time

print(
    f"Fetched {len(all_landmark_ids)} unique landmark IDs in {elapsed_time:.2f} seconds"
)

## 4. Check Processing Status in Pinecone

Now we'll check which landmarks already have vectors in Pinecone.

In [None]:
def check_landmark_processing_status(pinecone_db, landmark_ids, batch_size=10, top_k=1):
    """Check which landmarks have vectors in Pinecone.

    Args:
        pinecone_db: PineconeDB instance
        landmark_ids: Set of landmark IDs to check
        batch_size: Number of landmarks to check in parallel batches
        top_k: Number of vectors to retrieve per landmark (1 is sufficient to check existence)

    Returns:
        processed_landmarks: Set of landmark IDs that have vectors
        unprocessed_landmarks: Set of landmark IDs that don't have vectors
    """
    # Generate a random query vector for searching
    random_vector = np.random.rand(pinecone_db.dimensions).tolist()

    processed_landmarks = set()
    unprocessed_landmarks = set()

    # Convert set to list for iteration with tqdm
    landmark_ids_list = list(landmark_ids)

    with tqdm(total=len(landmark_ids_list), desc="Checking processing status") as pbar:
        for i in range(0, len(landmark_ids_list), batch_size):
            # Get the current batch
            batch = landmark_ids_list[i : i + batch_size]

            # Check each landmark in the batch
            for landmark_id in batch:
                # Query Pinecone for vectors with this landmark_id
                filter_dict = {"landmark_id": landmark_id}
                try:
                    # We only need to know if vectors exist, so top_k=1 is sufficient
                    vectors = pinecone_db.query_vectors(
                        query_vector=random_vector, top_k=top_k, filter_dict=filter_dict
                    )

                    # If vectors found, mark as processed, otherwise unprocessed
                    if vectors:
                        processed_landmarks.add(landmark_id)
                    else:
                        unprocessed_landmarks.add(landmark_id)
                except Exception as e:
                    print(f"Error checking landmark {landmark_id}: {e}")
                    # If we can't check, assume unprocessed to be safe
                    unprocessed_landmarks.add(landmark_id)

            # Update progress
            pbar.update(len(batch))
            pbar.set_postfix(
                {
                    "processed": len(processed_landmarks),
                    "unprocessed": len(unprocessed_landmarks),
                }
            )

            # Small delay to avoid rate limiting
            time.sleep(0.2)

    return processed_landmarks, unprocessed_landmarks

In [None]:
# Check processing status for all landmarks
start_time = time.time()
processed_landmarks, unprocessed_landmarks = check_landmark_processing_status(
    pinecone_db=pinecone_db,
    landmark_ids=all_landmark_ids,
    batch_size=10,  # Adjust based on API rate limits
    top_k=1,
)
elapsed_time = time.time() - start_time

print(f"\nProcessing status check completed in {elapsed_time:.2f} seconds")
print(f"Total landmarks: {len(all_landmark_ids)}")
print(
    f"Processed landmarks: {len(processed_landmarks)} ({len(processed_landmarks)/len(all_landmark_ids)*100:.2f}%)"
)
print(
    f"Unprocessed landmarks: {len(unprocessed_landmarks)} ({len(unprocessed_landmarks)/len(all_landmark_ids)*100:.2f}%)"
)

## 5. Analysis and Visualizations

Now we'll analyze the processing status and create visualizations.

In [None]:
# Skip DataFrame creation entirely and use Python native data structures
# First convert to Python lists to ensure we're working with primitive types
landmark_ids_list = sorted(list(all_landmark_ids))
processed_landmarks_set = processed_landmarks
unprocessed_landmarks_set = unprocessed_landmarks

# Count the number of processed and unprocessed landmarks
total_landmarks = len(landmark_ids_list)
processed_count = len(processed_landmarks_set)
unprocessed_count = len(unprocessed_landmarks_set)

# Calculate percentages
processed_percentage = (processed_count / total_landmarks) * 100
unprocessed_percentage = (unprocessed_count / total_landmarks) * 100

# Create a simple dictionary to display the statistics
stats_dict = {
    "Status": ["Processed", "Unprocessed"],
    "Count": [processed_count, unprocessed_count],
    "Percentage": [processed_percentage, unprocessed_percentage],
}

# Display the statistics
print("Processing Statistics:")
print(f"Total landmarks: {total_landmarks}")
print(f"Processed landmarks: {processed_count} ({processed_percentage:.2f}%)")
print(f"Unprocessed landmarks: {unprocessed_count} ({unprocessed_percentage:.2f}%)")

# Display sample of processed and unprocessed landmarks
print("\nSample of processed landmarks:")
for lid in list(processed_landmarks_set)[:5]:
    print(f"  - {lid}")

print("\nSample of unprocessed landmarks:")
for lid in list(unprocessed_landmarks_set)[:5]:
    print(f"  - {lid}")

In [None]:
import matplotlib.pyplot as plt

# Create visualization of processing status using our native Python data structures
import numpy as np

# Data for plotting
status_labels = ["Processed", "Unprocessed"]
status_counts = [len(processed_landmarks), len(unprocessed_landmarks)]
status_percentages = [
    len(processed_landmarks) / len(all_landmark_ids) * 100,
    len(unprocessed_landmarks) / len(all_landmark_ids) * 100,
]

# Create a bar chart
plt.figure(figsize=(10, 6))
bars = plt.bar(status_labels, status_counts, color=["#4CAF50", "#F44336"])

# Add count labels on bars
for i, bar in enumerate(bars):
    height = bar.get_height()
    plt.text(
        bar.get_x() + bar.get_width() / 2.0,
        height / 2,
        f"{status_counts[i]} ({status_percentages[i]:.1f}%)",
        color="white",
        ha="center",
        va="center",
        fontweight="bold",
        fontsize=12,
    )

# Customize plot
plt.title("NYC Landmarks Processing Status", fontsize=16)
plt.xlabel("Status", fontsize=14)
plt.ylabel("Number of Landmarks", fontsize=14)
plt.ylim(0, len(all_landmark_ids) * 1.1)  # Add some space above bars
plt.grid(axis="y", alpha=0.3)
plt.tight_layout()
plt.show()

In [None]:
# Create a pie chart of processing status using native Python data structures
plt.figure(figsize=(8, 8))
plt.pie(
    status_counts,
    labels=status_labels,
    autopct="%1.1f%%",
    colors=["#4CAF50", "#F44336"],
    explode=[0.05, 0],  # Slightly explode the 'Processed' slice
    startangle=90,
    shadow=True,
    textprops={"fontsize": 14},
)
plt.title("NYC Landmarks Processing Status", fontsize=16)
plt.axis("equal")  # Equal aspect ratio ensures that pie is circular
plt.tight_layout()
plt.show()

## 6. Export Results for Batch Processing

Now we'll export the list of unprocessed landmarks for batch processing.

In [None]:
# Create and configure output directory
output_dir = Path("../data/processing_status")
output_dir.mkdir(exist_ok=True, parents=True)

# Generate timestamp for filenames
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

In [None]:
# Prepare report data
report = {
    "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    "total_landmarks": len(all_landmark_ids),
    "processed_count": len(processed_landmarks),
    "unprocessed_count": len(unprocessed_landmarks),
    "processed_percentage": len(processed_landmarks) / len(all_landmark_ids) * 100,
    "unprocessed_percentage": len(unprocessed_landmarks) / len(all_landmark_ids) * 100,
    "processed_landmarks": sorted(list(processed_landmarks)),
    "unprocessed_landmarks": sorted(list(unprocessed_landmarks)),
}

# Save full report
report_file = output_dir / f"landmark_processing_report_{timestamp}.json"
with open(report_file, "w") as f:
    json.dump(report, f, indent=2)
print(f"Saved processing report to {report_file}")

# Save just the unprocessed landmarks list for easier use
unprocessed_file = output_dir / f"unprocessed_landmarks_{timestamp}.json"
with open(unprocessed_file, "w") as f:
    json.dump(
        {"unprocessed_landmarks": sorted(list(unprocessed_landmarks))}, f, indent=2
    )
print(f"Saved unprocessed landmarks list to {unprocessed_file}")

In [None]:
# Generate a GitHub Actions workflow configuration
# This can be used to process the remaining landmarks

# Optimize batch size and worker count based on total remaining landmarks
unprocessed_count = len(unprocessed_landmarks)
recommended_batch_size = min(10, max(1, unprocessed_count // 100))
recommended_workers = min(8, max(2, unprocessed_count // 500))

workflow_config = {
    "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    "total_unprocessed_landmarks": unprocessed_count,
    "api_page_size": 100,  # Standard page size for API
    "job_batch_size": recommended_batch_size,
    "parallel_workers": recommended_workers,
    "recreate_index": False,  # Don't recreate index since we already have processed landmarks
    "specific_landmarks": sorted(list(unprocessed_landmarks)),
}

# Save the workflow configuration
workflow_file = output_dir / f"workflow_config_{timestamp}.json"
with open(workflow_file, "w") as f:
    json.dump(workflow_config, f, indent=2)
print(f"Saved GitHub Actions workflow configuration to {workflow_file}")

## 7. Summary and Next Steps

This notebook has analyzed the processing status of NYC landmark records in the Pinecone vector database. Here's a summary of the findings:

1. **Total Landmarks**: We found a total of [TOTAL_COUNT] landmarks in the CoreDataStore API.
2. **Processing Status**:
   - [PROCESSED_COUNT] landmarks have already been processed ([PROCESSED_PERCENTAGE]%)
   - [UNPROCESSED_COUNT] landmarks still need processing ([UNPROCESSED_PERCENTAGE]%)
3. **Exported Files**:
   - Full processing report: [REPORT_FILE]
   - Unprocessed landmarks list: [UNPROCESSED_FILE]
   - GitHub Actions workflow configuration: [WORKFLOW_FILE]

### Next Steps

To process all remaining landmarks:

1. Run the GitHub Actions workflow using the generated configuration:
   - Upload the workflow config file to the GitHub repository
   - Trigger the workflow with the appropriate parameters

2. Monitor the processing progress:
   - Track GitHub Actions execution
   - Run this notebook again after processing to verify completion

3. Verify results:
   - Run verification tests to confirm all landmarks were processed correctly
   - Check metadata consistency across the database
   - Verify vector counts match expected totals

## Troubleshooting Notes

If you encounter any issues with the DataFrame creation in the analysis section, make sure:

1. The landmarks data is properly converted from sets to lists before processing
2. Avoid using direct list comprehension methods that might trigger NumPy type conversion issues
3. Consider creating the DataFrame in steps if you encounter type conversion errors