Skip to content

Commit

Permalink
[BEAM-8944] Change to use single thread in py sdk bundle progress rep…
Browse files Browse the repository at this point in the history
…ort (apache#10387)

* [BEAM-8944] Change to single thread executor in python sdk bundle progress report

(cherry picked from commit 794e58d)
  • Loading branch information
y1chi committed Dec 20, 2019
1 parent 0658bc6 commit 0d5bdaa
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions sdks/python/apache_beam/runners/worker/sdk_worker.py
Expand Up @@ -29,6 +29,7 @@
import threading
import traceback
from builtins import object
from concurrent import futures

import grpc
from future.utils import raise_
Expand All @@ -45,7 +46,7 @@
from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor
from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor

_LOGGER = logging.getLogger()
_LOGGER = logging.getLogger(__name__)

# This SDK harness will (by default), log a "lull" in processing if it sees no
# transitions in over 5 minutes.
Expand Down Expand Up @@ -91,6 +92,10 @@ def __init__(
state_handler_factory=self._state_handler_factory,
data_channel_factory=self._data_channel_factory,
fns=self._fns)

# TODO(BEAM-8998) use common UnboundedThreadPoolExecutor to process bundle
# progress once dataflow runner's excessive progress polling is removed.
self._report_progress_executor = futures.ThreadPoolExecutor(max_workers=1)
self._worker_thread_pool = UnboundedThreadPoolExecutor()
self._responses = queue.Queue()
_LOGGER.info('Initializing SDKHarness with unbounded number of workers.')
Expand Down Expand Up @@ -176,7 +181,7 @@ def task():
'Unknown process bundle instruction {}').format(
instruction_id)), request)

self._worker_thread_pool.submit(task)
self._report_progress_executor.submit(task)

def _request_finalize_bundle(self, request):
self._request_execute(request)
Expand Down

0 comments on commit 0d5bdaa

Please sign in to comment.