From 138add52cb9d05ea817021e09c2d9fed307b08ee Mon Sep 17 00:00:00 2001 From: Elizabeth Heinlein Date: Fri, 1 Mar 2024 11:13:19 -0800 Subject: [PATCH 1/8] feat: Compare cluster configs in Sync Library --- sync/clients/sync.py | 69 +++++++++- tests/clients/test_sync.py | 184 +++++++++++++++++++++++++++ tests/test_files/cluster.json | 52 ++++++++ tests/test_files/recommendation.json | 55 ++++++++ 4 files changed, 359 insertions(+), 1 deletion(-) create mode 100644 tests/clients/test_sync.py create mode 100644 tests/test_files/cluster.json create mode 100644 tests/test_files/recommendation.json diff --git a/sync/clients/sync.py b/sync/clients/sync.py index 3a7d700..de824d6 100644 --- a/sync/clients/sync.py +++ b/sync/clients/sync.py @@ -1,7 +1,10 @@ +import difflib +import json import logging -from typing import Generator +from typing import Dict, Generator import httpx +from pydantic import BaseModel from ..config import API_KEY, CONFIG, APIKey from . import USER_AGENT, RetryableHTTPClient, encode_json @@ -9,6 +12,37 @@ logger = logging.getLogger(__name__) +class S3ClusterLogConfiguration(BaseModel): + destination: str + region: str + enable_encryption: bool + canned_acl: str + + +class DBFSClusterLogConfiguration(BaseModel): + destination: str + + +class ClusterLogConfiguration(BaseModel): + s3: S3ClusterLogConfiguration + dbfs: DBFSClusterLogConfiguration + + +class ProjectConfiguration(BaseModel): + node_type_id: str + driver_node_type: str + custom_tags: Dict + cluster_log_conf: ClusterLogConfiguration + cluster_name: str + num_workers: int + spark_version: str + runtime_engine: str + autoscale: Dict + spark_conf: Dict + aws_attributes: Dict + spark_env_vars: Dict + + class SyncAuth(httpx.Auth): requires_response_body = True @@ -122,6 +156,13 @@ def get_project_recommendation(self, project_id: str, recommendation_id: str) -> ) ) + def get_latest_project_recommendation(self, project_id: str) -> dict: + return self._send( + self._client.build_request( + "GET", f"/v1/projects/{project_id}/recommendations?page=0&per_page=1" + ) + ) + def get_project_submissions(self, project_id: str, params: dict = None) -> dict: return self._send( self._client.build_request( @@ -194,6 +235,32 @@ def onboard_workflow(self, workspace_id, job_id: str, project_id: str) -> dict: ) ) + def get_latest_project_config_recommendation(self, project_id: str) -> ProjectConfiguration: + latest_recommendation = self.get_latest_project_recommendation(project_id) + if latest_recommendation.get("result"): + return latest_recommendation["result"][0]["recommendation"]["configuration"] + else: + return {"error": f"No project recommendation found for Project {project_id}"} + + def get_cluster_definition_and_recommendation( + self, project_id: str, cluster_spec_str: str + ) -> dict: + cluster_recommendation = self.get_latest_project_config_recommendation(project_id) + return { + "cluster_recommendations": cluster_recommendation, + "cluster_definition": json.loads(cluster_spec_str), + } + + def get_updated_cluster_defintion( + self, project_id: str, cluster_spec_str: str + ) -> ProjectConfiguration: + latest_recommendation = self.get_latest_project_config_recommendation(project_id) + cluster_definition = json.loads(cluster_spec_str) + for key in latest_recommendation.keys(): + cluster_definition[key] = latest_recommendation[key] + + return cluster_definition + def _send(self, request: httpx.Request) -> dict: response = self._send_request(request) diff --git a/tests/clients/test_sync.py b/tests/clients/test_sync.py new file mode 100644 index 0000000..f3f62a2 --- /dev/null +++ b/tests/clients/test_sync.py @@ -0,0 +1,184 @@ +from unittest import TestCase, mock +from unittest.mock import patch +import json + +import pytest + +from sync.clients.sync import SyncClient +from sync.config import APIKey + + +@pytest.fixture() +def mock_get_recommendation(request): + with patch( + "sync.clients.sync.SyncClient.get_latest_project_recommendation", + side_effect=get_rec_from_file, + ): + yield + + +def get_rec_from_file(): + with open("tests/test_files/recommendation.json") as rec_in: + return json.loads(rec_in.read()) + + +class TestSync(TestCase): + @mock.patch("sync.clients.sync.SyncClient._send") + def test_get_latest_project_config_recommendation(self, mock_send): + mock_send.return_value = get_rec_from_file() + test_client = SyncClient("url", APIKey()) + expected_result = { + "node_type_id": "i6.xlarge", + "driver_node_type_id": "i6.xlarge", + "custom_tags": { + "sync:project-id": "b9bd7136-7699-4603-9040-c6dc4c914e43", + "sync:run-id": "e96401da-f64d-4ed0-8ded-db1317f40248", + "sync:recommendation-id": "e029a220-c6a5-49fd-b7ed-7ea046366741", + "sync:tenant-id": "352176a7-b605-4cc2-b3b2-ee591715b6b4", + }, + "num_workers": 20, + "spark_conf": {"spark.databricks.isv.product": "sync-gradient"}, + "spark_version": "13.3.x-scala2.12", + "runtime_engine": "PHOTON", + "aws_attributes": { + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100, + }, + } + + result = test_client.get_latest_project_config_recommendation("project_id") + assert result == expected_result + + @mock.patch("sync.clients.sync.SyncClient._send") + def test_get_cluster_definition_and_recommendation(self, mock_send): + mock_send.return_value = get_rec_from_file() + test_client = SyncClient("url", APIKey()) + + expected_result = { + "cluster_recommendations": { + "node_type_id": "i6.xlarge", + "driver_node_type_id": "i6.xlarge", + "custom_tags": { + "sync:project-id": "b9bd7136-7699-4603-9040-c6dc4c914e43", + "sync:run-id": "e96401da-f64d-4ed0-8ded-db1317f40248", + "sync:recommendation-id": "e029a220-c6a5-49fd-b7ed-7ea046366741", + "sync:tenant-id": "352176a7-b605-4cc2-b3b2-ee591715b6b4", + }, + "num_workers": 20, + "spark_conf": {"spark.databricks.isv.product": "sync-gradient"}, + "spark_version": "13.3.x-scala2.12", + "runtime_engine": "PHOTON", + "aws_attributes": { + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100, + }, + }, + "cluster_definition": { + "cluster_id": "1234-567890-reef123", + "spark_context_id": 4020997813441462000, + "cluster_name": "my-cluster", + "spark_version": "13.3.x-scala2.12", + "aws_attributes": { + "zone_id": "us-west-2c", + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100, + "ebs_volume_count": 0, + }, + "node_type_id": "i3.xlarge", + "driver_node_type_id": "i3.xlarge", + "autotermination_minutes": 120, + "enable_elastic_disk": False, + "disk_spec": {"disk_count": 0}, + "cluster_source": "UI", + "enable_local_disk_encryption": False, + "instance_source": {"node_type_id": "i3.xlarge"}, + "driver_instance_source": {"node_type_id": "i3.xlarge"}, + "state": "TERMINATED", + "state_message": "Inactive cluster terminated (inactive for 120 minutes).", + "start_time": 1618263108824, + "terminated_time": 1619746525713, + "last_state_loss_time": 1619739324740, + "num_workers": 30, + "default_tags": { + "Vendor": "Databricks", + "Creator": "someone@example.com", + "ClusterName": "my-cluster", + "ClusterId": "1234-567890-reef123", + }, + "creator_user_name": "someone@example.com", + "termination_reason": { + "code": "INACTIVITY", + "parameters": {"inactivity_duration_min": "120"}, + "type": "SUCCESS", + }, + "init_scripts_safe_mode": False, + "spec": {"spark_version": "13.3.x-scala2.12"}, + }, + } + + with open("tests/test_files/cluster.json") as cluster_in: + result = test_client.get_cluster_definition_and_recommendation( + "project_id", cluster_in.read() + ) + assert result == expected_result + + @mock.patch("sync.clients.sync.SyncClient.get_latest_project_recommendation") + def test_get_updated_cluster_defintion(self, mock_send): + mock_send.return_value = get_rec_from_file() + test_client = SyncClient("url", APIKey()) + + expected_result = { + "cluster_id": "1234-567890-reef123", + "spark_context_id": 4020997813441462000, + "cluster_name": "my-cluster", + "spark_version": "13.3.x-scala2.12", + "aws_attributes": { + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100, + }, + "custom_tags": { + "sync:project-id": "b9bd7136-7699-4603-9040-c6dc4c914e43", + "sync:run-id": "e96401da-f64d-4ed0-8ded-db1317f40248", + "sync:recommendation-id": "e029a220-c6a5-49fd-b7ed-7ea046366741", + "sync:tenant-id": "352176a7-b605-4cc2-b3b2-ee591715b6b4", + }, + "node_type_id": "i6.xlarge", + "driver_node_type_id": "i6.xlarge", + "autotermination_minutes": 120, + "enable_elastic_disk": False, + "disk_spec": {"disk_count": 0}, + "cluster_source": "UI", + "enable_local_disk_encryption": False, + "instance_source": {"node_type_id": "i3.xlarge"}, + "driver_instance_source": {"node_type_id": "i3.xlarge"}, + "spark_conf": {"spark.databricks.isv.product": "sync-gradient"}, + "state": "TERMINATED", + "state_message": "Inactive cluster terminated (inactive for 120 minutes).", + "start_time": 1618263108824, + "terminated_time": 1619746525713, + "last_state_loss_time": 1619739324740, + "num_workers": 20, + "runtime_engine": "PHOTON", + "default_tags": { + "Vendor": "Databricks", + "Creator": "someone@example.com", + "ClusterName": "my-cluster", + "ClusterId": "1234-567890-reef123", + }, + "creator_user_name": "someone@example.com", + "termination_reason": { + "code": "INACTIVITY", + "parameters": {"inactivity_duration_min": "120"}, + "type": "SUCCESS", + }, + "init_scripts_safe_mode": False, + "spec": {"spark_version": "13.3.x-scala2.12"}, + } + + with open("tests/test_files/cluster.json") as cluster_in: + result = test_client.get_updated_cluster_defintion("project_id", cluster_in.read()) + assert result == expected_result diff --git a/tests/test_files/cluster.json b/tests/test_files/cluster.json new file mode 100644 index 0000000..1b0012f --- /dev/null +++ b/tests/test_files/cluster.json @@ -0,0 +1,52 @@ +{ + "cluster_id": "1234-567890-reef123", + "spark_context_id": 4020997813441462000, + "cluster_name": "my-cluster", + "spark_version": "13.3.x-scala2.12", + "aws_attributes": { + "zone_id": "us-west-2c", + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100, + "ebs_volume_count": 0 + }, + "node_type_id": "i3.xlarge", + "driver_node_type_id": "i3.xlarge", + "autotermination_minutes": 120, + "enable_elastic_disk": false, + "disk_spec": { + "disk_count": 0 + }, + "cluster_source": "UI", + "enable_local_disk_encryption": false, + "instance_source": { + "node_type_id": "i3.xlarge" + }, + "driver_instance_source": { + "node_type_id": "i3.xlarge" + }, + "state": "TERMINATED", + "state_message": "Inactive cluster terminated (inactive for 120 minutes).", + "start_time": 1618263108824, + "terminated_time": 1619746525713, + "last_state_loss_time": 1619739324740, + "num_workers": 30, + "default_tags": { + "Vendor": "Databricks", + "Creator": "someone@example.com", + "ClusterName": "my-cluster", + "ClusterId": "1234-567890-reef123" + }, + "creator_user_name": "someone@example.com", + "termination_reason": { + "code": "INACTIVITY", + "parameters": { + "inactivity_duration_min": "120" + }, + "type": "SUCCESS" + }, + "init_scripts_safe_mode": false, + "spec": { + "spark_version": "13.3.x-scala2.12" + } +} \ No newline at end of file diff --git a/tests/test_files/recommendation.json b/tests/test_files/recommendation.json new file mode 100644 index 0000000..6087fe2 --- /dev/null +++ b/tests/test_files/recommendation.json @@ -0,0 +1,55 @@ +{ + "result": [ + { + "created_at": "2024-02-29T23:11:58.559Z", + "updated_at": "2024-02-29T23:11:58.559Z", + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "state": "string", + "error": "string", + "recommendation": { + "metrics": { + "spark_duration_minutes": 0, + "spark_cost_requested_usd": 0, + "spark_cost_lower_usd": 0, + "spark_cost_midpoint_usd": 0, + "spark_cost_upper_usd": 0 + }, + "configuration": { + "node_type_id": "i6.xlarge", + "driver_node_type_id": "i6.xlarge", + "custom_tags": { + "sync:project-id": "b9bd7136-7699-4603-9040-c6dc4c914e43", + "sync:run-id": "e96401da-f64d-4ed0-8ded-db1317f40248", + "sync:recommendation-id": "e029a220-c6a5-49fd-b7ed-7ea046366741", + "sync:tenant-id": "352176a7-b605-4cc2-b3b2-ee591715b6b4" + }, + "num_workers": 20, + "spark_conf": { + "spark.databricks.isv.product": "sync-gradient" + }, + "spark_version": "13.3.x-scala2.12", + "runtime_engine": "PHOTON", + "aws_attributes": { + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100 + } + }, + "prediction_params": { + "sla_minutes": 0, + "force_ondemand_workers": false, + "force_ondemand_basis": false, + "fix_worker_family": true, + "fix_driver_type": true, + "fix_scaling_type": false + } + }, + "context": { + "latest_submission_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "phase": "LEARNING", + "current_learning_iteration": 0, + "total_learning_iterations": 0 + } + } + ] +} \ No newline at end of file From 45a92d2949c06c5abd80083cfb3dbdaca86344c1 Mon Sep 17 00:00:00 2001 From: Elizabeth Heinlein Date: Fri, 1 Mar 2024 11:13:19 -0800 Subject: [PATCH 2/8] feat: Compare cluster configs in Sync Library moving methods to projects.py finalizing changes for tests --- sync/api/projects.py | 89 ++++++- sync/clients/sync.py | 7 + sync/models.py | 39 ++- tests/clients/test_sync.py | 279 +++++++++++++++++++++ tests/test_files/aws_cluster.json | 52 ++++ tests/test_files/aws_recommendation.json | 55 ++++ tests/test_files/azure_cluster.json | 50 ++++ tests/test_files/azure_recommendation.json | 47 ++++ 8 files changed, 615 insertions(+), 3 deletions(-) create mode 100644 tests/clients/test_sync.py create mode 100644 tests/test_files/aws_cluster.json create mode 100644 tests/test_files/aws_recommendation.json create mode 100644 tests/test_files/azure_cluster.json create mode 100644 tests/test_files/azure_recommendation.json diff --git a/sync/api/projects.py b/sync/api/projects.py index 64d845b..e3fb706 100644 --- a/sync/api/projects.py +++ b/sync/api/projects.py @@ -1,15 +1,24 @@ """Project functions """ import io +import json import logging from time import sleep -from typing import List +from typing import List, Union from urllib.parse import urlparse import httpx from sync.clients.sync import get_default_client -from sync.models import Platform, ProjectError, RecommendationError, Response, SubmissionError +from sync.models import ( + AWSProjectConfiguration, + AzureProjectConfiguration, + Platform, + ProjectError, + RecommendationError, + Response, + SubmissionError, +) from . import generate_presigned_url @@ -396,3 +405,79 @@ def get_project_submission(project_id: str, submission_id: str) -> Response[dict return Response(**response) return Response(result=response["result"]) + + +def get_latest_project_config_recommendation( + project_id: str, +) -> Response[Union[AWSProjectConfiguration, AzureProjectConfiguration]]: + """Get Latest Project Configuration Recommendation. + + :param project_id: project ID + :type project_id: str + :return: Project Configuration Recommendation object + :rtype: AWSProjectConfiguration or AzureProjectConfiguration + """ + latest_recommendation = get_default_client().get_latest_project_recommendation(project_id) + if latest_recommendation.get("result"): + return Response( + result=latest_recommendation["result"][0]["recommendation"]["configuration"] + ) + + +def get_cluster_definition_and_recommendation( + project_id: str, cluster_spec_str: str +) -> Response[dict]: + """Print Current Cluster Definition and Project Configuration Recommendatio. + Throws error if no cluster recommendation found for project + + :param project_id: project ID + :type project_id: str + :param cluster_spec_str: Current Cluster Recommendation + :type cluster_spec_str: str + :return: Current Cluster Definition and Project Configuration Recommendation object + :rtype: dict + """ + recommendation_response = get_latest_project_config_recommendation(project_id) + if not recommendation_response: + logger.info(f"No cluster recommendation found for {project_id}") + return Response(error=RecommendationError(message="Recommendation failed")) + response_str = json.dumps(recommendation_response.result) + return Response( + result={ + "cluster_recommendations": json.loads(response_str), + "cluster_definition": json.loads(cluster_spec_str), + } + ) + + +def get_updated_cluster_defintion( + project_id: str, cluster_spec_str: str +) -> Response[Union[AWSProjectConfiguration, AzureProjectConfiguration]]: + """Print Cluster Definition merged with Project Configuration Recommendations. + + :param project_id: project ID + :type project_id: str + :param cluster_spec_str: Current Cluster Recommendation + :type cluster_spec_str: str + :return: Updated Cluster Definition with Project Configuration Recommendations + :rtype: AWSProjectConfiguration or AzureProjectConfiguration + """ + rec_response = get_latest_project_config_recommendation(project_id) + if not rec_response.error: + # Convert Response result object to str + latest_rec_str = json.dumps(rec_response.result) + # Convert json string to json + latest_recommendation = json.loads(latest_rec_str) + cluster_definition = json.loads(cluster_spec_str) + for key in latest_recommendation.keys(): + cluster_definition[key] = latest_recommendation[key] + + # instance_source and driver_instance_source are not + # included in recommendation and need to be updated as well + driver_recommendation = cluster_definition["node_type_id"] + cluster_definition["instance_source"] = {"node_type_id": driver_recommendation} + cluster_definition["driver_instance_source"] = {"node_type_id": driver_recommendation} + else: + return rec_response + + return Response(result=cluster_definition) diff --git a/sync/clients/sync.py b/sync/clients/sync.py index 3a7d700..61dd89b 100644 --- a/sync/clients/sync.py +++ b/sync/clients/sync.py @@ -122,6 +122,13 @@ def get_project_recommendation(self, project_id: str, recommendation_id: str) -> ) ) + def get_latest_project_recommendation(self, project_id: str) -> dict: + return self._send( + self._client.build_request( + "GET", f"/v1/projects/{project_id}/recommendations?page=0&per_page=1" + ) + ) + def get_project_submissions(self, project_id: str, params: dict = None) -> dict: return self._send( self._client.build_request( diff --git a/sync/models.py b/sync/models.py index e6118ad..ac1bc0c 100644 --- a/sync/models.py +++ b/sync/models.py @@ -4,7 +4,7 @@ from dataclasses import dataclass from enum import Enum, unique -from typing import Callable, Generic, List, TypeVar, Union +from typing import Callable, Dict, Generic, List, TypeVar, Union from botocore.exceptions import ClientError from pydantic import BaseModel, Field, root_validator, validator @@ -137,3 +137,40 @@ def check_consistency(cls, err, values): if err is None and values.get("result") is None: raise ValueError("must provide result or error") return err + + +class S3ClusterLogConfiguration(BaseModel): + destination: str + region: str + enable_encryption: bool + canned_acl: str + + +class DBFSClusterLogConfiguration(BaseModel): + destination: str + + +class AWSProjectConfiguration(BaseModel): + node_type_id: str + driver_node_type: str + custom_tags: Dict + cluster_log_conf: Union[S3ClusterLogConfiguration, DBFSClusterLogConfiguration] + cluster_name: str + num_workers: int + spark_version: str + runtime_engine: str + autoscale: Dict + spark_conf: Dict + aws_attributes: Dict + spark_env_vars: Dict + + +class AzureProjectConfiguration(BaseModel): + node_type_id: str + driver_node_type: str + custom_tags: Dict + num_workers: int + spark_conf: Dict + spark_version: str + runtime_engine: str + aws_attributes: Dict diff --git a/tests/clients/test_sync.py b/tests/clients/test_sync.py new file mode 100644 index 0000000..45cf70f --- /dev/null +++ b/tests/clients/test_sync.py @@ -0,0 +1,279 @@ +import json +from unittest import TestCase, mock + +from sync.api.projects import ( + get_cluster_definition_and_recommendation, + get_latest_project_config_recommendation, + get_updated_cluster_defintion, +) +from sync.models import RecommendationError, Response + + +def get_aws_rec_from_file(): + with open("tests/test_files/aws_recommendation.json") as rec_in: + return json.loads(rec_in.read()) + + +def get_azure_rec_from_file(): + with open("tests/test_files/azure_recommendation.json") as rec_in: + return json.loads(rec_in.read()) + + +class TestSync(TestCase): + @mock.patch("sync.clients.sync.SyncClient._send") + def test_get_latest_aws_project_config_recommendation(self, mock_send): + mock_send.return_value = get_aws_rec_from_file() + expected_result = Response( + result={ + "node_type_id": "i6.xlarge", + "driver_node_type_id": "i6.xlarge", + "custom_tags": { + "sync:project-id": "b9bd7136-7699-4603-9040-c6dc4c914e43", + "sync:run-id": "e96401da-f64d-4ed0-8ded-db1317f40248", + "sync:recommendation-id": "e029a220-c6a5-49fd-b7ed-7ea046366741", + "sync:tenant-id": "352176a7-b605-4cc2-b3b2-ee591715b6b4", + }, + "num_workers": 20, + "spark_conf": {"spark.databricks.isv.product": "sync-gradient"}, + "spark_version": "13.3.x-scala2.12", + "runtime_engine": "PHOTON", + "aws_attributes": { + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100, + }, + } + ) + + result = get_latest_project_config_recommendation("project_id") + assert result == expected_result + + @mock.patch("sync.clients.sync.SyncClient._send") + def test_get_latest_azure_project_config_recommendation(self, mock_send): + mock_send.return_value = get_azure_rec_from_file() + expected_result = Response( + result={ + "node_type_id": "Standard_D4s_v3", + "driver_node_type_id": "Standard_D4s_v3", + "custom_tags": { + "sync:project-id": "769c3443-afd7-45ff-a72a-27bf4296b80e", + "sync:run-id": "d3f8db6c-df4b-430a-a511-a1e9c95d1ad0", + "sync:recommendation-id": "6024acdd-fd13-4bf1-82f5-44f1ab7008f2", + "sync:tenant-id": "290d381e-8eb4-4d6a-80d4-453d82897ecc", + }, + "num_workers": 5, + "spark_conf": {"spark.databricks.isv.product": "sync-gradient"}, + "spark_version": "13.3.x-scala2.12", + "runtime_engine": "STANDARD", + "azure_attributes": { + "availability": "SPOT_WITH_FALLBACK_AZURE", + "first_on_demand": 6, + "spot_bid_max_price": 100.0, + }, + } + ) + + result = get_latest_project_config_recommendation("project_id") + assert result == expected_result + + @mock.patch("sync.clients.sync.SyncClient._send") + def test_get_cluster_definition_no_recommendation(self, mock_send): + mock_send.return_value = {"result": []} + expected_result = Response(error=RecommendationError(message="Recommendation failed")) + + with open("tests/test_files/azure_cluster.json") as cluster_in: + result = get_cluster_definition_and_recommendation("project_id", cluster_in.read()) + assert result == expected_result + + @mock.patch("sync.clients.sync.SyncClient._send") + def test_get_cluster_definition_and_recommendation(self, mock_send): + mock_send.return_value = get_aws_rec_from_file() + + expected_result = Response( + result={ + "cluster_recommendations": { + "node_type_id": "i6.xlarge", + "driver_node_type_id": "i6.xlarge", + "custom_tags": { + "sync:project-id": "b9bd7136-7699-4603-9040-c6dc4c914e43", + "sync:run-id": "e96401da-f64d-4ed0-8ded-db1317f40248", + "sync:recommendation-id": "e029a220-c6a5-49fd-b7ed-7ea046366741", + "sync:tenant-id": "352176a7-b605-4cc2-b3b2-ee591715b6b4", + }, + "num_workers": 20, + "spark_conf": {"spark.databricks.isv.product": "sync-gradient"}, + "spark_version": "13.3.x-scala2.12", + "runtime_engine": "PHOTON", + "aws_attributes": { + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100, + }, + }, + "cluster_definition": { + "cluster_id": "1234-567890-reef123", + "spark_context_id": 4020997813441462000, + "cluster_name": "my-cluster", + "spark_version": "13.3.x-scala2.12", + "aws_attributes": { + "zone_id": "us-west-2c", + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100, + "ebs_volume_count": 0, + }, + "node_type_id": "i3.xlarge", + "driver_node_type_id": "i3.xlarge", + "autotermination_minutes": 120, + "enable_elastic_disk": False, + "disk_spec": {"disk_count": 0}, + "cluster_source": "UI", + "enable_local_disk_encryption": False, + "instance_source": {"node_type_id": "i3.xlarge"}, + "driver_instance_source": {"node_type_id": "i3.xlarge"}, + "state": "TERMINATED", + "state_message": "Inactive cluster terminated (inactive for 120 minutes).", + "start_time": 1618263108824, + "terminated_time": 1619746525713, + "last_state_loss_time": 1619739324740, + "num_workers": 30, + "default_tags": { + "Vendor": "Databricks", + "Creator": "someone@example.com", + "ClusterName": "my-cluster", + "ClusterId": "1234-567890-reef123", + }, + "creator_user_name": "someone@example.com", + "termination_reason": { + "code": "INACTIVITY", + "parameters": {"inactivity_duration_min": "120"}, + "type": "SUCCESS", + }, + "init_scripts_safe_mode": False, + "spec": {"spark_version": "13.3.x-scala2.12"}, + }, + } + ) + + with open("tests/test_files/aws_cluster.json") as cluster_in: + result = get_cluster_definition_and_recommendation("project_id", cluster_in.read()) + print(f"Result = {result}") + assert result == expected_result + + @mock.patch("sync.clients.sync.SyncClient.get_latest_project_recommendation") + def test_get_updated_aws_cluster_defintion(self, mock_send): + mock_send.return_value = get_aws_rec_from_file() + + expected_result = Response( + result={ + "cluster_id": "1234-567890-reef123", + "spark_context_id": 4020997813441462000, + "cluster_name": "my-cluster", + "spark_version": "13.3.x-scala2.12", + "aws_attributes": { + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100, + }, + "custom_tags": { + "sync:project-id": "b9bd7136-7699-4603-9040-c6dc4c914e43", + "sync:run-id": "e96401da-f64d-4ed0-8ded-db1317f40248", + "sync:recommendation-id": "e029a220-c6a5-49fd-b7ed-7ea046366741", + "sync:tenant-id": "352176a7-b605-4cc2-b3b2-ee591715b6b4", + }, + "node_type_id": "i6.xlarge", + "driver_node_type_id": "i6.xlarge", + "autotermination_minutes": 120, + "enable_elastic_disk": False, + "disk_spec": {"disk_count": 0}, + "cluster_source": "UI", + "enable_local_disk_encryption": False, + "instance_source": {"node_type_id": "i6.xlarge"}, + "driver_instance_source": {"node_type_id": "i6.xlarge"}, + "spark_conf": {"spark.databricks.isv.product": "sync-gradient"}, + "state": "TERMINATED", + "state_message": "Inactive cluster terminated (inactive for 120 minutes).", + "start_time": 1618263108824, + "terminated_time": 1619746525713, + "last_state_loss_time": 1619739324740, + "num_workers": 20, + "runtime_engine": "PHOTON", + "default_tags": { + "Vendor": "Databricks", + "Creator": "someone@example.com", + "ClusterName": "my-cluster", + "ClusterId": "1234-567890-reef123", + }, + "creator_user_name": "someone@example.com", + "termination_reason": { + "code": "INACTIVITY", + "parameters": {"inactivity_duration_min": "120"}, + "type": "SUCCESS", + }, + "init_scripts_safe_mode": False, + "spec": {"spark_version": "13.3.x-scala2.12"}, + } + ) + + with open("tests/test_files/aws_cluster.json") as cluster_in: + result = get_updated_cluster_defintion("project_id", cluster_in.read()) + assert result == expected_result + + @mock.patch("sync.clients.sync.SyncClient.get_latest_project_recommendation") + def test_get_updated_azure_cluster_defintion(self, mock_send): + mock_send.return_value = get_azure_rec_from_file() + + expected_result = Response( + result={ + "cluster_id": "1114-202840-mu1ql9xp", + "spark_context_id": 8637481617925571639, + "cluster_name": "my-cluster", + "spark_version": "13.3.x-scala2.12", + "azure_attributes": { + "first_on_demand": 6, + "availability": "SPOT_WITH_FALLBACK_AZURE", + "spot_bid_max_price": 100.0, + }, + "node_type_id": "Standard_D4s_v3", + "driver_node_type_id": "Standard_D4s_v3", + "autotermination_minutes": 120, + "enable_elastic_disk": False, + "disk_spec": {"disk_count": 0}, + "cluster_source": "UI", + "enable_local_disk_encryption": False, + "instance_source": {"node_type_id": "Standard_D4s_v3"}, + "driver_instance_source": {"node_type_id": "Standard_D4s_v3"}, + "state": "TERMINATED", + "state_message": "Inactive cluster terminated (inactive for 120 minutes).", + "start_time": 1618263108824, + "terminated_time": 1619746525713, + "last_state_loss_time": 1619739324740, + "num_workers": 5, + "default_tags": { + "Vendor": "Databricks", + "Creator": "someone@example.com", + "ClusterName": "my-cluster", + "ClusterId": "1234-567890-reef123", + }, + "creator_user_name": "someone@example.com", + "termination_reason": { + "code": "INACTIVITY", + "parameters": {"inactivity_duration_min": "120"}, + "type": "SUCCESS", + }, + "init_scripts_safe_mode": False, + "spec": {"spark_version": "13.3.x-scala2.12"}, + "custom_tags": { + "sync:project-id": "769c3443-afd7-45ff-a72a-27bf4296b80e", + "sync:run-id": "d3f8db6c-df4b-430a-a511-a1e9c95d1ad0", + "sync:recommendation-id": "6024acdd-fd13-4bf1-82f5-44f1ab7008f2", + "sync:tenant-id": "290d381e-8eb4-4d6a-80d4-453d82897ecc", + }, + "spark_conf": {"spark.databricks.isv.product": "sync-gradient"}, + "runtime_engine": "STANDARD", + } + ) + + with open("tests/test_files/azure_cluster.json") as cluster_in: + result = get_updated_cluster_defintion("project_id", cluster_in.read()) + assert result == expected_result diff --git a/tests/test_files/aws_cluster.json b/tests/test_files/aws_cluster.json new file mode 100644 index 0000000..1b0012f --- /dev/null +++ b/tests/test_files/aws_cluster.json @@ -0,0 +1,52 @@ +{ + "cluster_id": "1234-567890-reef123", + "spark_context_id": 4020997813441462000, + "cluster_name": "my-cluster", + "spark_version": "13.3.x-scala2.12", + "aws_attributes": { + "zone_id": "us-west-2c", + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100, + "ebs_volume_count": 0 + }, + "node_type_id": "i3.xlarge", + "driver_node_type_id": "i3.xlarge", + "autotermination_minutes": 120, + "enable_elastic_disk": false, + "disk_spec": { + "disk_count": 0 + }, + "cluster_source": "UI", + "enable_local_disk_encryption": false, + "instance_source": { + "node_type_id": "i3.xlarge" + }, + "driver_instance_source": { + "node_type_id": "i3.xlarge" + }, + "state": "TERMINATED", + "state_message": "Inactive cluster terminated (inactive for 120 minutes).", + "start_time": 1618263108824, + "terminated_time": 1619746525713, + "last_state_loss_time": 1619739324740, + "num_workers": 30, + "default_tags": { + "Vendor": "Databricks", + "Creator": "someone@example.com", + "ClusterName": "my-cluster", + "ClusterId": "1234-567890-reef123" + }, + "creator_user_name": "someone@example.com", + "termination_reason": { + "code": "INACTIVITY", + "parameters": { + "inactivity_duration_min": "120" + }, + "type": "SUCCESS" + }, + "init_scripts_safe_mode": false, + "spec": { + "spark_version": "13.3.x-scala2.12" + } +} \ No newline at end of file diff --git a/tests/test_files/aws_recommendation.json b/tests/test_files/aws_recommendation.json new file mode 100644 index 0000000..6087fe2 --- /dev/null +++ b/tests/test_files/aws_recommendation.json @@ -0,0 +1,55 @@ +{ + "result": [ + { + "created_at": "2024-02-29T23:11:58.559Z", + "updated_at": "2024-02-29T23:11:58.559Z", + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "state": "string", + "error": "string", + "recommendation": { + "metrics": { + "spark_duration_minutes": 0, + "spark_cost_requested_usd": 0, + "spark_cost_lower_usd": 0, + "spark_cost_midpoint_usd": 0, + "spark_cost_upper_usd": 0 + }, + "configuration": { + "node_type_id": "i6.xlarge", + "driver_node_type_id": "i6.xlarge", + "custom_tags": { + "sync:project-id": "b9bd7136-7699-4603-9040-c6dc4c914e43", + "sync:run-id": "e96401da-f64d-4ed0-8ded-db1317f40248", + "sync:recommendation-id": "e029a220-c6a5-49fd-b7ed-7ea046366741", + "sync:tenant-id": "352176a7-b605-4cc2-b3b2-ee591715b6b4" + }, + "num_workers": 20, + "spark_conf": { + "spark.databricks.isv.product": "sync-gradient" + }, + "spark_version": "13.3.x-scala2.12", + "runtime_engine": "PHOTON", + "aws_attributes": { + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100 + } + }, + "prediction_params": { + "sla_minutes": 0, + "force_ondemand_workers": false, + "force_ondemand_basis": false, + "fix_worker_family": true, + "fix_driver_type": true, + "fix_scaling_type": false + } + }, + "context": { + "latest_submission_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "phase": "LEARNING", + "current_learning_iteration": 0, + "total_learning_iterations": 0 + } + } + ] +} \ No newline at end of file diff --git a/tests/test_files/azure_cluster.json b/tests/test_files/azure_cluster.json new file mode 100644 index 0000000..3ef7977 --- /dev/null +++ b/tests/test_files/azure_cluster.json @@ -0,0 +1,50 @@ +{ + "cluster_id": "1114-202840-mu1ql9xp", + "spark_context_id": 8637481617925571639, + "cluster_name": "my-cluster", + "spark_version": "13.3.x-scala2.12", + "azure_attributes": { + "first_on_demand": 6, + "availability": "SPOT_WITH_FALLBACK_AZURE", + "spot_bid_max_price": 100.0 + }, + "node_type_id": "Standard_DS5_v2", + "driver_node_type_id": "Standard_DS5_v2", + "autotermination_minutes": 120, + "enable_elastic_disk": false, + "disk_spec": { + "disk_count": 0 + }, + "cluster_source": "UI", + "enable_local_disk_encryption": false, + "instance_source": { + "node_type_id": "Standard_DS5_v2" + }, + "driver_instance_source": { + "node_type_id": "Standard_DS5_v2" + }, + "state": "TERMINATED", + "state_message": "Inactive cluster terminated (inactive for 120 minutes).", + "start_time": 1618263108824, + "terminated_time": 1619746525713, + "last_state_loss_time": 1619739324740, + "num_workers": 30, + "default_tags": { + "Vendor": "Databricks", + "Creator": "someone@example.com", + "ClusterName": "my-cluster", + "ClusterId": "1234-567890-reef123" + }, + "creator_user_name": "someone@example.com", + "termination_reason": { + "code": "INACTIVITY", + "parameters": { + "inactivity_duration_min": "120" + }, + "type": "SUCCESS" + }, + "init_scripts_safe_mode": false, + "spec": { + "spark_version": "13.3.x-scala2.12" + } +} \ No newline at end of file diff --git a/tests/test_files/azure_recommendation.json b/tests/test_files/azure_recommendation.json new file mode 100644 index 0000000..b857bf0 --- /dev/null +++ b/tests/test_files/azure_recommendation.json @@ -0,0 +1,47 @@ +{ + "result": [ + { + "created_at": "2024-02-14T21:26:37Z", + "updated_at": "2024-02-14T21:29:38Z", + "id": "6024acdd-fd13-4bf1-82f5-44f1ab7008f2", + "state": "SUCCESS", + "recommendation": { + "configuration": { + "node_type_id": "Standard_D4s_v3", + "driver_node_type_id": "Standard_D4s_v3", + "custom_tags": { + "sync:project-id": "769c3443-afd7-45ff-a72a-27bf4296b80e", + "sync:run-id": "d3f8db6c-df4b-430a-a511-a1e9c95d1ad0", + "sync:recommendation-id": "6024acdd-fd13-4bf1-82f5-44f1ab7008f2", + "sync:tenant-id": "290d381e-8eb4-4d6a-80d4-453d82897ecc" + }, + "num_workers": 5, + "spark_conf": { + "spark.databricks.isv.product": "sync-gradient" + }, + "spark_version": "13.3.x-scala2.12", + "runtime_engine": "STANDARD", + "azure_attributes": { + "availability": "SPOT_WITH_FALLBACK_AZURE", + "first_on_demand": 6, + "spot_bid_max_price": 100.0 + } + }, + "prediction_params": { + "sla_minutes": 60, + "force_ondemand_workers": false, + "force_ondemand_basis": false, + "fix_worker_family": true, + "fix_driver_type": true, + "fix_scaling_type": true + } + }, + "context": { + "latest_submission_id": "8626c4f9-e57f-4563-9981-f21b936828b0", + "phase": "LEARNING", + "current_learning_iteration": 2, + "total_learning_iterations": 3 + } + } + ] +} \ No newline at end of file From 38c3b18eacf0b4edcdde65fbee4e2f19de0689a8 Mon Sep 17 00:00:00 2001 From: Taylor Gaw Date: Fri, 8 Mar 2024 12:04:15 -0500 Subject: [PATCH 3/8] linting --- sync/clients/sync.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sync/clients/sync.py b/sync/clients/sync.py index de824d6..5b1278e 100644 --- a/sync/clients/sync.py +++ b/sync/clients/sync.py @@ -1,4 +1,3 @@ -import difflib import json import logging from typing import Dict, Generator From d91508e53bbccdaab503f96782407afed2204cee Mon Sep 17 00:00:00 2001 From: Taylor Gaw Date: Fri, 8 Mar 2024 12:34:20 -0500 Subject: [PATCH 4/8] Removing redundant code left over from merge --- sync/clients/sync.py | 61 +------------------------------------------- 1 file changed, 1 insertion(+), 60 deletions(-) diff --git a/sync/clients/sync.py b/sync/clients/sync.py index 5b1278e..61dd89b 100644 --- a/sync/clients/sync.py +++ b/sync/clients/sync.py @@ -1,9 +1,7 @@ -import json import logging -from typing import Dict, Generator +from typing import Generator import httpx -from pydantic import BaseModel from ..config import API_KEY, CONFIG, APIKey from . import USER_AGENT, RetryableHTTPClient, encode_json @@ -11,37 +9,6 @@ logger = logging.getLogger(__name__) -class S3ClusterLogConfiguration(BaseModel): - destination: str - region: str - enable_encryption: bool - canned_acl: str - - -class DBFSClusterLogConfiguration(BaseModel): - destination: str - - -class ClusterLogConfiguration(BaseModel): - s3: S3ClusterLogConfiguration - dbfs: DBFSClusterLogConfiguration - - -class ProjectConfiguration(BaseModel): - node_type_id: str - driver_node_type: str - custom_tags: Dict - cluster_log_conf: ClusterLogConfiguration - cluster_name: str - num_workers: int - spark_version: str - runtime_engine: str - autoscale: Dict - spark_conf: Dict - aws_attributes: Dict - spark_env_vars: Dict - - class SyncAuth(httpx.Auth): requires_response_body = True @@ -234,32 +201,6 @@ def onboard_workflow(self, workspace_id, job_id: str, project_id: str) -> dict: ) ) - def get_latest_project_config_recommendation(self, project_id: str) -> ProjectConfiguration: - latest_recommendation = self.get_latest_project_recommendation(project_id) - if latest_recommendation.get("result"): - return latest_recommendation["result"][0]["recommendation"]["configuration"] - else: - return {"error": f"No project recommendation found for Project {project_id}"} - - def get_cluster_definition_and_recommendation( - self, project_id: str, cluster_spec_str: str - ) -> dict: - cluster_recommendation = self.get_latest_project_config_recommendation(project_id) - return { - "cluster_recommendations": cluster_recommendation, - "cluster_definition": json.loads(cluster_spec_str), - } - - def get_updated_cluster_defintion( - self, project_id: str, cluster_spec_str: str - ) -> ProjectConfiguration: - latest_recommendation = self.get_latest_project_config_recommendation(project_id) - cluster_definition = json.loads(cluster_spec_str) - for key in latest_recommendation.keys(): - cluster_definition[key] = latest_recommendation[key] - - return cluster_definition - def _send(self, request: httpx.Request) -> dict: response = self._send_request(request) From 59ee99b1ca194cfd923ace3b3da47ccaeacca3a8 Mon Sep 17 00:00:00 2001 From: Taylor Gaw Date: Fri, 8 Mar 2024 12:36:55 -0500 Subject: [PATCH 5/8] feat: Compare cluster configs in Sync Library --- sync/clients/sync.py | 61 +++++++++++++++++++++++++++- tests/test_files/cluster.json | 52 ++++++++++++++++++++++++ tests/test_files/recommendation.json | 55 +++++++++++++++++++++++++ 3 files changed, 167 insertions(+), 1 deletion(-) create mode 100644 tests/test_files/cluster.json create mode 100644 tests/test_files/recommendation.json diff --git a/sync/clients/sync.py b/sync/clients/sync.py index 61dd89b..5b1278e 100644 --- a/sync/clients/sync.py +++ b/sync/clients/sync.py @@ -1,7 +1,9 @@ +import json import logging -from typing import Generator +from typing import Dict, Generator import httpx +from pydantic import BaseModel from ..config import API_KEY, CONFIG, APIKey from . import USER_AGENT, RetryableHTTPClient, encode_json @@ -9,6 +11,37 @@ logger = logging.getLogger(__name__) +class S3ClusterLogConfiguration(BaseModel): + destination: str + region: str + enable_encryption: bool + canned_acl: str + + +class DBFSClusterLogConfiguration(BaseModel): + destination: str + + +class ClusterLogConfiguration(BaseModel): + s3: S3ClusterLogConfiguration + dbfs: DBFSClusterLogConfiguration + + +class ProjectConfiguration(BaseModel): + node_type_id: str + driver_node_type: str + custom_tags: Dict + cluster_log_conf: ClusterLogConfiguration + cluster_name: str + num_workers: int + spark_version: str + runtime_engine: str + autoscale: Dict + spark_conf: Dict + aws_attributes: Dict + spark_env_vars: Dict + + class SyncAuth(httpx.Auth): requires_response_body = True @@ -201,6 +234,32 @@ def onboard_workflow(self, workspace_id, job_id: str, project_id: str) -> dict: ) ) + def get_latest_project_config_recommendation(self, project_id: str) -> ProjectConfiguration: + latest_recommendation = self.get_latest_project_recommendation(project_id) + if latest_recommendation.get("result"): + return latest_recommendation["result"][0]["recommendation"]["configuration"] + else: + return {"error": f"No project recommendation found for Project {project_id}"} + + def get_cluster_definition_and_recommendation( + self, project_id: str, cluster_spec_str: str + ) -> dict: + cluster_recommendation = self.get_latest_project_config_recommendation(project_id) + return { + "cluster_recommendations": cluster_recommendation, + "cluster_definition": json.loads(cluster_spec_str), + } + + def get_updated_cluster_defintion( + self, project_id: str, cluster_spec_str: str + ) -> ProjectConfiguration: + latest_recommendation = self.get_latest_project_config_recommendation(project_id) + cluster_definition = json.loads(cluster_spec_str) + for key in latest_recommendation.keys(): + cluster_definition[key] = latest_recommendation[key] + + return cluster_definition + def _send(self, request: httpx.Request) -> dict: response = self._send_request(request) diff --git a/tests/test_files/cluster.json b/tests/test_files/cluster.json new file mode 100644 index 0000000..1b0012f --- /dev/null +++ b/tests/test_files/cluster.json @@ -0,0 +1,52 @@ +{ + "cluster_id": "1234-567890-reef123", + "spark_context_id": 4020997813441462000, + "cluster_name": "my-cluster", + "spark_version": "13.3.x-scala2.12", + "aws_attributes": { + "zone_id": "us-west-2c", + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100, + "ebs_volume_count": 0 + }, + "node_type_id": "i3.xlarge", + "driver_node_type_id": "i3.xlarge", + "autotermination_minutes": 120, + "enable_elastic_disk": false, + "disk_spec": { + "disk_count": 0 + }, + "cluster_source": "UI", + "enable_local_disk_encryption": false, + "instance_source": { + "node_type_id": "i3.xlarge" + }, + "driver_instance_source": { + "node_type_id": "i3.xlarge" + }, + "state": "TERMINATED", + "state_message": "Inactive cluster terminated (inactive for 120 minutes).", + "start_time": 1618263108824, + "terminated_time": 1619746525713, + "last_state_loss_time": 1619739324740, + "num_workers": 30, + "default_tags": { + "Vendor": "Databricks", + "Creator": "someone@example.com", + "ClusterName": "my-cluster", + "ClusterId": "1234-567890-reef123" + }, + "creator_user_name": "someone@example.com", + "termination_reason": { + "code": "INACTIVITY", + "parameters": { + "inactivity_duration_min": "120" + }, + "type": "SUCCESS" + }, + "init_scripts_safe_mode": false, + "spec": { + "spark_version": "13.3.x-scala2.12" + } +} \ No newline at end of file diff --git a/tests/test_files/recommendation.json b/tests/test_files/recommendation.json new file mode 100644 index 0000000..6087fe2 --- /dev/null +++ b/tests/test_files/recommendation.json @@ -0,0 +1,55 @@ +{ + "result": [ + { + "created_at": "2024-02-29T23:11:58.559Z", + "updated_at": "2024-02-29T23:11:58.559Z", + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "state": "string", + "error": "string", + "recommendation": { + "metrics": { + "spark_duration_minutes": 0, + "spark_cost_requested_usd": 0, + "spark_cost_lower_usd": 0, + "spark_cost_midpoint_usd": 0, + "spark_cost_upper_usd": 0 + }, + "configuration": { + "node_type_id": "i6.xlarge", + "driver_node_type_id": "i6.xlarge", + "custom_tags": { + "sync:project-id": "b9bd7136-7699-4603-9040-c6dc4c914e43", + "sync:run-id": "e96401da-f64d-4ed0-8ded-db1317f40248", + "sync:recommendation-id": "e029a220-c6a5-49fd-b7ed-7ea046366741", + "sync:tenant-id": "352176a7-b605-4cc2-b3b2-ee591715b6b4" + }, + "num_workers": 20, + "spark_conf": { + "spark.databricks.isv.product": "sync-gradient" + }, + "spark_version": "13.3.x-scala2.12", + "runtime_engine": "PHOTON", + "aws_attributes": { + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100 + } + }, + "prediction_params": { + "sla_minutes": 0, + "force_ondemand_workers": false, + "force_ondemand_basis": false, + "fix_worker_family": true, + "fix_driver_type": true, + "fix_scaling_type": false + } + }, + "context": { + "latest_submission_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "phase": "LEARNING", + "current_learning_iteration": 0, + "total_learning_iterations": 0 + } + } + ] +} \ No newline at end of file From 9fad4db942d223c57ab3c9f1120881a3c1617456 Mon Sep 17 00:00:00 2001 From: Taylor Gaw Date: Fri, 8 Mar 2024 12:34:20 -0500 Subject: [PATCH 6/8] Removing redundant code left over from merge --- sync/clients/sync.py | 61 +--------------------------- tests/test_files/cluster.json | 52 ------------------------ tests/test_files/recommendation.json | 55 ------------------------- 3 files changed, 1 insertion(+), 167 deletions(-) delete mode 100644 tests/test_files/cluster.json delete mode 100644 tests/test_files/recommendation.json diff --git a/sync/clients/sync.py b/sync/clients/sync.py index 5b1278e..61dd89b 100644 --- a/sync/clients/sync.py +++ b/sync/clients/sync.py @@ -1,9 +1,7 @@ -import json import logging -from typing import Dict, Generator +from typing import Generator import httpx -from pydantic import BaseModel from ..config import API_KEY, CONFIG, APIKey from . import USER_AGENT, RetryableHTTPClient, encode_json @@ -11,37 +9,6 @@ logger = logging.getLogger(__name__) -class S3ClusterLogConfiguration(BaseModel): - destination: str - region: str - enable_encryption: bool - canned_acl: str - - -class DBFSClusterLogConfiguration(BaseModel): - destination: str - - -class ClusterLogConfiguration(BaseModel): - s3: S3ClusterLogConfiguration - dbfs: DBFSClusterLogConfiguration - - -class ProjectConfiguration(BaseModel): - node_type_id: str - driver_node_type: str - custom_tags: Dict - cluster_log_conf: ClusterLogConfiguration - cluster_name: str - num_workers: int - spark_version: str - runtime_engine: str - autoscale: Dict - spark_conf: Dict - aws_attributes: Dict - spark_env_vars: Dict - - class SyncAuth(httpx.Auth): requires_response_body = True @@ -234,32 +201,6 @@ def onboard_workflow(self, workspace_id, job_id: str, project_id: str) -> dict: ) ) - def get_latest_project_config_recommendation(self, project_id: str) -> ProjectConfiguration: - latest_recommendation = self.get_latest_project_recommendation(project_id) - if latest_recommendation.get("result"): - return latest_recommendation["result"][0]["recommendation"]["configuration"] - else: - return {"error": f"No project recommendation found for Project {project_id}"} - - def get_cluster_definition_and_recommendation( - self, project_id: str, cluster_spec_str: str - ) -> dict: - cluster_recommendation = self.get_latest_project_config_recommendation(project_id) - return { - "cluster_recommendations": cluster_recommendation, - "cluster_definition": json.loads(cluster_spec_str), - } - - def get_updated_cluster_defintion( - self, project_id: str, cluster_spec_str: str - ) -> ProjectConfiguration: - latest_recommendation = self.get_latest_project_config_recommendation(project_id) - cluster_definition = json.loads(cluster_spec_str) - for key in latest_recommendation.keys(): - cluster_definition[key] = latest_recommendation[key] - - return cluster_definition - def _send(self, request: httpx.Request) -> dict: response = self._send_request(request) diff --git a/tests/test_files/cluster.json b/tests/test_files/cluster.json deleted file mode 100644 index 1b0012f..0000000 --- a/tests/test_files/cluster.json +++ /dev/null @@ -1,52 +0,0 @@ -{ - "cluster_id": "1234-567890-reef123", - "spark_context_id": 4020997813441462000, - "cluster_name": "my-cluster", - "spark_version": "13.3.x-scala2.12", - "aws_attributes": { - "zone_id": "us-west-2c", - "first_on_demand": 1, - "availability": "SPOT_WITH_FALLBACK", - "spot_bid_price_percent": 100, - "ebs_volume_count": 0 - }, - "node_type_id": "i3.xlarge", - "driver_node_type_id": "i3.xlarge", - "autotermination_minutes": 120, - "enable_elastic_disk": false, - "disk_spec": { - "disk_count": 0 - }, - "cluster_source": "UI", - "enable_local_disk_encryption": false, - "instance_source": { - "node_type_id": "i3.xlarge" - }, - "driver_instance_source": { - "node_type_id": "i3.xlarge" - }, - "state": "TERMINATED", - "state_message": "Inactive cluster terminated (inactive for 120 minutes).", - "start_time": 1618263108824, - "terminated_time": 1619746525713, - "last_state_loss_time": 1619739324740, - "num_workers": 30, - "default_tags": { - "Vendor": "Databricks", - "Creator": "someone@example.com", - "ClusterName": "my-cluster", - "ClusterId": "1234-567890-reef123" - }, - "creator_user_name": "someone@example.com", - "termination_reason": { - "code": "INACTIVITY", - "parameters": { - "inactivity_duration_min": "120" - }, - "type": "SUCCESS" - }, - "init_scripts_safe_mode": false, - "spec": { - "spark_version": "13.3.x-scala2.12" - } -} \ No newline at end of file diff --git a/tests/test_files/recommendation.json b/tests/test_files/recommendation.json deleted file mode 100644 index 6087fe2..0000000 --- a/tests/test_files/recommendation.json +++ /dev/null @@ -1,55 +0,0 @@ -{ - "result": [ - { - "created_at": "2024-02-29T23:11:58.559Z", - "updated_at": "2024-02-29T23:11:58.559Z", - "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", - "state": "string", - "error": "string", - "recommendation": { - "metrics": { - "spark_duration_minutes": 0, - "spark_cost_requested_usd": 0, - "spark_cost_lower_usd": 0, - "spark_cost_midpoint_usd": 0, - "spark_cost_upper_usd": 0 - }, - "configuration": { - "node_type_id": "i6.xlarge", - "driver_node_type_id": "i6.xlarge", - "custom_tags": { - "sync:project-id": "b9bd7136-7699-4603-9040-c6dc4c914e43", - "sync:run-id": "e96401da-f64d-4ed0-8ded-db1317f40248", - "sync:recommendation-id": "e029a220-c6a5-49fd-b7ed-7ea046366741", - "sync:tenant-id": "352176a7-b605-4cc2-b3b2-ee591715b6b4" - }, - "num_workers": 20, - "spark_conf": { - "spark.databricks.isv.product": "sync-gradient" - }, - "spark_version": "13.3.x-scala2.12", - "runtime_engine": "PHOTON", - "aws_attributes": { - "first_on_demand": 1, - "availability": "SPOT_WITH_FALLBACK", - "spot_bid_price_percent": 100 - } - }, - "prediction_params": { - "sla_minutes": 0, - "force_ondemand_workers": false, - "force_ondemand_basis": false, - "fix_worker_family": true, - "fix_driver_type": true, - "fix_scaling_type": false - } - }, - "context": { - "latest_submission_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", - "phase": "LEARNING", - "current_learning_iteration": 0, - "total_learning_iterations": 0 - } - } - ] -} \ No newline at end of file From 0ac3d6b997111d2fa8ee940a12bbbed5d7098426 Mon Sep 17 00:00:00 2001 From: Taylor Gaw Date: Fri, 8 Mar 2024 12:39:33 -0500 Subject: [PATCH 7/8] removing old files --- tests/test_files/cluster.json | 52 -------------------------- tests/test_files/recommendation.json | 55 ---------------------------- 2 files changed, 107 deletions(-) delete mode 100644 tests/test_files/cluster.json delete mode 100644 tests/test_files/recommendation.json diff --git a/tests/test_files/cluster.json b/tests/test_files/cluster.json deleted file mode 100644 index 1b0012f..0000000 --- a/tests/test_files/cluster.json +++ /dev/null @@ -1,52 +0,0 @@ -{ - "cluster_id": "1234-567890-reef123", - "spark_context_id": 4020997813441462000, - "cluster_name": "my-cluster", - "spark_version": "13.3.x-scala2.12", - "aws_attributes": { - "zone_id": "us-west-2c", - "first_on_demand": 1, - "availability": "SPOT_WITH_FALLBACK", - "spot_bid_price_percent": 100, - "ebs_volume_count": 0 - }, - "node_type_id": "i3.xlarge", - "driver_node_type_id": "i3.xlarge", - "autotermination_minutes": 120, - "enable_elastic_disk": false, - "disk_spec": { - "disk_count": 0 - }, - "cluster_source": "UI", - "enable_local_disk_encryption": false, - "instance_source": { - "node_type_id": "i3.xlarge" - }, - "driver_instance_source": { - "node_type_id": "i3.xlarge" - }, - "state": "TERMINATED", - "state_message": "Inactive cluster terminated (inactive for 120 minutes).", - "start_time": 1618263108824, - "terminated_time": 1619746525713, - "last_state_loss_time": 1619739324740, - "num_workers": 30, - "default_tags": { - "Vendor": "Databricks", - "Creator": "someone@example.com", - "ClusterName": "my-cluster", - "ClusterId": "1234-567890-reef123" - }, - "creator_user_name": "someone@example.com", - "termination_reason": { - "code": "INACTIVITY", - "parameters": { - "inactivity_duration_min": "120" - }, - "type": "SUCCESS" - }, - "init_scripts_safe_mode": false, - "spec": { - "spark_version": "13.3.x-scala2.12" - } -} \ No newline at end of file diff --git a/tests/test_files/recommendation.json b/tests/test_files/recommendation.json deleted file mode 100644 index 6087fe2..0000000 --- a/tests/test_files/recommendation.json +++ /dev/null @@ -1,55 +0,0 @@ -{ - "result": [ - { - "created_at": "2024-02-29T23:11:58.559Z", - "updated_at": "2024-02-29T23:11:58.559Z", - "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", - "state": "string", - "error": "string", - "recommendation": { - "metrics": { - "spark_duration_minutes": 0, - "spark_cost_requested_usd": 0, - "spark_cost_lower_usd": 0, - "spark_cost_midpoint_usd": 0, - "spark_cost_upper_usd": 0 - }, - "configuration": { - "node_type_id": "i6.xlarge", - "driver_node_type_id": "i6.xlarge", - "custom_tags": { - "sync:project-id": "b9bd7136-7699-4603-9040-c6dc4c914e43", - "sync:run-id": "e96401da-f64d-4ed0-8ded-db1317f40248", - "sync:recommendation-id": "e029a220-c6a5-49fd-b7ed-7ea046366741", - "sync:tenant-id": "352176a7-b605-4cc2-b3b2-ee591715b6b4" - }, - "num_workers": 20, - "spark_conf": { - "spark.databricks.isv.product": "sync-gradient" - }, - "spark_version": "13.3.x-scala2.12", - "runtime_engine": "PHOTON", - "aws_attributes": { - "first_on_demand": 1, - "availability": "SPOT_WITH_FALLBACK", - "spot_bid_price_percent": 100 - } - }, - "prediction_params": { - "sla_minutes": 0, - "force_ondemand_workers": false, - "force_ondemand_basis": false, - "fix_worker_family": true, - "fix_driver_type": true, - "fix_scaling_type": false - } - }, - "context": { - "latest_submission_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", - "phase": "LEARNING", - "current_learning_iteration": 0, - "total_learning_iterations": 0 - } - } - ] -} \ No newline at end of file From 76ffeb67b34cf01312ca9b711fee96140a8a4b2f Mon Sep 17 00:00:00 2001 From: Taylor Gaw Date: Mon, 11 Mar 2024 16:12:48 -0600 Subject: [PATCH 8/8] moving deep_merge to separate utility function --- Makefile | 2 +- sync/_databricks.py | 46 ++++++---------------- sync/api/projects.py | 27 +++++++------ sync/models.py | 3 +- sync/utils/json.py | 22 +++++++++++ tests/clients/test_sync.py | 17 ++++---- tests/test_files/azure_recommendation.json | 2 +- 7 files changed, 63 insertions(+), 56 deletions(-) diff --git a/Makefile b/Makefile index 1cc1e76..fd89cfc 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ FILES := $(shell git diff --name-only --diff-filter=AM $$(git merge-base origin/ .PHONY: test test: - pytest + pytest -vv .PHONY: lint lint: diff --git a/sync/_databricks.py b/sync/_databricks.py index c602703..8032084 100644 --- a/sync/_databricks.py +++ b/sync/_databricks.py @@ -10,7 +10,7 @@ from datetime import datetime, timezone from pathlib import Path from time import sleep -from typing import Any, Collection, Dict, List, Set, Tuple, TypeVar, Union +from typing import Collection, Dict, List, Set, Tuple, Union from urllib.parse import urlparse import boto3 as boto @@ -21,12 +21,13 @@ from sync.models import ( DatabricksAPIError, DatabricksClusterReport, - DatabricksError, DatabricksComputeType, + DatabricksError, DatabricksPlanType, - Response + Response, ) from sync.utils.dbfs import format_dbfs_filepath, read_dbfs_file +from sync.utils.json import deep_update logger = logging.getLogger(__name__) @@ -102,7 +103,7 @@ def create_submission_with_cluster_info( cluster_activity_events=cluster_activity_events, tasks=tasks, plan_type=plan_type, - compute_type=compute_type + compute_type=compute_type, ) eventlog = _get_event_log_from_cluster(cluster, tasks).result @@ -308,12 +309,12 @@ def _get_cluster_report( def _create_cluster_report( - cluster: dict, - cluster_info: dict, - cluster_activity_events: dict, - tasks: List[dict], - plan_type: DatabricksPlanType, - compute_type: DatabricksComputeType + cluster: dict, + cluster_info: dict, + cluster_activity_events: dict, + tasks: List[dict], + plan_type: DatabricksPlanType, + compute_type: DatabricksComputeType, ) -> DatabricksClusterReport: raise NotImplementedError() @@ -659,7 +660,7 @@ def get_recommendation_cluster( if "autoscale" in cluster: del cluster["autoscale"] - recommendation_cluster = _deep_update(cluster, recommendation["configuration"]) + recommendation_cluster = deep_update(cluster, recommendation["configuration"]) return Response(result=recommendation_cluster) return recommendation_response @@ -727,7 +728,7 @@ def get_project_cluster(cluster: dict, project_id: str, region_name: str = None) project_settings_response = get_project_cluster_settings(project_id, region_name) project_cluster_settings = project_settings_response.result if project_cluster_settings: - project_cluster = _deep_update(cluster, project_cluster_settings) + project_cluster = deep_update(cluster, project_cluster_settings) return Response(result=project_cluster) return project_settings_response @@ -1638,24 +1639,3 @@ def _update_monitored_timelines( retired_inst_timeline_list.append(active_timelines_by_id.pop(id)) return active_timelines_by_id, retired_inst_timeline_list - - -KeyType = TypeVar("KeyType") - - -def _deep_update( - mapping: Dict[KeyType, Any], *updating_mappings: Dict[KeyType, Any] -) -> Dict[KeyType, Any]: - updated_mapping = mapping.copy() - for updating_mapping in updating_mappings: - for k, v in updating_mapping.items(): - if k in updated_mapping: - if isinstance(updated_mapping[k], dict) and isinstance(v, dict): - updated_mapping[k] = _deep_update(updated_mapping[k], v) - elif isinstance(updated_mapping[k], list) and isinstance(v, list): - updated_mapping[k] += v - else: - updated_mapping[k] = v - else: - updated_mapping[k] = v - return updated_mapping diff --git a/sync/api/projects.py b/sync/api/projects.py index e3fb706..76b0f2d 100644 --- a/sync/api/projects.py +++ b/sync/api/projects.py @@ -19,6 +19,7 @@ Response, SubmissionError, ) +from sync.utils.json import deep_update from . import generate_presigned_url @@ -444,7 +445,7 @@ def get_cluster_definition_and_recommendation( response_str = json.dumps(recommendation_response.result) return Response( result={ - "cluster_recommendations": json.loads(response_str), + "cluster_recommendation": json.loads(response_str), "cluster_definition": json.loads(cluster_spec_str), } ) @@ -453,7 +454,7 @@ def get_cluster_definition_and_recommendation( def get_updated_cluster_defintion( project_id: str, cluster_spec_str: str ) -> Response[Union[AWSProjectConfiguration, AzureProjectConfiguration]]: - """Print Cluster Definition merged with Project Configuration Recommendations. + """Return Cluster Definition merged with Project Configuration Recommendations. :param project_id: project ID :type project_id: str @@ -469,15 +470,17 @@ def get_updated_cluster_defintion( # Convert json string to json latest_recommendation = json.loads(latest_rec_str) cluster_definition = json.loads(cluster_spec_str) - for key in latest_recommendation.keys(): - cluster_definition[key] = latest_recommendation[key] - - # instance_source and driver_instance_source are not - # included in recommendation and need to be updated as well - driver_recommendation = cluster_definition["node_type_id"] - cluster_definition["instance_source"] = {"node_type_id": driver_recommendation} - cluster_definition["driver_instance_source"] = {"node_type_id": driver_recommendation} + # num_workers/autoscale are mutually exclusive settings, and we are relying on our Prediction + # Recommendations to set these appropriately. Since we may recommend a Static cluster (i.e. a cluster + # with `num_workers`) for a cluster that was originally autoscaled, we want to make sure to remove this + # prior configuration + if "num_workers" in cluster_definition: + del cluster_definition["num_workers"] + + if "autoscale" in cluster_definition: + del cluster_definition["autoscale"] + + recommendation_cluster = deep_update(cluster_definition, latest_recommendation) + return Response(result=recommendation_cluster) else: return rec_response - - return Response(result=cluster_definition) diff --git a/sync/models.py b/sync/models.py index ac1bc0c..661047f 100644 --- a/sync/models.py +++ b/sync/models.py @@ -168,9 +168,10 @@ class AWSProjectConfiguration(BaseModel): class AzureProjectConfiguration(BaseModel): node_type_id: str driver_node_type: str + cluster_log_conf: DBFSClusterLogConfiguration custom_tags: Dict num_workers: int spark_conf: Dict spark_version: str runtime_engine: str - aws_attributes: Dict + azure_attributes: Dict diff --git a/sync/utils/json.py b/sync/utils/json.py index fc783ca..37e11df 100644 --- a/sync/utils/json.py +++ b/sync/utils/json.py @@ -1,5 +1,6 @@ import datetime from json import JSONEncoder +from typing import Any, Dict, TypeVar class DefaultDateTimeEncoder(JSONEncoder): @@ -34,3 +35,24 @@ def default(self, obj): date = date.isoformat() date = date.replace("+00:00", "Z") return date + + +KeyType = TypeVar("KeyType") + + +def deep_update( + mapping: Dict[KeyType, Any], *updating_mappings: Dict[KeyType, Any] +) -> Dict[KeyType, Any]: + updated_mapping = mapping.copy() + for updating_mapping in updating_mappings: + for k, v in updating_mapping.items(): + if k in updated_mapping: + if isinstance(updated_mapping[k], dict) and isinstance(v, dict): + updated_mapping[k] = deep_update(updated_mapping[k], v) + elif isinstance(updated_mapping[k], list) and isinstance(v, list): + updated_mapping[k] += v + else: + updated_mapping[k] = v + else: + updated_mapping[k] = v + return updated_mapping diff --git a/tests/clients/test_sync.py b/tests/clients/test_sync.py index 45cf70f..f680f18 100644 --- a/tests/clients/test_sync.py +++ b/tests/clients/test_sync.py @@ -67,7 +67,7 @@ def test_get_latest_azure_project_config_recommendation(self, mock_send): "runtime_engine": "STANDARD", "azure_attributes": { "availability": "SPOT_WITH_FALLBACK_AZURE", - "first_on_demand": 6, + "first_on_demand": 7, "spot_bid_max_price": 100.0, }, } @@ -91,7 +91,7 @@ def test_get_cluster_definition_and_recommendation(self, mock_send): expected_result = Response( result={ - "cluster_recommendations": { + "cluster_recommendation": { "node_type_id": "i6.xlarge", "driver_node_type_id": "i6.xlarge", "custom_tags": { @@ -157,7 +157,6 @@ def test_get_cluster_definition_and_recommendation(self, mock_send): with open("tests/test_files/aws_cluster.json") as cluster_in: result = get_cluster_definition_and_recommendation("project_id", cluster_in.read()) - print(f"Result = {result}") assert result == expected_result @mock.patch("sync.clients.sync.SyncClient.get_latest_project_recommendation") @@ -172,8 +171,10 @@ def test_get_updated_aws_cluster_defintion(self, mock_send): "spark_version": "13.3.x-scala2.12", "aws_attributes": { "first_on_demand": 1, + "ebs_volume_count": 0, "availability": "SPOT_WITH_FALLBACK", "spot_bid_price_percent": 100, + "zone_id": "us-west-2c", }, "custom_tags": { "sync:project-id": "b9bd7136-7699-4603-9040-c6dc4c914e43", @@ -188,8 +189,8 @@ def test_get_updated_aws_cluster_defintion(self, mock_send): "disk_spec": {"disk_count": 0}, "cluster_source": "UI", "enable_local_disk_encryption": False, - "instance_source": {"node_type_id": "i6.xlarge"}, - "driver_instance_source": {"node_type_id": "i6.xlarge"}, + "instance_source": {"node_type_id": "i3.xlarge"}, + "driver_instance_source": {"node_type_id": "i3.xlarge"}, "spark_conf": {"spark.databricks.isv.product": "sync-gradient"}, "state": "TERMINATED", "state_message": "Inactive cluster terminated (inactive for 120 minutes).", @@ -230,7 +231,7 @@ def test_get_updated_azure_cluster_defintion(self, mock_send): "cluster_name": "my-cluster", "spark_version": "13.3.x-scala2.12", "azure_attributes": { - "first_on_demand": 6, + "first_on_demand": 7, "availability": "SPOT_WITH_FALLBACK_AZURE", "spot_bid_max_price": 100.0, }, @@ -241,8 +242,8 @@ def test_get_updated_azure_cluster_defintion(self, mock_send): "disk_spec": {"disk_count": 0}, "cluster_source": "UI", "enable_local_disk_encryption": False, - "instance_source": {"node_type_id": "Standard_D4s_v3"}, - "driver_instance_source": {"node_type_id": "Standard_D4s_v3"}, + "instance_source": {"node_type_id": "Standard_DS5_v2"}, + "driver_instance_source": {"node_type_id": "Standard_DS5_v2"}, "state": "TERMINATED", "state_message": "Inactive cluster terminated (inactive for 120 minutes).", "start_time": 1618263108824, diff --git a/tests/test_files/azure_recommendation.json b/tests/test_files/azure_recommendation.json index b857bf0..8218219 100644 --- a/tests/test_files/azure_recommendation.json +++ b/tests/test_files/azure_recommendation.json @@ -23,7 +23,7 @@ "runtime_engine": "STANDARD", "azure_attributes": { "availability": "SPOT_WITH_FALLBACK_AZURE", - "first_on_demand": 6, + "first_on_demand": 7, "spot_bid_max_price": 100.0 } },