Skip to content

Commit

Permalink
refactor: small timer refactor (#6130)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Dec 14, 2023
1 parent 48e4437 commit 9eb4d09
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
14 changes: 9 additions & 5 deletions jina/serve/runtimes/worker/batch_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(
self._timeout: int = timeout
self._reset()
self._flush_trigger: Event = Event()
self._timer_started, self._timer_finished = False, False
self._timer_task: Optional[Task] = None

def __repr__(self) -> str:
Expand Down Expand Up @@ -67,20 +68,22 @@ def _cancel_timer_if_pending(self):
and not self._timer_task.done()
and not self._timer_task.cancelled()
):
self._timer_finished = False
self._timer_task.cancel()

def _start_timer(self):
self._cancel_timer_if_pending()
self._timer_task = asyncio.create_task(
self._sleep_then_set()
)
self._timer_started = True

async def _sleep_then_set(self):
"""Sleep and then set the event
"""
self._timer_finished = False
await asyncio.sleep(self._timeout / 1000)
self._flush_trigger.set()
self._timer_finished = True

async def push(self, request: DataRequest) -> asyncio.Queue:
"""Append request to the the list of requests to be processed.
Expand All @@ -94,8 +97,10 @@ async def push(self, request: DataRequest) -> asyncio.Queue:
"""
docs = request.docs

# writes to shared data between tasks need to be mutually exclusive
if not self._timer_task:
if not self._timer_task or self._timer_finished:
# If there is no timer (first arrival), or the timer is already consumed, any new push should trigger a new Timer, before
# this push requests the data lock. The order of accessing the data lock guarantees that this request will be put in the `big_doc`
# before the `flush` task processes it.
self._start_timer()
async with self._data_lock:
if not self._flush_task:
Expand Down Expand Up @@ -216,9 +221,8 @@ def batch(iterable_1, iterable_2, n=1):
# At this moment, we have documents concatenated in self._big_doc corresponding to requests in
# self._requests with its lengths stored in self._requests_len. For each requests, there is a queue to
# communicate that the request has been processed properly. At this stage the data_lock is ours and
# therefore noone can add requests to this list.
# therefore no-one can add requests to this list.
self._flush_trigger: Event = Event()
self._timer_task = None
try:
if not docarray_v2:
non_assigned_to_response_docs: DocumentArray = DocumentArray.empty()
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/docarray_v2/test_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -1634,7 +1634,7 @@ def foo(self, docs: DocList[TextDoc], **kwargs) -> DocList[DummyEmbeddingDoc]:
depl = Deployment(uses=SlowExecutorWithException)

with depl:
da = DocList[TextDoc]([TextDoc(text='good') for _ in range(50)])
da = DocList[TextDoc]([TextDoc(text=f'good-{i}') for i in range(50)])
da[4].text = 'fail'
responses = depl.post(on='/foo', inputs=da, request_size=1, return_responses=True, continue_on_error=True, results_in_order=True)
assert len(responses) == 50 # 1 request per input
Expand Down

0 comments on commit 9eb4d09

Please sign in to comment.