From 778e80f755e1772f2b1984b5dd2e6fd394f4b941 Mon Sep 17 00:00:00 2001 From: prrao87 Date: Mon, 23 Oct 2023 16:42:33 -0400 Subject: [PATCH 1/2] Update elastic version --- dbs/elasticsearch/.env.example | 2 +- dbs/elasticsearch/requirements.txt | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dbs/elasticsearch/.env.example b/dbs/elasticsearch/.env.example index 5a11bce..48230c6 100644 --- a/dbs/elasticsearch/.env.example +++ b/dbs/elasticsearch/.env.example @@ -1,7 +1,7 @@ ELASTIC_USER = "elastic" ELASTIC_PASSWORD = "" -STACK_VERSION = "8.7.0" +STACK_VERSION = "8.10.2" ELASTIC_INDEX_ALIAS = "wines" ELASTIC_PORT = 9200 KIBANA_PORT = 5601 diff --git a/dbs/elasticsearch/requirements.txt b/dbs/elasticsearch/requirements.txt index d9c251b..18ec6f3 100644 --- a/dbs/elasticsearch/requirements.txt +++ b/dbs/elasticsearch/requirements.txt @@ -1,5 +1,5 @@ -elasticsearch~=8.7.0 -pydantic~=2.0.0 +elasticsearch~=8.10.0 +pydantic~=2.4.0 pydantic-settings~=2.0.0 python-dotenv>=1.0.0 fastapi~=0.100.0 From 0e0884a6b353d2b64e6e63cb11f0070d8f1d8192 Mon Sep 17 00:00:00 2001 From: prrao87 Date: Mon, 23 Oct 2023 16:42:42 -0400 Subject: [PATCH 2/2] Update indexing script --- dbs/elasticsearch/scripts/bulk_index.py | 43 ++++++++++--------------- 1 file changed, 17 insertions(+), 26 deletions(-) diff --git a/dbs/elasticsearch/scripts/bulk_index.py b/dbs/elasticsearch/scripts/bulk_index.py index 7e8d31c..a43c4db 100644 --- a/dbs/elasticsearch/scripts/bulk_index.py +++ b/dbs/elasticsearch/scripts/bulk_index.py @@ -3,7 +3,6 @@ import os import sys import warnings -from concurrent.futures import ProcessPoolExecutor from functools import lru_cache, partial from pathlib import Path from typing import Any, Iterator @@ -60,7 +59,7 @@ def validate( data: tuple[JsonBlob], exclude_none: bool = False, ) -> list[JsonBlob]: - validated_data = [Wine(**item).dict(exclude_none=exclude_none) for item in data] + validated_data = [Wine(**item).model_dump(exclude_none=exclude_none) for item in data] return validated_data @@ -95,7 +94,8 @@ async def create_index(client: AsyncElasticsearch, index: str, mappings_path: Pa elastic_config = dict(srsly.read_json(mappings_path)) assert elastic_config is not None - if not client.indices.exists_alias(name=index): + exists_alias = await client.indices.exists_alias(name=index) + if not exists_alias: print(f"Did not find index {index} in db, creating index...\n") with warnings.catch_warnings(): warnings.simplefilter("ignore") @@ -129,31 +129,22 @@ async def update_documents_to_index( print(f"Processed ids in range {min(ids)}-{max(ids)}") -async def main(data: list[JsonBlob], index: str) -> None: +async def main(data: list[JsonBlob]) -> None: settings = get_settings() with warnings.catch_warnings(): - warnings.simplefilter("ignore") elastic_client = await get_elastic_client(settings) - await create_index(elastic_client, index, Path("mapping/mapping.json")) - - # Process multiple chunks of data in a process pool to avoid blocking the event loop - print("Processing chunks") - chunked_data = chunk_iterable(data, CHUNKSIZE) - - with ProcessPoolExecutor() as pool: - loop = asyncio.get_running_loop() - executor_tasks = [partial(process_chunks, chunk) for chunk in chunked_data] - awaitables = [loop.run_in_executor(pool, call) for call in executor_tasks] - # Attach process pool to running event loop so that we can process multiple chunks in parallel - validated_data = await asyncio.gather(*awaitables) - tasks = [ - update_documents_to_index(elastic_client, index, data) for data in validated_data - ] - try: - await asyncio.gather(*tasks) - print("Finished execution!") - except Exception as e: - print(f"{e}: Error while indexing to db") + assert await elastic_client.ping() + await create_index(elastic_client, INDEX_ALIAS, Path("mapping/mapping.json")) + # Validate data and chunk it for ingesting in batches + validated_data = validate(data, exclude_none=False) + chunked_data = chunk_iterable(validated_data, chunksize=CHUNKSIZE) + for chunk in chunked_data: + try: + ids = [item["id"] for item in chunk] + print(f"Finished indexing ID range {min(ids)}-{max(ids)}") + await helpers.async_bulk(elastic_client, chunk, index=INDEX_ALIAS) + except Exception as e: + print(f"{e}: Error while indexing ID range {min(ids)}-{max(ids)}") # Close AsyncElasticsearch client await elastic_client.close() @@ -182,4 +173,4 @@ async def main(data: list[JsonBlob], index: str) -> None: # Run main async event loop if data: - asyncio.run(main(data, INDEX_ALIAS)) + asyncio.run(main(data))