Skip to content
This repository has been archived by the owner on Aug 17, 2023. It is now read-only.

Fairing with Azure - TrainJob fails when provided with argument "pod_spec_mutators" - TypeError: 'NoneType' object is not subscriptable #562

Open
pshah16 opened this issue Nov 2, 2021 · 1 comment
Labels

Comments

@pshah16
Copy link

pshah16 commented Nov 2, 2021

/kind bug

What steps did you take and what happened:
Running Kubeflow Fairing on Microsoft Azure.

When trying to execute the TrainJob command in the following notebook with 'pod_spec_mutator' argument, it fails with the following error message:

Notebook:
https://github.com/kubeflow/fairing/blob/master/examples/train_job_api/main.ipynb

TypeError: 'NoneType' object is not subscriptable

TypeError                                 Traceback (most recent call last)
<ipython-input-24-a50f7ea4d549> in <module>
      1 job = TrainJob(train, docker_registry=DOCKER_REGISTRY, input_files=["requirements.txt"],base_docker_image = BASE_DOCKER_IMAGE, backend=BackendClass(build_context_source=BuildContext), 
      2               pod_spec_mutators=[get_resource_mutator(cpu=1, memory=2)])
----> 3 job.submit()

/opt/conda/lib/python3.7/site-packages/kubeflow/fairing/ml_tasks/tasks.py in submit(self)
     82         deployer = self._backend.get_training_deployer(
     83             pod_spec_mutators=self._pod_spec_mutators)
---> 84         return deployer.deploy(self.pod_spec)
     85 
     86 

/opt/conda/lib/python3.7/site-packages/kubeflow/fairing/deployers/job/job.py in deploy(self, pod_spec)
     88         self.labels['fairing-id'] = self.job_id
     89         for fn in self.pod_spec_mutators:
---> 90             fn(self.backend, pod_spec, self.namespace)
     91         pod_template_spec = self.generate_pod_template_spec(pod_spec)
     92         pod_template_spec.spec.restart_policy = 'Never'

/opt/conda/lib/python3.7/site-packages/kubeflow/fairing/cloud/azure.py in add_azure_files(kube_manager, pod_spec, namespace)
    207 # Mount Azure Files shared folder so the pod can access its files with a local path
    208 def add_azure_files(kube_manager, pod_spec, namespace):
--> 209     context_hash = pod_spec.containers[0].args[1].split(':')[-1]
    210     secret_name = constants.AZURE_STORAGE_CREDS_SECRET_NAME_PREFIX + context_hash.lower()
    211     if not kube_manager.secret_exists(secret_name, namespace):

TypeError: 'NoneType' object is not subscriptable

What did you expect to happen:
TrainJob should have run successfully without error.

Environment:

Fairing version: kubeflow-fairing==1.0.2
Kubeflow version: Build: dev_local | dashboard: v.0.0.2- | Isolation-mode: multi-user
Kubernetes version: (use kubectl version): v1.21.2
OS (e.g. from /etc/os-release): Ubuntu 20.04 LTS (Focal Fossa)

@pshah16
Copy link
Author

pshah16 commented Nov 2, 2021

Here is the solution to fix this issue..

Replacing backends.py with the following changes in
kubeflow/fairing/backends/backends.py

import abc
import six
import sys
import logging

from kubeflow.fairing import utils
from kubeflow.fairing.builders.docker.docker import DockerBuilder
from kubeflow.fairing.builders.cluster import gcs_context
from kubeflow.fairing.builders.cluster.cluster import ClusterBuilder
from kubeflow.fairing.builders.cluster import s3_context
from kubeflow.fairing.builders.cluster import cos_context
from kubeflow.fairing.builders.cluster import azurestorage_context
from kubeflow.fairing.builders.append.append import AppendBuilder
from kubeflow.fairing.deployers.gcp.gcp import GCPJob
from kubeflow.fairing.deployers.job.job import Job
from kubeflow.fairing.deployers.serving.serving import Serving
from kubeflow.fairing.cloud import aws
from kubeflow.fairing.cloud import azure
from kubeflow.fairing.cloud import gcp
from kubeflow.fairing.cloud import docker
from kubeflow.fairing.ml_tasks import utils as ml_tasks_utils
from kubeflow.fairing.constants import constants

logger = logging.getLogger(__name__)


@six.add_metaclass(abc.ABCMeta)
class BackendInterface:
    """ Backend interface.
    Creating a builder instance or a deployer to be used with a traing job or a serving job
    for the given backend.
    And get the approriate base container or docker registry for the current environment.
    """

    @abc.abstractmethod
    def get_builder(self, preprocessor, base_image, registry):
        """Creates a builder instance with right config for the given backend

        :param preprocessor: Preprocessor to use to modify inputs
        :param base_image: Base image to use for this builder
        :param registry: Registry to push image to. Example: gcr.io/kubeflow-images
        :raises NotImplementedError: not implemented exception

        """

        raise NotImplementedError('BackendInterface.get_builder')

    @abc.abstractmethod
    def get_training_deployer(self, pod_spec_mutators=None):
        """Creates a deployer to be used with a training job

        :param pod_spec_mutators: list of functions that is used to mutate the podsspec.
                                  e.g. fairing.cloud.gcp.add_gcp_credentials_if_exists
                                  This can used to set things like volumes and security context.
                                  (Default value = None)
        :raises NotImplementedError: not implemented exception

        """

        raise NotImplementedError('BackendInterface.get_training_deployer')

    @abc.abstractmethod
    def get_serving_deployer(self, model_class):
        """Creates a deployer to be used with a serving job

        :param model_class: the name of the class that holds the predict function.
        :raises NotImplementedError: not implemented exception

        """

        raise NotImplementedError('BackendInterface.get_serving_deployer')

    def get_base_contanier(self):
        """Returns the approriate base container for the current environment

        :returns: base image

        """

        py_version = ".".join([str(x) for x in sys.version_info[0:3]])
        base_image = 'registry.hub.docker.com/library/python:{}'.format(
            py_version)
        return base_image

    def get_docker_registry(self):
        """Returns the approriate docker registry for the current environment

        :returns: None

        """

        return None


class KubernetesBackend(BackendInterface):
    """ Use to create a builder instance and create a deployer to be used with a traing job or
    a serving job for the Kubernetes.
    """
    def __init__(self, namespace=None, build_context_source=None):
        if not namespace and not utils.is_running_in_k8s():
            logger.warning("Can't determine namespace automatically. "
                           "Using 'default' namespace but recomend to provide namespace explicitly"
                           ". Using 'default' namespace might result in unable to mount some "
                           "required secrets in cloud backends.")
        self._namespace = namespace or utils.get_default_target_namespace()
        self._build_context_source = build_context_source

    def get_builder(self, preprocessor, base_image, registry, needs_deps_installation=True,  # pylint:disable=arguments-differ
                    pod_spec_mutators=None):
        """Creates a builder instance with right config for the given Kubernetes

        :param preprocessor: Preprocessor to use to modify inputs
        :param base_image: Base image to use for this job
        :param registry: Registry to push image to. Example: gcr.io/kubeflow-images
        :param needs_deps_installation:  need depends on installation(Default value = True)
        :param pod_spec_mutators: list of functions that is used to mutate the podsspec.
                                  e.g. fairing.cloud.gcp.add_gcp_credentials_if_exists
                                  This can used to set things like volumes and security context.
                                  (Default value =None)

        """
        if not needs_deps_installation:
            return AppendBuilder(preprocessor=preprocessor,
                                 base_image=base_image,
                                 registry=registry)
        elif utils.is_running_in_k8s():
            return ClusterBuilder(preprocessor=preprocessor,
                                  base_image=base_image,
                                  registry=registry,
                                  pod_spec_mutators=pod_spec_mutators,
                                  namespace=self._namespace,
                                  context_source=self._build_context_source)
        elif ml_tasks_utils.is_docker_daemon_exists():
            return DockerBuilder(preprocessor=preprocessor,
                                 base_image=base_image,
                                 registry=registry)
        else:
            # TODO (karthikv2k): Add more info on how to reolve this issue
            raise RuntimeError(
                "Not able to guess the right builder for this job!")

    def get_training_deployer(self, pod_spec_mutators=None):
        """Creates a deployer to be used with a training job for the Kubernetes

        :param pod_spec_mutators: list of functions that is used to mutate the podsspec.
            (Default value = None)
        :returns: job for handle all the k8s' template building for a training

        """
        return Job(self._namespace, pod_spec_mutators=pod_spec_mutators)

    def get_serving_deployer(self, model_class, service_type='ClusterIP', # pylint:disable=arguments-differ
                             pod_spec_mutators=None):
        """Creates a deployer to be used with a serving job for the Kubernetes

        :param model_class: the name of the class that holds the predict function.
        :param service_type: service type (Default value = 'ClusterIP')
        :param pod_spec_mutators: list of functions that is used to mutate the podsspec.
            (Default value = None)

        """
        return Serving(model_class, namespace=self._namespace, service_type=service_type,
                       pod_spec_mutators=pod_spec_mutators)


class GKEBackend(KubernetesBackend):
    """ Use to create a builder instance and create a deployer to be used with a traing job
    or a serving job for the GKE backend.
    And get the approriate docker registry for GKE.
    """

    def __init__(self, namespace=None, build_context_source=None):
        super(GKEBackend, self).__init__(namespace, build_context_source)
        self._build_context_source = gcs_context.GCSContextSource(
            namespace=self._namespace)

    def get_builder(self, preprocessor, base_image, registry, needs_deps_installation=True,
                    pod_spec_mutators=None):
        """Creates a builder instance with right config for GKE

        :param preprocessor: Preprocessor to use to modify inputs
        :param base_image: Base image to use for this job
        :param registry: Registry to push image to. Example: gcr.io/kubeflow-images
        :param needs_deps_installation:  need depends on installation(Default value = True)
        :param pod_spec_mutators: list of functions that is used to mutate the podsspec.
                                  e.g. fairing.cloud.gcp.add_gcp_credentials_if_exists
                                  This can used to set things like volumes and security context.
                                  (Default value =None)

        """

        pod_spec_mutators = pod_spec_mutators or []
        pod_spec_mutators.append(gcp.add_gcp_credentials_if_exists)

        if not needs_deps_installation:
            return AppendBuilder(preprocessor=preprocessor,
                                 base_image=base_image,
                                 registry=registry)
        elif (utils.is_running_in_k8s() or
              not ml_tasks_utils.is_docker_daemon_exists()):
            return ClusterBuilder(preprocessor=preprocessor,
                                  base_image=base_image,
                                  registry=registry,
                                  pod_spec_mutators=pod_spec_mutators,
                                  namespace=self._namespace,
                                  context_source=self._build_context_source)
        elif ml_tasks_utils.is_docker_daemon_exists():
            return DockerBuilder(preprocessor=preprocessor,
                                 base_image=base_image,
                                 registry=registry)
        else:
            msg = ["Not able to guess the right builder for this job!"]
            if not utils.is_running_in_k8s():
                msg.append(" Also If you are using 'sudo' to access docker in your system you can"
                           " solve this problem by adding your username to the docker group. "
                           "Reference: https://docs.docker.com/install/linux/linux-postinstall/"
                           "#manage-docker-as-a-non-root-user You need to logout and login to "
                           "get change activated.")
            message = " ".join(msg)
            raise RuntimeError(message)

    def get_training_deployer(self, pod_spec_mutators=None):
        """Creates a deployer to be used with a training job for GKE

        :param pod_spec_mutators: list of functions that is used to mutate the podsspec.
            (Default value = None)
        :returns: job for handle all the k8s' template building for a training

        """
        pod_spec_mutators = pod_spec_mutators or []
        pod_spec_mutators.append(gcp.add_gcp_credentials_if_exists)
        return Job(namespace=self._namespace, pod_spec_mutators=pod_spec_mutators)

    def get_serving_deployer(self, model_class, service_type='ClusterIP',
                             pod_spec_mutators=None):
        """Creates a deployer to be used with a serving job for GKE

        :param model_class: the name of the class that holds the predict function.
        :param service_type: service type (Default value = 'ClusterIP')
        :param pod_spec_mutators: list of functions that is used to mutate the podsspec.
            (Default value = None)

        """
        return Serving(model_class, namespace=self._namespace, service_type=service_type,
                       pod_spec_mutators=pod_spec_mutators)

    def get_docker_registry(self):
        """Returns the approriate docker registry for GKE

        :returns: docker registry

        """
        return gcp.get_default_docker_registry()


class AWSBackend(KubernetesBackend):
    """ Use to create a builder instance and create a deployer to be used with a traing job
    or a serving job for the AWS backend.
    """

    def __init__(self, namespace=None, build_context_source=None):
        build_context_source = build_context_source or s3_context.S3ContextSource()
        super(AWSBackend, self).__init__(namespace, build_context_source)

    def get_builder(self, preprocessor, base_image, registry, needs_deps_installation=True,
                    pod_spec_mutators=None):
        """Creates a builder instance with right config for AWS

        :param preprocessor: Preprocessor to use to modify inputs
        :param base_image: Base image to use for this job
        :param registry: Registry to push image to. Example: gcr.io/kubeflow-images
        :param needs_deps_installation:  need depends on installation(Default value = True)
        :param pod_spec_mutators: list of functions that is used to mutate the podsspec.
                                  e.g. fairing.cloud.gcp.add_gcp_credentials_if_exists
                                  This can used to set things like volumes and security context.
                                  (Default value =None)

        """
        pod_spec_mutators = pod_spec_mutators or []
        pod_spec_mutators.append(aws.add_aws_credentials_if_exists)
        if aws.is_ecr_registry(registry):
            pod_spec_mutators.append(aws.add_ecr_config)
            aws.create_ecr_registry(registry, constants.DEFAULT_IMAGE_NAME)
        return super(AWSBackend, self).get_builder(preprocessor,
                                                   base_image,
                                                   registry,
                                                   needs_deps_installation,
                                                   pod_spec_mutators)

    def get_training_deployer(self, pod_spec_mutators=None):
        """Creates a deployer to be used with a training job for AWS

        :param pod_spec_mutators: list of functions that is used to mutate the podsspec.
            (Default value = None)
        :returns: job for handle all the k8s' template building for a training

        """
        pod_spec_mutators = pod_spec_mutators or []
        pod_spec_mutators.append(aws.add_aws_credentials_if_exists)
        return Job(namespace=self._namespace, pod_spec_mutators=pod_spec_mutators)

    def get_serving_deployer(self, model_class, service_type='ClusterIP', # pylint:disable=arguments-differ
                             pod_spec_mutators=None):
        """Creates a deployer to be used with a serving job for AWS

        :param model_class: the name of the class that holds the predict function.
        :param service_type: service type (Default value = 'ClusterIP')
        :param pod_spec_mutators: list of functions that is used to mutate the podsspec.
            (Default value = None)

        """
        return Serving(model_class, namespace=self._namespace, service_type=service_type,
                       pod_spec_mutators=pod_spec_mutators)


class IBMCloudBackend(KubernetesBackend):
    """ Use to create a builder instance and create a deployer to be used with a traing job
    or a serving job for the IBM Cloud backend.
    """

    def __init__(self, namespace=None, cos_endpoint_url=None, build_context_source=None):
        build_context_source = build_context_source or\
            cos_context.COSContextSource(namespace=namespace, cos_endpoint_url=cos_endpoint_url)
        super(IBMCloudBackend, self).__init__(namespace, build_context_source)

    def get_builder(self, preprocessor, base_image, registry, needs_deps_installation=True,
                    pod_spec_mutators=None):
        """Creates a builder instance with right config for IBM Cloud

        :param preprocessor: Preprocessor to use to modify inputs
        :param base_image: Base image to use for this job
        :param registry: Registry to push image to. Example: gcr.io/kubeflow-images
        :param needs_deps_installation:  need depends on installation(Default value = True)
        :param pod_spec_mutators: list of functions that is used to mutate the podsspec.
                                  e.g. fairing.cloud.gcp.add_gcp_credentials_if_exists
                                  This can used to set things like volumes and security context.
                                  (Default value =None)

        """
        pod_spec_mutators = pod_spec_mutators or []
        return super(IBMCloudBackend, self).get_builder(preprocessor,
                                                        base_image,
                                                        registry,
                                                        needs_deps_installation,
                                                        pod_spec_mutators)

    def get_training_deployer(self, pod_spec_mutators=None):
        """Creates a deployer to be used with a training job for IBM Cloud

        :param pod_spec_mutators: list of functions that is used to mutate the podsspec.
            (Default value = None)
        :returns: job for handle all the k8s' template building for a training

        """
        pod_spec_mutators = pod_spec_mutators or []
        return Job(namespace=self._namespace, pod_spec_mutators=pod_spec_mutators)

    def get_serving_deployer(self, model_class, service_type='ClusterIP', # pylint:disable=arguments-differ
                             pod_spec_mutators=None):
        """Creates a deployer to be used with a serving job for IBM Cloud

        :param model_class: the name of the class that holds the predict function.
        :param service_type: service type (Default value = 'ClusterIP')
        :param pod_spec_mutators: list of functions that is used to mutate the podsspec.
            (Default value = None)

        """
        return Serving(model_class, namespace=self._namespace, service_type=service_type,
                       pod_spec_mutators=pod_spec_mutators)


class AzureBackend(KubernetesBackend):
    """ Use to create a builder instance and create a deployer to be used with a traing job or
    a serving job for the Azure backend.
    """
    def __init__(self, namespace=None, build_context_source=None):
        build_context_source = (
            build_context_source or azurestorage_context.StorageContextSource(namespace=namespace)
        )
        super(AzureBackend, self).__init__(namespace, build_context_source)

    def get_builder(self, preprocessor, base_image, registry,
                    needs_deps_installation=True, pod_spec_mutators=None):
        """Creates a builder instance with right config for Azure

        :param preprocessor: Preprocessor to use to modify inputs
        :param base_image: Base image to use for this job
        :param registry: Registry to push image to. Example: gcr.io/kubeflow-images
        :param needs_deps_installation:  need depends on installation(Default value = True)
        :param pod_spec_mutators: list of functions that is used to mutate the podsspec.
                                  e.g. fairing.cloud.gcp.add_gcp_credentials_if_exists
                                  This can used to set things like volumes and security context.
                                  (Default value =None)

        """
        if pod_spec_mutators:
            mutators = pod_spec_mutators[:]
        else:
            mutators = []
        if not azure.is_acr_registry(registry):
            raise Exception("'{}' is not an Azure Container Registry".format(registry))
        mutators.append(azure.add_acr_config)
        mutators.append(azure.add_azure_files)
        return super(AzureBackend, self).get_builder(preprocessor,
                                                     base_image,
                                                     registry,
                                                     needs_deps_installation,
                                                     mutators)

class KubeflowBackend(KubernetesBackend):
    """Kubeflow backend refer to KubernetesBackend """

    def __init__(self, namespace=None, build_context_source=None):
        if not namespace and not utils.is_running_in_k8s():
            namespace = "kubeflow"
        super(KubeflowBackend, self).__init__(namespace, build_context_source)


class KubeflowGKEBackend(GKEBackend):
    """Kubeflow for GKE backend refer to GKEBackend """

    def __init__(self, namespace=None, build_context_source=None):
        if not namespace and not utils.is_running_in_k8s():
            namespace = "kubeflow"
        super(KubeflowGKEBackend, self).__init__(
            namespace, build_context_source)


class KubeflowAWSBackend(AWSBackend):
    """Kubeflow for AWS backend refer to AWSBackend """

    def __init__(self, namespace=None, build_context_source=None): # pylint:disable=useless-super-delegation
        super(KubeflowAWSBackend, self).__init__(
            namespace, build_context_source)


class KubeflowAzureBackend(AzureBackend):
    """Kubeflow for Azure backend refer to AzureBackend """

    def __init__(self, namespace=None, build_context_source=None): # pylint:disable=useless-super-delegation
        super(KubeflowAzureBackend, self).__init__(namespace, build_context_source)


class GCPManagedBackend(BackendInterface):
    """ Use to create a builder instance and create a deployer to be used with a traing job
    or a serving job for the GCP.
    """
    def __init__(self, project_id=None, region=None, training_scale_tier=None,
                 job_config=None, use_stream_logs=False):
        """Creates an instance of GCPManagedBackend

        :param project_id: Google Cloud project ID to use.
        :param region: region in which the job has to be deployed.
            Ref: https://cloud.google.com/compute/docs/regions-zones/
        :param training_scale_tier: machine type to use for the job.
            Ref: https://cloud.google.com/ml-engine/docs/tensorflow/machine-types
        :param job_config: Custom job configuration options. If an option is specified
            in the job_config and as a top-level parameter, the parameter overrides
            the value in the job_config.
            Ref: https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs
        :param use_stream_logs: If true, when deploying a job, output the job stream
            log until the job ends.
        """

        super(GCPManagedBackend, self).__init__()
        self._project_id = project_id or gcp.guess_project_name()
        self._region = region or 'us-central1'
        self._training_scale_tier = training_scale_tier or 'BASIC'
        self._job_config = job_config
        self._use_stream_logs = use_stream_logs

    def get_builder(self, preprocessor, base_image, registry, needs_deps_installation=True,  # pylint:disable=arguments-differ
                    pod_spec_mutators=None):
        """Creates a builder instance with right config for GCP

        :param preprocessor: Preprocessor to use to modify inputs
        :param base_image: Base image to use for this job
        :param registry: Registry to push image to. Example: gcr.io/kubeflow-images
        :param needs_deps_installation:  need depends on installation(Default value = True)
        :param pod_spec_mutators: list of functions that is used to mutate the podsspec.
                                  e.g. fairing.cloud.gcp.add_gcp_credentials_if_exists
                                  This can used to set things like volumes and security context.
                                  (Default value =None)

        """
        pod_spec_mutators = pod_spec_mutators or []
        pod_spec_mutators.append(gcp.add_gcp_credentials_if_exists)
        pod_spec_mutators.append(docker.add_docker_credentials_if_exists)
        # TODO (karthikv2k): Add cloud build as the deafult
        # once https://github.com/kubeflow/fairing/issues/145 is fixed
        if not needs_deps_installation:
            return AppendBuilder(preprocessor=preprocessor,
                                 base_image=base_image,
                                 registry=registry)
        elif utils.is_running_in_k8s():
            return ClusterBuilder(preprocessor=preprocessor,
                                  base_image=base_image,
                                  registry=registry,
                                  pod_spec_mutators=pod_spec_mutators,
                                  context_source=gcs_context.GCSContextSource(
                                      namespace=utils.get_default_target_namespace()))
        elif ml_tasks_utils.is_docker_daemon_exists():
            return DockerBuilder(preprocessor=preprocessor,
                                 base_image=base_image,
                                 registry=registry)
        else:
            # TODO (karthikv2k): Add more info on how to reolve this issue
            raise RuntimeError(
                "Not able to guess the right builder for this job!")

    def get_training_deployer(self, pod_spec_mutators=None):
        """Creates a deployer to be used with a training job for GCP

        :param pod_spec_mutators: list of functions that is used to mutate the podsspec.
            (Default value = None)
        :returns: job for handle all the k8s' template building for a training

        """
        return GCPJob(self._project_id, self._region, self._training_scale_tier,
                      self._job_config, self._use_stream_logs)

    def get_serving_deployer(self, model_class, pod_spec_mutators=None): # pylint:disable=arguments-differ
        """Creates a deployer to be used with a serving job for GCP

        :param model_class: the name of the class that holds the predict function.
        :param service_type: service type (Default value = 'ClusterIP')
        :param pod_spec_mutators: list of functions that is used to mutate the podsspec.
            (Default value = None)

        """
        # currently GCP serving deployer doesn't implement deployer interface
        raise NotImplementedError(
            "GCP managed serving is not integrated into high level API yet.")

    def get_docker_registry(self):
        """Returns the approriate docker registry for GCP

        :returns: docker registry

        """
        return gcp.get_default_docker_registry()

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

1 participant