## Ingest Wikipedia in a few mins!

TODO: add Wiki app screenshot

In this guide, we'll see how you can embed and  ingest Wikipedia dataset to LanceDB table using Geneva for autoscaling your embedding/ingestion jobs.
You can explore the end result of a similar experiment using this app "Wiki fact check". In this notebook, we'll only cover building the embedding, ingesetion and retrieval pipeline. Let's get started

In [None]:
# Install required libraries
!pip install geneva transformers sentence-transformers

### [optional] Check GCP permissions
In this notebook we'll use GCP for compute. You can verify you have the right cluster access using these commands:

```
!sudo apt-get install -y google-cloud-sdk-gke-gcloud-auth-plugin

!gcloud container clusters get-credentials geneva-integ \
  --region us-central1 \
  --project lancedb-dev-us-central1
  
!kubectl config get-contexts

!kubectl get namespaces
```

### Setup LanceDB table
We'll ingest in a LanceDB data by chunking the text content into chunks of 512 characters. The code to download and upload the dataset to gcs bucket is provided in `upload_gcs.py` util file. 

In [1]:
from datasets import load_from_disk, disable_caching

disable_caching()
DATASET_PATH = "gs://wikipedia-geneva/train"
CHUNK_SIZE = 512
ds = load_from_disk(DATASET_PATH)



Loading dataset from disk:   0%|          | 0/41 [00:00<?, ?it/s]

In [2]:
def process_batch(num_chunks=100_000):
    chunks = []
    total_chars = 0
    processed = 0

    # Generate all chunks first
    for text, doc_id, url, title in zip(ds["text"], ds["id"], 
                                      ds["url"], ds["title"]):
        total_chars += len(text)
        for idx, i in enumerate(range(0, len(text), CHUNK_SIZE)):
            chunks.append({
                "content": text[i:i + CHUNK_SIZE],
                "identifier": doc_id,
                "url": url,
                "title": title,
                "chunk_index": idx
            })
            if len(chunks) >= num_chunks:
                return chunks
    return chunks

In [17]:
num_rows = 100_000_00
chunks = process_batch(num_rows)

In [18]:
len(chunks)

10000000

In [19]:
# We'll ingest data in batches, which is completely optional. Ingesting in batched mode is done to see more interactive progress bars
# for demonstration purpose.

import geneva
import pyarrow as pa

LANCEDB_URI = "gs://wikipedia-geneva/db"

db = geneva.connect(LANCEDB_URI)

# Define schema for the table
schema = pa.schema([
    ("content", pa.string()),
    ("identifier", pa.string()),
    ("url", pa.string()),
    ("title", pa.string()),
    ("chunk_index", pa.int32())
])

tbl = db.create_table("wiki-10m", schema=schema, mode="overwrite")
for i in range(0, num_rows, 100_000):
    tbl.add(chunks[i: i + 100_000])

[90m[[0m2025-06-17T13:43:54Z [33mWARN [0m lance::dataset::write::insert[90m][0m No existing dataset at gs://wikipedia-geneva/db/wiki-10m.lance?, it will be created


In [2]:
db = geneva.connect(LANCEDB_URI)
tbl = db["wiki-10m"]

In [21]:
del chunks

In [22]:
tbl.schema

content: string
identifier: string
url: string
title: string
chunk_index: int32

In [23]:
tbl.count_rows()

10000000

### [Optional] Set logging level
Setting log level to debug can help you daignose problems if you run into any.

In [None]:
import logging
import sys
logging.basicConfig(level=logging.DEBUG, stream=sys.stderr, force=True) 

### Defining geneva UDF

Geneva improves the productivity of AI engineers by streamlining feature engineering tasks. It is designed to reduce the time required to prototype, perform experiments, scale up, and move to production.

Geneva uses Python **User Defined Functions (UDFs)** to define features as columns in a Lance dataset. Adding a feature is straightforward:

1. Prototype your Python function in your favorite environment.
2. Wrap the function with small UDF decorator.
3. Register the UDF as a virtual column using Table.add_columns().
4. Trigger a backfill operation.

There are various kinds of UDFs you can use depending on the task type
1. Row-level, stateless UDFs - You can use these when you're tasks don't need to be optimized with batch processing, and they don't require complex setup each time
2. Row-level, stateful UDFs - You can use these when you're tasks don't need to be optimized with batch processing, and they require complex setup each time
3. Batched, Statless UDFs - You can use these when batch processing is faster but you don't require complex setup each time.
4. Batched, Stateful UDFs - You can use these when batch processing is faster AND you require complex setup (like loading model) for each batch.

Read more about geneva UDFs here - https://lancedb.github.io/geneva/features/UDFs/

In this example we'll use Batched, Stateful UDF 

NOTE: **cuda=True** means this UDF is meant to run on GPU nodes

In [24]:
from typing import Callable
from sentence_transformers import SentenceTransformer
import torch
from geneva import udf


@udf(data_type=pa.list_(pa.float32(), 384), cuda=True)
class TextEmbedding(Callable):
    def __init__(self, model: str = "BAAI/bge-small-en-v1.5"):
        self.model_name = model
        self.is_loaded = False
    
    def setup(self):
        self.model = SentenceTransformer(self.model_name)     
        self.is_loaded = True

    def __call__(self, batch: pa.RecordBatch) -> pa.Array:
        if not self.is_loaded:
            self.setup()
            
        content = batch["content"].to_pylist()
        with torch.no_grad():
            embeddings = self.model.encode(content, convert_to_tensor=True, normalize_embeddings=True)
            embeddings = embeddings.cpu().tolist()
        return pa.array(embeddings, pa.list_(pa.float32(), 384))

### Defining our compute Cluster

Geneva support multiple backends. For this we'll use gcp kubernetes. Let's setup ray cluster config. It consists of Head node, a CPU worker, and a GPU worker.
You can scale them to your liking.

In [25]:
from geneva.runners.ray._mgr import ray_cluster
from geneva.runners.ray.raycluster import RayCluster, _HeadGroupSpec, _WorkerGroupSpec
from geneva.config import override_config
from geneva.config.loader import from_kv


k8s_name = "ayush-k8"
k8s_namespace = "geneva"

override_config(from_kv({"uploader.upload_dir": "gs://wikipedia-geneva/zips"}))

cluster = ray_cluster(
        name= k8s_name, 
        namespace=k8s_namespace,
        head_group=_HeadGroupSpec(
            service_account="geneva-integ-test",
            num_cpus=4,
            memory="8G",
        ),
        worker_groups=[
            _WorkerGroupSpec(
                name="cpu",
                num_cpus=16,
                memory="32G",
                service_account="geneva-integ-test",
            ),
            _WorkerGroupSpec(
                name="gpu",
                num_cpus=8,
                memory="32G",
                num_gpus=1,
                service_account="geneva-integ-test",
            ),
        ],
    
    )

In [None]:
# this starts a cluster and zips your local env so that it can be reproduced in all the VMs in the cluster
cluster.__enter__()

### Register the embedding column using UDF
Registering a feature is done by providing the `Table.add_columns()` function a new column name and the Geneva UDF.

In [27]:
tbl.add_columns({"embedding": TextEmbedding()})

### Run Backfill
Triggering backfill creates a distributed job to run the UDF and populate the column values in your LanceDB table. The Geneva framework simplifies several aspects of distributed execution.

**Environment management**: Geneva automatically packages and deploys your Python execution environment to worker nodes. This ensures that distributed execution occurs in the same environment and depedencies as your prototype.

**Checkpoints**: Each batch of UDF execution is checkpointed so that partial results are not lost in case of job failures. Jobs can resume and avoid most of the expense of having to recalculate values.

`backfill` accepts various params to customise scale of your workload, here we'll use:
* **batch_size** of 5000 - Which determines the inference batch size 
* **concurrency** of 16 - Which determins how many GPU nodes used for parallelization

In [28]:
tbl.backfill("embedding", batch_size=5000, concurrency=16)

  0%|          | 0/2000 [00:00<?, ?it/s]

  0%|          | 0/100 [00:00<?, ?it/s]

[90m[[0m2025-06-17T14:05:27Z [33mWARN [0m lance::dataset::transaction[90m][0m Building manifest with DataReplacement operation. This operation is not stable yet, please use with caution.
[90m[[0m2025-06-17T14:11:49Z [33mWARN [0m lance::dataset::transaction[90m][0m Building manifest with DataReplacement operation. This operation is not stable yet, please use with caution.
[90m[[0m2025-06-17T14:11:49Z [33mWARN [0m lance::dataset::transaction[90m][0m Building manifest with DataReplacement operation. This operation is not stable yet, please use with caution.


### [optional] clear HF cache
You might want to clear HF cache by running `clear_cache.py` util file. It clears the dataset cache so that it's not automatically picked up by geneva when packaging your env!

In [29]:
# Exit the cluster and release resources
cluster.__exit__(None, None, None)

False

### Let's optimise the table and create an index for searching

In [4]:
tbl.optimize()
tbl.create_index(vector_column_name="embedding")

### Perform retreival

In [7]:
tbl = tbl._ltbl
query = "what is a vector"
model = SentenceTransformer("BAAI/bge-small-en-v1.5")     
vector = model.encode([query], convert_to_tensor=True, normalize_embeddings=True).cpu().numpy()[0]
vector.shape
            

(384,)

In [8]:
tbl.search(vector).to_pandas()

Unnamed: 0,content,identifier,url,title,chunk_index,embedding,_distance
0,"e of tensor.\n\nIn pure mathematics, a vector ...",32533,https://en.wikipedia.org/wiki/Euclidean%20vector,Euclidean vector,28,"[-0.04118163, -0.031828538, -0.013418324, 0.01...",0.338192
1,Vector may refer to:\n\nBiology\nVector (epide...,243816,https://en.wikipedia.org/wiki/Vector,Vector,0,"[0.026708547, -0.045680013, 0.0049091275, 0.03...",0.352812
2,ype.\n\nVectors \nVectors are physical quantit...,23204,https://en.wikipedia.org/wiki/Physical%20quantity,Physical quantity,5,"[-0.052353792, -0.07202609, 0.015442002, -0.01...",0.375776
3,"In mathematics, physics and engineering, a Euc...",32533,https://en.wikipedia.org/wiki/Euclidean%20vector,Euclidean vector,0,"[-0.048410792, -0.057634044, 0.046941835, -0.0...",0.386303
4,"l vector, the interval content of a given set ...",243816,https://en.wikipedia.org/wiki/Vector,Vector,11,"[-0.020939205, -0.061921444, 0.014615878, 0.00...",0.400136
5,"In mathematics and statistics, a probability v...",217133,https://en.wikipedia.org/wiki/Probability%20ve...,Probability vector,0,"[-0.048714407, -0.06333104, -0.0123118125, 0.0...",0.409078
6,"In linear algebra, a coordinate vector is a re...",879358,https://en.wikipedia.org/wiki/Coordinate%20vector,Coordinate vector,0,"[-0.044985604, -0.049643297, -0.013043938, -0....",0.410997
7,"In physics, a wave vector (also spelled waveve...",686036,https://en.wikipedia.org/wiki/Wave%20vector,Wave vector,0,"[-0.0241238, -0.0714645, -0.022887865, 0.00534...",0.415834
8,"protocols\nDope vector, a data structure used...",243816,https://en.wikipedia.org/wiki/Vector,Vector,3,"[-0.04756716, -0.03191846, 0.031988315, 0.0149...",0.421834
9,"f certain email headers, \nthe email structure...",1299404,https://en.wikipedia.org/wiki/Feature%20%28mac...,Feature (machine learning),3,"[-0.012206657, -0.041650396, -0.01602055, 0.00...",0.42648
