# Collect Search Times

Collect search times for datasets.

How to run

```bash
papermill --log-output collect_search_times.ipynb collect_search_times.run1.ipynb

# Dry run
papermill --log-output collect_search_times.ipynb collect_search_times.run1.ipynb -p DRY_RUN 1

# To override parameters with yaml:
papermill --log-output collect_search_times.ipynb collect_search_times.run1.ipynb -y "
SEARCH_MODE: throughput
DATASETS:
    - "sift-128-euclidean"
"
```

In [None]:
import pathlib
import subprocess
import time
import shutil

import common

In [None]:
# Notebook parameters
DRY_RUN = False
DATASETS = [common.SIFT_128_EUCLIDEAN]
ALGORITHMS = [
    common.FAISS_CPU_FLAT,
    common.FAISS_CPU_IVF_FLAT,
    common.FAISS_CPU_IVF_PQ,
    common.FAISS_GPU_FLAT,
    common.FAISS_GPU_IVF_FLAT,
    common.FAISS_GPU_IVF_PQ,
    common.HNSWLIB,
    common.RAFT_BRUTE_FORCE,
    common.RAFT_CAGRA,
    common.RAFT_IVF_FLAT,
    common.RAFT_IVF_PQ,
    common.RAFT_CAGRA_HNSWLIB,
]
COUNTS = [10, 100]
SEARCH_MODE = "latency"  # one of ("latency", "throughput").
BATCH_SIZES = {
    "latency": [1],
    "throughput": [1, 10, 100, 10000],
}
SEARCH_THREADS = {
    common.FAISS_GPU_FLAT: "1",
    common.FAISS_GPU_IVF_FLAT: "1:4",
    common.FAISS_GPU_IVF_PQ: "1:4",
    common.FAISS_CPU_FLAT: "1",
    common.FAISS_CPU_IVF_FLAT: "1:4",
    common.FAISS_CPU_IVF_PQ: "1:4",
    common.RAFT_BRUTE_FORCE: "1",
    common.RAFT_CAGRA: "1:16",
    common.RAFT_IVF_FLAT: "1:4",
    common.RAFT_IVF_PQ: "1:4",
}
SEARCH_THREADS_BY_BATCH_SIZE = {
    10000: "1",
}
DATASET_PATH = common.DATASET_PATH
ALGO_CONFIG_DIR = common.ALGO_CONFIG_DIR
GPU_DOCKER_CMD_TEMPLATE = common.GPU_DOCKER_CMD_TEMPLATE
CPU_DOCKER_CMD_TEMPLATE = common.CPU_DOCKER_CMD_TEMPLATE
DOWNLOAD_CMD_TEMPLATE = common.DOWNLOAD_CMD_TEMPLATE
BUILD_CMD_TEMPLATE = common.BUILD_CMD_TEMPLATE
SEARCH_CMD_TEMPLATE = common.SEARCH_CMD_TEMPLATE
EXPORT_DATA_CMD_TEMPLATE = common.EXPORT_DATA_CMD_TEMPLATE
PLOT_CMD_TEMPLATE = common.PLOT_CMD_TEMPLATE

In [None]:
DATASET_PATH = pathlib.Path(DATASET_PATH).resolve()
DATASET_PATH.mkdir(parents=True, exist_ok=True)

In [None]:
def get_docker_image(algo: str = ""):
    """FAISS requires CUDA 11."""
    if algo.startswith("faiss"):
        return common.CUDA11_DOCKER_IMAGE
    return common.CUDA12_DOCKER_IMAGE

In [None]:
def iter_params():
    """An iterator over all parameter combinations."""
    for ds in DATASETS:
        for algo in ALGORITHMS:
            for bs in BATCH_SIZES[SEARCH_MODE]:
                for cnt in COUNTS:
                    yield ds, algo, bs, cnt


search_time_collector = {
    (ds, algo, bs, cnt): dict(tick=None, tock=None)
    for ds, algo, bs, cnt in iter_params()
}
error_collector = {ds: list() for ds in DATASETS}

## Search

* Searching takes a while, so make sure that we don't do the same search.

In [None]:
def get_search_result_path(ds: str, algo: str, bs: int, cnt: str):
    return (
        DATASET_PATH
        / "datasets"
        / ds
        / "result"
        / "search"
        / f"{algo},base,k{cnt},bs{bs}.json"
    )


def get_threads(algo: str | None = None, bs: int | None = None) -> str | None:
    for candidate in [SEARCH_THREADS_BY_BATCH_SIZE.get(bs), SEARCH_THREADS.get(algo)]:
        if candidate:
            return candidate
    return None


def search_all():
    for ds, algo, bs, cnt in iter_params():
        res_file_path = get_search_result_path(ds, algo, bs, cnt)
        if res_file_path.exists():
            print(f"Skipping {res_file_path} as it already exists.")
            continue

        extra_args = []
        # FIXME: the docker image is missing configurations for FAISS_CPU_IVF_FLAT and FAISS_CPU_IVF_PQ
        need_algo_config = algo in (common.FAISS_CPU_IVF_FLAT, common.FAISS_CPU_IVF_PQ)
        if need_algo_config:
            extra_args.append(f"--configuration {ALGO_CONFIG_DIR}")

        # Find the first suitable the number of threads to use
        threads = get_threads(algo=algo, bs=bs)
        if threads:
            extra_args.append(f"--search-threads {threads}")

        search_cmd = SEARCH_CMD_TEMPLATE.safe_substitute(
            SEARCH_MODE=SEARCH_MODE,
            DATASET=ds,
            ALGORITHMS=algo,
            BATCH_SIZE=bs,
            COUNT=cnt,
            EXTRA_ARGS=" ".join(extra_args),
        )
        cmd = GPU_DOCKER_CMD_TEMPLATE.safe_substitute(
            DATASET_PATH=DATASET_PATH,
            DOCKER_IMAGE=get_docker_image(algo),
            CONTAINER_CMD=search_cmd,
        )
        print(cmd)
        if DRY_RUN:
            print(f"Would search {ds} with {algo}")
            continue
        try:
            search_time_collector[(ds, algo, bs, cnt)]["tick"] = time.time()
            subprocess.run(
                cmd, shell=True, executable="/bin/bash", check=True, text=True
            )
            search_time_collector[(ds, algo, bs, cnt)]["tock"] = time.time()
        except Exception as e:
            print(f"Error searching {ds} with {algo}: {e}")
            error_collector[ds].append(f"search {algo} {SEARCH_MODE} bs{bs} k{cnt}")
            continue


search_all()

## Export data

In [None]:
def export_all():
    for ds in DATASETS:
        export_cmd = EXPORT_DATA_CMD_TEMPLATE.safe_substitute(DATASET=ds)
        cmd = GPU_DOCKER_CMD_TEMPLATE.safe_substitute(
            DATASET_PATH=DATASET_PATH,
            DOCKER_IMAGE=get_docker_image(),
            CONTAINER_CMD=export_cmd,
        )
        print(cmd)
        if DRY_RUN:
            print(f"Would export {ds} from {DATASET_PATH}")
            continue
        try:
            subprocess.run(
                cmd, shell=True, executable="/bin/bash", check=True, text=True
            )
        except Exception as e:
            print(f"Error exporting {ds}: {e}")
            error_collector[ds].append("export")
            continue


export_all()

## Archive results

We need to archive the search results as they can be overwritten when we switch search mode.

In [None]:
def copy_search_results():
    for ds, _, _, _ in iter_params():
        src_dir = DATASET_PATH / "datasets" / ds / "result" / "search"
        dest_dir = DATASET_PATH / "datasets" / ds / "result" / f"search_{SEARCH_MODE}"
        dest_dir.mkdir(parents=True, exist_ok=True)
        for src in src_dir.glob("*.json"):
            dest = dest_dir / src.name
            if DRY_RUN:
                print(f"Would copy {src} to {dest}")
                continue
            shutil.copy(src, dest)
        for src in src_dir.glob("*.csv"):
            dest = dest_dir / src.name
            if DRY_RUN:
                print(f"Would copy {src} to {dest}")
                continue
            shutil.copy(src, dest)


copy_search_results()

## Plot

In [None]:
def plot_all():
    for ds in DATASETS:
        export_cmd = PLOT_CMD_TEMPLATE.safe_substitute(
            SEARCH_MODE="latency",
            DATASET=ds,
            BATCH_SIZE=BATCH_SIZES[SEARCH_MODE][0],
            COUNT=COUNTS[0],
            EXTRA_ARGS="",
        )
        cmd = GPU_DOCKER_CMD_TEMPLATE.safe_substitute(
            DATASET_PATH=DATASET_PATH,
            DOCKER_IMAGE=get_docker_image(),
            CONTAINER_CMD=export_cmd,
        )
        print(cmd)
        if DRY_RUN:
            print(f"Would plot {ds} from {DATASET_PATH}")
            continue
        try:
            subprocess.run(
                cmd, shell=True, executable="/bin/bash", check=True, text=True
            )
        except Exception as e:
            print(f"Error plotting {ds}: {e}")
            error_collector[ds].append("plot")
            continue


# FIXME: plot does not always work.
# plot_all()

In [None]:
print(error_collector)

In [None]:
for k, v in search_time_collector.items():
    search_time = (
        round(v["tock"] - v["tick"], 2) if v["tick"] and v["tock"] else "failed"
    )
    print(f"Search time for {k}: {search_time}")