In [38]:
import json
from pathlib import Path

import pandas as pd
import zstandard as zstd  # You need the zstandard library for compression.

# Constants
OUTPUT_DIR = "./output-data/"
FINAL_DIR = OUTPUT_DIR + "final/"

# Vespa constants
VESPA_DOCTYPE = "product"
VESPA_NAMESPACE = "product"

# ES constants
ES_INDEX = "product"

# Ensure the output directory exists
Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True)


In [39]:
# Find all parquet files in the output directory
parquet_files = list(Path(OUTPUT_DIR).rglob("*.parquet"))
parquet_files


[PosixPath('output-data/CDs_and_Vinyl_processed.parquet'),
 PosixPath('output-data/Baby_Products_processed.parquet'),
 PosixPath('output-data/Movies_and_TV_processed.parquet'),
 PosixPath('output-data/Office_Products_processed.parquet'),
 PosixPath('output-data/Electronics_processed.parquet'),
 PosixPath('output-data/Toys_and_Games_processed.parquet'),
 PosixPath('output-data/Health_and_Household_processed.parquet'),
 PosixPath('output-data/All_Beauty_processed.parquet'),
 PosixPath('output-data/Pet_Supplies_processed.parquet'),
 PosixPath('output-data/Cell_Phones_and_Accessories_processed.parquet'),
 PosixPath('output-data/Subscription_Boxes_processed.parquet'),
 PosixPath('output-data/Grocery_and_Gourmet_Food_processed.parquet'),
 PosixPath('output-data/Video_Games_processed.parquet'),
 PosixPath('output-data/Arts_Crafts_and_Sewing_processed.parquet'),
 PosixPath('output-data/Amazon_Fashion_processed.parquet'),
 PosixPath('output-data/Sports_and_Outdoors_processed.parquet'),
 PosixPa

In [40]:
# Read and join all parquet files
df = pd.concat([pd.read_parquet(file) for file in parquet_files], ignore_index=True)


In [41]:
len(df)

382791

In [42]:
# Drop duplicates on title and description.
df = df.drop_duplicates(subset=["title", "description"])
len(df)


381739

In [43]:
# Reset id-column to integer index
df["id"] = df.index


In [44]:
# Use only 1M first rows
# df = df.iloc[:1_000_000]


In [45]:
len(df)

381739

In [49]:
# Check if any of the descriptions are empty strings (stripped for whitespace)
assert df["description"].str.strip().eq("").sum() == 0
# Check that no ids are duplicated
assert df["id"].duplicated().sum() == 0
# Check that no NaN values are present
assert df.isnull().sum().values.sum() == 0
# Check for no duplicates on title and description
assert df[["title", "description"]].duplicated().sum() == 0
# Assert length is 1M
# assert len(df) == 1_000_000


In [47]:
def save_df_to_vespa_format(
    df: pd.DataFrame,
    file_name: Path,
    compression: str = "zstd",
) -> None:
    """
    Transform the DataFrame to a Vespa-compatible format.
    Save transformed data to a newline-separated JSON file.
    """
    df["docid"] = (
        f"id:{VESPA_DOCTYPE}:{VESPA_NAMESPACE}::"
        + df["category"]
        + df["id"].astype(str)
    )
    df = df.apply(
        lambda row: {
            "put": row["docid"],
            "fields": {
                "id": row["id"],
                "title": row["title"],
                "category": row["category"],
                "description": row["description"],
                "price": row["price"],
                "average_rating": row["average_rating"],
                "embedding": row["embedding"].tolist(),
            },
        },
        axis=1,
    )
    if compression == "zstd":
        file_name = file_name.with_suffix(".json.zst")
    df.to_json(file_name, orient="records", lines=True, compression=compression)
    return df


def save_df_to_es_format(
    df: pd.DataFrame,
    file_name: str,
    compression: str = "zstd",
) -> None:
    """
    Transform the DataFrame to an Elasticsearch-compatible format.
    Save transformed data to a newline-separated JSON file.
    """
    if compression == "zstd":
        file_name = file_name.with_suffix(".json.zst")
    data = ""
    for index, row in df.iterrows():
        action = {"index": {"_index": ES_INDEX, "_id": str(row["id"])}}
        data += json.dumps(action, ensure_ascii=True) + "\n"
        doc_data = {
            "title": row["title"],
            "category": row["category"],
            "description": row["description"],
            "price": row["price"],
            "average_rating": row["average_rating"],
            "embedding": row["embedding"].tolist(),
        }
        data += json.dumps(doc_data, ensure_ascii=True) + "\n"
    cctx = zstd.ZstdCompressor()
    compressed_data = cctx.compress(data.encode("utf-8"))
    with open(file_name, "wb") as f:
        f.write(compressed_data)
    return df


In [15]:
# Write a sample to both vespa and es
num_samples = 100_000
vespa_save_path = Path(OUTPUT_DIR) / f"vespa_sample-{num_samples}.jsonl"
es_save_path = Path(OUTPUT_DIR) / f"es_sample-{num_samples}.jsonl"
df = df.sample(num_samples, random_state=42)
save_df_to_vespa_format(df, vespa_save_path)
# save_df_to_es_format(df, es_save_path)


ValueError: Cannot take a larger sample than population when 'replace=False'

In [None]:
def title_to_query(title: str) -> str:
    """
    Convert a title to a query.
    Split the title by whitespace, and use up to 4 words as the query.
    If the title has less than 4 words, use the whole title as the query.
    """
    return " ".join(title.split()[:4])


def generate_vespa_query_file(df, num_queries: int, seed=int):
    """
    Generate a query file for Vespa.
    Sample format:
    ```json
    /search/
    {"yql": "select * from sources * where userQuery()", "ranking.profile": "bm25", "query": "small money clip leather wallet"}
    /search/
    {"yql": "select * from sources * where userQuery()", "ranking.profile": "bm25", "query": "blue nike shoes"}
    ```
    """
    df_sample = df.sample(num_queries, random_state=seed)
    df_sample["query"] = df_sample["title"].apply(title_to_query)
    query_file = Path(OUTPUT_DIR) / f"vespa_query_sample-{num_queries}.jsonl"
    search_string = "/search/\n"
    with open(query_file, "w") as f:
        for index, row in df_sample.iterrows():
            f.write(search_string)
            query = {
                "yql": "select * from sources * where userQuery();",
                "ranking.profile": "default",
                "query": row["query"],
            }
            f.write(json.dumps(query) + "\n")
    return query_file


### weakAnd + bm25 ranking:

vespa query "yql=select \* from product where userQuery()" ranking.profile=bm25 "query=Small MONEY CLIP Leather Wallet"

### nearestNeighbor + closeness ranking:

vespa query "yql=select \* from product where ({targetHits:100}nearestNeighbor(embedding,q_embedding))" ranking.profile=closeness "input.query(q_embedding)=[]"

Note: Use embedding from id:product:product::4, title:"Small MONEY CLIP Leather Wallet ID Bag Cash Holder Credit Card Cover Case Pouch"

### Hybrid:

vespa query "yql=select \* from product where ({targetHits:10}nearestNeighbor(embedding,q_embedding)) or userQuery()" ranking.profile=hybrid "query=Small MONEY CLIP Leather Wallet" "input.query(q_embedding)=[]"
