Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use asyncio gather for multiple waits in tests #503

Merged
merged 1 commit into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ async def clear_indexes(test_client):
yield
indexes = await test_client.get_indexes()
if indexes:
for index in indexes:
response = await test_client.index(index.uid).delete()
await wait_for_task(test_client, response.task_uid)
tasks = await asyncio.gather(*[test_client.index(x.uid).delete() for x in indexes])
await asyncio.gather(*[wait_for_task(test_client, x.task_uid) for x in tasks])


@pytest.fixture(scope="session")
Expand Down
108 changes: 39 additions & 69 deletions tests/test_documents.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import csv
import json
from math import ceil
Expand Down Expand Up @@ -103,10 +104,8 @@ async def test_add_documents_in_batches(
)
assert ceil(len(small_movies) / batch_size) == len(response)

for r in response:
update = await wait_for_task(index.http_client, r.task_uid)
assert update.status == "succeeded"

tasks = await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in response])
assert {"succeeded"} == {x.status for x in tasks}
assert await index.get_primary_key() == expected_primary_key


Expand All @@ -130,8 +129,7 @@ async def test_add_documents_from_directory(
index = test_client.index("movies")
path = str(tmp_path) if path_type == "str" else tmp_path
responses = await index.add_documents_from_directory(path, combine_documents=combine_documents)
for response in responses:
await wait_for_task(index.http_client, response.task_uid)
await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in responses])
stats = await index.get_stats()
assert stats.number_of_documents == total_documents

Expand All @@ -148,8 +146,7 @@ async def test_add_documents_from_directory_csv_path(
responses = await index.add_documents_from_directory(
path, combine_documents=combine_documents, document_type="csv"
)
for response in responses:
await wait_for_task(index.http_client, response.task_uid)
await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in responses])
stats = await index.get_stats()
assert stats.number_of_documents == 20

Expand All @@ -166,8 +163,7 @@ async def test_add_documents_from_directory_csv_path_with_delimiter(
responses = await index.add_documents_from_directory(
path, combine_documents=combine_documents, document_type="csv", csv_delimiter=";"
)
for response in responses:
await wait_for_task(index.http_client, response.task_uid)
await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in responses])
stats = await index.get_stats()
assert stats.number_of_documents == 20

Expand All @@ -184,8 +180,7 @@ async def test_add_documents_from_directory_ndjson(
responses = await index.add_documents_from_directory(
path, combine_documents=combine_documents, document_type="ndjson"
)
for response in responses:
await wait_for_task(index.http_client, response.task_uid)
await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in responses])
stats = await index.get_stats()
assert stats.number_of_documents == 20

Expand Down Expand Up @@ -235,8 +230,7 @@ async def test_add_documents_from_directory_in_batchs(
path, batch_size=batch_size, combine_documents=combine_documents
)

for response in responses:
await wait_for_task(index.http_client, response.task_uid)
await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in responses])
stats = await index.get_stats()
assert stats.number_of_documents == total_documents

Expand All @@ -255,8 +249,7 @@ async def test_add_documents_from_directory_in_batchs_csv(
path, batch_size=batch_size, combine_documents=combine_documents, document_type="csv"
)

for response in responses:
await wait_for_task(index.http_client, response.task_uid)
await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in responses])
stats = await index.get_stats()
assert stats.number_of_documents == 20

Expand All @@ -275,8 +268,7 @@ async def test_add_documents_from_directory_in_batchs_ndjson(
path, batch_size=batch_size, combine_documents=combine_documents, document_type="ndjson"
)

for response in responses:
await wait_for_task(index.http_client, response.task_uid)
await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in responses])
stats = await index.get_stats()
assert stats.number_of_documents == 20

Expand Down Expand Up @@ -444,10 +436,8 @@ async def test_add_documents_from_file_in_batches(

assert ceil(len(small_movies) / batch_size) == len(response)

for r in response:
update = await wait_for_task(index.http_client, r.task_uid)
assert update.status == "succeeded"

tasks = await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in response])
assert {"succeeded"} == {x.status for x in tasks}
assert await index.get_primary_key() == expected_primary_key


Expand All @@ -473,10 +463,8 @@ async def test_add_documents_from_file_in_batches_csv(

assert ceil(len(small_movies) / batch_size) == len(response)

for r in response:
update = await wait_for_task(index.http_client, r.task_uid)
assert update.status == "succeeded"

tasks = await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in response])
assert {"succeeded"} == {x.status for x in tasks}
assert await index.get_primary_key() == expected_primary_key


Expand Down Expand Up @@ -506,10 +494,8 @@ async def test_add_documents_from_file_in_batches_csv_with_delimiter(

assert ceil(len(small_movies) / batch_size) == len(response)

for r in response:
update = await wait_for_task(index.http_client, r.task_uid)
assert update.status == "succeeded"

tasks = await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in response])
assert {"succeeded"} == {x.status for x in tasks}
assert await index.get_primary_key() == expected_primary_key


Expand Down Expand Up @@ -546,10 +532,8 @@ async def test_add_documents_from_file_in_batches_ndjson(

assert ceil(len(small_movies) / batch_size) == len(response)

for r in response:
update = await wait_for_task(index.http_client, r.task_uid)
assert update.status == "succeeded"

tasks = await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in response])
assert {"succeeded"} == {x.status for x in tasks}
assert await index.get_primary_key() == expected_primary_key


Expand Down Expand Up @@ -627,8 +611,7 @@ async def test_update_documents_in_batches(batch_size, index_with_documents, sma
updates = await index.update_documents_in_batches(small_movies, batch_size=batch_size)
assert ceil(len(small_movies) / batch_size) == len(updates)

for update in updates:
await wait_for_task(index.http_client, update.task_uid)
await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in updates])

response = await index.get_document(doc_id)
assert response["title"] != "Some title"
Expand All @@ -643,10 +626,8 @@ async def test_update_documents_in_batches_with_primary_key(batch_size, test_cli
)
assert ceil(len(small_movies) / batch_size) == len(updates)

for update in updates:
update_status = await wait_for_task(index.http_client, update.task_uid)
assert update_status.status == "succeeded"

tasks = await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in updates])
assert {"succeeded"} == {x.status for x in tasks}
assert await index.get_primary_key() == primary_key


Expand All @@ -672,8 +653,7 @@ async def test_update_documents_from_directory(
responses = await index.update_documents_from_directory(
path, combine_documents=combine_documents
)
for response in responses:
await wait_for_task(index.http_client, response.task_uid)
await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in responses])
stats = await index.get_stats()
assert stats.number_of_documents == total_documents

Expand All @@ -690,8 +670,7 @@ async def test_update_documents_from_directory_csv(
responses = await index.update_documents_from_directory(
path, combine_documents=combine_documents, document_type="csv"
)
for response in responses:
await wait_for_task(index.http_client, response.task_uid)
await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in responses])
stats = await index.get_stats()
assert stats.number_of_documents == 20

Expand All @@ -708,8 +687,7 @@ async def test_update_documents_from_directory_csv_with_delimiter(
responses = await index.update_documents_from_directory(
path, combine_documents=combine_documents, document_type="csv", csv_delimiter=";"
)
for response in responses:
await wait_for_task(index.http_client, response.task_uid)
await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in responses])
stats = await index.get_stats()
assert stats.number_of_documents == 20

Expand Down Expand Up @@ -738,8 +716,7 @@ async def test_update_documents_from_directory_ndjson(
responses = await index.update_documents_from_directory(
path, combine_documents=combine_documents, document_type="ndjson"
)
for response in responses:
await wait_for_task(index.http_client, response.task_uid)
await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in responses])
stats = await index.get_stats()
assert stats.number_of_documents == 20

Expand Down Expand Up @@ -769,8 +746,7 @@ async def test_update_documents_from_directory_in_batchs(
path, batch_size=batch_size, combine_documents=combine_documents
)

for response in responses:
await wait_for_task(index.http_client, response.task_uid)
await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in responses])
stats = await index.get_stats()
assert stats.number_of_documents == total_documents

Expand All @@ -789,8 +765,7 @@ async def test_update_documents_from_directory_in_batchs_csv(
path, batch_size=batch_size, combine_documents=combine_documents, document_type="csv"
)

for response in responses:
await wait_for_task(index.http_client, response.task_uid)
await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in responses])
stats = await index.get_stats()
assert stats.number_of_documents == 20

Expand All @@ -813,8 +788,7 @@ async def test_update_documents_from_directory_in_batchs_csv_delimiter(
csv_delimiter=";",
)

for response in responses:
await wait_for_task(index.http_client, response.task_uid)
await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in responses])
stats = await index.get_stats()
assert stats.number_of_documents == 20

Expand Down Expand Up @@ -845,8 +819,7 @@ async def test_update_documents_from_directory_in_batchs_ndjson(
path, batch_size=batch_size, combine_documents=combine_documents, document_type="ndjson"
)

for response in responses:
await wait_for_task(index.http_client, response.task_uid)
await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in responses])
stats = await index.get_stats()
assert stats.number_of_documents == 20

Expand Down Expand Up @@ -918,7 +891,7 @@ async def test_update_documents_from_file_csv_with_delimiter(

@pytest.mark.parametrize("delimiter", [";;", "😀"])
async def test_update_documents_from_file_csv_delimiter_invalid(
delimiter, test_client, tmp_path, small_movies_csv_path_semicolon_delimiter
delimiter, test_client, small_movies_csv_path_semicolon_delimiter
):
index = test_client.index("movies")
with pytest.raises(ValueError):
Expand Down Expand Up @@ -972,7 +945,7 @@ async def test_update_documents_from_file_in_batches(
movie_id = small_movies[0]["id"]
index = test_client.index("movies")
response = await index.add_documents(small_movies)
update = await wait_for_task(index.http_client, response.task_uid)
await wait_for_task(index.http_client, response.task_uid)
assert await index.get_primary_key() == "id"
response = await index.get_documents()
got_title = filter(lambda x: x["id"] == movie_id, response.results)
Expand All @@ -981,9 +954,8 @@ async def test_update_documents_from_file_in_batches(
updates = await index.update_documents_from_file_in_batches(path, batch_size=batch_size)
assert ceil(len(small_movies) / batch_size) == len(updates)

for update in updates:
update_status = await wait_for_task(index.http_client, update.task_uid) # type: ignore
assert update_status.status == "succeeded"
tasks = await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in updates])
assert {"succeeded"} == {x.status for x in tasks}

response = await index.get_documents()
assert response.results[0]["title"] != "Some title"
Expand All @@ -998,7 +970,7 @@ async def test_update_documents_from_file_in_batches_csv(
movie_id = small_movies[0]["id"]
index = test_client.index("movies")
response = await index.add_documents(small_movies)
update = await wait_for_task(index.http_client, response.task_uid)
await wait_for_task(index.http_client, response.task_uid)
assert await index.get_primary_key() == "id"
response = await index.get_documents()
got_title = filter(lambda x: x["id"] == movie_id, response.results)
Expand All @@ -1007,9 +979,8 @@ async def test_update_documents_from_file_in_batches_csv(
updates = await index.update_documents_from_file_in_batches(path, batch_size=batch_size)
assert ceil(len(small_movies) / batch_size) == len(updates)

for update in updates:
update_status = await wait_for_task(index.http_client, update.task_uid) # type: ignore
assert update_status.status == "succeeded"
tasks = await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in updates])
assert {"succeeded"} == {x.status for x in tasks}

response = await index.get_documents()
assert response.results[0]["title"] != "Some title"
Expand All @@ -1024,7 +995,7 @@ async def test_update_documents_from_file_in_batches_ndjson(
movie_id = small_movies[0]["id"]
index = test_client.index("movies")
response = await index.add_documents(small_movies)
update = await wait_for_task(index.http_client, response.task_uid)
await wait_for_task(index.http_client, response.task_uid)
assert await index.get_primary_key() == "id"
response = await index.get_documents()
got_title = filter(lambda x: x["id"] == movie_id, response.results)
Expand All @@ -1033,9 +1004,8 @@ async def test_update_documents_from_file_in_batches_ndjson(
updates = await index.update_documents_from_file_in_batches(path, batch_size=batch_size)
assert ceil(len(small_movies) / batch_size) == len(updates)

for update in updates:
update_status = await wait_for_task(index.http_client, update.task_uid) # type: ignore
assert update_status.status == "succeeded"
tasks = await asyncio.gather(*[wait_for_task(index.http_client, x.task_uid) for x in updates]) # type: ignore
assert {"succeeded"} == {x.status for x in tasks}

response = await index.get_documents()
assert response.results[0]["title"] != "Some title"
Expand Down