Skip to content

Commit

Permalink
[BEAM-8151, BEAM-7848] Swap to using a thread pool which is unbounded…
Browse files Browse the repository at this point in the history
… and shrinks when threads are idle.
  • Loading branch information
lukecwik committed Nov 14, 2019
1 parent e9f766f commit 1b62310
Show file tree
Hide file tree
Showing 14 changed files with 67 additions and 61 deletions.
Expand Up @@ -28,13 +28,13 @@
import tempfile
import time
import unittest
from concurrent import futures

import grpc

from apache_beam.portability.api import beam_artifact_api_pb2
from apache_beam.portability.api import beam_artifact_api_pb2_grpc
from apache_beam.runners.portability import artifact_service
from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor


class AbstractArtifactServiceTest(unittest.TestCase):
Expand Down Expand Up @@ -76,7 +76,7 @@ def test_basic(self):
self._run_staging(self._service, self._service)

def test_with_grpc(self):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
server = grpc.server(UnboundedThreadPoolExecutor())
try:
beam_artifact_api_pb2_grpc.add_ArtifactStagingServiceServicer_to_server(
self._service, server)
Expand Down Expand Up @@ -208,7 +208,7 @@ def check(index):
self._service, tokens[session(index)], name(index)))

# pylint: disable=range-builtin-not-iterating
pool = futures.ThreadPoolExecutor(max_workers=10)
pool = UnboundedThreadPoolExecutor()
sessions = set(pool.map(put, range(100)))
tokens = dict(pool.map(commit, sessions))
# List forces materialization.
Expand Down
Expand Up @@ -17,7 +17,6 @@
from __future__ import absolute_import

import argparse
import concurrent.futures as futures
import logging
import signal
import sys
Expand All @@ -30,6 +29,7 @@
from apache_beam.portability.api import beam_expansion_api_pb2_grpc
from apache_beam.runners.portability import expansion_service
from apache_beam.transforms import ptransform
from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor

# This script provides an expansion service and example ptransforms for running
# external transform test cases. See external_test.py for details.
Expand Down Expand Up @@ -163,7 +163,7 @@ def main(unused_argv):
help='port on which to serve the job api')
options = parser.parse_args()
global server
server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
server = grpc.server(UnboundedThreadPoolExecutor())
beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
expansion_service.ExpansionServiceServicer(PipelineOptions()), server
)
Expand Down
30 changes: 8 additions & 22 deletions sdks/python/apache_beam/runners/portability/fn_api_runner.py
Expand Up @@ -33,7 +33,6 @@
import time
import uuid
from builtins import object
from concurrent import futures

import grpc

Expand Down Expand Up @@ -78,6 +77,7 @@
from apache_beam.utils import profiler
from apache_beam.utils import proto_utils
from apache_beam.utils import windowed_value
from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor

# This module is experimental. No backwards-compatibility guarantees.

Expand Down Expand Up @@ -1224,12 +1224,10 @@ class GrpcServer(object):

_DEFAULT_SHUTDOWN_TIMEOUT_SECS = 5

def __init__(self, state, provision_info, max_workers):
def __init__(self, state, provision_info):
self.state = state
self.provision_info = provision_info
self.max_workers = max_workers
self.control_server = grpc.server(
futures.ThreadPoolExecutor(max_workers=self.max_workers))
self.control_server = grpc.server(UnboundedThreadPoolExecutor())
self.control_port = self.control_server.add_insecure_port('[::]:0')
self.control_address = 'localhost:%s' % self.control_port

Expand All @@ -1239,12 +1237,12 @@ def __init__(self, state, provision_info, max_workers):
no_max_message_sizes = [("grpc.max_receive_message_length", -1),
("grpc.max_send_message_length", -1)]
self.data_server = grpc.server(
futures.ThreadPoolExecutor(max_workers=self.max_workers),
UnboundedThreadPoolExecutor(),
options=no_max_message_sizes)
self.data_port = self.data_server.add_insecure_port('[::]:0')

self.state_server = grpc.server(
futures.ThreadPoolExecutor(max_workers=self.max_workers),
UnboundedThreadPoolExecutor(),
options=no_max_message_sizes)
self.state_port = self.state_server.add_insecure_port('[::]:0')

Expand Down Expand Up @@ -1280,7 +1278,7 @@ def __init__(self, state, provision_info, max_workers):
self.state_server)

self.logging_server = grpc.server(
futures.ThreadPoolExecutor(max_workers=2),
UnboundedThreadPoolExecutor(),
options=no_max_message_sizes)
self.logging_port = self.logging_server.add_insecure_port('[::]:0')
beam_fn_api_pb2_grpc.add_BeamFnLoggingServicer_to_server(
Expand Down Expand Up @@ -1508,24 +1506,12 @@ def get_worker_handlers(self, environment_id, num_workers):
# Any environment will do, pick one arbitrarily.
environment_id = next(iter(self._environments.keys()))
environment = self._environments[environment_id]
max_total_workers = num_workers * len(self._environments)

# assume all environments except EMBEDDED_PYTHON use gRPC.
if environment.urn == python_urns.EMBEDDED_PYTHON:
pass # no need for a gRPC server
elif self._grpc_server is None:
self._grpc_server = GrpcServer(self._state, self._job_provision_info,
max_total_workers)
elif max_total_workers > self._grpc_server.max_workers:
# each gRPC server is running with fixed number of threads (
# max_total_workers), which is defined by the first call to
# get_worker_handlers(). Assumption here is a worker has a connection to a
# gRPC server. In case a stage tries to add more workers
# than the max_total_workers, some workers cannot connect to gRPC and
# pipeline will hang, hence raise an error here.
raise RuntimeError('gRPC servers are running with %s threads, we cannot '
'attach %s workers.' % (self._grpc_server.max_workers,
max_total_workers))
self._grpc_server = GrpcServer(self._state, self._job_provision_info)

worker_handler_list = self._cached_handlers[environment_id]
if len(worker_handler_list) < num_workers:
Expand Down Expand Up @@ -1801,7 +1787,7 @@ def process_bundle(self, inputs, expected_outputs):

merged_result = None
split_result_list = []
with futures.ThreadPoolExecutor(max_workers=self._num_workers) as executor:
with UnboundedThreadPoolExecutor() as executor:
for result, split_result in executor.map(lambda part: BundleManager(
self._worker_handler_list, self._get_buffer,
self._get_input_coder_impl, self._bundle_descriptor,
Expand Down
Expand Up @@ -26,7 +26,6 @@
import time
import traceback
from builtins import object
from concurrent import futures

import grpc
from google.protobuf import text_format
Expand All @@ -42,6 +41,7 @@
from apache_beam.runners.portability import abstract_job_service
from apache_beam.runners.portability import artifact_service
from apache_beam.runners.portability import fn_api_runner
from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor


class LocalJobServicer(abstract_job_service.AbstractJobServiceServicer):
Expand Down Expand Up @@ -92,7 +92,7 @@ def create_beam_job(self, preparation_id, job_name, pipeline, options):
self._artifact_staging_endpoint)

def start_grpc_server(self, port=0):
self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=3))
self._server = grpc.server(UnboundedThreadPoolExecutor())
port = self._server.add_insecure_port('localhost:%d' % port)
beam_job_api_pb2_grpc.add_JobServiceServicer_to_server(self, self._server)
beam_artifact_api_pb2_grpc.add_ArtifactStagingServiceServicer_to_server(
Expand Down Expand Up @@ -139,7 +139,7 @@ def __init__(self, worker_command_line, control_address, worker_id=None):
self._worker_id = worker_id

def run(self):
logging_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
logging_server = grpc.server(UnboundedThreadPoolExecutor())
logging_port = logging_server.add_insecure_port('[::]:0')
logging_server.start()
logging_servicer = BeamFnLoggingServicer()
Expand Down
Expand Up @@ -27,13 +27,13 @@
import string
import tempfile
import unittest
from concurrent import futures

import grpc

from apache_beam.portability.api import beam_artifact_api_pb2
from apache_beam.portability.api import beam_artifact_api_pb2_grpc
from apache_beam.runners.portability import portable_stager
from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor


class PortableStagerTest(unittest.TestCase):
Expand All @@ -56,7 +56,7 @@ def _stage_files(self, files):
describing the name of the artifacts in local temp folder and desired
name in staging location.
"""
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
server = grpc.server(UnboundedThreadPoolExecutor())
staging_service = TestLocalFileSystemArtifactStagingServiceServicer(
self._remote_dir)
beam_artifact_api_pb2_grpc.add_ArtifactStagingServiceServicer_to_server(
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/runners/worker/data_plane_test.py
Expand Up @@ -25,7 +25,6 @@
import sys
import threading
import unittest
from concurrent import futures

import grpc
from future.utils import raise_
Expand All @@ -34,6 +33,7 @@
from apache_beam.portability.api import beam_fn_api_pb2_grpc
from apache_beam.runners.worker import data_plane
from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor
from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor


def timeout(timeout_secs):
Expand Down Expand Up @@ -67,7 +67,7 @@ def test_grpc_data_channel(self):
data_channel_service = \
data_servicer.get_conn_by_worker_id(worker_id)

server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
server = grpc.server(UnboundedThreadPoolExecutor())
beam_fn_api_pb2_grpc.add_BeamFnDataServicer_to_server(
data_servicer, server)
test_port = server.add_insecure_port('[::]:0')
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/runners/worker/log_handler_test.py
Expand Up @@ -20,14 +20,14 @@
import logging
import unittest
from builtins import range
from concurrent import futures

import grpc

from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_fn_api_pb2_grpc
from apache_beam.portability.api import endpoints_pb2
from apache_beam.runners.worker import log_handler
from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor


class BeamFnLoggingServicer(beam_fn_api_pb2_grpc.BeamFnLoggingServicer):
Expand All @@ -47,7 +47,7 @@ class FnApiLogRecordHandlerTest(unittest.TestCase):

def setUp(self):
self.test_logging_service = BeamFnLoggingServicer()
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
self.server = grpc.server(UnboundedThreadPoolExecutor())
beam_fn_api_pb2_grpc.add_BeamFnLoggingServicer_to_server(
self.test_logging_service, self.server)
self.test_port = self.server.add_insecure_port('[::]:0')
Expand Down
14 changes: 4 additions & 10 deletions sdks/python/apache_beam/runners/worker/sdk_worker.py
Expand Up @@ -31,7 +31,6 @@
import traceback
from builtins import object
from builtins import range
from concurrent import futures

import grpc
from future.utils import raise_
Expand All @@ -45,6 +44,7 @@
from apache_beam.runners.worker.channel_factory import GRPCChannelFactory
from apache_beam.runners.worker.statecache import StateCache
from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor
from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor

# This SDK harness will (by default), log a "lull" in processing if it sees no
# transitions in over 5 minutes.
Expand Down Expand Up @@ -97,15 +97,9 @@ def __init__(
# one worker for progress/split request.
self.progress_worker = SdkWorker(self._bundle_processor_cache,
profiler_factory=self._profiler_factory)
# one thread is enough for getting the progress report.
# Assumption:
# Progress report generation should not do IO or wait on other resources.
# Without wait, having multiple threads will not improve performance and
# will only add complexity.
self._progress_thread_pool = futures.ThreadPoolExecutor(max_workers=1)
self._progress_thread_pool = UnboundedThreadPoolExecutor()
# finalize and process share one thread pool.
self._process_thread_pool = futures.ThreadPoolExecutor(
max_workers=self._worker_count)
self._process_thread_pool = UnboundedThreadPoolExecutor()
self._responses = queue.Queue()
self._process_bundle_queue = queue.Queue()
self._unscheduled_process_bundle = {}
Expand Down Expand Up @@ -202,7 +196,7 @@ def task():
self._unscheduled_process_bundle[request.instruction_id] = time.time()
self._process_thread_pool.submit(task)
logging.debug(
"Currently using %s threads." % len(self._process_thread_pool._threads))
"Currently using %s threads." % len(self._process_thread_pool._workers))

def _request_process_bundle_split(self, request):
self._request_process_bundle_action(request)
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/runners/worker/sdk_worker_main.py
Expand Up @@ -190,7 +190,7 @@ def _get_worker_count(pipeline_options):
future releases.
Returns:
an int containing the worker_threads to use. Default is 12
an int containing the worker_threads to use. Default is 12.
"""
experiments = pipeline_options.view_as(DebugOptions).experiments

Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/runners/worker/sdk_worker_test.py
Expand Up @@ -23,14 +23,14 @@
import logging
import unittest
from builtins import range
from concurrent import futures

import grpc

from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_fn_api_pb2_grpc
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.worker import sdk_worker
from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor


class BeamFnControlServicer(beam_fn_api_pb2_grpc.BeamFnControlServicer):
Expand Down Expand Up @@ -93,7 +93,7 @@ def _check_fn_registration_multi_request(self, *args):

test_controller = BeamFnControlServicer(requests)

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
server = grpc.server(UnboundedThreadPoolExecutor())
beam_fn_api_pb2_grpc.add_BeamFnControlServicer_to_server(
test_controller, server)
test_port = server.add_insecure_port("[::]:0")
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/runners/worker/worker_pool_main.py
Expand Up @@ -35,13 +35,13 @@
import sys
import threading
import time
from concurrent import futures

import grpc

from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_fn_api_pb2_grpc
from apache_beam.runners.worker import sdk_worker
from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor


class BeamFnExternalWorkerPoolServicer(
Expand All @@ -60,7 +60,7 @@ def __init__(self, worker_threads,
@classmethod
def start(cls, worker_threads=1, use_process=False, port=0,
state_cache_size=0, container_executable=None):
worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
worker_server = grpc.server(UnboundedThreadPoolExecutor())
worker_address = 'localhost:%s' % worker_server.add_insecure_port(
'[::]:%s' % port)
worker_pool = cls(worker_threads,
Expand Down

0 comments on commit 1b62310

Please sign in to comment.