# Ingest Data with Retry

This recipe demonstrates how to ingest data into Weaviate with retry logic.

## Weaviate Setup

The sample code is for a local Weaviate deployment with Docker and Ollama running on localhost. However, the retry logic is agnostic to the deployment method and the underlying vector embedding service and model.

### Steps to deploy Weaviate locally with CLIP

We will use `docker-compose.yaml` and verify that ollama is running. Note that we set `ASYNC_INDEXING: 'true'` in the weaviate environment variables to speed up the ingestion process.

Run:

docker compose -f docker-compose.yaml up -d
ollama serve
ollama pull mxbai-embed-large:latest
ollama pull llama3.2:latest
curl http://localhost:11434

### Dependencies

In [None]:
!pip install -r requirements.txt

## Configuration

In [34]:
import weaviate
import weaviate.classes.config as wc
import weaviate.classes.query as wq
from weaviate.classes.init import AdditionalConfig, Timeout
from weaviate.util import generate_uuid5
import os
import json
import ijson
import sys
import argparse

WEAVIATE_URL = "http://localhost:8080"

OLLAMA_EMBEDDING_MODEL_ID = "mxbai-embed-large:latest"
OLLAMA_GENERATIVE_MODEL_ID = "llama3.2:latest"
OLLAMA_URL = "http://host.docker.internal:11434"

PRODUCT_COLLECTION_NAME = "product"
client = weaviate.connect_to_local(
    headers={},
    additional_config=AdditionalConfig(
        timeout=Timeout(init=30, query=60, insert=120)
    )
)
assert client.is_live()

### Create `Product` collection

The collection has the following key characteristics:
1. Name: `"Product"`
2. Vectorizer: `text2vec-ollama`

In [None]:
from weaviate.classes.config import Configure, Multi2VecField, Property, DataType

# Delete the collection if it exists.
if client.collections.exists(PRODUCT_COLLECTION_NAME):
    client.collections.delete(PRODUCT_COLLECTION_NAME)
    
client.collections.create(
    name=PRODUCT_COLLECTION_NAME,
    properties=[
        wc.Property(name="category", data_type=wc.DataType.TEXT_ARRAY, index_filterable=True, index_searchable=True),
        wc.Property(name="tech1", data_type=wc.DataType.TEXT, skip_vectorization=True, index_filterable=False, index_searchable=False),
        wc.Property(name="tech2", data_type=wc.DataType.TEXT, skip_vectorization=True, index_filterable=False, index_searchable=False),
        wc.Property(name="description", data_type=wc.DataType.TEXT_ARRAY, index_filterable=True, index_searchable=True),
        wc.Property(name="fit", data_type=wc.DataType.TEXT, skip_vectorization=True, index_filterable=False, index_searchable=False),
        wc.Property(name="title", data_type=wc.DataType.TEXT, index_filterable=True, index_searchable=True),
        wc.Property(name="also_buy", data_type=wc.DataType.TEXT_ARRAY, skip_vectorization=True, index_filterable=False, index_searchable=False),
        wc.Property(name="image", data_type=wc.DataType.TEXT_ARRAY, skip_vectorization=True, index_filterable=False, index_searchable=False),
        wc.Property(name="brand", data_type=wc.DataType.TEXT, index_filterable=True, index_searchable=True),
        wc.Property(name="feature", data_type=wc.DataType.TEXT_ARRAY, skip_vectorization=True, index_filterable=False, index_searchable=False),
        wc.Property(name="rank", data_type=wc.DataType.TEXT_ARRAY, skip_vectorization=True, index_filterable=False, index_searchable=False),
        wc.Property(name="also_view", data_type=wc.DataType.TEXT_ARRAY, skip_vectorization=True, index_filterable=False, index_searchable=False),
        wc.Property(name="main_cat", data_type=wc.DataType.TEXT, index_filterable=True, index_searchable=True),
        wc.Property(name="date", data_type=wc.DataType.TEXT, skip_vectorization=True, index_filterable=True, index_searchable=True),
        wc.Property(name="price", data_type=wc.DataType.TEXT, skip_vectorization=True, index_filterable=True, index_searchable=True),
        wc.Property(name="asin", data_type=wc.DataType.TEXT, index_filterable=True, index_searchable=True),
    ],
    vectorizer_config=wc.Configure.Vectorizer.text2vec_ollama(
        api_endpoint=OLLAMA_URL,
        model=OLLAMA_EMBEDDING_MODEL_ID,
    ),
    generative_config=wc.Configure.Generative.ollama(
        api_endpoint=OLLAMA_URL,
        model=OLLAMA_GENERATIVE_MODEL_ID
    )
)
products = client.collections.get(PRODUCT_COLLECTION_NAME)
print("Successfully created Product collection.")

### Import Product Logic
Some of the data in the Amazon product dataset is stored inconsisntely, so we need to normalize the data before importing it into Weaviate.

Note that the `import_products` will first import the data, and then enter an infinite loop to retry the objects that failed to import.

In [36]:
# Some fields are stored in the data as single items and sometimes as lists
# Make sure these fields are always lists to match what the Weaviate
# collection expectes
def normalize_field(obj, field):
    if not isinstance(obj[field], list):
        obj[field] = [obj[field]]

def process_product(obj):
    product_obj = {
        "category": obj["category"],
        "tech1": obj["tech1"],
        "tech2": obj["tech2"],
        "description": obj["description"],
        "fit": obj["fit"],
        "title": obj["title"],
        "also_buy": obj["also_buy"],
        "image": obj["image"],
        "brand": obj["brand"],
        "feature": obj["feature"],
        "rank": obj["rank"],
        "also_view": obj["also_view"],
        "main_cat": obj["main_cat"],
        "date": obj["date"],
        "price": obj["price"],
        "asin": obj["asin"],
    }

    for field in ["category", "description", "also_buy", "image", "feature", "also_view"]:
        normalize_field(product_obj, field)

    # Sometimes rank is a string and sometimes it is an array
    if isinstance(obj["rank"], str):
        product_obj["rank"] = [obj["rank"]]
    elif isinstance(obj["rank"], list):
        product_obj["rank"] = obj["rank"]
    else:
        product_obj["rank"] = []

    return product_obj

def import_products(local_json_path):
    counter = 0
    INTERVAL = 100

    with products.batch.dynamic() as batch:
        print(f"Opening {local_json_path}")
        with open(local_json_path, "rb") as f:
            objects = ijson.items(f, '', multiple_values=True)
            for obj in objects:
                product_obj = process_product(obj)
                #print(json.dumps(product_obj, indent=2))
                batch.add_object(
                    properties=product_obj,
                    uuid=generate_uuid5(obj["asin"])
                )

                counter += 1
                if counter % INTERVAL == 0:
                    print(f"{local_json_path}: Imported {counter} products...")
        print(f"{local_json_path}: Flushing batch")
        batch.flush()
        print(f"{local_json_path}: Batch flushed")

    # The failed_objects are not available until after flush is called
    old_failed_obj_count = len(products.batch.failed_objects)
    new_failed_obj_count = 0
    while True:
        if len(products.batch.failed_objects) == 0:
            print(f"{local_json_path}: All products imported successfully")
            break

        print(f"{local_json_path}: Retrying {len(products.batch.failed_objects)} failed objects...")
        retry_counter = 0

        current_failed_object_count = len(products.batch.failed_objects)
        failed_objects = products.batch.failed_objects
        with products.batch.dynamic() as batch:
            print(f"{local_json_path}: Inside retry loop are {len(failed_objects)} failed objects...")

            for failed in failed_objects:
                try:
                    print(f"{local_json_path}: Failed with error \"{failed.message}\": {failed.object_.uuid}")
                    #print(f"{local_json_path}: "
                    #    + json.dumps(failed.object_.properties, indent=2))
                    if new_failed_obj_count == old_failed_obj_count:
                        print(f"{local_json_path}: Debugging stuck object: "
                                + json.dumps(failed.object_.properties, indent=2))
                    batch.add_object(
                        properties=failed.object_.properties,
                        uuid=failed.object_.uuid
                    )
                except Exception as e:
                    print(f"{local_json_path}: Exception while retrying: {e}")
                    print(f"{local_json_path}: Failed Object: {failed}")
                    break

                retry_counter += 1
                if retry_counter % INTERVAL == 0:
                    print(f"{local_json_path}: Retried {retry_counter} products...")
            batch.flush()
        old_failed_obj_count = current_failed_object_count
        new_failed_obj_count = len(products.batch.failed_objects)

### Import Products

The below code assumes you have downloaded the Amazon product dataset and split the JSON data into multiple files named `amazon_products_00.json`, `amazon_products_01.json`, etc.

See the README for more details.

In [None]:
import_products('Amazon_Meta_CDs_Vinyl_00.json')

## Verify the number of objects in the Product collection

There should be 10,000 objects in the Product collection after ingestion.

In [None]:
# Display the number of objects in the Product collection
print(products.aggregate.over_all(total_count=True).total_count)

## Run a vector search query

In [None]:
def print_product(response_object):
    print("Product Title: " + response_object.properties["title"])
    print("  Artist: " + response_object.properties["brand"])
    print("  ASIN: " + response_object.properties["asin"])
    print("  Categories: ")
    for c in response_object.properties["category"]:
        print("    " + c)
    print("  Price: " + response_object.properties["price"])
    print("  Description: ")
    for d in response_object.properties["description"]:
        print("    " + d)

response = products.query.near_text(
    query="background music for falling asleep",
    limit=5,
    return_metadata=wq.MetadataQuery(distance=True),
)

seen_asin = []
for o in response.objects:
    if o.properties["asin"] in seen_asin:
        continue
    seen_asin.append(o.properties["asin"])
    print_product(o)