From 29100a145f9a1464ec6f34fedbbbcfbbf19d7e56 Mon Sep 17 00:00:00 2001 From: Kenny Lee Sin Cheong Date: Thu, 27 Aug 2020 15:52:47 -0400 Subject: [PATCH] BuilderServer --- buildman/builder.py | 15 +-- buildman/server.py | 231 ++++++-------------------------------------- 2 files changed, 31 insertions(+), 215 deletions(-) diff --git a/buildman/builder.py b/buildman/builder.py index f4efbcc830..7ecb1acdfb 100644 --- a/buildman/builder.py +++ b/buildman/builder.py @@ -8,7 +8,6 @@ from app import app, userfiles as user_files, build_logs, dockerfile_build_queue from util.log import logfile_path -from buildman.manager.enterprise import EnterpriseManager from buildman.manager.ephemeral import EphemeralBuilderManager from buildman.server import BuilderServer @@ -19,17 +18,13 @@ logger = logging.getLogger(__name__) BUILD_MANAGERS = { - "enterprise": EnterpriseManager, "ephemeral": EphemeralBuilderManager, } EXTERNALLY_MANAGED = "external" -DEFAULT_WEBSOCKET_PORT = 8787 DEFAULT_CONTROLLER_PORT = 8686 -LOG_FORMAT = "%(asctime)s [%(process)d] [%(levelname)s] [%(name)s] %(message)s" - def run_build_manager(): if not features.BUILD_SUPPORT: @@ -63,12 +58,6 @@ def run_build_manager(): manager_hostname = os.environ.get( "BUILDMAN_HOSTNAME", app.config.get("BUILDMAN_HOSTNAME", app.config["SERVER_HOSTNAME"]) ) - websocket_port = int( - os.environ.get( - "BUILDMAN_WEBSOCKET_PORT", - app.config.get("BUILDMAN_WEBSOCKET_PORT", DEFAULT_WEBSOCKET_PORT), - ) - ) controller_port = int( os.environ.get( "BUILDMAN_CONTROLLER_PORT", @@ -92,14 +81,14 @@ def run_build_manager(): server = BuilderServer( app.config["SERVER_HOSTNAME"], + manager_hostname, dockerfile_build_queue, build_logs, user_files, manager_klass, build_manager_config[1], - manager_hostname, ) - server.run("0.0.0.0", websocket_port, controller_port, ssl=ssl_context) + server.run("0.0.0.0", controller_port, ssl=ssl_context) if __name__ == "__main__": diff --git a/buildman/server.py b/buildman/server.py index ef2e53b722..867c712b88 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -7,12 +7,13 @@ import asyncio from aiowsgi import create_server as create_wsgi_server -from autobahn.asyncio.wamp import RouterFactory, RouterSessionFactory -from autobahn.asyncio.websocket import WampWebSocketServerFactory -from autobahn.wamp import types from flask import Flask + from app import app +from buildman.buildmanagerservicer import BuildManagerServicer +from buildman.buildman_pb import buildman_pb2, buildman_pb2_grpc + from buildman.enums import BuildJobResult, BuildServerStatus, RESULT_PHASES from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from buildman.jobutil.buildstatus import StatusHandler @@ -32,38 +33,32 @@ class BuilderServer(object): """ - Server which handles both HTTP and WAMP requests, managing the full state of the build - controller. + Server which handles starting the gRPC gateway, along with initializing the build manager + and starting its reconciliation loop. """ def __init__( self, registry_hostname, + manager_hostname, queue, build_logs, user_files, lifecycle_manager_klass, lifecycle_manager_config, - manager_hostname, ): - self._loop = None self._current_status = BuildServerStatus.STARTING - self._current_components = [] - self._realm_map = {} - self._job_count = 0 - self._session_factory = RouterSessionFactory(RouterFactory()) self._registry_hostname = registry_hostname self._queue = queue self._build_logs = build_logs self._user_files = user_files self._lifecycle_manager = lifecycle_manager_klass( - self._register_component, - self._unregister_component, - self._job_heartbeat, - self._job_complete, - manager_hostname, - HEARTBEAT_PERIOD_SEC, + self._registry_hostname, + self._manager_hostname, + self._queue, + self._build_logs, + self._user_files, ) self._lifecycle_manager_config = lifecycle_manager_config @@ -102,196 +97,28 @@ def status(): self._controller_app = controller_app - def run(self, host, websocket_port, controller_port, ssl=None): + def run(self, host, controller_port, ssl=None): logger.debug("Initializing the lifecycle manager") - self._lifecycle_manager.initialize(self._lifecycle_manager_config) - - logger.debug("Initializing all members of the event loop") - loop = asyncio.get_event_loop() + self._lifecycle_manager.initialize(self.queue, self._lifecycle_manager_config) - logger.debug( - "Starting server on port %s, with controller on port %s", - websocket_port, - controller_port, + logger.debug("Initializing the gRPC server") + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + buildman_pb2_grpc.add_BuildManagerServicer_to_server( + BuildManagerServicer(self._lifecycle_manager), server ) + server.add_insecure_port('[::]:50051') + logger.debug("Starting the gRPC server...") + server.start() + + logger.debug("Starting the build manager...") try: - loop.run_until_complete( - self._initialize(loop, host, websocket_port, controller_port, ssl) - ) + while True: + self._lifecycle_manager._work_checker() except KeyboardInterrupt: - pass + self._current_status = BuildServerStatus.SHUTTIING_DOWN + except Exception: + self._current_status = BuildServerStatus.EXCEPTION finally: - loop.close() - - def close(self): - logger.debug("Requested server shutdown") - self._current_status = BuildServerStatus.SHUTDOWN - self._lifecycle_manager.shutdown() - self._shutdown_event.wait() - logger.debug("Shutting down server") - - def _register_component(self, realm, component_klass, **kwargs): - """ - Registers a component with the server. - - The component_klass must derive from BaseComponent. - """ - logger.debug("Registering component with realm %s", realm) - if realm in self._realm_map: - logger.debug("Component with realm %s already registered", realm) - return self._realm_map[realm] - - component = component_klass(types.ComponentConfig(realm=realm), realm=realm, **kwargs) - component.server = self - component.parent_manager = self._lifecycle_manager - component.build_logs = self._build_logs - component.user_files = self._user_files - component.registry_hostname = self._registry_hostname - - self._realm_map[realm] = component - self._current_components.append(component) - self._session_factory.add(component) - return component - - def _unregister_component(self, component): - logger.debug( - "Unregistering component with realm %s and token %s", - component.builder_realm, - component.expected_token, - ) - - self._realm_map.pop(component.builder_realm, None) - - if component in self._current_components: - self._current_components.remove(component) - self._session_factory.remove(component) - - def _job_heartbeat(self, build_job): - self._queue.extend_processing( - build_job.job_item, - seconds_from_now=JOB_TIMEOUT_SECONDS, - minimum_extension=MINIMUM_JOB_EXTENSION, - ) - - async def _job_complete(self, build_job, job_status, executor_name=None, update_phase=False): - if job_status == BuildJobResult.INCOMPLETE: - logger.warning( - "[BUILD INCOMPLETE: job complete] Build ID: %s. No retry restore.", - build_job.repo_build.uuid, - ) - self._queue.incomplete(build_job.job_item, restore_retry=False, retry_after=30) - else: - self._queue.complete(build_job.job_item) - - # Update the trigger failure tracking (if applicable). - if build_job.repo_build.trigger is not None: - model.build.update_trigger_disable_status( - build_job.repo_build.trigger, RESULT_PHASES[job_status] - ) - - if update_phase: - status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid) - await status_handler.set_phase(RESULT_PHASES[job_status]) - - self._job_count = self._job_count - 1 - - if self._current_status == BuildServerStatus.SHUTDOWN and not self._job_count: - self._shutdown_event.set() - - async def _work_checker(self): - logger.debug("Initializing work checker") - while self._current_status == BuildServerStatus.RUNNING: - with database.CloseForLongOperation(app.config): - await asyncio.sleep(WORK_CHECK_TIMEOUT) - - logger.debug( - "Checking for more work for %d active workers", - self._lifecycle_manager.num_workers(), - ) - - processing_time = self._lifecycle_manager.overall_setup_time() + SETUP_LEEWAY_SECONDS - job_item = self._queue.get(processing_time=processing_time, ordering_required=True) - if job_item is None: - logger.debug( - "No additional work found. Going to sleep for %s seconds", WORK_CHECK_TIMEOUT - ) - continue - - try: - build_job = BuildJob(job_item) - except BuildJobLoadException as irbe: - logger.warning( - "[BUILD INCOMPLETE: job load exception] Job data: %s. No retry restore.", - job_item.body, - ) - logger.exception(irbe) - self._queue.incomplete(job_item, restore_retry=False) - continue - - logger.debug( - "Checking for an avaliable worker for build job %s", build_job.repo_build.uuid - ) - - try: - schedule_success, retry_timeout = await self._lifecycle_manager.schedule(build_job) - except: - logger.warning( - "[BUILD INCOMPLETE: scheduling] Build ID: %s. Retry restored.", - build_job.repo_build.uuid, - ) - logger.exception("Exception when scheduling job: %s", build_job.repo_build.uuid) - self._current_status = BuildServerStatus.EXCEPTION - self._queue.incomplete(job_item, restore_retry=True, retry_after=WORK_CHECK_TIMEOUT) - return - - if schedule_success: - logger.debug("Marking build %s as scheduled", build_job.repo_build.uuid) - status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid) - await status_handler.set_phase(database.BUILD_PHASE.BUILD_SCHEDULED) - - self._job_count = self._job_count + 1 - logger.debug( - "Build job %s scheduled. Running: %s", - build_job.repo_build.uuid, - self._job_count, - ) - else: - logger.warning( - "[BUILD INCOMPLETE: no schedule] Build ID: %s. Retry restored.", - build_job.repo_build.uuid, - ) - logger.debug( - "All workers are busy for job %s Requeuing after %s seconds.", - build_job.repo_build.uuid, - retry_timeout, - ) - self._queue.incomplete(job_item, restore_retry=True, retry_after=retry_timeout) - - async def _queue_metrics_updater(self): - logger.debug("Initializing queue metrics updater") - while self._current_status == BuildServerStatus.RUNNING: - logger.debug("Writing metrics") - self._queue.update_metrics() - - logger.debug("Metrics going to sleep for 30 seconds") - await asyncio.sleep(30) - - async def _initialize(self, loop, host, websocket_port, controller_port, ssl=None): - self._loop = loop - - # Create the WAMP server. - transport_factory = WampWebSocketServerFactory(self._session_factory, debug_wamp=False) - transport_factory.setProtocolOptions(failByDrop=True) - - # Initialize the controller server and the WAMP server - create_wsgi_server( - self._controller_app, loop=loop, host=host, port=controller_port, ssl=ssl - ) - await loop.create_server(transport_factory, host, websocket_port, ssl=ssl) - - # Initialize the metrics updater - asyncio.create_task(self._queue_metrics_updater()) + server.stop() - # Initialize the work queue checker. - await self._work_checker()