# Semantic Search with Weaviate's Query Agent, LlamaIndex and dltHub

##Pre-requisites
For this workshop you will need:

* A (free) Weaviate Cloud (WCD) account
* A cluster set up in WCD
* The REST endpoint for your cluster
* Your cluster Admin API key
* An OpenAI API key

In this workshop we will create a Retrieval Augmented Generation system leveraging Weaviate's Query Agent, LlamaIndex and dltHub.


We are utilizing LlamaIndex to transform full-text PDF research articles into manageable, structured text chunks enhanced with metadata and section detection logic. As an added feature, dlt can be used in the chunking process to create a simple pipeline that automatically tracks the processing history of each PDF (success/failure, timestamps, chunk counts) in a local database, giving you a queryable audit trail and the ability to resume failed runs - essentially adding data pipeline monitoring and reliability features that would otherwise require manual implementation. These chunks are then uploaded into a Weaviate vector database to support semantic search over our collection of space medicine literature using natural languge leveraging Weaviate's query agent.

### Link to slide deck
https://docs.google.com/presentation/d/1QIYfhHk3OguKGOs2GH7f2yjvneHXo1ObtXqptkV0ur8/edit?usp=sharing

### Link to repo and instructions for copllecting REST endpoint and API keys
https://github.com/saskinosie/Hack-Night-at-GitHub

### Link to G-Drive folder with research articles
 https://drive.google.com/drive/folders/18iu8lGJ0SEZcISkUqc20pecGrb61Mo7s?usp=drive_link

In [1]:
# Installs
!pip install llama-index pymupdf weaviate-client weaviate-agents dlt -q


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.1/24.1 MB[0m [31m72.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m437.0/437.0 kB[0m [31m24.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.5/43.5 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m926.0/926.0 kB[0m [31m12.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m223.8/223.8 kB[0m [31m15.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m42.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.7/7.7 MB[0m [31m36.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.8/40.8 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
# Importing major libraries
import fitz  # PyMuPDF
import json
import re
import requests
from llama_index.core import Document
from llama_index.core.node_parser import HierarchicalNodeParser
import weaviate

In [3]:
# Creating a variable that houses the links for our Space Medicine articles from Google Drive
gdrive_links = [
    "https://drive.google.com/file/d/1bNX5nZTif8roMK1bFaJmHF6wxapi5YDg/view?usp=sharing",
    "https://drive.google.com/file/d/1FZkvMOyTP_-kSIyx9VewaV9tP_XwpZXK/view?usp=drive_link",
    "https://drive.google.com/file/d/1jfcCLHmAazvs7DnAhd3jS0LOb7qOctMc/view?usp=drive_link",
    "https://drive.google.com/file/d/1K8D6VOe2aAX6tIfJWzF2-9zWbqH0C_wp/view?usp=drive_link",
    "https://drive.google.com/file/d/12ee59tcUcxotC1NFfz0EaLNWAAqakKDk/view?usp=drive_link",
    "https://drive.google.com/file/d/115LBMKIobYRdqKWqL2uR1VWoPS5zqoZq/view?usp=drive_link",
    "https://drive.google.com/file/d/1CcjaMYUIQNJ2S4nFGHIh0hpkeG158-Ag/view?usp=drive_link",
    "https://drive.google.com/file/d/1eR6rTQcYw_q4Lob2JFB2_cnND9U2VivA/view?usp=drive_link",
    "https://drive.google.com/file/d/1Gw9UQGNIcDTLpCaamYm4WoeYqUVlKsAG/view?usp=drive_link",
    "https://drive.google.com/file/d/1E961JtImN2eis_IxK5EZS3JaqhXkyhK8/view?usp=drive_link",
    "https://drive.google.com/file/d/1G5xQ10Ijjhrnm_Uq_hl2OgXNaKERfB3G/view?usp=drive_link",
    "https://drive.google.com/file/d/1u-nmLQIvBdcRomCo__yvoCCVNHV3AiKg/view?usp=drive_link",
    "https://drive.google.com/file/d/1cfF_cRkvfaTw5BTpiMxalw0Bc-IdBBnO/view?usp=drive_link",
    "https://drive.google.com/file/d/1WSMqabWY4pElGrQjVVkkaJ8NNEuP6T10/view?usp=drive_link",
    "https://drive.google.com/file/d/11bCbFObW-51XE0lMS3sz5-VYvWsxctRm/view?usp=drive_link",
    "https://drive.google.com/file/d/13AFfg8doORRytR3IXfTOmL1vpp6hsCuG/view?usp=drive_link",
    "https://drive.google.com/file/d/1k8QYuAsyzkMTJA-KzPrOX2BToKmzSr3l/view?usp=drive_link",
    "https://drive.google.com/file/d/1HQJ-WPOXE_CT4bVmuZ4LaZ5GKK8MIgXB/view?usp=sharing",
    "https://drive.google.com/file/d/1sqeJmEmP-3eTTFgTbxTSmS69kiH-K46H/view?usp=sharing"


]

In [4]:
# PDF wrangling function and chunking setup. This will get our pdfs in text format, split text into scetions based on article
# organization and chunk using LlamaIndex

def download_google_drive_pdf(share_url, output_folder="downloads"):
    os.makedirs(output_folder, exist_ok=True)
    file_id_match = re.search(r"/d/([^/]+)", share_url)
    if not file_id_match:
        raise ValueError(f"Invalid Google Drive URL: {share_url}")
    file_id = file_id_match.group(1)
    download_url = f"https://drive.google.com/uc?export=download&id={file_id}"
    response = requests.get(download_url)
    pdf_path = os.path.join(output_folder, f"{file_id}.pdf")
    with open(pdf_path, "wb") as f:
        f.write(response.content)
    return pdf_path

def extract_text(filepath):
    doc = fitz.open(filepath)
    return "\n".join(page.get_text("text") for page in doc)

def extract_title(text):
    candidate_block = text[:1000]
    lines = [line.strip() for line in candidate_block.split("\n") if line.strip()]
    for i, line in enumerate(lines):
        if line.lower() != line and len(line.split()) > 5 and not line.endswith(":") and i < 5:
            return line
    return "Unknown Title"

def slugify(text):
    text = text.lower()
    text = re.sub(r"[^\w\s-]", "", text)
    text = re.sub(r"\s+", "-", text)
    return text.strip("-")

# Search for section headings, if no Intro will assume first section is Intro
def detect_section(text_chunk, chunk_index=0):
    lowered = text_chunk.lower()
    if "introduction" in lowered[:150] or chunk_index == 0:
        return "Introduction"
    elif "methods" in lowered[:150] or "materials and methods" in lowered[:150]:
        return "Methods"
    elif "results" in lowered[:150]:
        return "Results"
    elif "discussion" in lowered[:150]:
        return "Discussion"
    elif "conclusion" in lowered[:150]:
        return "Conclusion"
    else:
        return "Unknown"

def chunk_for_weaviate(text, title=None):
    from llama_index.core.node_parser import HierarchicalNodeParser
    from llama_index.core.text_splitter import SentenceSplitter

    if not title:
        title = extract_title(text)
    slug = slugify(title)
    document = Document(text=text, metadata={"title": title, "slug": slug})

    # Create hierarchical parser directly without parameters
    parser = HierarchicalNodeParser.from_defaults()

    # Configure it after creation if needed
    nodes = parser.get_nodes_from_documents([document])

    # Further chunk the nodes if they're too large using SentenceSplitter
    text_splitter = SentenceSplitter(chunk_size=512, chunk_overlap=50)
    # Will create chunk size of 512 characters with 50 character overlap
    smaller_nodes = []
    for node in nodes:
        split_texts = text_splitter.split_text(node.text)
        for i, split_text in enumerate(split_texts):
          smaller_nodes.append({
              "text": split_text,
              "metadata": {
                  "title": title,
                  "slug": slug,
                  "section": detect_section(split_text, chunk_index=i)
                  }
              })


    return smaller_nodes



##Option 1 chunking without DLT pipeline

In [6]:
# Option 1 - Chunking Function, chunks documents directly without dlt

def process_gdrive_links(gdrive_links, output_path="demo_chunks.json"):
    all_chunks = []
    for link in gdrive_links:
        print(f"Processing: {link}")
        try:
            pdf_path = download_google_drive_pdf(link)
            text = extract_text(pdf_path)
            chunks = chunk_for_weaviate(text)
            all_chunks.extend(chunks)
        except Exception as e:
            print(f"❌ Failed to process {link}: {e}")

    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(all_chunks, f, ensure_ascii=False, indent=2)
    print(f"\n✅ Saved {len(all_chunks)} chunks to {output_path}")

# Chunk PDFs
import os
process_gdrive_links(gdrive_links, output_path="demo_chunks.json")

Processing: https://drive.google.com/file/d/1bNX5nZTif8roMK1bFaJmHF6wxapi5YDg/view?usp=sharing
Processing: https://drive.google.com/file/d/1FZkvMOyTP_-kSIyx9VewaV9tP_XwpZXK/view?usp=drive_link
Processing: https://drive.google.com/file/d/1jfcCLHmAazvs7DnAhd3jS0LOb7qOctMc/view?usp=drive_link
Processing: https://drive.google.com/file/d/1K8D6VOe2aAX6tIfJWzF2-9zWbqH0C_wp/view?usp=drive_link
Processing: https://drive.google.com/file/d/12ee59tcUcxotC1NFfz0EaLNWAAqakKDk/view?usp=drive_link
Processing: https://drive.google.com/file/d/115LBMKIobYRdqKWqL2uR1VWoPS5zqoZq/view?usp=drive_link
Processing: https://drive.google.com/file/d/1CcjaMYUIQNJ2S4nFGHIh0hpkeG158-Ag/view?usp=drive_link
Processing: https://drive.google.com/file/d/1eR6rTQcYw_q4Lob2JFB2_cnND9U2VivA/view?usp=drive_link
Processing: https://drive.google.com/file/d/1Gw9UQGNIcDTLpCaamYm4WoeYqUVlKsAG/view?usp=drive_link
Processing: https://drive.google.com/file/d/1E961JtImN2eis_IxK5EZS3JaqhXkyhK8/view?usp=drive_link
Processing: https://dri

##Option 2 chunking with DLT pipeline

In [25]:
# Option 2 - Use dlt for better pipeling management including tracking and monitoring
# The output will be more verbose than you will need for this exercise, but is
# very useful for error handling in production!

import dlt
from datetime import datetime

def process_gdrive_links_with_dlt(gdrive_links, output_path="demo_chunks.json"):
    # Create a simple dlt pipeline for tracking
    pipeline = dlt.pipeline(
        pipeline_name="space_med_tracker",
        destination="duckdb"
    )

    @dlt.resource
    def track_processing():
        all_chunks = []
        for link in gdrive_links:
            print(f"Processing: {link}")
            try:
                pdf_path = download_google_drive_pdf(link)
                text = extract_text(pdf_path)
                chunks = chunk_for_weaviate(text)
                all_chunks.extend(chunks)

                # Yield tracking info
                yield {
                    "link": link,
                    "status": "success",
                    "chunks_created": len(chunks),
                    "timestamp": datetime.now().isoformat()
                }
            except Exception as e:
                print(f"❌ Failed to process {link}: {e}")
                yield {
                    "link": link,
                    "status": "failed",
                    "error": str(e),
                    "timestamp": datetime.now().isoformat()
                }

        # Save chunks as before
        with open(output_path, "w", encoding="utf-8") as f:
            json.dump(all_chunks, f, ensure_ascii=False, indent=2)
        print(f"\n✅ Saved {len(all_chunks)} chunks to {output_path}")

    # Run pipeline
    info = pipeline.run(track_processing())

    # Just print the info object - it has what we need
    print(f"\n🔍 DLT Pipeline completed!")
    print(f"✅ Load info: {info}")

    return info

# Chunk PDFs
import os
process_gdrive_links_with_dlt(gdrive_links, output_path="demo_chunks.json")

Processing: https://drive.google.com/file/d/1bNX5nZTif8roMK1bFaJmHF6wxapi5YDg/view?usp=sharing
Processing: https://drive.google.com/file/d/1FZkvMOyTP_-kSIyx9VewaV9tP_XwpZXK/view?usp=drive_link
Processing: https://drive.google.com/file/d/1jfcCLHmAazvs7DnAhd3jS0LOb7qOctMc/view?usp=drive_link
Processing: https://drive.google.com/file/d/1K8D6VOe2aAX6tIfJWzF2-9zWbqH0C_wp/view?usp=drive_link
Processing: https://drive.google.com/file/d/12ee59tcUcxotC1NFfz0EaLNWAAqakKDk/view?usp=drive_link
Processing: https://drive.google.com/file/d/115LBMKIobYRdqKWqL2uR1VWoPS5zqoZq/view?usp=drive_link
Processing: https://drive.google.com/file/d/1CcjaMYUIQNJ2S4nFGHIh0hpkeG158-Ag/view?usp=drive_link
Processing: https://drive.google.com/file/d/1eR6rTQcYw_q4Lob2JFB2_cnND9U2VivA/view?usp=drive_link
Processing: https://drive.google.com/file/d/1Gw9UQGNIcDTLpCaamYm4WoeYqUVlKsAG/view?usp=drive_link
Processing: https://drive.google.com/file/d/1E961JtImN2eis_IxK5EZS3JaqhXkyhK8/view?usp=drive_link
Processing: https://dri

LoadInfo(pipeline=<dlt.pipeline.pipeline.Pipeline object at 0x7cd4c8115f10>, metrics={'1748279691.7295551': [{'started_at': DateTime(2025, 5, 26, 17, 15, 59, 752929, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2025, 5, 26, 17, 15, 59, 884001, tzinfo=Timezone('UTC')), 'job_metrics': {'track_processing.d09cae1e83.insert_values': LoadJobMetrics(job_id='track_processing.d09cae1e83.insert_values', file_path='/var/dlt/pipelines/space_med_tracker/load/normalized/1748279691.7295551/started_jobs/track_processing.d09cae1e83.0.insert_values', table_name='track_processing', started_at=DateTime(2025, 5, 26, 17, 15, 59, 812949, tzinfo=Timezone('UTC')), finished_at=DateTime(2025, 5, 26, 17, 15, 59, 833779, tzinfo=Timezone('UTC')), state='completed', remote_url=None)}}]}, destination_type='dlt.destinations.duckdb', destination_displayable_credentials='duckdb:////content/space_med_tracker.duckdb', destination_name='duckdb', environment=None, staging_type=None, staging_name=None, staging_displayabl

In [7]:
# Setting up Weaviate keys. Be sure keys are stored in Colab secrets (<--- key icon in tray on left hand side)
from google.colab import userdata

WEAVIATE_URL = userdata.get("WEAVIATE_URL")
WEAVIATE_API_KEY = userdata.get("WEAVIATE_API_KEY")
OPENAI_API_KEY = userdata.get("OPENAI_API_KEY")

print("Weaviate URL:", WEAVIATE_URL)
print("Weaviate API Key:", WEAVIATE_API_KEY)


Weaviate URL: https://rwxzavyuspepzg2fkhjag.c0.us-west3.gcp.weaviate.cloud
Weaviate API Key: IwgDlvGkKqzCylsFoLh6wAuPgYqY9bvyp7yR


In [8]:
#Establish connection to Weaviate Client
client = weaviate.connect_to_weaviate_cloud(
    WEAVIATE_URL,
    auth_credentials=weaviate.AuthApiKey(WEAVIATE_API_KEY),
    headers={
        "X-OpenAI-Api-Key": OPENAI_API_KEY
    }
)

In [9]:
# This will inform us if keys are not present
assert client.is_ready(), "Weaviate client is not ready. Check credentials and endpoint."


In [10]:
# This will confirm our client is ready
client.is_ready()

True

In [26]:
# Creating our collection in our Weaviate cluster
from weaviate.classes.config import Configure

if client.collections.exists("SpaceMedResearch"):
    client.collections.delete("SpaceMedResearch")

client.collections.create(
    name = "SpaceMedResearch",
    vectorizer_config= [
            Configure.NamedVectors.text2vec_weaviate(
                name="main_vector",
                model="Snowflake/snowflake-arctic-embed-l-v2.0",
                source_properties=["title", "content"],
            )
        ],
    )


<weaviate.collections.collection.sync.Collection at 0x7cd4c82ca310>

In [27]:
# Function to batch upload to Weaviate (including UUIDs)
def bulk_upload_space_chunks_to_weaviate(json_file_path, collection_name="SpaceMedResearch"):
    import weaviate
    from google.colab import userdata
    from weaviate.util import generate_uuid5

    docs_collection = client.collections.get(collection_name)

    with open(json_file_path, "r", encoding="utf-8") as f:
        chunks = json.load(f)

    successful_uploads = 0

    with docs_collection.batch.fixed_size(batch_size=100, concurrent_requests=2) as batch:
        for i, chunk in enumerate(chunks):
            text = chunk.get("text", "")
            metadata = chunk.get("metadata", {})

            # Create a unique ID by combining title with chunk index and first 20 chars of text
            unique_id = f"{metadata.get('title', 'unknown')}-chunk-{i}-{text[:20]}"
            uid = generate_uuid5(unique_id)

            batch.add_object(
                properties={
                    "content": text,
                    "title": metadata.get("title", "unknown"),
                    "slug": metadata.get("slug", "unknown"),
                    "section": metadata.get("section", "unknown")
                },
                uuid=uid
            )
            successful_uploads += 1

            # Progress indicator
            if i % 500 == 0 and i > 0:
                print(f"Progress: {i}/{len(chunks)} chunks processed")

            if batch.number_errors > 10:
                print("❌ Too many errors during batch import — stopping early.")
                break

    # Verify the actual count in the collection
    collection_count = docs_collection.aggregate.over_all().total_count
    print(f"✅ Uploaded {successful_uploads} chunks to Weaviate from {json_file_path}")
    print(f"✅ Collection now contains {collection_count} objects")

In [28]:
# Batch upload
bulk_upload_space_chunks_to_weaviate("/content/demo_chunks.json")

Progress: 500/5070 chunks processed
Progress: 1000/5070 chunks processed
Progress: 1500/5070 chunks processed
Progress: 2000/5070 chunks processed
Progress: 2500/5070 chunks processed
Progress: 3000/5070 chunks processed
Progress: 3500/5070 chunks processed
Progress: 4000/5070 chunks processed
Progress: 4500/5070 chunks processed
Progress: 5000/5070 chunks processed
✅ Uploaded 5070 chunks to Weaviate from /content/demo_chunks.json
✅ Collection now contains 5070 objects


In [29]:
from weaviate.classes.init import Auth
# Importing from weaviate-agents
from weaviate_agents.query import QueryAgent

# Instantiate agent object, and specify the collections to query
qa = QueryAgent(
    client=client, collections=["SpaceMedResearch"]
)

In [30]:
# Perform a query
response = qa.run(
    "What are the greatest health concerns facing astronauts during their time in space and upon their return to earth?"
)
# Print the response
response.display()





In [20]:
# Perform a query
response = qa.run(
    "What are the greatest health concerns facing astronauts during their time in space and upon their return to earth?"
)
# Print the response
response.display()





In [None]:
# Perform a query
response = qa.run(
    "What is tyeh best cookie recipe in the world?"
)
# Print the response
response.display()





In [None]:
# Perform a query
response = qa.run(
    "What stress is common in pilots?"
)
# Print the response
response.display()





In [None]:
# Perform a query for articles with contradictory information
response = qa.run(
    "Do all astronauts who engage in long-duration spaceflight develop structural changes in the eyes (Spaceflight-Associated Neuro-ocular Syndrome, or SANS)?"
)
# Print the response
response.display()





In [None]:
# Perform a query for articles with contradictory information
response = qa.run(
    "What are the proposed mechanisms behind SANS, and do researchers agree on whether fluid shifts and intracranial pressure are the primary causes?"
)
# Print the response
response.display()



