-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support disjoint DAGs in MLP #6743
Conversation
Signed-off-by: Liang Zhang <liang.zhang@databricks.com>
_STEP_NAMES = ["ingest", "split", "train", "transform", "evaluate"] | ||
# _STEP_NAMES must contain all step names that are expected to be executed when | ||
# `pipeline.run(step=None)` is called | ||
_STEP_NAMES = ["ingest", "split", "train", "transform", "evaluate", "register"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Existing test already covers testing self._get_default_step()
: test_pipelines_execution_directory_is_managed_as_expected()
calls p.run()
and asserts outputs are found for all steps in _STEP_NAMES
.
I don't understand why ci/circleci: build_doc is failing... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with minor comments! Thanks, @liangz1 !
_SUBGRAPH_INDICES_MAP = { | ||
_TRAIN_DAG_NAME: ( | ||
_PIPELINE_STEPS.index(_TRAIN_DAG_STEPS[0]), | ||
_PIPELINE_STEPS.index(_TRAIN_DAG_STEPS[-1]) + 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to not "+1" here to respect what index
returns for the last step for each DAG, then on L222 return return self._steps[s:e+1]
to include the last step for each graph.
@@ -140,7 +143,7 @@ def clean(self, step: str = None) -> None: | |||
|
|||
def _get_step(self, step_name) -> BaseStep: | |||
"""Returns a step class object from the pipeline.""" | |||
steps = self._steps or self._resolve_pipeline_steps() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason to remove self._resolve_pipeline_steps()
? Does it cause any harm?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(This is related to the first item in my PR description) self._resolve_pipeline_steps()
will return new BaseStep
instances. This line means self._steps
may contain stale information so _get_step()
also tries to get the step info from the config file. This PR changes the logic to make self._steps
always up-to-date (by reloading before each pipeline action) so we don't reload elsewhere (here).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Leaving the code as is should not cause any harm either, since self._steps
is always populated at this point. But it could be a stopgap in the future if one misses to update the steps.
for step_class in _SCORING_DAG_STEPS: | ||
_STEPS_SUBGRAPH_MAP[step_class] = _SCORING_DAG_NAME |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liangz1 step_class
here is referencing PredictStep
(the last element in _SCORING_DAG_STEPS
), which caused <unknown>:1: WARNING: py:class reference target not found: mlflow.pipelines.steps.predict.PredictStep
for step_class in _SCORING_DAG_STEPS: | |
_STEPS_SUBGRAPH_MAP[step_class] = _SCORING_DAG_NAME | |
for _step_class in _TRAIN_DAG_STEPS: |
should fix the error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for explaining! This looks tricky to me :D
Signed-off-by: Liang Zhang <liang.zhang@databricks.com>
Signed-off-by: Liang Zhang <liang.zhang@databricks.com>
A pipeline can have multiple disjoint DAGs. Each DAG is composed of one or more steps.
Related Issues/PRs
What changes are proposed in this pull request?
A pipeline can have multiple disjoint DAGs. Each DAG is composed of one or more steps.
self._steps
from config files at the beginning of__init__()
,run()
, andinspect()
, and assume they contain up-to-date info elsewhere. Previously we also reload inself._get_step()
._get_subgraph_for_target_step(target_step)
to_BasePipeline
. Pass the subgraph DAG torun_pipeline_step()
instead of the fullself._steps
._get_default_step()
to_BasePipeline
, which defines the default step to run when no step is specified. Previously we runself._steps[-1]
.mlflow/pipelines/regression/v1/pipeline.py
.How is this patch tested?
Existing tests
AND
Does this PR change the documentation?
Details
link on thePreview docs
check.Release Notes
Is this a user-facing change?
(Details in 1-2 sentences. You can just refer to another PR with a description if this PR is part of a larger change.)
What component(s), interfaces, languages, and integrations does this PR affect?
Components
area/artifacts
: Artifact stores and artifact loggingarea/build
: Build and test infrastructure for MLflowarea/docs
: MLflow documentation pagesarea/examples
: Example codearea/model-registry
: Model Registry service, APIs, and the fluent client calls for Model Registryarea/models
: MLmodel format, model serialization/deserialization, flavorsarea/pipelines
: Pipelines, Pipeline APIs, Pipeline configs, Pipeline Templatesarea/projects
: MLproject format, project running backendsarea/scoring
: MLflow Model server, model deployment tools, Spark UDFsarea/server-infra
: MLflow Tracking server backendarea/tracking
: Tracking Service, tracking client APIs, autologgingInterface
area/uiux
: Front-end, user experience, plotting, JavaScript, JavaScript dev serverarea/docker
: Docker use across MLflow's components, such as MLflow Projects and MLflow Modelsarea/sqlalchemy
: Use of SQLAlchemy in the Tracking Service or Model Registryarea/windows
: Windows supportLanguage
language/r
: R APIs and clientslanguage/java
: Java APIs and clientslanguage/new
: Proposals for new client languagesIntegrations
integrations/azure
: Azure and Azure ML integrationsintegrations/sagemaker
: SageMaker integrationsintegrations/databricks
: Databricks integrationsHow should the PR be classified in the release notes? Choose one:
rn/breaking-change
- The PR will be mentioned in the "Breaking Changes" sectionrn/none
- No description will be included. The PR will be mentioned only by the PR number in the "Small Bugfixes and Documentation Updates" sectionrn/feature
- A new user-facing feature worth mentioning in the release notesrn/bug-fix
- A user-facing bug fix worth mentioning in the release notesrn/documentation
- A user-facing documentation change worth mentioning in the release notes