# Phase 3: Ingest into OpenSearch

Reads the embeddings Delta table and bulk-loads everything into the OpenSearch index running in Docker.

### The tuning exercise
Try different `BATCH_SIZE` values (100, 500, 1000, 2000) and record the elapsed time. This is your "Uber 12.5 hrs → 2.5 hrs" story — document the table in your README.

| batch_size | elapsed (s) | docs/s |
|-----------|------------|--------|
| 100       | ?          | ?      |
| 500       | ?          | ?      |
| 1000      | ?          | ?      |
| 2000      | ?          | ?      |

In [35]:
import os
import time
from pyspark.sql import SparkSession
from opensearchpy import OpenSearch
from opensearchpy.helpers import bulk
from openai import OpenAI

In [36]:
DELTA_IN      = "../delta_lake/embeddings/restaurants"
INDEX_NAME    = "restaurants"
VECTOR_DIM    = 1024   # text-embedding-3-small
BATCH_SIZE    = 500    # <-- tune this and record results

OS_HOST = "localhost"
OS_PORT = 9200

## 1. Connect to OpenSearch

In [37]:
client = OpenSearch(
    hosts=[{"host": OS_HOST, "port": OS_PORT}],
    http_compress=True,
)

info = client.info()
print(f"OpenSearch version: {info['version']['number']}")

OpenSearch version: 2.11.0


## 2. Create index with HNSW mapping

- `m=16`: each node connects to 16 neighbours during graph construction — higher = better recall, more memory
- `ef_construction=128`: size of the candidate list at build time — higher = slower build, better graph quality
- `ef_search=100`: size of the candidate list at query time — tune at query time without rebuilding

In [38]:
# Drop and recreate for a clean slate
if client.indices.exists(index=INDEX_NAME):
    client.indices.delete(index=INDEX_NAME)
    print(f"Dropped existing index '{INDEX_NAME}'")

index_config = {
    "settings": {
        "index": {
            "knn": True,
            "knn.algo_param.ef_search": 100,
            "number_of_shards": 1,
            "number_of_replicas": 0,
        }
    },
    "mappings": {
        "properties": {
            "restaurant_id": {"type": "keyword"},
            "name":          {"type": "text"},
            "cuisines":      {"type": "text"},
            "location":      {"type": "text"},
            "rating":        {"type": "float"},
            "cost_for_two":  {"type": "integer"},
            "text_for_embedding": {"type": "text"},
            "embedding": {
                "type": "knn_vector",
                "dimension": VECTOR_DIM,
                "method": {
                    "name": "hnsw",
                    "engine": "nmslib",   # supports up to 16k dims (lucene caps at 1024)
                    "space_type": "cosinesimil",
                    "parameters": {
                        "m": 16,
                        "ef_construction": 128,
                    },
                },
            },
        }
    },
}

client.indices.create(index=INDEX_NAME, body=index_config)
print(f"Index '{INDEX_NAME}' created  (dim={VECTOR_DIM}, engine=nmslib)")

Dropped existing index 'restaurants'
Index 'restaurants' created  (dim=1024, engine=nmslib)


## 3. Read embeddings from Delta Lake

In [39]:
spark = (
    SparkSession.builder
    .appName("ZomatoSemanticSearch-Ingestion")
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .master("local[*]")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")

df = spark.read.format("delta").load(DELTA_IN)
print(f"Rows to ingest: {df.count():,}")

Rows to ingest: 9,542


## 4. Bulk ingest with tunable batch_size

In [40]:
def ingest_to_opensearch(rows, batch_size: int = 500):
    actions = []
    ingested = 0
    t0 = time.time()

    for row in rows:
        actions.append({
            "_index": INDEX_NAME,
            "_id":    row["restaurant_id"],
            "_source": {
                "restaurant_id":      row["restaurant_id"],
                "name":               row["name"],
                "cuisines":           row["cuisines"],
                "location":           row["location"] if row["location"] else "",
                "rating":             float(row["rating"]) if row["rating"] else 0.0,
                "cost_for_two":       int(row["cost_for_two"]) if row["cost_for_two"] else 0,
                "text_for_embedding": row["text_for_embedding"],
                "embedding":          row["embedding"],
            },
        })

        if len(actions) >= batch_size:
            success, errors = bulk(client, actions, raise_on_error=False)
            ingested += success
            if errors:
                print(f"  {len(errors)} errors in last batch")
            actions = []
            elapsed = time.time() - t0
            print(f"  {ingested:,} docs  |  {elapsed:.1f}s elapsed")

    # Flush remainder
    if actions:
        success, _ = bulk(client, actions, raise_on_error=False)
        ingested += success

    total = time.time() - t0
    print(f"\nIngestion complete: {ingested:,} docs in {total:.1f}s  ({ingested/total:.0f} docs/s)")
    return total


rows = df.collect()
ingest_to_opensearch(rows, batch_size=BATCH_SIZE)

                                                                                

  500 docs  |  1.5s elapsed
  1,000 docs  |  2.9s elapsed
  1,500 docs  |  4.4s elapsed
  2,000 docs  |  5.7s elapsed
  2,500 docs  |  7.1s elapsed
  3,000 docs  |  8.4s elapsed
  3,500 docs  |  9.8s elapsed
  4,000 docs  |  11.1s elapsed
  4,500 docs  |  12.4s elapsed
  5,000 docs  |  13.8s elapsed
  5,500 docs  |  15.1s elapsed
  6,000 docs  |  16.5s elapsed
  6,500 docs  |  17.9s elapsed
  7,000 docs  |  19.2s elapsed
  7,500 docs  |  20.5s elapsed
  8,000 docs  |  21.9s elapsed
  8,500 docs  |  23.2s elapsed
  9,000 docs  |  24.6s elapsed
  9,500 docs  |  26.0s elapsed

Ingestion complete: 9,542 docs in 26.1s  (365 docs/s)


26.14699697494507

## 5. Verify index stats

In [41]:
client.indices.refresh(index=INDEX_NAME)

stats = client.cat.indices(index=INDEX_NAME, h=["index", "docs.count", "store.size"], v=True)
print(stats)

index       docs.count store.size
restaurants       9542    267.4mb



## 6. Quick sanity query

In [42]:
from dotenv import load_dotenv
load_dotenv()

True

In [43]:
openai_client = OpenAI()   # Reads OPENAI_API_KEY from env

query = "First Date"
query_vec = openai_client.embeddings.create(
    model="text-embedding-3-small",
    input=[query],
    dimensions=1024
).data[0].embedding

resp = client.search(
    index=INDEX_NAME,
    body={
        "size": 5,
        "query": {
            "knn": {
                "embedding": {
                    "vector": query_vec,
                    "k": 5,
                }
            }
        },
        "_source": {"excludes": ["embedding"]},
    }
)

print(f"Top results for '{query}':")
for hit in resp["hits"]["hits"]:
    src = hit["_source"]
    print(f"  {src['name']:40s}  |  {src['cuisines']:30s}  |  score {hit['_score']:.4f}")

Top results for 'First Date':
  Let's Meet Up                             |  Fast Food, Bakery               |  score 0.6012
  Rendezvous Adda                           |  North Indian, Bengali           |  score 0.5938
  Hook Up                                   |  North Indian, Continental       |  score 0.5917
  Chapter 1 Cafe                            |  Cafe, Italian, Mexican, North Indian, Continental  |  score 0.5916
  The Host                                  |  Chinese                         |  score 0.5888


In [44]:
spark.stop()