Skip to content

Commit

Permalink
[BEAM-8944] Change to use single thread in py sdk bundle progress rep…
Browse files Browse the repository at this point in the history
…ort (apache#10387)

* [BEAM-8944] Change to single thread executor in python sdk bundle progress report
  • Loading branch information
y1chi authored and angoenka committed Dec 20, 2019
1 parent 99037e4 commit 794e58d
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion sdks/python/apache_beam/runners/worker/sdk_worker.py
Expand Up @@ -29,6 +29,7 @@
import threading
import traceback
from builtins import object
from concurrent import futures
from typing import TYPE_CHECKING
from typing import Callable
from typing import DefaultDict
Expand Down Expand Up @@ -104,6 +105,10 @@ def __init__(self,
state_handler_factory=self._state_handler_factory,
data_channel_factory=self._data_channel_factory,
fns=self._fns)

# TODO(BEAM-8998) use common UnboundedThreadPoolExecutor to process bundle
# progress once dataflow runner's excessive progress polling is removed.
self._report_progress_executor = futures.ThreadPoolExecutor(max_workers=1)
self._worker_thread_pool = UnboundedThreadPoolExecutor()
self._responses = queue.Queue() # type: queue.Queue[beam_fn_api_pb2.InstructionResponse]
_LOGGER.info('Initializing SDKHarness with unbounded number of workers.')
Expand Down Expand Up @@ -199,7 +204,7 @@ def task():
'Unknown process bundle instruction {}').format(
instruction_id)), request)

self._worker_thread_pool.submit(task)
self._report_progress_executor.submit(task)

def _request_finalize_bundle(self, request):
# type: (beam_fn_api_pb2.InstructionRequest) -> None
Expand Down

0 comments on commit 794e58d

Please sign in to comment.