Skip to content

Commit

Permalink
Merge pull request #15 from sanders41/multi-process
Browse files Browse the repository at this point in the history
Process files in parallel
  • Loading branch information
prrao87 committed Apr 19, 2023
2 parents 6ec840f + 17b17e1 commit f7eacfc
Showing 1 changed file with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions dbs/meilisearch/scripts/bulk_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import asyncio
import glob
import json
from functools import lru_cache
from concurrent.futures import ProcessPoolExecutor
from functools import lru_cache, partial
import os
import sys
import zipfile
Expand Down Expand Up @@ -150,6 +151,11 @@ async def do_indexing(index: Index, data: list[JsonBlob], file_name: str) -> Non
print(f"Indexed {Path(file_name).name} to db")


def process_file(file_name: str) -> tuple[str, list[JsonBlob]]:
data = read_jsonl_from_file(file_name)
return file_name, validate(data, Wine, exclude_none=True)


async def main(files: list[str]) -> None:
settings = Settings()
URI = f"http://{settings.meili_url}:{settings.meili_port}"
Expand All @@ -161,13 +167,17 @@ async def main(files: list[str]) -> None:
_update_sortable_attributes(client, "wines"),
)
index = client.index("wines")
tasks = []
print("Processing files")
for file in files:
data = read_jsonl_from_file(file)
data = validate(data, Wine, exclude_none=True)
tasks.append(do_indexing(index, data, file))
print(f"Validated data from {Path(file).name} in pydantic")
with ProcessPoolExecutor() as process_pool:
loop = asyncio.get_running_loop()
calls = [partial(process_file, file_name) for file_name in files]
call_coroutines = []

for call in calls:
call_coroutines.append(loop.run_in_executor(process_pool, call))

data = await asyncio.gather(*call_coroutines)
tasks = [do_indexing(index, d[1], d[0]) for d in data]
try:
# Set id as primary key prior to indexing
await asyncio.gather(*tasks)
Expand Down

0 comments on commit f7eacfc

Please sign in to comment.