Skip to content

Commit

Permalink
S
Browse files Browse the repository at this point in the history
  • Loading branch information
kleesc committed Sep 2, 2020
1 parent ed67fbd commit 4c1b8d4
Show file tree
Hide file tree
Showing 2 changed files with 270 additions and 0 deletions.
260 changes: 260 additions & 0 deletions buildman/manager/newephemeral.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
import logging
import json
import time

from prometheus_client import Counter, Histogram

from util.security.registry_jwt import generate_bearer_token


logger = logging.getLogger(__name__)


build_fallback = Counter(
"quay_build_fallback_total", "number of times a build has been retried", labelnames=["executor"]
)
build_ack_duration = Histogram(
"quay_build_ack_duration_seconds",
"seconds taken for the builder to acknowledge a queued build",
labelnames=["executor"],
)
build_duration = Histogram(
"quay_build_duration_seconds",
"seconds taken for a build's execution",
labelnames=["executor", "job_status"],
)

JOB_PREFIX = "building/"
LOCK_PREFIX = "lock/"
REALM_PREFIX = "realm/"
CANCEL_PREFIX = "cancel/"
METRIC_PREFIX = "metric/"

CANCELED_LOCK_PREFIX = slash_join(LOCK_PREFIX, "job-cancelled")
EXPIRED_LOCK_PREFIX = slash_join(LOCK_PREFIX, "job-expired")

EPHEMERAL_API_TIMEOUT = 20
EPHEMERAL_SETUP_TIMEOUT = 500

RETRY_IMMEDIATELY_SLEEP_DURATION = 0
TOO_MANY_WORKERS_SLEEP_DURATION = 10


def generate_build_registration_token(hostname, instance_keys, build_id, build_job_id, max_startup_time):
access = {
"type": BUILD_ACCESS_TYPE,
"build_id": build_id,
"build_job_id": build_job_id,
"max_startup_time": max_startup_time
}
audience = hostname
token = generate_bearer_token(
audience,
BUILD_REGISTRATION_SUBJECT,
{},
access,
BUILD_REGISTRATION_TOKEN_VALIDITY_LIFETIME_S,
instance_keys
)
return token


class EphemeralBuilderManager(BuildStateInterface):
EXECUTORS = {
"popen": PopenExecutor,
"ec2": EC2Executor,
"kubernetes": KubernetesExecutor,
}

def __init__(self, registry_hostname, manager_hostname, queue, build_logs, user_files):
self._registry_hostname = registry_hostname
self._manager_hostname = manager_hostname
self._queue = queue
self._build_logs = build_logs
self._user_files = user_files

self._ordered_executors = []
self._executor_name_to_executor = {}

self._manager_config = None
self._orchestrator = None

def initialize(self, manager_config):
self._manager_config = manager_config
if manager_config.get("EXECUTORS"):
for executor_config in manager_config["EXECUTORS"]:
self._load_executor(executor_config.get("EXECUTOR"), executor_config)
else:
self._load_executor(
manager_config.get("EXECUTOR"), manager_config.get("EXECUTOR_CONFIG")
)

logger.debug("calling orchestrator_from_config")
self._orchestrator = orchestrator_from_config(manager_config)

logger.debug("setting on_key_change callbacks for job, cancel, realm")
self._orchestrator.on_key_change(self._job_prefix, self._job_callback)
self._orchestrator.on_key_change(self._cancel_prefix, self._cancel_callback)

def create_job(self, build_id, build_metadata):
raise NotImplementedError

def schedule_job(execution_id, max_startup_time):
raise NotImplementedError

def job_complete(self, callback):
raise NotImplementedError

def start_job(self, job_id, max_build_time):
raise NotImplementedError

def update_job_phase(self, job_id, phase):
raise NotImplementedError

def heartbeat(self, job_id):
raise NotImplementedError

def cancel_build(self, build_id):
raise NotImplementedError

def _work_checker(self):
logger.debug("Initializing work checker")
while True:
with database.CloseForLongOperation(app.config):
time.sleep(WORK_CHECK_TIMEOUT)

logger.debug("Checking for more work from build queue")

processing_time = self._lifecycle_manager.overall_setup_time() + SETUP_LEEWAY_SECONDS
job_item = self._queue.get(processing_time=processing_time, ordering_required=True)
if job_item is None:
logger.debug(
"No additional work found. Going to sleep for %s seconds", WORK_CHECK_TIMEOUT
)
continue

try:
build_job = BuildJob(job_item)
except BuildJobLoadException as irbe:
logger.warning(
"[BUILD INCOMPLETE: job load exception] Job data: %s. No retry restore.",
job_item.body,
)
logger.exception(irbe)
self._queue.incomplete(job_item, restore_retry=False)
continue

logger.debug(
"Checking for an avaliable worker for build job %s", build_job.repo_build.uuid
)

try:
schedule_success, retry_timeout = self._lifecycle_manager.schedule(build_job)
except:
logger.warning(
"[BUILD INCOMPLETE: scheduling] Build ID: %s. Retry restored.",
build_job.repo_build.uuid,
)
logger.exception("Exception when scheduling job: %s", build_job.repo_build.uuid)
self._current_status = BuildServerStatus.EXCEPTION
self._queue.incomplete(job_item, restore_retry=True, retry_after=WORK_CHECK_TIMEOUT)
return

if schedule_success:
logger.debug("Marking build %s as scheduled", build_job.repo_build.uuid)
status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid)
status_handler.set_phase(database.BUILD_PHASE.BUILD_SCHEDULED)

self._job_count = self._job_count + 1
logger.debug(
"Build job %s scheduled. Running: %s",
build_job.repo_build.uuid,
self._job_count,
)
else:
logger.warning(
"[BUILD INCOMPLETE: no schedule] Build ID: %s. Retry restored.",
build_job.repo_build.uuid,
)
logger.debug(
"All workers are busy for job %s Requeuing after %s seconds.",
build_job.repo_build.uuid,
retry_timeout,
)
self._queue.incomplete(job_item, restore_retry=True, retry_after=retry_timeout)



def overall_setup_time(self):
return EPHEMERAL_SETUP_TIMEOUT

def job_heartbeat(self, job_id):
self._state_manager.heartbeat(job_id)

def schedule(self):
# Check if there are worker slots available by checking the number of jobs in the orchestrator
allowed_worker_count = self._manager_config.get("ALLOWED_WORKER_COUNT", 1)

def timeout_build(self, job_id):
pass

def job_completed(self):
pass

def _write_duration_metric(self, metric, realm, job_status=None):
try:
metric_data = self._orchestrator.get_key(self._metric_key(realm))
parsed_metric_data = json.loads(metric_data)
start_time = parsed_metric_data["start_time"]
executor = parsed_metric_data.get("executor_name", "unknown")
if job_status is not None:
metric.labels(executor, str(job_status)).observe(time.time() - start_time)
else:
metric.labels(executor).observe(time.time() - start_time)
except Exception:
logger.exception("Could not write metric for realm %s", realm)

def _job_complete_callback(self, build_job, job_status):
pass


def _job_heartbeat(self, build_job):
self._queue.extend_processing(
build_job.job_item,
seconds_from_now=JOB_TIMEOUT_SECONDS,
minimum_extension=MINIMUM_JOB_EXTENSION,
)

async def _job_complete(self, build_job, job_status, executor_name=None, update_phase=False):
if job_status == BuildJobResult.INCOMPLETE:
logger.warning(
"[BUILD INCOMPLETE: job complete] Build ID: %s. No retry restore.",
build_job.repo_build.uuid,
)
self._queue.incomplete(build_job.job_item, restore_retry=False, retry_after=30)
else:
self._queue.complete(build_job.job_item)

# Update the trigger failure tracking (if applicable).
if build_job.repo_build.trigger is not None:
model.build.update_trigger_disable_status(
build_job.repo_build.trigger, RESULT_PHASES[job_status]
)

if update_phase:
status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid)
await status_handler.set_phase(RESULT_PHASES[job_status])

self._job_count = self._job_count - 1

if self._current_status == BuildServerStatus.SHUTDOWN and not self._job_count:
self._shutdown_event.set()

async def _queue_metrics_updater(self):
logger.debug("Initializing queue metrics updater")
while self._current_status == BuildServerStatus.RUNNING:
logger.debug("Writing metrics")
self._queue.update_metrics()

logger.debug("Metrics going to sleep for 30 seconds")
await asyncio.sleep(30)
10 changes: 10 additions & 0 deletions buildman/rpc_test_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import grpc

from buildman.buildman_pb import buildman_pb2_grpc
from buildman.buildman_pb import buildman_pb2

if __name__ == "__main__":
with grpc.insecure_channel("localhost:50051") as channel:
stub = buildman_pb2_grpc.BuildManagerStub(channel)
pong = stub.Ping(buildman_pb2.PingRequest())
print(pong)

0 comments on commit 4c1b8d4

Please sign in to comment.