Skip to content

Commit

Permalink
State interface definition
Browse files Browse the repository at this point in the history
  • Loading branch information
kleesc committed Aug 27, 2020
1 parent 9aaa2a4 commit ccec945
Showing 1 changed file with 183 additions and 0 deletions.
183 changes: 183 additions & 0 deletions buildman/interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
from abc import ABC, abstractmethod


class BuildJobAlreadyExistsError(Exception):
"""Raised when trying to create a job that already exists."""


class BuildJobDoesNotExistsError(Exception):
"""Raised when trying to delete a job that does not exists."""


class BuildJobExpiredError(Exception):
"""Raised when trying to update a job that has already expired."""


class BuildStateInterface(ABC):
@abstractmethod
def create_job(self, build_id: str, build_metadata) -> str:
"""Creates a new job for a build. Raises an error if the job already exists.
Otherwise, returns a job_id. The job should get expired if the job is not scheduled within
a time frame.
"""

@abstractmethod
def schedule_job(self, execution_id: str, max_startup_time: int):
"""Mark the job as scheduled with execution_id. If the job is not started after
max_startup_time, the job should get expired.
"""

@abstractmethod
def job_complete(self, callback):
"""Invoke the callback to clean up the build execution:
- Update the build phase
- Set the final build job state: complete, cancelled, user_error, internal_error, cancelled
- Clean up any resources used to execute the build: ec2 instances, k8s jobs, ...
Returns False if there is an error, and should be tried again.
"""

@abstractmethod
def start_job(self, job_id: str, max_build_time: int) -> bool:
""" Mark a job as started.
Returns False if the job does not exists, or has already started.
The worker's lifetime should be set to max_build_time"""

@abstractmethod
def update_job_phase(self, job_id: str, phase: str) -> bool:
"""Update the given job phase.
Returns False if the given job does not exists or the job has been cancelled.
"""

@abstractmethod
def heartbeat(self, job_id: str) -> bool:
"""Sends a heartbeat to a job, extending its expiration time.
Returns True if the given job exists and its expiration was updated, False otherwise.
"""

@abstractmethod
def cancel_build(self, build_id: str) -> bool:
"""
Cancels the given build.
"""


# TODO(kleesc): Move this in a separate implementation file
import logging
import time

from data.model import db_transaction
from util.morecollections import AttrDict


logger = logging.getLogger(__name__)



CANCELED_LOCK_PREFIX = slash_join(LOCK_PREFIX, "job-cancelled")
EXPIRED_LOCK_PREFIX = slash_join(LOCK_PREFIX, "job-expired")

JOB_TIMEOUT_SECONDS = 300
MINIMUM_JOB_EXTENSION = timedelta(minutes=1)


class BuildJobState(BuildStateInterface):
def __init__(self, queue, orchestrator):
self._queue = queue
self._orchestrator = orchestrator

self._orchestrator.on_key_change(self._cancel_prefix, self._cancel_callback)
self._orchestrator.on_key_change(self._job_prefix, self._job_callback)

def create_job(self, build_id, build_metadata):
build_uuid = build_job.build_uuid
job_key = self._job_key(build_uuid)

try:
self._orchestrator.set_key()
except KeyError as ke:
pass

def heartbeat(self, job_id: str) -> bool:
try:
job_data = self._orchestrator.get_key(job_id)
except KeyError:
logger.error("Job %s no longer exists in the orchestrator", job_id)
return False
except OrchestratorConnectionError:
logger.error("Failed to connect when attempted to extend job")
return False

build_job_metadata = json.loads(job_data)

max_expiration = datetime.utcfromtimestamp(build_job_metadata["max_expiration"])
max_expiration_remaining = max_expiration - datetime.utcnow()
max_expiration_sec = max(0, int(max_expiration_remaining.total_seconds()))

if max_expiration_sec == 0:
logger.error("Job %s expired", job_id)
return False

self._queue.extend_processing(
build_job_metadata["job_queue_item"],
seconds_from_now=JOB_TIMEOUT_SECONDS,
minimum_extension=MINIMUM_JOB_EXTENSION,
)

ttl = min(self.heartbeat_period_sec * 2, max_expiration_sec)
payload = {
"job_queue_item": build_job_metadata["job_queue_item"],
"max_expiration": build_job_metadata["max_expiration"],
"had_heartbeat": True,
}

try:
self._orchestrator.set_key(
self._job_key(build_job), json.dumps(payload), expiration=ttl
)
except OrchestratorConnectionError:
logger.exception(
"Could not update heartbeat for job as the orchestrator is not available"
)
time.sleep(ORCHESTRATOR_UNAVAILABLE_SLEEP_DURATION)

def cancel_build(self, build_uuid):
cancel_key = slash_join(CANCEL_PREFIX, build_uuid)
try:
self._orchestrator.set_key_sync(cancel_key, build_uuid, expiration=60)
except KeyError as ke:
logger.error("Failed to cancel action to redis with build id %s", build_uuid)
return False

def _cancel_callback(self, key_change):
"""Callback to cleanup the redis resources related to a build."""
if key_change.event not in (KeyEvent.CREATE, KeyEvent.SET):
return

build_uuid = key_change.value
build_info = self._build_uuid_to_info.get(build_uuid, None)
if build_info is None:
logger.debug('No build info for "%s" job %s', key_change.event, build_uuid)
return False

job_key = self._job_key(build_uuid)
build_job = self._job_key

lock_key = slash_join(self._canceled_lock_prefix, build_uuid, build_info.execution_id)
with self._orchestrator.lock(lock_key):
# Delete the metric key
self._orchestrator.delete_key(self._metric_key(build_uuid))
# Delete the job key
self._orchestrator.delete_key(slash_join(self._job_prefix, build_uuid))

def _build_job_from_id(self, job_id):
try:
job_data = self._orchestrator.get_key(job_id)
except KeyError:
logger.debug("Job %s no longer exists in the orchestrator", build_job.build_uuid)
return
except OrchestratorConnectionError:
logger.exception("failed to connect when attempted to extend job")

build_job_metadata = json.loads(job_data)
build_job = BuildJob(AttrDict(job_metadata["job_queue_item"]))
return build_job

0 comments on commit ccec945

Please sign in to comment.