## Weaviate 1.31 Enablement session

### Prep

In [None]:
from helpers import get_model_and_processor
import dotenv

dotenv.load_dotenv(verbose=True)
model, processor = get_model_and_processor()


### Key features

- MUVERA encoding algorithm for multi-vector embeddings
    - Efficiently use state-of-the-art multi-vector models
- Vectorizer changes
    - Adapt to changing needs by adding new object vectors as needed
- Shard movement between nodes (experimental)
    - Easier scaling & adaptability in multi-node clusters
- HNSW snapshotting
    - Faster bootup - big bonus for big datasets


In [None]:
import weaviate
import os

client = weaviate.connect_to_local(
    headers={
        "X-Cohere-Api-Key": os.getenv("COHERE_API_KEY"),
        "X-JinaAI-Api-Key": os.getenv("JINAAI_API_KEY")
    }
)

In [None]:
client.get_meta()["version"]

Check the cluster setup:

In [None]:
client.cluster.nodes()

## MUVERA

### Reminder re: multi-vector embeddings

In [None]:
from helpers import show_local_imgs
from pathlib import Path

paths = sorted(list(Path("data/slides/imgs").glob("202504_roadshow_mm_talk_*.png")))

show_local_imgs([paths[12], paths[22], paths[32], paths[55]])

> ### 🤔 Can I search these as images ⬆️

In [None]:
from weaviate.classes.config import Configure, Property, DataType, Tokenization

collection_name = "TempCollection"

client.collections.delete(collection_name)

client.collections.create(
    collection_name,
    properties=[
        Property(name="text", data_type=DataType.TEXT),
        Property(name="image", data_type=DataType.BLOB),
        Property(name="filepath", data_type=DataType.TEXT, tokenization=Tokenization.FIELD),
    ],
    vectorizer_config=[
        Configure.NamedVectors.none(
            name="colpali",  # colpali_v1_3 vector
            vector_index_config=Configure.VectorIndex.hnsw(
                multi_vector=Configure.VectorIndex.MultiVector.multi_vector()
            )
        ),
    ],
    replication_config=Configure.replication(factor=3)
)

In [None]:
from pathlib import Path
import base64
import numpy as np

imgs_dir = Path("data/slides/imgs")
embs_dir = Path("data/slides/embeddings")

embeddings_file = embs_dir / "202504_roadshow_mm_talk_embeddings.npz"
embeddings = np.load(embeddings_file)

In [None]:
from tqdm import tqdm

collection = client.collections.get(collection_name)

with collection.batch.fixed_size(20) as batch:
    for i, embedding in tqdm(enumerate(embeddings["embeddings"])):
        filename = embeddings["filepaths"][i].split("/")[-1]
        img_path = imgs_dir/filename
        batch.add_object(
            properties={
                "filepath": str(img_path),
                "image": base64.b64encode(img_path.read_bytes()).decode("utf-8"),
            },
            vector={
                "colpali": embedding
            }
        )

if collection.batch.failed_objects:
    print(collection.batch.failed_objects[0].message)

print(len(collection))

In [None]:
from helpers import show_img_results, text_to_colpali

# Using a pre-defined function for generating ColPali query
query_embeddings = text_to_colpali(["how rag works"], model, processor)

r = collection.query.near_vector(
    near_vector=query_embeddings[0],
    limit=2
)

show_img_results(r)

### How does this work?

In [None]:
from IPython.display import Image, display

img_w = 800
display(Image('./assets/mv_explained1.png', width=img_w))

In [None]:
from IPython.display import Image, display

img_w = 800
display(Image('./assets/mv_explained2.png', width=img_w))

In [None]:
from IPython.display import Image, display

img_w = 800
display(Image('data/slides/imgs/202504_roadshow_mm_talk_55_of_60.png', width=img_w))

### One small (big) challenge

In [None]:
for e in embeddings["embeddings"][:3]:
    print("\nembedding shape", e.shape)
    print("embedding dimensions:", e.shape[0] * e.shape[1])
    for r in e[:2]:
        print(r[:2], "...", r[-2:])


> #### These vectors are very big! (each one with ~130k dimensions - vs typical ~1-2k dimensions)

In [None]:
query_embeddings = text_to_colpali([
    "multimodal embeddings",
], model, processor)

for e in query_embeddings:
    print("\nembedding shape", e.shape)
    print("embedding dimensions:", e.shape[0] * e.shape[1])
    for r in e[:2]:
        print(r[:2], "...", r[-2:])


In [None]:
query_embeddings = text_to_colpali([
    "a detailed explanation of how multi-modal models like ColPali works in vector retrieval",
], model, processor)

for e in query_embeddings:
    print("\nembedding shape", e.shape)
    print("embedding dimensions:", e.shape[0] * e.shape[1])
    for r in e[:2]:
        print(r[:2], "...", r[-2:])


> #### Even the text vectors are quite large

### Solution: Use MUVERA

In [None]:
muvera_img_paths = [
    "assets/muvera_algo1.png",
    "assets/muvera_algo3.png",
]

show_local_imgs(muvera_img_paths)

In [None]:
muvera_img_paths = [
    "assets/muvera_test_heap_profile.png",
    "assets/muvera_test_import_time.png",
]

show_local_imgs(muvera_img_paths)

In [None]:
muvera_img_paths = [
    "assets/muvera_test_query1.png",
    "assets/muvera_test_query2.png",
]

show_local_imgs(muvera_img_paths)

In [None]:
from weaviate.classes.config import Configure, Property, DataType, Tokenization

collection_name = "TempCollection"

client.collections.delete(collection_name)

client.collections.create(
    collection_name,
    properties=[
        Property(name="text", data_type=DataType.TEXT),
        Property(name="image", data_type=DataType.BLOB),
        Property(name="filepath", data_type=DataType.TEXT, tokenization=Tokenization.FIELD),
    ],
    vectorizer_config=[
        Configure.NamedVectors.none(
            name="colpali",  # colpali_v1_3 vector
            vector_index_config=Configure.VectorIndex.hnsw(
                multi_vector=Configure.VectorIndex.MultiVector.multi_vector(
                    # 💡 Enable MUVERA encoding ⬇️
                    encoding=Configure.VectorIndex.MultiVector.Encoding.muvera()
                )
            )
        ),
    ],
    replication_config=Configure.replication(factor=3)
)

## Vectorizer changes

In [None]:
from weaviate.classes.config import Configure, Property, DataType

collection_name = "DemoVecChanges"

client.collections.delete(collection_name)

client.collections.create(
    collection_name,
    properties=[
        Property(name="title", data_type=DataType.TEXT),
        Property(name="body", data_type=DataType.TEXT),
    ],
    vectorizer_config=[
        Configure.NamedVectors.text2vec_cohere(
            name="default",
            source_properties=["title", "body"],
        ),
        Configure.NamedVectors.text2vec_cohere(
            name="new_title",
            source_properties=["title"],
        )
    ],
    # vectorizer_config=Configure.Vectorizer.text2vec_cohere(),
)

In [None]:
c = client.collections.get(collection_name)

cc = c.config.get().vector_config.keys()

print(cc)

In [None]:
c.config.add_vector(
    vector_config=Configure.NamedVectors.text2vec_cohere(
        name="body_only",
        source_properties=["body"],
    )
)

In [None]:
c = client.collections.get(collection_name)

cc = c.config.get().vector_config.keys()

print(cc)

In [None]:
c = client.collections.get(collection_name)

c.data.insert_many(objects)

In [None]:
r = c.query.fetch_objects(limit=1, include_vector=True)

for k, v in r.objects[0].vector.items():
    print(k)
    print(v[:3])

### The order matters

In [None]:
from weaviate.classes.config import Configure, Property, DataType

collection_name = "TempCollection"

client.collections.delete(collection_name)

client.collections.create(
    collection_name,
    properties=[
        Property(name="title", data_type=DataType.TEXT),
        Property(name="body", data_type=DataType.TEXT),
    ],
    vectorizer_config=[
        Configure.NamedVectors.text2vec_cohere(
            name="default",
            source_properties=["title", "body"],
        ),
        Configure.NamedVectors.text2vec_cohere(
            name="new_title",
            source_properties=["title"],
        )
    ],
)

In [None]:
c = client.collections.get(collection_name)

c.data.insert_many(objects)

In [None]:
c.config.add_vector(
    vector_config=Configure.NamedVectors.text2vec_cohere(
        name="body_only",
        source_properties=["body"],
    )
)

In [None]:
r = c.query.fetch_objects(limit=1, include_vector=True)

for k, v in r.objects[0].vector.items():
    print(k)
    print(v[:3])

## Move shards

Pre-load some data:

In [None]:
from weaviate.classes.config import Configure, Property, DataType

collection_name = "TempCollection"

client.collections.delete(collection_name)

client.collections.create(
    collection_name,
    properties=[
        Property(name="title", data_type=DataType.TEXT),
        Property(name="body", data_type=DataType.TEXT),
    ],
    vectorizer_config=[
        Configure.NamedVectors.text2vec_cohere(
            name="default",
            source_properties=["title", "body"],
        ),
    ],
    replication_config=Configure.replication(factor=2),  # Note - this just a demo - do NOT use a RF of 2 in production - use 3 or above odd number
    sharding_config=Configure.sharding(
        desired_count=5  # To demonstrate sharding, we set it to an arbitrary high number (for our dataset size, anyway)
    )
)

c = client.collections.get(collection_name)

objects = [
    {"title": "Howl's Moving Castle", "body": "A fantasy novel by Diana Wynne Jones."},
    {"title": "The Hobbit", "body": "A fantasy novel by J.R.R. Tolkien."},
    {"title": "The Hitchhiker's Guide to the Galaxy", "body": "A science fiction novel by Douglas Adams."},
    {"title": "The Great Gatsby", "body": "A novel by F. Scott Fitzgerald."},
    {"title": "1984", "body": "A dystopian novel by George Orwell."},
    {"title": "To Kill a Mockingbird", "body": "A novel by Harper Lee."},
    {"title": "Pride and Prejudice", "body": "A novel by Jane Austen."},
    {"title": "The Catcher in the Rye", "body": "A novel by J.D. Salinger."},
    {"title": "The Lord of the Rings", "body": "A fantasy novel by J.R.R. Tolkien."},
    {"title": "Brave New World", "body": "A dystopian novel by Aldous Huxley."},
    {"title": "Fahrenheit 451", "body": "A dystopian novel by Ray Bradbury."},
    {"title": "The Picture of Dorian Gray", "body": "A novel by Oscar Wilde."},
]

c.data.insert_many(objects)

import time

start_time = time.time()
print("Waiting for object count to update...")

while True:
    n = client.cluster.nodes(collection=collection_name, output="verbose")[0]
    s = n.shards[0]
    if s.object_count != 0:
        print(f"On node {n.name} and shard {s.name} - obj count: {s.object_count}")
        break
    time.sleep(10)
    elapsed_time = time.time() - start_time
    print(f"Elapsed time: {elapsed_time:.1f}s")

finish_time = time.time()
print(f"Time taken for obj count to update: {finish_time - start_time} seconds")

In [None]:
len(c)

In [None]:
from IPython.display import Image, display

img_w = 600
display(Image('./assets/shards-1.png', width=img_w))

In [None]:
display(Image('./assets/shards-2.png', width=img_w))

In [None]:
display(Image('./assets/shards-3.png', width=img_w))

In [None]:
display(Image('./assets/shards-4.png', width=img_w))

In [None]:
nodes_response = client.cluster.nodes(collection=collection_name, output="verbose")

for n in nodes_response:
    print(f"\nNode {n.name} has {len(n.shards)} shards")
    for s in n.shards:
        print(f"Shard {s.name} has {s.object_count} objects from {collection_name}")

In [None]:
n = nodes_response[0]
candidate_shard = None
for src_shard in n.shards:
    node2_shards = [
        n2_shard.name for n2_shard in nodes_response[1].shards
    ]
    if src_shard.name not in node2_shards:
        candidate_shard = src_shard
        break

print(f"Candidate shard to move: {candidate_shard.name}")

SHARD_ID = candidate_shard.name

In [None]:
import requests
import json

# Shard move parameters
SOURCE_NODE = "node1"
DESTINATION_NODE = "node2"

# Create the request payload
payload = {
    "sourceNodeName": "node1",
    "destinationNodeName": DESTINATION_NODE,
    "collectionId": collection_name,
    "shardId": SHARD_ID,
    "transferType": "MOVE"  # Use "MOVE" to relocate the shard, or "COPY" to replicate it
}

# Set up the headers
headers = {
    "Content-Type": "application/json"
}

# Make the API request
response = requests.post(
    f"http://localhost:8080/v1/replication/replicate",
    headers=headers,
    data=json.dumps(payload)
)

operation_id = response.json().get("id")

# Check for the status of the operation
response = requests.get(
    f"http://localhost:8080/v1/replication/replicate/{operation_id}",
    headers=headers,
)

print(response.json())
print("Shard move operation status:")
print(f'Status: {response.json()["status"]["state"]}')

In [None]:
display(Image('./assets/shards-5.png', width=img_w))

Check the status to see if it's finished:

In [None]:
# Check for the status of the operation
response = requests.get(
    f"http://localhost:8080/v1/replication/replicate/{operation_id}",
    headers=headers,
)

print(response.json())
print("Shard move operation status:")
print(f'Status: {response.json()["status"]["state"]}')

In [None]:
for n in client.cluster.nodes(collection=collection_name, output="verbose"):
    print(f"\nNode {n.name} has {len(n.shards)} shards")
    for s in n.shards:
        print(f"Shard {s.name} has {s.object_count} objects from {collection_name}")