The tutorial is divided into two parts:

*   **Part 1: Feature Engineering with LanceDB and Geneva**: In this part, we'll focus on the crucial process of feature engineering. We'll use LanceDB and its Geneva feature engineering framework to enrich our data with meaningful features that will power our search engine.

*   **Part 2: Inference and Retrieval with LanceDB**: In this part, we'll build the inference and retrieval pipeline that uses the features we engineered in Part 1 to provide a powerful and intuitive search experience. We'll cover query routing, hybrid search, and reranking to build a state-of-the-art search engine. 

## Part 1: Feature Engineering with LanceDB and Geneva

This notebook is the first part of our tutorial on building an advanced product search engine. In this part, we will focus on the crucial process of feature engineering. We'll start with a raw dataset of fashion products, and ingest it in LanceDB. We'll then use Geneva to enrich our data with meaningful features that will power our search engine.

We will cover the following steps:
1. **Data Ingestion**: Downloading a fashion dataset and loading it into a LanceDB table.
2. **Declarative Feature Engineering**: Using Geneva to define and compute features on-the-fly.
3. **Embedding Generation**: Creating vector embeddings for both images and text to enable semantic search.
4. **Indexing**: Creating indexes to speed up our queries.

In [None]:
!pip install --upgrade geneva lancedb kubernetes "ray[default]" rerankers pandas pillow transformers torch torchvision open-clip-torch

## 1. Data Ingestion

First, let's download our dataset. We're using a small version of the Fashion Product Images dataset from Kaggle. This dataset contains images and metadata for a variety of fashion products.

In [None]:
!sudo rm -r db fashion-dataset # Delete if already exists

# ### HIGH RES DATASET (Slow to download) #
#!curl -L -o fashion-product-images-dataset.zip\
#  https://www.kaggle.com/api/v1/datasets/download/paramaggarwal/fashion-product-images-dataset

#!unzip -q fashion-product-images-dataset.zip 

# SAME DATASET WITH LOW RES IMAGES #

!curl -L -o fashion-product-images-small.zip\
  https://www.kaggle.com/api/v1/datasets/download/paramaggarwal/fashion-product-images-small
!unzip -q fashion-product-images-small.zip -d fashion-dataset/

## Set Scale based on your environment

This example uses geneva locally by default - which means the scale of concurrent jobs will be limited to the system you're working on. Set these params based on your CPU/GPU and memory configs

In [None]:
## Update this based on your env

# Start with 100-1000 images for testing. With only a few images, you might not get great query
# results, but it will run faster. Try up to 44200 for the full dataset.
DATASET_SIZE = 1000 
# Increase this if you have more CPUs available and want it to run faster.
CONCURRENCY = 4

CHECKPOINT_SIZE = 300

In [None]:
import io
import geneva
from geneva import udf
import lancedb
import pandas as pd
import pyarrow as pa
import numpy as np
from pathlib import Path
from PIL import Image
import torch
import open_clip
from typing import Callable

IMG_DIR = Path("fashion-dataset/images")
STYLE_CSV = Path("fashion-dataset/styles.csv")
DB_PATH = "./db"
TABLE_NAME = "products"
INSERT_FRAG_SIZE = min(1000, DATASET_SIZE / 10)

Now, let's load the data into a LanceDB table. We'll read the CSV file with the product metadata, and for each product, we'll also load the corresponding image from the `images` directory. We'll then create a LanceDB table and add the data to it in batches. LanceDB can store objects(images in this case) along with vector embeddings and metadata.

In [None]:
df = pd.read_csv(STYLE_CSV, on_bad_lines='skip')
df = df.dropna(subset=["id", "productDisplayName"])    
df = df.drop_duplicates(subset=["id"], keep="first")    
df = df.sample(DATASET_SIZE) # set to 1000 for testing

def generate_rows(df, img_dir):
    for _, row in df.iterrows():
        img_path = img_dir / f"{row['id']}.jpg"
        if not img_path.exists():
            continue
        with open(img_path, "rb") as f:
            yield {
                "id": int(row["id"]),
                "description": row["productDisplayName"],
                "image_bytes": f.read()
            }

db = lancedb.connect(DB_PATH)

# Drop the table if it already exists so we can recreate it
try:
    table = db.drop_table(TABLE_NAME)
except ValueError as e:
    pass
    
data_stream = generate_rows(df, IMG_DIR)
table = None

rows = []
for row in data_stream:
    rows.append(row)
    if len(rows) == INSERT_FRAG_SIZE:
        if table:
            table.add(rows)
        else:
            table = db.create_table(TABLE_NAME, data=rows)
        rows = []
if rows:
    table.add(rows)
    
len(table)

## 2. Feature Engineering with Geneva

Now that we have our data in a LanceDB table, we can start engineering features. We'll use Geneva to create new features for our products. 

### Defining geneva UDF

Geneva uses Python User Defined Functions (UDFs) to define features as columns in a Lance dataset. Adding a feature is straightforward:

1. Prototype your Python function in your favorite environment.
2. Wrap the function with small UDF decorator.
3. Register the UDF as a virtual column using Table.add_columns().
4. Trigger a backfill operation.
There are various kinds of UDFs you can use depending on the task type

* **Row-level, stateless UDFs** - You can use these when you're tasks don't need to be optimized with batch processing, and they don't require complex setup each time
* **Row-level, stateful UDFs** - You can use these when you're tasks don't need to be optimized with batch processing, and they require complex setup each time
* **Batched, Statless UDFs** - You can use these when batch processing is faster but you don't require complex setup each time.
* **Batched, Stateful UDFs** - You can use these when batch processing is faster AND you require complex setup (like loading model) for each batch.
Read more about geneva UDFs here - TODO: Add new docs link

In this example we'll use Batched, Stateful UDF

NOTE: num_gpus>0 means this UDF is meant to run on GPU nodes

### Simple Feature Extraction

Let's start with a simple feature: extracting color tags from the product description. We'll define a User-Defined Function (UDF) that takes the product description as input and returns a comma-separated string of colors found in the description.

In [None]:
db = geneva.connect(DB_PATH)
table = db.open_table(TABLE_NAME)

In [None]:
@udf
def color_tags(description: str)-> str:
    colors = ["black", "white", "red", "blue", "green", "yellow", "pink", "brown", "grey", "silver"]
    return ", ".join([c for c in colors if c in description.lower()])

### Adding a Computed Column

Now that we've defined our feature-generating UDF, we can add it to our table as a computed column. Computed columns are computed on-the-fly when you perform a backfill operation.

In [None]:
table.add_columns({
    "color_tags": color_tags,
})

Let's inspect the table schema to see our newly registered UDF.

In [None]:
table.schema

### Backfilling Features

Triggering backfill creates a distributed job to run the UDF and populate the column values in your LanceDB table. The Geneva framework simplifies several aspects of distributed execution.

Environment management: Geneva automatically packages and deploys your Python execution environment to worker nodes. This ensures that distributed execution occurs in the same environment and depedencies as your prototype.

Checkpoints: Each batch of UDF execution is checkpointed so that partial results are not lost in case of job failures. Jobs can resume and avoid most of the expense of having to recalculate values.

backfill accepts various params to customise scale of your workload, here we'll use:

* **checkpoint_size**  - Which determines the number of rows that are processed before writing a checkpoint
* **concurrency** - Which determins how many nodes are used for parallelization

Here, we're using geneva locally, so we won't set up a Ray cluster, but you can also use the same setup and run distributed jobs remotely on Ray clusters.

In [None]:
table.backfill("color_tags", checkpoint_size=CHECKPOINT_SIZE, concurrency=CONCURRENCY)

Let's take a look at our enriched data.

In [None]:
table.search().limit(3).to_pandas()

## 3. Embedding Generation

Now that we have our text-based features, let's create some vector embeddings. Embeddings are numerical representations of data that capture its semantic meaning. We'll create embeddings for our product images and for our new `summary` and `occasion` features.

### Image Embeddings

We'll use a pretrained CLIP model to generate embeddings for our product images. We'll define a UDF that takes a batch of image bytes as input, preprocesses them, and then uses the CLIP model to generate embeddings.

In [None]:

@udf(version="0.1", num_gpus=1 if torch.cuda.is_available() else 0, data_type=pa.list_(pa.float32(), 512))
class GenEmbeddings(Callable):

    def __init__(self):
        self.is_loaded=False


    def setup(self):
        self.model, _, self.preprocess = open_clip.create_model_and_transforms("ViT-B-32", pretrained="laion2b_s34b_b79k")
        self.tokenizer = open_clip.get_tokenizer("ViT-B-32")
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.model = self.model.to(self.device).eval()

        self.is_loaded=True

    def __call__(self, image_bytes:pa.Array) -> pa.Array:
        if not self.is_loaded:
            self.setup()

        embeddings = []
        for b in image_bytes:
            this_image_bytes = b.as_buffer().to_pybytes()

            image_stream = io.BytesIO(this_image_bytes)
            img = Image.open(image_stream).convert("RGB")
            img_tensor = self.preprocess(img).unsqueeze(0).to(self.device)
            with torch.no_grad():
                emb_tensor = self.model.encode_image(img_tensor)
                emb_tensor /= emb_tensor.norm(dim=-1, keepdim=True)
            np_emb = emb_tensor.squeeze().cpu().numpy().astype(np.float32)

            flat = pa.array(np_emb) # 1D float32 vector of shape (512,)
            embeddings.append(flat)

        stacked = pa.FixedSizeListArray.from_arrays(pa.concat_arrays(embeddings), 512)
        return stacked

In [None]:
from transformers import BlipProcessor, BlipForConditionalGeneration

@udf(version="0.1", num_gpus=1 if torch.cuda.is_available() else 0, data_type=pa.string())
class GenCaptions(Callable):

    def __init__(self):
        self.is_loaded=False

    def setup(self):
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
        self.model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
        self.model = self.model.to(self.device).eval()
        self.is_loaded=True

    def __call__(self, image_bytes:pa.Array) -> pa.Array:
        if not self.is_loaded:
            self.setup()

        captions = []
        for b in image_bytes:
            this_image_bytes = b.as_buffer().to_pybytes()
            
            image_stream = io.BytesIO(this_image_bytes)
            img = Image.open(image_stream).convert("RGB")
            
            inputs = self.processor(img, return_tensors="pt").to(self.device)
            # Use greedy decoding (num_beams=1) and short max_length for speed in this demo
            with torch.no_grad():
                out = self.model.generate(**inputs, max_length=30, num_beams=1, do_sample=False)
            
            caption = self.processor.decode(out[0], skip_special_tokens=True)
            captions.append(caption)

        return pa.array(captions)


### Adding and Backfilling Embedding Columns

Now, let's add our new embedding generators as virtual columns and then backfill them.

In [None]:
for column in ["image_embedding", "captions"]:
    if (column in table.schema.names):
        table.drop_columns([column])
        
table.add_columns({
    "image_embedding": GenEmbeddings(),
    "caption": GenCaptions(),
})

In [None]:
import logging
import sys
logging.basicConfig(level=logging.INFO, stream=sys.stderr, force=True)

In [None]:
table.backfill("image_embedding", checkpoint_size=CHECKPOINT_SIZE, concurrency=CONCURRENCY)

In [None]:
# This may take a few minutes if you're running on a CPU.
table.backfill("caption", checkpoint_size=CHECKPOINT_SIZE, concurrency=CONCURRENCY)

In [None]:
table.search().limit(20).to_pandas()

## 4. Indexing

To speed up our queries, we need to create indexes on our new features. We'll create two types of indexes:

*   **Full-Text Search (FTS) Index**: This will allow us to quickly search for keywords in our `description` column.
*   **Vector Index**: This will allow us to perform fast similarity searches on our `image_embedding` column.

In [None]:
table.create_fts_index("caption")

In [None]:
table.create_index(vector_column_name="image_embedding", num_sub_vectors=128)

In [None]:
table.list_indices()

That's it for the feature engineering part! We now have a LanceDB table enriched with a variety of features that will power our search engine. In the next part of this tutorial, we'll build the inference and retrieval pipeline that uses these features to provide a powerful and intuitive search experience.