In [7]:
%%file local_run.py
from abc import abstractmethod
from spinner import Spinner
import os
import signal
import logging
from run_status import RunStatus
#from mlflow.entities import RunStatus


_logger = logging.getLogger(__name__)


class SubmittedRun(object):
    """
    Wrapper around an MLflow project run (e.g. a subprocess running an entry point
    command or a Databricks job run) and exposing methods for waiting on and cancelling the run.
    This class defines the interface that the MLflow project runner uses to manage the lifecycle
    of runs launched in different environments (e.g. runs launched locally or on Databricks).
    ``SubmittedRun`` is not thread-safe. That is, concurrent calls to wait() / cancel()
    from multiple threads may inadvertently kill resources (e.g. local processes) unrelated to the
    run.
    NOTE:
        Subclasses of ``SubmittedRun`` must expose a ``run_id`` member containing the
        run's MLflow run ID.
    """
    @abstractmethod
    def wait(self):
        """
        Wait for the run to finish, returning True if the run succeeded and false otherwise. Note
        that in some cases (e.g. remote execution on Databricks), we may wait until the remote job
        completes rather than until the MLflow run completes.
        """
        pass

    @abstractmethod
    def get_status(self):
        """
        Get status of the run.
        """
        pass

    @abstractmethod
    def cancel(self):
        """
        Cancel the run (interrupts the command subprocess, cancels the Databricks run, etc) and
        waits for it to terminate. The MLflow run status may not be set correctly
        upon run cancellation.
        """
        pass

    @property
    @abstractmethod
    def run_id(self):
        pass


class LocalSubmittedRun(SubmittedRun):
    """
    Instance of ``SubmittedRun`` corresponding to a subprocess launched to run an entry point
    command locally.
    """
    def __init__(self, run_id, command_proc):
        super(LocalSubmittedRun, self).__init__()

        self._run_id = run_id
        self.spinner = Spinner()
        try:
            self.spinner.start()
            self.command_proc = command_proc
        finally:
            self.spinner.stop()

    @property
    def run_id(self):
        return self._run_id

    def wait(self):
        return self.command_proc.wait() == 0

    def cancel(self):
        # Interrupt child process if it hasn't already exited
        if self.command_proc.poll() is None:
            # Kill the the process tree rooted at the child if it's the leader of its own process
            # group, otherwise just kill the child
            try:
                if self.command_proc.pid == os.getpgid(self.command_proc.pid):
                    os.killpg(self.command_proc.pid, signal.SIGTERM)
                else:
                    self.command_proc.terminate()
            except OSError:
                # The child process may have exited before we attempted to terminate it, so we
                # ignore OSErrors raised during child process termination
                _logger.info(
                    "Failed to terminate child process (PID %s) corresponding to Arthur.io "
                    "run with ID %s. The process may have already exited.",
                    self.command_proc.pid, self._run_id)
            self.command_proc.wait()

    def _get_status(self):
        exit_code = self.command_proc.poll()
        if exit_code is None:
            return RunStatus.RUNNING
        if exit_code == 0:
            return RunStatus.FINISHED
        return RunStatus.FAILED

    def get_status(self):
        return RunStatus.to_string(self._get_status()),self.command_proc.pid

Overwriting local_run.py


In [None]:
      try:
            # Build the Environment with the driver
            self.spinner.start()
            result = self.environment_driver.build(
                environment_id,
                path=environment_definition_filepath,
                workspace=workspace)
        finally:
self.spinner.stop()

In [3]:
%%file run_status.py
class RunStatus(object):
    """Enum for status of an :py:class:`mlflow.entities.Run`."""
    RUNNING, SCHEDULED, FINISHED, FAILED = range(1, 5)
    _STRING_TO_STATUS = {
        "RUNNING": RUNNING,
        "SCHEDULED": SCHEDULED,
        "FINISHED": FINISHED,
        "FAILED": FAILED,
    }
    _STATUS_TO_STRING = {value: key for key, value in _STRING_TO_STATUS.items()}
    _TERMINATED_STATUSES = set([FINISHED, FAILED])

    @staticmethod
    def from_string(status_str):
        if status_str not in RunStatus._STRING_TO_STATUS:
            raise Exception(
                "Could not get run status corresponding to string %s. Valid run "
                "status strings: %s" % (status_str, list(RunStatus._STRING_TO_STATUS.keys())))
        return RunStatus._STRING_TO_STATUS[status_str]

    @staticmethod
    def to_string(status):
        if status not in RunStatus._STATUS_TO_STRING:
            raise Exception("Could not get string corresponding to run status %s. Valid run "
                            "statuses: %s" % (status, list(RunStatus._STATUS_TO_STRING.keys())))
        return RunStatus._STATUS_TO_STRING[status]

    @staticmethod
    def is_terminated(status):
        return status in RunStatus._TERMINATED_STATUSES

Writing run_status.py


In [None]:
Spinner