From cafe92ea0f0ee07107229da078f35c062a3d7239 Mon Sep 17 00:00:00 2001 From: Li Wan <49334982+wanliAlex@users.noreply.github.com> Date: Thu, 13 Jun 2024 17:31:06 +1000 Subject: [PATCH] Patch image download with invalid _id (#860) Patch the image download when there is an invalid _id in the documents. --- src/marqo/tensor_search/add_docs.py | 7 +-- .../test_add_documents_combined.py | 51 ++++++++++++++++++- 2 files changed, 52 insertions(+), 6 deletions(-) diff --git a/src/marqo/tensor_search/add_docs.py b/src/marqo/tensor_search/add_docs.py index 131308d2f..70ef84d13 100644 --- a/src/marqo/tensor_search/add_docs.py +++ b/src/marqo/tensor_search/add_docs.py @@ -2,10 +2,10 @@ import concurrent import copy import math -import random from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager from typing import ContextManager +import threading import PIL from PIL.ImageFile import ImageFile @@ -49,11 +49,8 @@ def threaded_download_and_preprocess_images(allocated_docs: List[dict], image_re None """ - # TODO - We may not be handling errors in threads properly. Test introducing errors (e.g., call a method - # that doesn't exist) in this code and verify # Generate pseudo-unique ID for thread metrics. - _id = hash("".join([d.get("_id", str(random.getrandbits(64))) for d in allocated_docs])) % 1000 - _id = f"image_download.{_id}" + _id = f'image_download.{threading.get_ident()}' TIMEOUT_SECONDS = 3 if metric_obj is None: # Occurs predominately in testing. metric_obj = RequestMetricsStore.for_request() diff --git a/tests/tensor_search/integ_tests/test_add_documents_combined.py b/tests/tensor_search/integ_tests/test_add_documents_combined.py index 68ab992ec..69b0e06bf 100644 --- a/tests/tensor_search/integ_tests/test_add_documents_combined.py +++ b/tests/tensor_search/integ_tests/test_add_documents_combined.py @@ -389,4 +389,53 @@ def test_download_images_non_tensor_field(self): self.assertIsInstance(image_repo[k], expected_repo_structure[k]) # Images should not be closed as they are Tensor instead of ImageType - mock_close.assert_not_called() \ No newline at end of file + mock_close.assert_not_called() + + def test_idErrorWhenImageDownloading(self): + """A test ensure image download is not raising 500 error when there is an invalid _id. + + Image download use the document _id to generate a unique thread id. + However, the image download happens before validate the document _id. + This test ensures that the image download does not raise a 500 error when the document _id is invalid. + """ + test_docs = [ + { + "image_field_1": "https://raw.githubusercontent.com/marqo-ai/marqo/mainline" + "/examples/ImageSearchGuide/data/image1.jpg", + "text_field_1": "this is a valid image", + "_id": "1" + }, + { + "image_field_1": "https://raw.githubusercontent.com/marqo-ai/marqo/mainline" + "/examples/ImageSearchGuide/data/image2.jpg", + "text_field_1": "this is a invalid image due to int id", + "_id": 2 + }, + { + "image_field_1": "https://raw.githubusercontent.com/marqo-ai/marqo/mainline" + "/examples/ImageSearchGuide/data/image3.jpg", + "text_field_1": "this is a invalid image due to None", + "_id": None + }, + { + "image_field_1": "https://raw.githubusercontent.com/marqo-ai/marqo/mainline" + "/examples/ImageSearchGuide/data/image4.jpg", + "text_field_1": "this is a invalid image due to ", + "_id": [] + } + ] + + for index_name in [self.unstructured_marqo_index_name, self.structured_marqo_index_name]: + tensor_fields = ["image_field_1", "text_field_1"] if index_name == self.unstructured_marqo_index_name \ + else None + with self.subTest(index_name): + r = tensor_search.add_documents(config=self.config, + add_docs_params=AddDocsParams(index_name=index_name, + docs=test_docs, + tensor_fields=tensor_fields)) + self.assertEqual(True, r["errors"]) + self.assertEqual(4, len(r["items"])) + self.assertEqual(200, r["items"][0]["status"]) + for i in range(1, 4): + self.assertEqual(400, r["items"][i]["status"]) + self.assertIn("Document _id must be a string", r["items"][i]["error"]) \ No newline at end of file