From 1f1e433f236844a7a7fa1a907e6233f65e53f493 Mon Sep 17 00:00:00 2001 From: Brandon Kaplan Date: Tue, 6 Feb 2024 13:19:53 -0600 Subject: [PATCH 1/6] implement cluster_report_destination_override --- .gitignore | 4 +++- sync/awsdatabricks.py | 34 +++++++++++++++++++++++++++++----- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index e1aebe8..e1af508 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,6 @@ __pycache__ *.swo venv/* .idea/* -.python-version \ No newline at end of file +.python-version +.DS_Store +.venv diff --git a/sync/awsdatabricks.py b/sync/awsdatabricks.py index 2ad9f5c..6439f28 100644 --- a/sync/awsdatabricks.py +++ b/sync/awsdatabricks.py @@ -1,5 +1,6 @@ import json import logging +from pathlib import Path from time import sleep from typing import List, Tuple from urllib.parse import urlparse @@ -335,7 +336,9 @@ def _get_aws_cluster_info_from_s3(bucket: str, file_key: str, cluster_id): logger.warning(f"Failed to retrieve cluster info from S3 with key, '{file_key}': {err}") -def monitor_cluster(cluster_id: str, polling_period: int = 20) -> None: +def monitor_cluster( + cluster_id: str, polling_period: int = 20, cluster_report_destination_override: str = None +) -> None: cluster = get_default_client().get_cluster(cluster_id) spark_context_id = cluster.get("spark_context_id") @@ -347,16 +350,24 @@ def monitor_cluster(cluster_id: str, polling_period: int = 20) -> None: spark_context_id = cluster.get("spark_context_id") (log_url, filesystem, bucket, base_prefix) = _cluster_log_destination(cluster) - if log_url: + if log_url or cluster_report_destination_override: _monitor_cluster( - (log_url, filesystem, bucket, base_prefix), cluster_id, spark_context_id, polling_period + (log_url, filesystem, bucket, base_prefix), + cluster_id, + spark_context_id, + polling_period, + cluster_report_destination_override, ) else: logger.warning("Unable to monitor cluster due to missing cluster log destination - exiting") def _monitor_cluster( - cluster_log_destination, cluster_id: str, spark_context_id: int, polling_period: int + cluster_log_destination, + cluster_id: str, + spark_context_id: int, + polling_period: int, + cluster_report_destination_override: str = None, ) -> None: (log_url, filesystem, bucket, base_prefix) = cluster_log_destination @@ -368,7 +379,20 @@ def _monitor_cluster( aws_region_name = DB_CONFIG.aws_region_name ec2 = boto.client("ec2", region_name=aws_region_name) - if filesystem == "s3": + if cluster_report_destination_override: + + def ensure_path_exists(file_path: str): + logger.info(f"Ensuring path exists for {file_path}") + report_path = Path(file_path) + report_path.parent.mkdir(parents=True, exist_ok=True) + + def write_file(body: bytes): + logger.info("Saving state to local file") + ensure_path_exists(cluster_report_destination_override) + with open(cluster_report_destination_override, "wb") as f: + f.write(body) + + elif filesystem == "s3": s3 = boto.client("s3") def write_file(body: bytes): From 431df19215ede962f05844e16246a4f7e58cdc59 Mon Sep 17 00:00:00 2001 From: Brandon Kaplan Date: Wed, 7 Feb 2024 11:32:26 -0600 Subject: [PATCH 2/6] moved write_file definition out of the _monitor_cluster method for complexity --- sync/awsdatabricks.py | 72 +++++++++++++++++++++---------------- tests/test_awsdatabricks.py | 57 ++++++++++++++++++++++++++++- 2 files changed, 98 insertions(+), 31 deletions(-) diff --git a/sync/awsdatabricks.py b/sync/awsdatabricks.py index 6439f28..0af5c19 100644 --- a/sync/awsdatabricks.py +++ b/sync/awsdatabricks.py @@ -337,7 +337,9 @@ def _get_aws_cluster_info_from_s3(bucket: str, file_key: str, cluster_id): def monitor_cluster( - cluster_id: str, polling_period: int = 20, cluster_report_destination_override: str = None + cluster_id: str, + polling_period: int = 20, + cluster_report_destination_override: dict = None, ) -> None: cluster = get_default_client().get_cluster(cluster_id) spark_context_id = cluster.get("spark_context_id") @@ -350,13 +352,16 @@ def monitor_cluster( spark_context_id = cluster.get("spark_context_id") (log_url, filesystem, bucket, base_prefix) = _cluster_log_destination(cluster) + if cluster_report_destination_override: + filesystem = cluster_report_destination_override.get("filesystem", filesystem) + base_prefix = cluster_report_destination_override.get("base_prefix", base_prefix) + if log_url or cluster_report_destination_override: _monitor_cluster( (log_url, filesystem, bucket, base_prefix), cluster_id, spark_context_id, polling_period, - cluster_report_destination_override, ) else: logger.warning("Unable to monitor cluster due to missing cluster log destination - exiting") @@ -367,7 +372,6 @@ def _monitor_cluster( cluster_id: str, spark_context_id: int, polling_period: int, - cluster_report_destination_override: str = None, ) -> None: (log_url, filesystem, bucket, base_prefix) = cluster_log_destination @@ -379,33 +383,7 @@ def _monitor_cluster( aws_region_name = DB_CONFIG.aws_region_name ec2 = boto.client("ec2", region_name=aws_region_name) - if cluster_report_destination_override: - - def ensure_path_exists(file_path: str): - logger.info(f"Ensuring path exists for {file_path}") - report_path = Path(file_path) - report_path.parent.mkdir(parents=True, exist_ok=True) - - def write_file(body: bytes): - logger.info("Saving state to local file") - ensure_path_exists(cluster_report_destination_override) - with open(cluster_report_destination_override, "wb") as f: - f.write(body) - - elif filesystem == "s3": - s3 = boto.client("s3") - - def write_file(body: bytes): - logger.info("Saving state to S3") - s3.put_object(Bucket=bucket, Key=file_key, Body=body) - - elif filesystem == "dbfs": - path = format_dbfs_filepath(file_key) - dbx_client = get_default_client() - - def write_file(body: bytes): - logger.info("Saving state to DBFS") - write_dbfs_file(path, body, dbx_client) + write_file = _define_write_file(file_key, filesystem, bucket) all_inst_by_id = {} active_timelines_by_id = {} @@ -452,6 +430,40 @@ def write_file(body: bytes): sleep(polling_period) +def _define_write_file(file_key, filesystem, bucket): + if filesystem == "file": + file_path = Path(f"{Path.home()}{file_key}") + + def ensure_path_exists(report_path: Path): + logger.info(f"Ensuring path exists for {report_path}") + report_path.parent.mkdir(parents=True, exist_ok=True) + + def write_file(body: bytes): + logger.info("Saving state to local file") + ensure_path_exists(file_path) + with open(file_path, "wb") as f: + f.write(body) + + elif filesystem == "s3": + s3 = boto.client("s3") + + def write_file(body: bytes): + logger.info("Saving state to S3") + s3.put_object(Bucket=bucket, Key=file_key, Body=body) + + elif filesystem == "dbfs": + path = format_dbfs_filepath(file_key) + dbx_client = get_default_client() + + def write_file(body: bytes): + logger.info("Saving state to DBFS") + write_dbfs_file(path, body, dbx_client) + + else: + raise ValueError(f"Unsupported filesystem: {filesystem}") + return write_file + + def _get_ec2_instances(cluster_id: str, ec2_client: "botocore.client.ec2") -> List[dict]: filters = [ diff --git a/tests/test_awsdatabricks.py b/tests/test_awsdatabricks.py index c586d2b..f714f9d 100644 --- a/tests/test_awsdatabricks.py +++ b/tests/test_awsdatabricks.py @@ -1,6 +1,7 @@ import copy import io import json +import unittest from datetime import datetime from unittest.mock import Mock, patch from uuid import uuid4 @@ -10,7 +11,7 @@ from botocore.stub import Stubber from httpx import Response -from sync.awsdatabricks import create_prediction_for_run +from sync.awsdatabricks import create_prediction_for_run, monitor_cluster from sync.config import DatabricksConf from sync.models import DatabricksAPIError, DatabricksError from sync.models import Response as SyncResponse @@ -1089,3 +1090,57 @@ def client_patch(name, **kwargs): result = create_prediction_for_run("75778", "Premium", "Jobs Compute", "my-project-id") assert result.result + + +@patch("sync.awsdatabricks._monitor_cluster") +@patch("sync.clients.databricks.DatabricksClient.get_cluster") +@patch( + "sync.awsdatabricks._cluster_log_destination", +) +class TestMonitorCluster(unittest.TestCase): + def test_monitor_cluster_with_override( + self, + mock_cluster_log_destination, + mock_get_cluster, + mock_monitor_cluster, + ): + mock_cluster_log_destination.return_value = ("s3://bucket/path", "s3", "bucket", "path") + + mock_get_cluster.return_value = { + "cluster_id": "0101-214342-tpi6qdp2", + "spark_context_id": 1443449481634833945, + } + + cluster_report_destination_override = { + "filesystem": "file", + "base_prefix": "test_file_path", + } + + monitor_cluster("0101-214342-tpi6qdp2", 1, cluster_report_destination_override) + + expected_log_destination_override = ("s3://bucket/path", "file", "bucket", "test_file_path") + mock_monitor_cluster.assert_called_with( + expected_log_destination_override, "0101-214342-tpi6qdp2", 1443449481634833945, 1 + ) + + def test_monitor_cluster_without_override( + self, + mock_cluster_log_destination, + mock_get_cluster, + mock_monitor_cluster, + ): + mock_cluster_log_destination.return_value = ("s3://bucket/path", "s3", "bucket", "path") + + mock_get_cluster.return_value = { + "cluster_id": "0101-214342-tpi6qdp2", + "spark_context_id": 1443449481634833945, + } + + monitor_cluster("0101-214342-tpi6qdp2", 1) + + mock_monitor_cluster.assert_called_with( + mock_cluster_log_destination.return_value, + "0101-214342-tpi6qdp2", + 1443449481634833945, + 1, + ) From 577893a964ef391ff783311bb2e22f67184e8675 Mon Sep 17 00:00:00 2001 From: Brandon Kaplan Date: Wed, 7 Feb 2024 14:57:44 -0600 Subject: [PATCH 3/6] expand testing --- tests/test_awsdatabricks.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/test_awsdatabricks.py b/tests/test_awsdatabricks.py index f714f9d..8c14d9c 100644 --- a/tests/test_awsdatabricks.py +++ b/tests/test_awsdatabricks.py @@ -1123,6 +1123,13 @@ def test_monitor_cluster_with_override( expected_log_destination_override, "0101-214342-tpi6qdp2", 1443449481634833945, 1 ) + mock_cluster_log_destination.return_value = (None, "s3", None, "path") + monitor_cluster("0101-214342-tpi6qdp2", 1, cluster_report_destination_override) + expected_log_destination_override = (None, "file", None, "test_file_path") + mock_monitor_cluster.assert_called_with( + expected_log_destination_override, "0101-214342-tpi6qdp2", 1443449481634833945, 1 + ) + def test_monitor_cluster_without_override( self, mock_cluster_log_destination, From e81b44cce168082f5e67a1dfe11809b8e45923b1 Mon Sep 17 00:00:00 2001 From: Brandon Kaplan Date: Wed, 7 Feb 2024 15:00:56 -0600 Subject: [PATCH 4/6] add PR template! --- .github/pull_request_template.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 .github/pull_request_template.md diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 0000000..81dfbe9 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,17 @@ +# Summary + +*please add a few lines to give the reviewer context on the changes* + +## Checklist + +Before formally opening this PR, please adhere to the following standards: + +- [ ] Branch/PR names begin with the related Jira ticket id (ie PROD-31) for Jira integration +- [ ] File names are lower_snake_case +- [ ] Relevant unit tests have been added or not applicable +- [ ] Relevant documentation has been added or not applicable +- [ ] Mark yourself as the assignee (makes it easier to scan the PR list) + +[Related Jira Ticket](https://synccomputing.atlassian.net/browse/PROD-) (add id) + +Add any relevant testing examples or screenshots. \ No newline at end of file From 500260e9c23325efdab4139bf2552491788f6294 Mon Sep 17 00:00:00 2001 From: Brandon Kaplan Date: Wed, 7 Feb 2024 15:02:01 -0600 Subject: [PATCH 5/6] dependabot fix --- .github/dependabot.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 65f3851..b156a03 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -7,4 +7,4 @@ updates: directory: "/" # Location of package manifests schedule: interval: "weekly" - open-pull-requests-limit: 5 \ No newline at end of file + open-pull-requests-limit: 0 \ No newline at end of file From 6b6b1afac3e5da0a1b94d615e9057eef712e2dcc Mon Sep 17 00:00:00 2001 From: Brandon Kaplan Date: Thu, 8 Feb 2024 13:23:02 -0600 Subject: [PATCH 6/6] bump version --- sync/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sync/__init__.py b/sync/__init__.py index 6c7f87f..9ef6c4f 100644 --- a/sync/__init__.py +++ b/sync/__init__.py @@ -1,4 +1,4 @@ """Library for leveraging the power of Sync""" -__version__ = "0.6.3" +__version__ = "0.6.4" TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"