# Get To Know A Dataset: search.wilsonl.in Web Search Index Crawl + Text Embeddings

This notebook demonstrates how to use the search engine dataset from AWS Open Data, containing 280 million web pages indexed with 3 billion neural embeddings.

**Repository:** https://github.com/wilsonzlin/datasets/search-engine-open-data/

## What This Covers

- Representing natural language queries as vectors
- Finding similar content using approximate nearest neighbor search (HNSW)
- Working with sharded databases for scalability
- Pinpointing the most relevant section within a page

## About This Dataset

A complete web search engine built from scratch:

- **280 million web pages** crawled and parsed
- **3 billion embeddings** generated using the sentence-transformers/multi-qa-mpnet-base-dot-v1 model
- **Sharded architecture** designed to scale across hundreds of cores and terabytes of storage
- **Dual-level indexing** for both page-level and sentence-level semantic search

Read more about the original project: https://blog.wilsonl.in/search-engine

**GitHub:** https://github.com/wilsonzlin/datasets/search-engine-open-data/

## Dataset Structure

The dataset provides multiple ways to access the data:

1. **Original RocksDB** (64 shards) - The native format with full data
2. **Parquet export** - For analytics and bulk processing
3. **Postcard binary** - Compact serialization for efficient loading
4. **HNSW indices** - Both sharded (64) and combined (1) versions

### Bucket Directory Tree

```
./rocksdb-shards/
  shard-0/ ... shard-63/
./kg-dbpedia/
  vectors.hnsw
  vectors.sqlite3
  article-abstracts.sqlite3
./kg-wikidata/
  rocksdb/
./resource-id/
  rocksdb/
./hnsw-combined/
  statement.hnsw
  block-mean.hnsw
./export/
  data.parquet
  data-postcard/
    uids.bin
    offsets.bin
    data.bin
  statement_uid_base_to_resource_uid.arrow
  statement_embeddings_msgpack.bin
  block_embeddings_msgpack.bin
  norm_doc_json_brotli.bin
  statements_json_brotli.bin
  source_brotli.bin
  statement_labels_msgpack.bin
  urls.txt
  data.sql
./hnsw-shards/
  block-mean/shard-0/ ... shard-63/index.hnsw
  statement/shard-0/ ... shard-63/index.hnsw
```

## Prerequisites and Setup

First, let's install the necessary packages. We'll need:

- **sentence-transformers** - To generate embeddings from text. This includes PyTorch and the transformer model.
- **hnswlib** - Fast approximate nearest neighbor search using hierarchical navigable small world graphs.
- **msgpack** - Efficient binary serialization format used throughout the dataset.
- **pyarrow** - Apache Arrow and Parquet file reading for columnar data access.
- **xxhash** - Fast non-cryptographic hash function, used for consistent sharding.
- **brotli** - Compression algorithm used for text content (better compression than gzip).
- **numpy** - Numerical operations on embeddings and vectors.
- **python-rocksdb** - Python bindings for RocksDB, a high-performance embedded key-value store.
- **duckdb** - In-process SQL OLAP database for Parquet analysis.
- **pandas** - (Optional) For Parquet analysis examples.

In [None]:
!pip install sentence-transformers hnswlib msgpack pyarrow xxhash brotli numpy python-rocksdb duckdb pandas

## Mounting the Dataset

The dataset is hosted on AWS S3 as part of the Open Data program. We'll mount it directly using `mount-s3`, which provides efficient read access without downloading everything locally.

**Note:** Make sure you have `mount-s3` installed. On Amazon Linux 2023 or Ubuntu, you can install it with:

```bash
# Amazon Linux 2023
sudo yum install mount-s3

# Ubuntu
wget https://s3.amazonaws.com/mountpoint-s3-release/latest/x86_64/mount-s3.deb
sudo apt install ./mount-s3.deb
```

In [None]:
import os

# Create mount point
MOUNT_POINT = "/mnt/search-engine"
!mkdir -p {MOUNT_POINT}

# Mount the S3 bucket (read-only)
!mount-s3 aws-opendata.wilsonl.in {MOUNT_POINT} --region us-east-1 --read-only

# Set the data root
DATA_ROOT = f"{MOUNT_POINT}/search-engine"

# Verify the mount succeeded
if not os.path.exists(DATA_ROOT):
    raise RuntimeError(f"Failed to mount dataset. Check that mount-s3 is installed and working.")

print(f"Dataset successfully mounted at: {DATA_ROOT}")
print("Directory structure:")
for item in sorted(os.listdir(DATA_ROOT)):
    item_path = os.path.join(DATA_ROOT, item)
    if os.path.isdir(item_path):
        print(f"  {item}/")
    else:
        print(f"  {item}")

## Understanding the Data Architecture

Before we dive into the code, let's understand how the data is organized.

### Resource IDs and URLs

Each web page (called a "resource") has:
- A unique ID (`uid`) - a 64-bit integer
- A normalized URL - the primary key
- A `statement_uid_base` - the starting UID for statements (sentences) in that page

The `resource-id` service maintains bidirectional mappings:
```
URL → Resource { uid, fetch_id, statement_uid_base }
UID → URL
Statement UID Base → URL
```

### RocksDB Sharding

The data is split across 64 RocksDB shards using consistent hashing (XXH3). Each key has a prefix byte indicating its type, followed by the URL:

**Key format:** `[prefix_byte][url]`

| Prefix | Key Type | Value Format | Description |
|--------|----------|--------------|-------------|
| `0x01` | `Resource` | MessagePack | Basic metadata (title, state, HTTP status, etc.) |
| `0x02` | `ResourceLinks` | MessagePack | Outbound links from this page |
| `0x03` | `ResourceMeta` | MessagePack | OpenGraph and meta tags (og:description, etc.) |
| `0x04` | `ResourceNormDoc` | JSON + Brotli | Normalized HTML structure |
| `0x05` | `ResourceSource` | Brotli | Original HTML source |
| `0x06` | `ResourceStatements` | JSON + Brotli | Sentence chunks with context |
| `0x07` | `ResourceBlockEmbeddings` | MessagePack | Block-level embeddings (768-dim vectors) |
| `0x08` | `ResourceStatementLabels` | MessagePack | Classification labels |
| `0x09` | `ResourceStatementEmbeddings` | MessagePack | Statement-level embeddings (768-dim vectors) |

### HNSW Vector Indices

Two types of embeddings are indexed:
- **block-mean**: Average of statements in each semantic block (paragraph-sized chunks)
- **statement**: Individual sentence embeddings

**Sharding history:** The HNSW indices were originally sharded into 64 pieces to fit within available RAM on individual machines and allow parallel querying across nodes. Each shard was built independently using consistent hashing to distribute vectors uniformly. Later, when exporting the dataset for public release, combined single-file indices were created for simplicity, though they require substantially more RAM to load.

**Sizes:**
- Sharded block-mean: 960 GB total (~15 GB per shard)
- Sharded statement: 633 GB total (~10 GB per shard)
- Combined block-mean: 1.1 TB
- Combined statement: 767 GB

## Part 1: Loading the Embedding Model

Load the embedding model used to create all 3 billion embeddings in the dataset: `multi-qa-mpnet-base-dot-v1`.

This model:
- Produces 768-dimensional vectors
- Is optimized for semantic search and question answering
- Uses dot product (inner product) as the similarity metric

In [None]:
from sentence_transformers import SentenceTransformer
import numpy as np

print("Loading embedding model...")
print("This will download the model on first run (~420MB).")

model = SentenceTransformer('sentence-transformers/multi-qa-mpnet-base-dot-v1')
EMBEDDING_DIM = 768

print(f"Model loaded successfully. Embedding dimension: {EMBEDDING_DIM}")

# Test it with a simple query
test_query = "What is semantic search?"
test_embedding = model.encode(test_query, normalize_embeddings=True)
print(f"Test query: '{test_query}'")
print(f"Embedding shape: {test_embedding.shape}")
print(f"First 5 dimensions: {test_embedding[:5]}")

## Part 2: Loading Sharded HNSW Indices

Now we'll load the HNSW (Hierarchical Navigable Small World) indices. For production use, the dataset uses 64 shards that can be queried in parallel.

We'll load all 64 shards of the block-mean index, which represents page-level semantic embeddings. Each shard contains approximately 15 GB of vectors.

**Memory requirement:** ~960 GB total for all block-mean shards.

In [None]:
import hnswlib
from concurrent.futures import ThreadPoolExecutor

SHARD_COUNT = 64

def load_hnsw_index(path: str, dim: int = EMBEDDING_DIM) -> hnswlib.Index:
    """
    Load an HNSW index from disk.
    
    Args:
        path: Path to the .hnsw file
        dim: Embedding dimensionality (default: 768)
    
    Returns:
        Loaded HNSW index ready for queries
    """
    index = hnswlib.Index(space='ip', dim=dim)  # 'ip' = inner product
    index.load_index(path)
    return index

print("Loading 64 shards of block-mean HNSW indices...")
print("This will load approximately 960 GB into memory.")
print("Progress:")

block_mean_shards = []
for i in range(SHARD_COUNT):
    shard_path = f"{DATA_ROOT}/hnsw-shards/block-mean/shard-{i}/index.hnsw"
    index = load_hnsw_index(shard_path)
    block_mean_shards.append(index)
    
    if (i + 1) % 16 == 0:
        print(f"  Loaded shards 0-{i} ({i+1}/{SHARD_COUNT})")

total_vectors = sum(shard.get_current_count() for shard in block_mean_shards)
print(f"All block-mean shards loaded: {total_vectors:,} total vectors")

# Initialize thread pool for parallel queries
query_executor = ThreadPoolExecutor(max_workers=64)
print("Thread pool initialized for parallel queries.")

In [None]:
# Load statement shards (sentence-level embeddings)
print("Loading 64 shards of statement HNSW indices...")
print("This will load approximately 633 GB into memory.")
print("Progress:")

statement_shards = []
for i in range(SHARD_COUNT):
    shard_path = f"{DATA_ROOT}/hnsw-shards/statement/shard-{i}/index.hnsw"
    index = load_hnsw_index(shard_path)
    statement_shards.append(index)
    
    if (i + 1) % 16 == 0:
        print(f"  Loaded shards 0-{i} ({i+1}/{SHARD_COUNT})")

total_statements = sum(shard.get_current_count() for shard in statement_shards)
print(f"All statement shards loaded: {total_statements:,} total vectors")
print(f"Total memory used by indices: ~1.6 TB")

## Part 3: Querying Sharded HNSW Indices

With sharded indices, query all shards in parallel and merge the results.

**Important distinction:**
- Block-mean index: IDs are resource UIDs directly
- Statement index: IDs are statement UIDs, which must be mapped to resource UIDs via statement_uid_base

In [None]:
def query_sharded_hnsw(shards: list, query_vec: np.ndarray, k: int, executor: ThreadPoolExecutor) -> list:
    """
    Query all HNSW shards in parallel and return top k results.
    
    Args:
        shards: List of HNSW index shards
        query_vec: Query embedding vector
        k: Number of results to return
        executor: ThreadPoolExecutor for parallel queries
    
    Returns:
        List of (uid, distance) tuples sorted by distance
    """
    def query_shard(shard):
        labels, distances = shard.knn_query(query_vec, k=k)
        return list(zip(labels[0], distances[0]))
    
    # Query all shards in parallel
    results = list(executor.map(query_shard, shards))
    
    # Merge results
    all_results = []
    for shard_results in results:
        all_results.extend(shard_results)
    
    # Sort by distance and take top k
    all_results.sort(key=lambda x: x[1])
    return all_results[:k]

# Test it
test_query_vec = model.encode("distributed systems", normalize_embeddings=True).astype('float32')
test_results = query_sharded_hnsw(block_mean_shards, test_query_vec, k=10, executor=query_executor)

print("Test query: 'distributed systems'")
print(f"Found {len(test_results)} results")
print("Top 3 results:")
for i, (uid, distance) in enumerate(test_results[:3], 1):
    similarity = 1 - distance
    print(f"  {i}. UID {uid}: similarity {similarity:.4f}")

## Part 4: Working with RocksDB Shards

Now let's set up access to the RocksDB shards using the raw Python bindings.

### Sharding Logic

To find which shard contains a key, we use XXH3 hash:
```python
shard_number = xxh3_64(key) % 64
```

In [None]:
import rocksdb
import xxhash

# Key prefixes matching RocksDbKey enum
class KeyPrefix:
    RESOURCE = 1
    RESOURCE_LINKS = 2
    RESOURCE_META = 3
    RESOURCE_NORM_DOC = 4
    RESOURCE_SOURCE = 5
    RESOURCE_STATEMENTS = 6
    RESOURCE_BLOCK_EMBEDDINGS = 7
    RESOURCE_STATEMENT_LABELS = 8
    RESOURCE_STATEMENT_EMBEDDINGS = 9

def build_key(prefix: int, url: str) -> bytes:
    """Build a RocksDB key: [prefix_byte][url]"""
    return bytes([prefix]) + url.encode('utf-8')

def get_shard_for_key(key: bytes) -> int:
    """Determine which shard a key belongs to using XXH3 hash."""
    return xxhash.xxh64(key).intdigest() % SHARD_COUNT

print("Key building functions defined.")
print("Example key for URL 'https://example.com':")
example_key = build_key(KeyPrefix.RESOURCE, "https://example.com")
print(f"  Raw bytes: {example_key[:20]}...")
print(f"  Shard: {get_shard_for_key(example_key)}")

In [None]:
# Configure RocksDB options matching the original implementation
print("Opening RocksDB shards...")
print("This will open 64 read-only database connections.")

opts = rocksdb.Options()
opts.create_if_missing = False
opts.max_open_files = 300

# BlobDB settings for large values
opts.enable_blob_files = True
opts.min_blob_size = 0

# Cache and performance settings
block_cache = rocksdb.LRUCache(512 * 1024 * 1024)  # 512 MB cache
block_opts = rocksdb.BlockBasedTableFactory(
    block_cache=block_cache,
    filter_policy=rocksdb.BloomFilterPolicy(10),
    block_size=16 * 1024,
)
opts.table_factory = block_opts

rocksdb_shards = []
for i in range(SHARD_COUNT):
    shard_path = f"{DATA_ROOT}/rocksdb-shards/shard-{i}"
    db = rocksdb.DB(shard_path, opts, read_only=True)
    rocksdb_shards.append(db)
    
    if (i + 1) % 16 == 0:
        print(f"  Opened shards 0-{i}")

print(f"All {len(rocksdb_shards)} shards opened successfully.")

## Part 5: The Resource ID Service

The resource ID service provides mappings between URLs and numeric IDs. This is crucial for mapping statement UIDs back to their parent resources.

In [None]:
import msgpack
import bisect

# Open the resource-id RocksDB
resource_id_db_path = f"{DATA_ROOT}/resource-id/rocksdb"
resource_id_opts = rocksdb.Options()
resource_id_opts.create_if_missing = False
resource_id_db = rocksdb.DB(resource_id_db_path, resource_id_opts, read_only=True)

print(f"Resource ID database opened: {resource_id_db_path}")

# Key prefixes for resource-id service
class ResourceIdKeyPrefix:
    RESOURCE_URL = 1   # URL -> Resource
    RESOURCE_UID = 2   # UID -> URL  
    STATEMENT_UID_BASE = 3  # Statement UID Base -> URL

def k_resource_url(url: str) -> bytes:
    """Build key to lookup Resource by URL."""
    return bytes([ResourceIdKeyPrefix.RESOURCE_URL]) + url.encode('utf-8')

def k_resource_uid(uid: int) -> bytes:
    """Build key to lookup URL by resource UID (big-endian for ordering)."""
    return bytes([ResourceIdKeyPrefix.RESOURCE_UID]) + uid.to_bytes(8, byteorder='big')

def k_statement_uid_base(uid: int) -> bytes:
    """Build key to lookup URL by statement UID base (big-endian for ordering)."""
    return bytes([ResourceIdKeyPrefix.STATEMENT_UID_BASE]) + uid.to_bytes(8, byteorder='big')

def get_resource_by_url(url: str) -> dict:
    """
    Look up resource metadata by URL.
    Returns: {uid: int, fetch_id: int, statement_uid_base: Optional[int]}
    """
    key = k_resource_url(url)
    value = resource_id_db.get(key)
    if value is None:
        return None
    return msgpack.unpackb(value, raw=False)

def get_url_by_uid(uid: int) -> str:
    """Look up URL by resource UID."""
    key = k_resource_uid(uid)
    value = resource_id_db.get(key)
    if value is None:
        return None
    return value.decode('utf-8')

def get_url_by_statement_uid(statement_uid: int) -> str:
    """
    Look up URL by statement UID.
    Uses RocksDB's seek to find the largest statement_uid_base <= statement_uid.
    """
    # Seek to the statement UID key
    seek_key = k_statement_uid_base(statement_uid)
    
    # Create iterator starting at this key
    it = resource_id_db.iterkeys()
    it.seek(seek_key)
    
    # Get current key or move to previous if we're past the target
    try:
        key = next(it)
        # Check if this key is for statement_uid_base
        if key[0] != ResourceIdKeyPrefix.STATEMENT_UID_BASE:
            # We're past all statement UIDs, go back
            it.seek(seek_key)
            it.prev()
            key = it.key()
        else:
            # Check if the key is exactly our target or less
            key_uid = int.from_bytes(key[1:], byteorder='big')
            if key_uid > statement_uid:
                # We need the previous key
                it.prev()
                key = it.key()
    except StopIteration:
        return None
    
    # Verify this is a statement_uid_base key
    if key[0] != ResourceIdKeyPrefix.STATEMENT_UID_BASE:
        return None
    
    value = resource_id_db.get(key)
    if value is None:
        return None
    return value.decode('utf-8')

print("Resource ID lookup functions defined.")

## Part 6: Data Access Functions

Helper functions to fetch different types of data from the RocksDB shards:

In [None]:
import brotli
import json

def get_from_rocksdb(url: str, key_prefix: int):
    """
    Fetch a value from the appropriate RocksDB shard.
    Returns raw bytes or None if not found.
    """
    key = build_key(key_prefix, url)
    shard_no = get_shard_for_key(key)
    return rocksdb_shards[shard_no].get(key)

def get_resource_metadata(url: str) -> dict:
    """Get resource metadata (title, state, HTTP status, etc.)."""
    data = get_from_rocksdb(url, KeyPrefix.RESOURCE)
    if data is None:
        return None
    return msgpack.unpackb(data, raw=False)

def get_resource_meta_tags(url: str) -> dict:
    """Get OpenGraph and meta tags."""
    data = get_from_rocksdb(url, KeyPrefix.RESOURCE_META)
    if data is None:
        return {}
    return msgpack.unpackb(data, raw=False)

def get_statements(url: str) -> dict:
    """Get parsed statements (sentence chunks) with context."""
    data = get_from_rocksdb(url, KeyPrefix.RESOURCE_STATEMENTS)
    if data is None:
        return None
    json_str = brotli.decompress(data).decode('utf-8')
    return json.loads(json_str)

def get_block_embeddings(url: str) -> dict:
    """Get block-level embeddings."""
    data = get_from_rocksdb(url, KeyPrefix.RESOURCE_BLOCK_EMBEDDINGS)
    if data is None:
        return None
    return msgpack.unpackb(data, raw=False)

def get_normalized_doc(url: str) -> dict:
    """Get normalized HTML structure."""
    data = get_from_rocksdb(url, KeyPrefix.RESOURCE_NORM_DOC)
    if data is None:
        return None
    json_str = brotli.decompress(data).decode('utf-8')
    return json.loads(json_str)

print("Data access functions defined.")

## Part 7: Semantic Search Pipeline

Complete semantic search implementation.

The workflow:
1. Embed the query
2. Search sharded HNSW indices for similar vectors
3. Map statement UIDs to resource UIDs (for statement results)
4. Fetch page data from RocksDB
5. Rank and return results

In [None]:
def semantic_search(query: str, k: int = 10):
    """
    Perform semantic search for a natural language query.
    
    Args:
        query: Natural language search query
        k: Number of results to return
    
    Returns:
        List of search results with metadata
    """
    print(f"Query: {query}")
    print("=" * 80)
    
    # Step 1: Embed the query
    print("[1/5] Generating query embedding...")
    query_vec = model.encode(query, normalize_embeddings=True).astype('float32')
    print(f"      Generated {len(query_vec)}-dimensional vector")
    
    # Step 2: Search sharded HNSW indices
    print("[2/5] Searching HNSW indices across 64 shards...")
    block_results = query_sharded_hnsw(block_mean_shards, query_vec, k=100, executor=query_executor)
    stmt_results = query_sharded_hnsw(statement_shards, query_vec, k=32, executor=query_executor)
    
    print(f"      Block-mean: {len(block_results)} candidates")
    print(f"      Statement: {len(stmt_results)} candidates")
    
    # Step 3: Map UIDs to resources
    print("[3/5] Mapping UIDs to resource URLs...")
    
    # Block results have resource UIDs directly
    results = {}
    for resource_uid, distance in block_results:
        if resource_uid not in results:
            results[resource_uid] = {
                'uid': int(resource_uid),
                'block_distance': float(distance),
                'block_similarity': float(1 - distance),
                'statement_distances': []
            }
    
    # Statement results have statement UIDs - need to map to resource UIDs
    for statement_uid, distance in stmt_results:
        url = get_url_by_statement_uid(int(statement_uid))
        if url:
            resource_info = get_resource_by_url(url)
            if resource_info:
                resource_uid = resource_info['uid']
                if resource_uid not in results:
                    results[resource_uid] = {
                        'uid': resource_uid,
                        'block_distance': float('inf'),
                        'block_similarity': 0.0,
                        'statement_distances': []
                    }
                results[resource_uid]['statement_distances'].append(float(distance))
    
    # Convert to list and sort by best distance
    results = list(results.values())
    results.sort(key=lambda r: min(r['block_distance'], 
                                   min(r['statement_distances']) if r['statement_distances'] else float('inf')))
    results = results[:k]
    
    # Map resource UIDs to URLs
    for result in results:
        url = get_url_by_uid(result['uid'])
        result['url'] = url
    
    results = [r for r in results if r['url'] is not None]
    print(f"      Resolved {len(results)} URLs")
    
    # Step 4: Fetch metadata
    print("[4/5] Fetching page metadata...")
    for result in results:
        metadata = get_resource_metadata(result['url'])
        if metadata:
            result['title'] = metadata.get('title')
            result['state'] = metadata.get('state')
        
        meta_tags = get_resource_meta_tags(result['url'])
        result['description'] = meta_tags.get('og:description', '')
    
    print("      Metadata fetched")
    
    # Step 5: Ranking complete
    print("[5/5] Ranking complete")
    print("=" * 80)
    
    return results

# Try it!
results = semantic_search("how does semantic search work with neural networks", k=5)

# Display results
for i, r in enumerate(results, 1):
    print(f"{i}. {r['title'] or '(No title)'}")
    print(f"   URL: {r['url']}")
    print(f"   Similarity: {r['block_similarity']:.4f}")
    if r['description']:
        desc = r['description'][:150]
        print(f"   {desc}...")
    print()

## Part 8: Finding the Most Relevant Section

Use block embeddings to pinpoint the exact section of a page that's most relevant to the query.

In [None]:
def find_relevant_section(url: str, query: str):
    """
    Find the most relevant section within a page for a query.
    """
    print(f"Analyzing page: {url}")
    print("=" * 80)
    
    # Get query embedding
    query_vec = model.encode(query, normalize_embeddings=True).astype('float32')
    
    # Get block embeddings
    print("Fetching block embeddings...")
    block_data = get_block_embeddings(url)
    if not block_data or not block_data.get('blocks'):
        print("  No block embeddings found for this page.")
        return
    
    blocks = block_data['blocks']
    print(f"  Found {len(blocks)} blocks")
    
    # Get statements (text chunks)
    print("Fetching statements...")
    statements_data = get_statements(url)
    if not statements_data:
        print("  No statements found for this page.")
        return
    
    statements = statements_data['statements']
    print(f"  Found {len(statements)} statements")
    
    # Find most similar block
    print("Computing similarity scores...")
    best_score = -float('inf')
    best_block_idx = 0
    
    for i, block in enumerate(blocks):
        embedding_bytes = block['embedding']
        embedding = np.frombuffer(embedding_bytes, dtype=np.float32)
        score = np.dot(query_vec, embedding)
        
        if score > best_score:
            best_score = score
            best_block_idx = i
    
    best_block = blocks[best_block_idx]
    start_idx = best_block['statement_start_index']
    
    print(f"  Most relevant block: #{best_block_idx}")
    print(f"  Similarity score: {best_score:.4f}")
    print(f"  Starts at statement {start_idx}")
    
    # Show the relevant text
    print("Most relevant section:")
    print("-" * 80)
    
    end_idx = min(start_idx + 5, len(statements))
    for i in range(start_idx, end_idx):
        text = statements[i]['text']
        print(f"{text}")
        print()
    
    print("-" * 80)

# Try it with the first result from our search
if results:
    find_relevant_section(results[0]['url'], "how does semantic search work with neural networks")

## Part 9: Analytics with Parquet + DuckDB

The dataset is also available in Parquet format for analytics and bulk processing. DuckDB provides a SQL interface for analyzing the data without loading it all into memory.

Analytical query examples:

In [None]:
import duckdb

# Connect to DuckDB
con = duckdb.connect(database=':memory:')

parquet_path = f"{DATA_ROOT}/export/data.parquet"
print(f"Using Parquet file: {parquet_path}")
print("DuckDB can query Parquet files directly without loading into memory.")
print()

In [None]:
# Example 1: Count pages by HTTP status
print("Pages by HTTP status code:")
print("=" * 80)
result = con.execute(f"""
    SELECT 
        http_status,
        COUNT(*) as count,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as percentage
    FROM read_parquet('{parquet_path}')
    WHERE http_status IS NOT NULL
    GROUP BY http_status
    ORDER BY count DESC
    LIMIT 10
""").fetchdf()
print(result)
print()

In [None]:
# Example 2: Top domains by page count
print("Top 20 domains by number of indexed pages:")
print("=" * 80)
result = con.execute(f"""
    SELECT 
        url_hostname,
        COUNT(*) as page_count
    FROM read_parquet('{parquet_path}')
    WHERE url_hostname IS NOT NULL
    GROUP BY url_hostname
    ORDER BY page_count DESC
    LIMIT 20
""").fetchdf()
print(result)
print()

In [None]:
# Example 3: Pages by top-level domain extension
print("Pages by TLD:")
print("=" * 80)
result = con.execute(f"""
    SELECT 
        SPLIT_PART(url_hostname, '.', -1) as tld,
        COUNT(*) as count
    FROM read_parquet('{parquet_path}')
    WHERE url_hostname IS NOT NULL
    GROUP BY tld
    ORDER BY count DESC
    LIMIT 15
""").fetchdf()
print(result)
print()

In [None]:
# Example 4: Pages by file extension
print("Pages by file extension:")
print("=" * 80)
result = con.execute(f"""
    SELECT 
        CASE 
            WHEN url_path_ext = '' THEN '(no extension)'
            ELSE url_path_ext
        END as extension,
        COUNT(*) as count
    FROM read_parquet('{parquet_path}')
    GROUP BY extension
    ORDER BY count DESC
    LIMIT 15
""").fetchdf()
print(result)
print()

In [None]:
# Example 5: Distribution of resource states
print("Resource state distribution:")
print("=" * 80)
result = con.execute(f"""
    SELECT 
        state,
        COUNT(*) as count,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as percentage
    FROM read_parquet('{parquet_path}')
    WHERE state IS NOT NULL
    GROUP BY state
    ORDER BY count DESC
""").fetchdf()
print(result)
print()

In [None]:
# Example 6: Find pages with specific keywords in title (case-insensitive)
keyword = "machine learning"
print(f"Pages with '{keyword}' in title:")
print("=" * 80)
result = con.execute(f"""
    SELECT 
        title,
        url_hostname,
        url
    FROM read_parquet('{parquet_path}')
    WHERE LOWER(title) LIKE LOWER('%{keyword}%')
    LIMIT 10
""").fetchdf()
print(result)
print()

## Part 10: Simple Approach - Combined HNSW + Postcard

For simpler use cases or when you have sufficient RAM, you can use the combined HNSW indices with the Postcard binary format for fast direct lookups.

This approach is simpler because:
- Single index file instead of 64 shards
- No need to query in parallel and merge
- Direct UID to data mapping via Postcard

**Requirements:**
- Combined block-mean: 1.1 TB RAM
- Combined statement: 767 GB RAM
- Total: ~1.9 TB RAM

### Loading Combined HNSW Indices

Let's load the single combined indices instead of the 64 shards:

In [None]:
print("Loading combined HNSW indices...")
print("This requires approximately 1.9 TB of RAM.")
print("This is simpler than sharded approach but needs more memory.")
print()

# Load combined block-mean index
print("Loading combined block-mean index (1.1 TB)...")
combined_block_mean = load_hnsw_index(f"{DATA_ROOT}/hnsw-combined/block-mean.hnsw")
print(f"  Loaded: {combined_block_mean.get_current_count():,} vectors")

# Load combined statement index
print("Loading combined statement index (767 GB)...")
combined_statement = load_hnsw_index(f"{DATA_ROOT}/hnsw-combined/statement.hnsw")
print(f"  Loaded: {combined_statement.get_current_count():,} vectors")

print("Combined indices loaded successfully.")

### Simple Search with Combined Indices

With combined indices, searching is much simpler - just query once, no parallel execution or merging needed:

In [None]:
def simple_semantic_search(query: str, k: int = 10):
    """
    Simplified semantic search using combined indices.
    Much simpler than the sharded approach.
    """
    print(f"Query: {query}")
    print("=" * 80)
    
    # Embed query
    query_vec = model.encode(query, normalize_embeddings=True).astype('float32')
    
    # Query combined indices directly (no sharding, no parallel execution)
    print("Querying combined indices...")
    block_labels, block_distances = combined_block_mean.knn_query(query_vec, k=100)
    stmt_labels, stmt_distances = combined_statement.knn_query(query_vec, k=32)
    
    # Collect results (same logic as before)
    results = {}
    
    # Block results have resource UIDs directly
    for resource_uid, distance in zip(block_labels[0], block_distances[0]):
        if resource_uid not in results:
            results[resource_uid] = {
                'uid': int(resource_uid),
                'block_distance': float(distance),
                'block_similarity': float(1 - distance),
                'statement_distances': []
            }
    
    # Statement results need mapping
    for statement_uid, distance in zip(stmt_labels[0], stmt_distances[0]):
        url = get_url_by_statement_uid(int(statement_uid))
        if url:
            resource_info = get_resource_by_url(url)
            if resource_info:
                resource_uid = resource_info['uid']
                if resource_uid not in results:
                    results[resource_uid] = {
                        'uid': resource_uid,
                        'block_distance': float('inf'),
                        'block_similarity': 0.0,
                        'statement_distances': []
                    }
                results[resource_uid]['statement_distances'].append(float(distance))
    
    # Sort and fetch metadata
    results = list(results.values())
    results.sort(key=lambda r: min(r['block_distance'], 
                                   min(r['statement_distances']) if r['statement_distances'] else float('inf')))
    results = results[:k]
    
    # Map to URLs and fetch metadata
    for result in results:
        url = get_url_by_uid(result['uid'])
        result['url'] = url
        if url:
            metadata = get_resource_metadata(url)
            if metadata:
                result['title'] = metadata.get('title')
            meta_tags = get_resource_meta_tags(url)
            result['description'] = meta_tags.get('og:description', '')
    
    results = [r for r in results if r['url'] is not None]
    print("=" * 80)
    
    return results

# Test the simple approach
simple_results = simple_semantic_search("machine learning fundamentals", k=5)

print("\nResults:")
for i, r in enumerate(simple_results, 1):
    print(f"{i}. {r['title'] or '(No title)'}")
    print(f"   Similarity: {r['block_similarity']:.4f}")
    print(f"   URL: {r['url']}")
    print()

### Postcard Implementation

The Postcard format is optimized for Rust. Below is a Rust implementation that you can compile as a Python extension using PyO3:

```rust
// Cargo.toml dependencies:
// pyo3 = { version = "0.20", features = ["extension-module"] }
// postcard = "1.0"
// serde = { version = "1.0", features = ["derive"] }
// dashmap = "5.5"
// bytemuck = "1.14"

use pyo3::prelude::*;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use bytemuck::cast_slice;

#[derive(Deserialize, Serialize, Clone)]
pub struct ExportedResourcePostcardRow {
    pub url: String,
    pub uid: u64,
    pub fetch_id: u64,
    pub statement_uid_base: Option<u64>,
    pub http_status: Option<u16>,
    pub title: Option<String>,
    pub icon_url: Option<String>,
    // Offsets for external blob files
    pub norm_doc_json_brotli_offset: Option<u64>,
    pub norm_doc_json_brotli_size: u64,
    pub source_brotli_offset: Option<u64>,
    pub source_brotli_size: u64,
    pub statements_json_brotli_offset: Option<u64>,
    pub statements_json_brotli_size: u64,
    pub block_embeddings_msgpack_offset: Option<u64>,
    pub block_embeddings_msgpack_size: u64,
    pub statement_embeddings_msgpack_offset: Option<u64>,
    pub statement_embeddings_msgpack_size: u64,
}

#[pyclass]
struct PostcardData {
    uid_to_row: DashMap<u64, usize>,
    row_to_offset: DashMap<usize, u64>,
    data_path: String,
    data_len: u64,
}

#[pymethods]
impl PostcardData {
    #[new]
    fn new(folder: &str) -> PyResult<Self> {
        let folder_path = std::path::Path::new(folder);
        
        let mut uids_file = File::open(folder_path.join("uids.bin"))?;
        let mut uids_bytes = Vec::new();
        uids_file.read_to_end(&mut uids_bytes)?;
        let uids: &[u64] = cast_slice(&uids_bytes);
        
        let mut offsets_file = File::open(folder_path.join("offsets.bin"))?;
        let mut offsets_bytes = Vec::new();
        offsets_file.read_to_end(&mut offsets_bytes)?;
        let offsets: &[u64] = cast_slice(&offsets_bytes);
        
        let uid_to_row = DashMap::new();
        let row_to_offset = DashMap::new();
        
        for (row, (&uid, &offset)) in uids.iter().zip(offsets.iter()).enumerate() {
            uid_to_row.insert(uid, row);
            row_to_offset.insert(row, offset);
        }
        
        let data_path = folder_path.join("data.bin").to_string_lossy().to_string();
        let data_len = std::fs::metadata(&data_path)?.len();
        
        Ok(Self { uid_to_row, row_to_offset, data_path, data_len })
    }
    
    fn get(&self, uid: u64) -> PyResult<ExportedResourcePostcardRow> {
        let row = *self.uid_to_row.get(&uid)
            .ok_or_else(|| pyo3::exceptions::PyKeyError::new_err("UID not found"))?;
        
        let offset = *self.row_to_offset.get(&row).unwrap();
        let next_offset = self.row_to_offset.get(&(row + 1))
            .map(|r| *r)
            .unwrap_or(self.data_len);
        
        let mut file = File::open(&self.data_path)?;
        file.seek(SeekFrom::Start(offset))?;
        let mut buffer = vec![0u8; (next_offset - offset) as usize];
        file.read_exact(&mut buffer)?;
        
        postcard::from_bytes(&buffer)
            .map_err(|e| pyo3::exceptions::PyValueError::new_err(format!("{}", e)))
    }
}

#[pymodule]
fn postcard_reader(_py: Python, m: &PyModule) -> PyResult<()> {
    m.add_class::<PostcardData>()?;
    Ok(())
}
```

Build with `maturin`:
```bash
pip install maturin
maturin develop --release
```

Use in Python:
```python
import postcard_reader
postcard = postcard_reader.PostcardData(f"{DATA_ROOT}/export/data-postcard")
resource = postcard.get(12345)
print(resource.title)
```

## Summary

This notebook covered:

1. Mounting and accessing the AWS Open Data search engine dataset
2. Generating embeddings for natural language queries
3. Searching billions of vectors using sharded HNSW indices
4. Working with sharded RocksDB using the exact same schema as the original implementation
5. Mapping between resource UIDs, statement UIDs, and URLs
6. Finding the most relevant sections within pages
7. Performing analytical queries using DuckDB and Parquet
8. Understanding the Postcard format for efficient data access

### Two Approaches

**Sharded HNSW (Parts 2-8):**
- Uses ~1.6 TB RAM
- Queries 64 shards in parallel
- More complex but scales better
- Production approach

**Combined HNSW + Postcard (Part 10):**
- Uses ~1.9 TB RAM
- Single index, simpler queries
- Good for experimentation
- Requires Rust extension for Postcard

### Further Exploration

- **Hybrid search**: Combine semantic search with BM25 for keyword matching
- **Reranking**: Use cross-encoder models to rerank results
- **Query expansion**: Generate related queries automatically
- **Faceted search**: Add filters by domain, language, date, etc.
- **Knowledge graph**: Integrate DBpedia/Wikidata entities
- **Clustering**: Group similar pages together
- **Trend analysis**: Analyze what content is indexed over time

### Resources

- GitHub repository: https://github.com/wilsonzlin/datasets/search-engine-open-data/
- Original blog post: https://blog.wilsonl.in/search-engine