In [1]:
# Cell 1: Setup and Dependencies
import os
from dotenv import load_dotenv
import weaviate
from weaviate.classes.config import Configure, Property, DataType
from weaviate.classes.query import MetadataQuery
import pymupdf  # PyMuPDF
from PIL import Image
import base64
from io import BytesIO
from pathlib import Path
import re
from typing import List, Dict, Optional
from weaviate_agents.query import QueryAgent
from openai import OpenAI

load_dotenv()

print("Environment loaded")
print(f"Weaviate URL: {os.getenv('WEAVIATE_URL')}")
print(f"OpenAI Key: {'✓' if os.getenv('OPENAI_API_KEY') else '✗'}")
print(f"Cohere Key: {'✓' if os.getenv('COHERE_KEY') else '✗'}")



Environment loaded
Weaviate URL: abqxreqq0yaic9hlwfgfg.c0.us-west3.gcp.weaviate.cloud
OpenAI Key: ✓
Cohere Key: ✓


In [2]:
# Cell 2: Connect to Weaviate
client = weaviate.connect_to_weaviate_cloud(
    cluster_url=os.getenv("WEAVIATE_URL"),
    auth_credentials=weaviate.auth.Auth.api_key(os.getenv("WEAVIATE_API_KEY")),
    headers={
        "X-OpenAI-Api-Key": os.getenv("OPENAI_API_KEY"),
        "X-Cohere-Api-Key": os.getenv("COHERE_KEY")
    }
)

assert client.is_ready(), "Weaviate not ready"
print("✓ Connected to Weaviate")

✓ Connected to Weaviate


In [3]:
# Cell 3: Create Collection with Named Vectors
from weaviate.classes.config import Configure, Property, DataType

def create_engineering_collection(collection_name="Cook_Engineering_Manual"):
    """
    Create collection with ONLY text vectorization.
    Images stored as metadata, not vectorized.
    """
    # Delete if exists
    if client.collections.exists(collection_name):
        client.collections.delete(collection_name)
        print(f"Deleted existing collection: {collection_name}")
    
    # Create with text-only vectorization using Weaviate Embeddings
    collection = client.collections.create(
        name=collection_name,
        vector_config=[
            Configure.Vectors.text2vec_weaviate(
                name="text_vector",
                source_properties=["content", "section", "visual_description"],
                model="Snowflake/snowflake-arctic-embed-l-v2.0"
            )
        ],
        properties=[
            Property(name="content", data_type=DataType.TEXT,
                    description="Text content of the chunk"),
            Property(name="section", data_type=DataType.TEXT,
                    description="Section title (e.g., 'Wind Forces', 'Fan Laws')"),
            Property(name="page", data_type=DataType.INT,
                    description="Source page number"),
            Property(name="content_type", data_type=DataType.TEXT,
                    description="Type: 'text', 'table', 'chart', 'map', 'formula', 'mixed'"),
            Property(name="has_critical_visual", data_type=DataType.BOOL,
                    description="True if contains important visual information"),
            Property(name="visual_content", data_type=DataType.BLOB,
                    description="Base64 encoded image - NOT VECTORIZED, just stored"),
            Property(name="visual_description", data_type=DataType.TEXT,
                    description="Text description of visual - IS VECTORIZED for search"),
            Property(name="source_file", data_type=DataType.TEXT,
                    description="Source PDF filename")
        ]
    )
    
    print(f"✓ Created collection: {collection_name} (text vectorization only)")
    return collection

# Create the collection
create_engineering_collection()

✓ Created collection: Cook_Engineering_Manual (text vectorization only)


<weaviate.collections.collection.sync.Collection at 0x11214e080>

In [4]:
# Cell 4: PDF Extraction Functions
def detect_content_type(text: str) -> str:
    """Detect if chunk contains table, chart, formula, etc."""
    text_lower = text.lower()
    
    # Check for maps
    if any(word in text_lower for word in ['map', 'zone', 'seismic', 'wind forces']):
        return "map"
    
    # Check for tables
    if 'table' in text_lower or len(re.findall(r'\n\s*[-+|]', text)) > 3:
        return "table"
    
    # Check for charts/diagrams
    if any(word in text_lower for word in ['figure', 'chart', 'diagram', 'graph']):
        return "chart"
    
    # Check for formulas
    if '=' in text and any(c.isdigit() for c in text) and len(text) < 500:
        return "formula"
    
    # Check for mixed content
    if len(text) > 1000 and ('table' in text_lower or 'figure' in text_lower):
        return "mixed"
    
    return "text"

def should_include_visual(text: str, content_type: str) -> bool:
    """Determine if this chunk needs its visual content preserved."""
    # Always include for maps, charts, tables
    if content_type in ["map", "chart", "table", "diagram"]:
        return True
    
    # Include for formulas with complex layout
    if content_type == "formula" and len(text) > 200:
        return True
    
    # Include for mixed content
    if content_type == "mixed":
        return True
    
    return False

def generate_visual_description(text: str, content_type: str) -> str:
    """Generate searchable description of visual content."""
    text_lower = text.lower()
    
    descriptions = []
    
    # Add content type
    descriptions.append(f"{content_type}")
    
    # Extract key terms
    if "wind" in text_lower and "zone" in text_lower:
        descriptions.append("wind zone map United States")
    if "seismic" in text_lower:
        descriptions.append("seismic zone map earthquake risk")
    if "friction" in text_lower and "loss" in text_lower:
        descriptions.append("friction loss chart duct")
    if "motor" in text_lower and ("efficiency" in text_lower or "torque" in text_lower):
        descriptions.append("motor performance specifications")
    if "fan" in text_lower and ("curve" in text_lower or "performance" in text_lower):
        descriptions.append("fan performance curve")
    
    return " ".join(descriptions) if descriptions else content_type

In [5]:
# Cell 5: Extract Chunks with Images
def extract_chunks_with_images(pdf_path: str, output_dir="extracted_images"):
    """
    Extract semantic chunks from PDF with associated images.
    Chunks by section, preserving visual content where critical.
    """
    Path(output_dir).mkdir(exist_ok=True)
    
    doc = pymupdf.open(pdf_path)
    source_file = Path(pdf_path).name
    chunks = []
    
    print(f"Processing {len(doc)} pages from {source_file}...")
    
    for page_num, page in enumerate(doc):
        print(f"  Page {page_num + 1}/{len(doc)}", end='\r')
        
        # Extract text
        text = page.get_text()
        
        # Skip if page is mostly empty
        if len(text.strip()) < 50:
            continue
        
        # Detect content type
        content_type = detect_content_type(text)
        
        # Extract section title (first substantial line)
        lines = [l.strip() for l in text.split('\n') if l.strip()]
        section = lines[0] if lines else f"Page {page_num + 1}"
        
        # Check if we need to preserve visual
        needs_visual = should_include_visual(text, content_type)
        
        visual_content = None
        visual_description = None
        
        if needs_visual:
            # Render page as image
            mat = pymupdf.Matrix(2.0, 2.0)  # 2x scale for better quality
            pix = page.get_pixmap(matrix=mat)
            img_bytes = pix.tobytes("png")
            visual_content = base64.b64encode(img_bytes).decode('utf-8')
            visual_description = generate_visual_description(text, content_type)
            
            # Optionally save to disk for debugging
            img_path = Path(output_dir) / f"page_{page_num + 1}.png"
            with open(img_path, 'wb') as f:
                f.write(img_bytes)
        
        # Create chunk
        chunk = {
            "content": text,
            "section": section[:200],  # Limit section title length
            "page": page_num + 1,
            "page_context": f"From {source_file}, page {page_num + 1}",
            "content_type": content_type,
            "has_critical_visual": needs_visual,
            "visual_content": visual_content,
            "visual_description": visual_description,
            "source_file": source_file
        }
        
        chunks.append(chunk)
    
    doc.close()
    print(f"\n✓ Extracted {len(chunks)} chunks from {source_file}")
    return chunks

# Test extraction
pdf_path = "Cookbook_Catalog.pdf"
chunks = extract_chunks_with_images(pdf_path)

# Show sample
print("\nSample chunk:")
sample = chunks[50]  # Wind forces page
print(f"Section: {sample['section']}")
print(f"Content type: {sample['content_type']}")
print(f"Has visual: {sample['has_critical_visual']}")
if sample['has_critical_visual']:
    print(f"Visual description: {sample['visual_description']}")

Processing 150 pages from Cookbook_Catalog.pdf...
  Page 150/150
✓ Extracted 142 chunks from Cookbook_Catalog.pdf

Sample chunk:
Section: 46
Content type: text
Has visual: False


In [6]:
# Cell 6: Upload to Weaviate (Simplified)
def upload_chunks_to_weaviate(chunks: List[Dict], collection_name="Cook_Engineering_Manual"):
    """Upload chunks to Weaviate - simple batch upload."""
    collection = client.collections.get(collection_name)
    
    chunks_with_images = sum(1 for c in chunks if c.get("has_critical_visual"))
    print(f"Uploading {len(chunks)} chunks...")
    print(f"  - {chunks_with_images} with images")
    print(f"  - {len(chunks) - chunks_with_images} text-only")
    
    with collection.batch.dynamic() as batch:
        for chunk in chunks:
            batch.add_object(properties=chunk)
    
    print(f"✓ Uploaded {len(chunks)} chunks to {collection_name}")

# Upload
upload_chunks_to_weaviate(chunks)

# Verify
collection = client.collections.get("Cook_Engineering_Manual")
result = collection.aggregate.over_all(total_count=True)
print(f"\nTotal objects in collection: {result.total_count}")

Uploading 142 chunks...
  - 54 with images
  - 88 text-only
✓ Uploaded 142 chunks to Cook_Engineering_Manual

Total objects in collection: 142


In [7]:
# Cell 7: QueryAgent Retrieval Function
def query_with_agent(question: str, collection_name="Cook_Engineering_Manual"):
    """
    Use QueryAgent for intelligent retrieval.
    Now using single text vector only.
    """
    qa = QueryAgent(
        client=client,
        collections=[collection_name]
    )
    
    print(f"Question: {question}\n")
    response = qa.run(question)
    
    print(f"\n✓ QueryAgent found {len(response.sources)} relevant sources")
    print(f"✓ Executed {len(response.searches)} searches")
    
    return response

# Test query
test_response = query_with_agent("What are the wind force considerations for Missouri?")
print(f"\nQueryAgent answer preview: {test_response.final_answer[:200]}...")

Question: What are the wind force considerations for Missouri?



  response = qa.run(question)



✓ QueryAgent found 1 relevant sources
✓ Executed 1 searches

QueryAgent answer preview: For Missouri, the main wind force considerations revolve around the wind speeds and their effects on external equipment and building components. Missouri falls within a defined wind zone depicted in t...


In [8]:
# Cell 8: Fetch Full Objects with Images
def fetch_full_objects_with_images(sources: List, collection_name="Cook_Engineering_Manual"):
    """
    Fetch complete objects including images based on QueryAgent sources.
    """
    collection = client.collections.get(collection_name)
    
    full_objects = []
    
    print(f"Fetching {len(sources)} full objects...")
    
    for source in sources:
        try:
            obj = collection.query.fetch_object_by_id(
                source.object_id,
                return_properties=[
                    "content", "section", "page", "content_type",
                    "has_critical_visual", "visual_content", "visual_description"
                ]
            )
            full_objects.append(obj)
        except Exception as e:
            print(f"  Warning: Could not fetch {source.object_id}: {e}")
    
    print(f"✓ Retrieved {len(full_objects)} complete objects")
    
    # Count how many have images
    with_images = sum(1 for obj in full_objects if obj.properties.get("has_critical_visual"))
    print(f"  ({with_images} contain critical visual content)")
    
    return full_objects

# Test fetch
full_objects = fetch_full_objects_with_images(test_response.sources)

Fetching 1 full objects...
✓ Retrieved 1 complete objects
  (1 contain critical visual content)


In [9]:
# Cell 9: Vision Model Integration
def send_to_vision_model(
    question: str,
    agent_answer: str,
    full_objects: List,
    model: str = "gpt-4o"
):
    """
    Send QueryAgent results + images to GPT-4V for visual interpretation.
    """
    openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    
    # Build context with text from all objects
    text_context = []
    for obj in full_objects:
        props = obj.properties
        text_context.append(
            f"[{props['section']} - Page {props['page']}]\n{props['content'][:500]}..."
        )
    
    # Start building message
    message_content = [
        {
            "type": "text",
            "text": f"""You are a technical assistant helping with engineering specifications.

Question: {question}

Initial analysis from search system:
{agent_answer}

Additional context from relevant sections:
{chr(10).join(text_context)}

Please provide a comprehensive answer. If images are provided, carefully examine them for specific information like maps, charts, or diagrams that may contain data not in the text."""
        }
    ]
    
    # Add images if present
    images_added = 0
    for obj in full_objects:
        if obj.properties.get("has_critical_visual"):
            img_base64 = obj.properties["visual_content"]
            message_content.append({
                "type": "image_url",
                "image_url": {
                    "url": f"data:image/png;base64,{img_base64}",
                    "detail": "high"  # Use high detail for technical diagrams
                }
            })
            images_added += 1
    
    print(f"Sending to {model}:")
    print(f"  - Text context from {len(full_objects)} sources")
    print(f"  - {images_added} images attached")
    
    # Call vision model
    response = openai_client.chat.completions.create(
        model=model,
        messages=[
            {
                "role": "system",
                "content": "You are a technical assistant. When images are provided, examine them carefully for specific information like locations on maps, values in charts, or specifications in tables."
            },
            {
                "role": "user",
                "content": message_content
            }
        ],
        max_tokens=1500
    )
    
    return response.choices[0].message.content

# Test vision model integration
vision_answer = send_to_vision_model(
    question="What are the wind force considerations for Missouri?",
    agent_answer=test_response.final_answer,
    full_objects=full_objects
)

print("\n" + "="*80)
print("FINAL ANSWER FROM VISION MODEL:")
print("="*80)
print(vision_answer)

Sending to gpt-4o:
  - Text context from 1 sources
  - 1 images attached

FINAL ANSWER FROM VISION MODEL:
To address wind force considerations in Missouri, the following elements should be taken into account:

1. **Wind Speed Consideration:**
   - Missouri is located within a wind zone that requires careful attention to wind speeds. The map indicates that the state falls into zones with higher maximum wind speeds, particularly in the central and western regions.

2. **Building and Equipment Design:**
   - Structures in Missouri need to be designed to handle high wind forces. This includes ensuring that ventilation and other critical systems can function during wind events. Equipment mounted on exteriors must be rated according to wind suitability and duty.

3. **Anchoring and Security Measures:**
   - Local building codes often require tie downs, guy wires, and other anchoring techniques to secure external equipment and ensure structures are resistant to the stress of wind forces.

4. 

In [10]:
# Cell 10: Complete End-to-End Pipeline
def ask_question(question: str, use_vision: bool = True):
    """
    Complete pipeline: QueryAgent → Fetch Objects → Vision Model
    
    Args:
        question: User's question
        use_vision: If True, sends images to vision model. If False, returns QueryAgent answer.
    """
    print("="*80)
    print(f"QUERY: {question}")
    print("="*80 + "\n")
    
    # Step 1: QueryAgent retrieval
    print("Step 1: QueryAgent retrieval...")
    qa_response = query_with_agent(question)
    
    # Step 2: Fetch full objects
    print("\nStep 2: Fetching full objects with images...")
    full_objects = fetch_full_objects_with_images(qa_response.sources)
    
    # Step 3: Check if images present
    has_images = any(obj.properties.get("has_critical_visual") for obj in full_objects)
    
    if use_vision and has_images:
        print("\nStep 3: Sending to vision model (images detected)...")
        final_answer = send_to_vision_model(
            question=question,
            agent_answer=qa_response.final_answer,
            full_objects=full_objects
        )
    else:
        if use_vision and not has_images:
            print("\nStep 3: No images found, using QueryAgent answer...")
        else:
            print("\nStep 3: Vision disabled, using QueryAgent answer...")
        final_answer = qa_response.final_answer
    
    print("\n" + "="*80)
    print("FINAL ANSWER:")
    print("="*80)
    print(final_answer)
    print("="*80 + "\n")
    
    return {
        "question": question,
        "agent_answer": qa_response.final_answer,
        "final_answer": final_answer,
        "sources_count": len(qa_response.sources),
        "used_vision": use_vision and has_images
    }


In [11]:
# Cell 11: Test Queries

# Test 1: Visual query (needs map)
result1 = ask_question(
    "Is Springfield Missouri in a high wind zone according to HVAC standards?"
)

# Test 2: Text-only query (specifications)
result2 = ask_question(
    "What is the formula for calculating fan horsepower?"
)

# Test 3: Complex query needing both text and visuals
result3 = ask_question(
    "What are the friction loss values for a 2-inch round elbow with R/D ratio of 1, and can you show me the chart?"
)

# Summary
print("\n" + "="*80)
print("QUERY SUMMARY")
print("="*80)
for i, result in enumerate([result1, result2, result3], 1):
    print(f"\nQuery {i}: {result['question'][:60]}...")
    print(f"  Sources: {result['sources_count']}")
    print(f"  Used vision model: {result['used_vision']}")

QUERY: Is Springfield Missouri in a high wind zone according to HVAC standards?

Step 1: QueryAgent retrieval...
Question: Is Springfield Missouri in a high wind zone according to HVAC standards?



  response = qa.run(question)



✓ QueryAgent found 1 relevant sources
✓ Executed 1 searches

Step 2: Fetching full objects with images...
Fetching 1 full objects...
✓ Retrieved 1 complete objects
  (1 contain critical visual content)

Step 3: Sending to vision model (images detected)...
Sending to gpt-4o:
  - Text context from 1 sources
  - 1 images attached

FINAL ANSWER:
To determine if Springfield, Missouri is in a high wind zone according to HVAC standards, we can refer to the wind zone map provided.

The map categorizes regions into different wind zones based on their maximum wind speeds. Springfield, Missouri is located in the central part of the state, which, according to the map, appears to fall within the red zone.

The red zone indicates a region with higher wind speeds, typically requiring special considerations for HVAC equipment installation, such as enhanced anchoring, tie downs, or guy wires. Therefore, Springfield, Missouri is likely considered a high wind zone per HVAC standards.

For precise specif

In [12]:
# Cell 12: Cleanup
client.close()
# print("✓ Connection closed")