-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use subprocess for multi-part download execution #8352
Conversation
@@ -445,7 +450,7 @@ def _parallelized_download_from_cloud( | |||
|
|||
if failed_downloads: | |||
new_cloud_creds = self._get_read_credential_infos( | |||
self.run_id, dst_run_relative_artifact_path | |||
self.run_id, [dst_run_relative_artifact_path] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was broken before. _get_read_credential_infos
expects a list of file paths. by passing a single file path as a string, the path was broken into individual characters, resulting in incorrect download retry logic that attempted to fetch nonexistent files.
We need test coverage here.
@@ -425,17 +425,22 @@ def _upload_to_cloud( | |||
def _parallelized_download_from_cloud( | |||
self, cloud_credential_info, file_size, dst_local_file_path, dst_run_relative_artifact_path | |||
): | |||
from mlflow.projects.utils import get_databricks_env_vars |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Importing from projects feels odd. Perhaps move this util?
@lru_cache(maxsize=64) | ||
def _cached_get_request_session( | ||
max_retries, | ||
backoff_factor, | ||
retry_codes, | ||
# To create a new Session object for each process, we use the process id as the cache key. | ||
# This is to avoid sharing the same Session object across processes, which can lead to issues | ||
# such as https://stackoverflow.com/q/3724900. | ||
_pid, | ||
): | ||
""" | ||
This function should not be called directly. Instead, use `_get_request_session` below. | ||
""" | ||
assert 0 <= max_retries < 10 | ||
assert 0 <= backoff_factor < 120 | ||
|
||
retry_kwargs = { | ||
"total": max_retries, | ||
"connect": max_retries, | ||
"read": max_retries, | ||
"redirect": max_retries, | ||
"status": max_retries, | ||
"status_forcelist": retry_codes, | ||
"backoff_factor": backoff_factor, | ||
} | ||
if Version(urllib3.__version__) >= Version("1.26.0"): | ||
retry_kwargs["allowed_methods"] = None | ||
else: | ||
retry_kwargs["method_whitelist"] = None | ||
|
||
retry = Retry(**retry_kwargs) | ||
adapter = HTTPAdapter(max_retries=retry) | ||
session = requests.Session() | ||
session.mount("https://", adapter) | ||
session.mount("http://", adapter) | ||
return session |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method, along with a bunch of others, is defined in rest_utils.py
. However, if we import these utils from MLflow, it will take ~ 1.5 seconds for the fresh subprocess to import all of MLflow. For prototyping purposes, I redefined these utils so that this module doesn't rely on any imports from MLflow, but we probably want to make this cleaner before merging.
In particular, this creates the potential for Databricks multi-part download logic to diverge from the rest of MLflow.
At minimum, before merging, we need some test verifying that this script doesn't import MLflow and runs quickly.
print( # pylint: disable=print-function | ||
json.dumps( | ||
{ | ||
"error_status_code": e.response.status_code, | ||
"error_text": str(e), | ||
} | ||
), | ||
file=sys.stdout, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like it could be brittle. What if other dependencies print stdout? There must be a better way.
Signed-off-by: dbczumar <corey.zumar@databricks.com>
@@ -62,7 +62,7 @@ | |||
) | |||
|
|||
_logger = logging.getLogger(__name__) | |||
_DOWNLOAD_CHUNK_SIZE = 10_000_000 | |||
_DOWNLOAD_CHUNK_SIZE = 100_000_000 # 100 MB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use 100MB chunks, which is objectively faster due to the overhead of launching new processes
Documentation preview for b72517a will be available here when this CircleCI job completes successfully. More info
|
Signed-off-by: Serena Ruan <serena.rxy@gmail.com>
Signed-off-by: Serena Ruan <serena.rxy@gmail.com>
Signed-off-by: Serena Ruan <serena.rxy@gmail.com>
…ls.databricks_utils Signed-off-by: Serena Ruan <serena.rxy@gmail.com>
Signed-off-by: Serena Ruan <serena.rxy@gmail.com>
…els_artifact_repo Signed-off-by: Serena Ruan <serena.rxy@gmail.com>
Signed-off-by: Serena Ruan <serena.rxy@gmail.com>
Signed-off-by: Serena Ruan <serena.rxy@gmail.com>
Signed-off-by: Serena Ruan <serena.rxy@gmail.com>
Signed-off-by: Serena Ruan <serena.rxy@gmail.com>
Signed-off-by: Serena Ruan <serena.rxy@gmail.com>
This reverts commit a63313a. Signed-off-by: Serena Ruan <serena.rxy@gmail.com>
Signed-off-by: Serena Ruan <serena.rxy@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
@serena-ruan Can you update the PR description to add a table describing how large the performance gain is? |
Signed-off-by: Serena Ruan <serena.rxy@gmail.com>
Signed-off-by: Serena Ruan <serena.rxy@gmail.com>
What changes are proposed in this pull request?
Use subprocess for multi-part download execution.
Benchmarking test:
On DBR 12.2 LTS ML, 32GB 4 cores:
Decreases 1GB file download time from 18.x seconds to 3.x seconds.
Decreases 2GB file download time from 32.x seconds to 6.x seconds.
How is this patch tested?
Does this PR change the documentation?
Release Notes
Is this a user-facing change?
(Details in 1-2 sentences. You can just refer to another PR with a description if this PR is part of a larger change.)
What component(s), interfaces, languages, and integrations does this PR affect?
Components
area/artifacts
: Artifact stores and artifact loggingarea/build
: Build and test infrastructure for MLflowarea/docs
: MLflow documentation pagesarea/examples
: Example codearea/model-registry
: Model Registry service, APIs, and the fluent client calls for Model Registryarea/models
: MLmodel format, model serialization/deserialization, flavorsarea/recipes
: Recipes, Recipe APIs, Recipe configs, Recipe Templatesarea/projects
: MLproject format, project running backendsarea/scoring
: MLflow Model server, model deployment tools, Spark UDFsarea/server-infra
: MLflow Tracking server backendarea/tracking
: Tracking Service, tracking client APIs, autologgingInterface
area/uiux
: Front-end, user experience, plotting, JavaScript, JavaScript dev serverarea/docker
: Docker use across MLflow's components, such as MLflow Projects and MLflow Modelsarea/sqlalchemy
: Use of SQLAlchemy in the Tracking Service or Model Registryarea/windows
: Windows supportLanguage
language/r
: R APIs and clientslanguage/java
: Java APIs and clientslanguage/new
: Proposals for new client languagesIntegrations
integrations/azure
: Azure and Azure ML integrationsintegrations/sagemaker
: SageMaker integrationsintegrations/databricks
: Databricks integrationsHow should the PR be classified in the release notes? Choose one:
rn/breaking-change
- The PR will be mentioned in the "Breaking Changes" sectionrn/none
- No description will be included. The PR will be mentioned only by the PR number in the "Small Bugfixes and Documentation Updates" sectionrn/feature
- A new user-facing feature worth mentioning in the release notesrn/bug-fix
- A user-facing bug fix worth mentioning in the release notesrn/documentation
- A user-facing documentation change worth mentioning in the release notes