Skip to content

Commit

Permalink
Merge pull request #42 from prrao87/bugfix-meilisearch
Browse files Browse the repository at this point in the history
Fixed issue with batching and file handling logic
  • Loading branch information
prrao87 authored Jul 30, 2023
2 parents 03879dc + 5aaf191 commit d36089f
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 46 deletions.
88 changes: 60 additions & 28 deletions dbs/meilisearch/scripts/bulk_index_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@
import sys
from functools import lru_cache
from pathlib import Path
from typing import Any
from typing import Any, Iterator

import srsly
from codetiming import Timer
from dotenv import load_dotenv
from meilisearch_python_async import Client
from meilisearch_python_async.index import Index
from meilisearch_python_async.models.settings import MeilisearchSettings
from schemas.wine import Wine
from tqdm import tqdm
from tqdm.asyncio import tqdm_asyncio

sys.path.insert(1, os.path.realpath(Path(__file__).resolve().parents[1]))
from api.config import Settings
from schemas.wine import Wine

load_dotenv()
# Custom types
Expand All @@ -38,15 +39,24 @@ def get_settings():
return Settings()


def get_json_data(data_dir: Path, filename: str) -> list[JsonBlob]:
def chunk_files(item_list: list[Any], file_chunksize: int) -> Iterator[tuple[JsonBlob, ...]]:
"""
Break a large list of files into a list of lists of files, where each inner list is of size `file_chunksize`
"""
for i in range(0, len(item_list), file_chunksize):
yield tuple(item_list[i : i + file_chunksize])


def get_json_data(file_path: Path) -> list[JsonBlob]:
"""Get all line-delimited json files (.jsonl) from a directory with a given prefix"""
file_path = data_dir / filename
if not file_path.is_file():
# File may not have been uncompressed yet so try to do that first
data = srsly.read_gzip_jsonl(file_path)
# This time if it isn't there it really doesn't exist
if not file_path.is_file():
raise FileNotFoundError(f"No valid .jsonl file found in `{data_dir}`")
raise FileNotFoundError(
f"`{file_path}` doesn't contain a valid `.jsonl.gz` file - check and try again."
)
else:
data = srsly.read_gzip_jsonl(file_path)
return data
Expand All @@ -70,7 +80,19 @@ def get_meili_settings(filename: str) -> MeilisearchSettings:
# --- Async functions ---


async def main() -> None:
async def update_documents(filepath: Path, index: Index, primary_key: str, batch_size: int):
data = list(get_json_data(filepath))
if LIMIT > 0:
data = data[:LIMIT]
validated_data = validate(data)
await index.update_documents_in_batches(
validated_data,
batch_size=batch_size,
primary_key=primary_key,
)


async def main(data_files: list[Path]) -> None:
meili_settings = get_meili_settings(filename="settings/settings.json")
config = Settings()
URI = f"http://{config.meili_url}:{config.meili_port}"
Expand All @@ -84,43 +106,53 @@ async def main() -> None:
# Update settings
await client.index(index_name).update_settings(meili_settings)
print("Finished updating database index settings")
# Process data
validated_data = validate(data)
try:
tasks = [
# Update index
index.update_documents_in_batches(
validated_data, batch_size=CHUNKSIZE, primary_key=primary_key
)
for _ in range(BENCHMARK_NUM)
]
await tqdm_asyncio.gather(*tasks)
print(f"Finished running benchmarks")
except Exception as e:
print(f"{e}: Error while indexing to db")
file_chunks = chunk_files(data_files, file_chunksize=FILE_CHUNKSIZE)
for chunk in tqdm(
file_chunks, desc="Handling file chunks", total=len(data_files) // FILE_CHUNKSIZE
):
try:
tasks = [
# Update index
update_documents(
filepath,
index,
primary_key=primary_key,
batch_size=BATCHSIZE,
)
# In a real case we'd be iterating through a list of files
# For this example, it's just looping through the same file N times
for filepath in chunk
]
await tqdm_asyncio.gather(*tasks)
except Exception as e:
print(f"{e}: Error while indexing to db")
print(f"Finished running benchmarks")


if __name__ == "__main__":
# fmt: off
parser = argparse.ArgumentParser("Bulk index database from the wine reviews JSONL data")
parser.add_argument("--limit", type=int, default=0, help="Limit the size of the dataset to load for testing purposes")
parser.add_argument("--chunksize", type=int, default=5_000, help="Size of each chunk to break the dataset into before processing")
parser.add_argument("--batchsize", "-b", type=int, default=10_000, help="Size of each batch to break the dataset into before ingesting")
parser.add_argument("--file_chunksize", "-c", type=int, default=5, help="Size of file chunk that will be concurrently processed and passed to the client in batches")
parser.add_argument("--filename", type=str, default="winemag-data-130k-v2.jsonl.gz", help="Name of the JSONL zip file to use")
parser.add_argument("--benchmark", "-b", type=int, default=1, help="Run a benchmark of the script N times")
parser.add_argument("--benchmark_num", "-n", type=int, default=1, help="Run a benchmark of the script N times")
args = vars(parser.parse_args())
# fmt: on

LIMIT = args["limit"]
DATA_DIR = Path(__file__).parents[3] / "data"
FILENAME = args["filename"]
CHUNKSIZE = args["chunksize"]
BENCHMARK_NUM = args["benchmark"]
BATCHSIZE = args["batchsize"]
BENCHMARK_NUM = args["benchmark_num"]
FILE_CHUNKSIZE = args["file_chunksize"]

data = list(get_json_data(DATA_DIR, FILENAME))
if LIMIT > 0:
data = data[:LIMIT]
# Get a list of all files in the data directory
data_files = [f for f in DATA_DIR.glob("*.jsonl.gz") if f.is_file()]
# For benchmarking, we want to run on the same data multiple times (in the real world this would be many different files)
benchmark_data_files = data_files * BENCHMARK_NUM

meili_settings = get_meili_settings(filename="settings/settings.json")

# Run main async event loop
asyncio.run(main())
asyncio.run(main(benchmark_data_files))
48 changes: 30 additions & 18 deletions dbs/meilisearch/scripts/bulk_index_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ def get_settings():
return Settings()


def get_json_data(data_dir: Path, filename: str) -> list[JsonBlob]:
def get_json_data(file_path: Path) -> list[JsonBlob]:
"""Get all line-delimited json files (.jsonl) from a directory with a given prefix"""
file_path = data_dir / filename
if not file_path.is_file():
# File may not have been uncompressed yet so try to do that first
data = srsly.read_gzip_jsonl(file_path)
# This time if it isn't there it really doesn't exist
if not file_path.is_file():
raise FileNotFoundError(f"No valid .jsonl file found in `{data_dir}`")
raise FileNotFoundError(
f"`{file_path}` doesn't contain a valid `.jsonl.gz` file - check and try again."
)
else:
data = srsly.read_gzip_jsonl(file_path)
return data
Expand All @@ -63,7 +64,19 @@ def get_meili_settings(filename: str) -> dict[str, Any]:
return settings


def main() -> None:
def update_documents(filepath: Path, index: Index, primary_key: str, batch_size: int):
data = list(get_json_data(filepath))
if LIMIT > 0:
data = data[:LIMIT]
validated_data = validate(data)
index.update_documents_in_batches(
validated_data,
batch_size=batch_size,
primary_key=primary_key,
)


def main(data_files: list[Path]) -> None:
meili_settings = get_meili_settings(filename="settings/settings.json")
config = Settings()
URI = f"http://{config.meili_url}:{config.meili_port}"
Expand All @@ -78,14 +91,12 @@ def main() -> None:
# Update settings
client.index(index_name).update_settings(meili_settings)
print("Finished updating database index settings")
# Process data
validated_data = validate(data)
try:
for i in tqdm(range(BENCHMARK_NUM)):
# In a real case we'd be iterating through a list of files
# For this example, it's just looping through the same file N times
for filepath in tqdm(data_files):
# Update index
index.update_documents_in_batches(
validated_data, batch_size=CHUNKSIZE, primary_key=primary_key
)
update_documents(filepath, index, primary_key=primary_key, batch_size=BATCHSIZE)
except Exception as e:
print(f"{e}: Error while indexing to db")

Expand All @@ -94,23 +105,24 @@ def main() -> None:
# fmt: off
parser = argparse.ArgumentParser("Bulk index database from the wine reviews JSONL data")
parser.add_argument("--limit", type=int, default=0, help="Limit the size of the dataset to load for testing purposes")
parser.add_argument("--chunksize", type=int, default=10_000, help="Size of each chunk to break the dataset into before processing")
parser.add_argument("--batchsize", "-b", type=int, default=10_000, help="Size of each chunk to break the dataset into before processing")
parser.add_argument("--filename", type=str, default="winemag-data-130k-v2.jsonl.gz", help="Name of the JSONL zip file to use")
parser.add_argument("--benchmark", "-b", type=int, default=1, help="Run a benchmark of the script N times")
parser.add_argument("--benchmark_num", "-n", type=int, default=1, help="Run a benchmark of the script N times")
args = vars(parser.parse_args())
# fmt: on

LIMIT = args["limit"]
DATA_DIR = Path(__file__).parents[3] / "data"
FILENAME = args["filename"]
CHUNKSIZE = args["chunksize"]
BENCHMARK_NUM = args["benchmark"]
BATCHSIZE = args["batchsize"]
BENCHMARK_NUM = args["benchmark_num"]

data = list(get_json_data(DATA_DIR, FILENAME))
if LIMIT > 0:
data = data[:LIMIT]
# Get a list of all files in the data directory
data_files = [f for f in DATA_DIR.glob("*.jsonl.gz") if f.is_file()]
# For benchmarking, we want to run on the same data multiple times (in the real world this would be many different files)
benchmark_data_files = data_files * BENCHMARK_NUM

meili_settings = get_meili_settings(filename="settings/settings.json")

# Run main function
main()
main(benchmark_data_files)

0 comments on commit d36089f

Please sign in to comment.