diff --git a/Makefile b/Makefile index 1cc1e76..fd89cfc 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ FILES := $(shell git diff --name-only --diff-filter=AM $$(git merge-base origin/ .PHONY: test test: - pytest + pytest -vv .PHONY: lint lint: diff --git a/sync/_databricks.py b/sync/_databricks.py index c602703..8032084 100644 --- a/sync/_databricks.py +++ b/sync/_databricks.py @@ -10,7 +10,7 @@ from datetime import datetime, timezone from pathlib import Path from time import sleep -from typing import Any, Collection, Dict, List, Set, Tuple, TypeVar, Union +from typing import Collection, Dict, List, Set, Tuple, Union from urllib.parse import urlparse import boto3 as boto @@ -21,12 +21,13 @@ from sync.models import ( DatabricksAPIError, DatabricksClusterReport, - DatabricksError, DatabricksComputeType, + DatabricksError, DatabricksPlanType, - Response + Response, ) from sync.utils.dbfs import format_dbfs_filepath, read_dbfs_file +from sync.utils.json import deep_update logger = logging.getLogger(__name__) @@ -102,7 +103,7 @@ def create_submission_with_cluster_info( cluster_activity_events=cluster_activity_events, tasks=tasks, plan_type=plan_type, - compute_type=compute_type + compute_type=compute_type, ) eventlog = _get_event_log_from_cluster(cluster, tasks).result @@ -308,12 +309,12 @@ def _get_cluster_report( def _create_cluster_report( - cluster: dict, - cluster_info: dict, - cluster_activity_events: dict, - tasks: List[dict], - plan_type: DatabricksPlanType, - compute_type: DatabricksComputeType + cluster: dict, + cluster_info: dict, + cluster_activity_events: dict, + tasks: List[dict], + plan_type: DatabricksPlanType, + compute_type: DatabricksComputeType, ) -> DatabricksClusterReport: raise NotImplementedError() @@ -659,7 +660,7 @@ def get_recommendation_cluster( if "autoscale" in cluster: del cluster["autoscale"] - recommendation_cluster = _deep_update(cluster, recommendation["configuration"]) + recommendation_cluster = deep_update(cluster, recommendation["configuration"]) return Response(result=recommendation_cluster) return recommendation_response @@ -727,7 +728,7 @@ def get_project_cluster(cluster: dict, project_id: str, region_name: str = None) project_settings_response = get_project_cluster_settings(project_id, region_name) project_cluster_settings = project_settings_response.result if project_cluster_settings: - project_cluster = _deep_update(cluster, project_cluster_settings) + project_cluster = deep_update(cluster, project_cluster_settings) return Response(result=project_cluster) return project_settings_response @@ -1638,24 +1639,3 @@ def _update_monitored_timelines( retired_inst_timeline_list.append(active_timelines_by_id.pop(id)) return active_timelines_by_id, retired_inst_timeline_list - - -KeyType = TypeVar("KeyType") - - -def _deep_update( - mapping: Dict[KeyType, Any], *updating_mappings: Dict[KeyType, Any] -) -> Dict[KeyType, Any]: - updated_mapping = mapping.copy() - for updating_mapping in updating_mappings: - for k, v in updating_mapping.items(): - if k in updated_mapping: - if isinstance(updated_mapping[k], dict) and isinstance(v, dict): - updated_mapping[k] = _deep_update(updated_mapping[k], v) - elif isinstance(updated_mapping[k], list) and isinstance(v, list): - updated_mapping[k] += v - else: - updated_mapping[k] = v - else: - updated_mapping[k] = v - return updated_mapping diff --git a/sync/api/projects.py b/sync/api/projects.py index 64d845b..76b0f2d 100644 --- a/sync/api/projects.py +++ b/sync/api/projects.py @@ -1,15 +1,25 @@ """Project functions """ import io +import json import logging from time import sleep -from typing import List +from typing import List, Union from urllib.parse import urlparse import httpx from sync.clients.sync import get_default_client -from sync.models import Platform, ProjectError, RecommendationError, Response, SubmissionError +from sync.models import ( + AWSProjectConfiguration, + AzureProjectConfiguration, + Platform, + ProjectError, + RecommendationError, + Response, + SubmissionError, +) +from sync.utils.json import deep_update from . import generate_presigned_url @@ -396,3 +406,81 @@ def get_project_submission(project_id: str, submission_id: str) -> Response[dict return Response(**response) return Response(result=response["result"]) + + +def get_latest_project_config_recommendation( + project_id: str, +) -> Response[Union[AWSProjectConfiguration, AzureProjectConfiguration]]: + """Get Latest Project Configuration Recommendation. + + :param project_id: project ID + :type project_id: str + :return: Project Configuration Recommendation object + :rtype: AWSProjectConfiguration or AzureProjectConfiguration + """ + latest_recommendation = get_default_client().get_latest_project_recommendation(project_id) + if latest_recommendation.get("result"): + return Response( + result=latest_recommendation["result"][0]["recommendation"]["configuration"] + ) + + +def get_cluster_definition_and_recommendation( + project_id: str, cluster_spec_str: str +) -> Response[dict]: + """Print Current Cluster Definition and Project Configuration Recommendatio. + Throws error if no cluster recommendation found for project + + :param project_id: project ID + :type project_id: str + :param cluster_spec_str: Current Cluster Recommendation + :type cluster_spec_str: str + :return: Current Cluster Definition and Project Configuration Recommendation object + :rtype: dict + """ + recommendation_response = get_latest_project_config_recommendation(project_id) + if not recommendation_response: + logger.info(f"No cluster recommendation found for {project_id}") + return Response(error=RecommendationError(message="Recommendation failed")) + response_str = json.dumps(recommendation_response.result) + return Response( + result={ + "cluster_recommendation": json.loads(response_str), + "cluster_definition": json.loads(cluster_spec_str), + } + ) + + +def get_updated_cluster_defintion( + project_id: str, cluster_spec_str: str +) -> Response[Union[AWSProjectConfiguration, AzureProjectConfiguration]]: + """Return Cluster Definition merged with Project Configuration Recommendations. + + :param project_id: project ID + :type project_id: str + :param cluster_spec_str: Current Cluster Recommendation + :type cluster_spec_str: str + :return: Updated Cluster Definition with Project Configuration Recommendations + :rtype: AWSProjectConfiguration or AzureProjectConfiguration + """ + rec_response = get_latest_project_config_recommendation(project_id) + if not rec_response.error: + # Convert Response result object to str + latest_rec_str = json.dumps(rec_response.result) + # Convert json string to json + latest_recommendation = json.loads(latest_rec_str) + cluster_definition = json.loads(cluster_spec_str) + # num_workers/autoscale are mutually exclusive settings, and we are relying on our Prediction + # Recommendations to set these appropriately. Since we may recommend a Static cluster (i.e. a cluster + # with `num_workers`) for a cluster that was originally autoscaled, we want to make sure to remove this + # prior configuration + if "num_workers" in cluster_definition: + del cluster_definition["num_workers"] + + if "autoscale" in cluster_definition: + del cluster_definition["autoscale"] + + recommendation_cluster = deep_update(cluster_definition, latest_recommendation) + return Response(result=recommendation_cluster) + else: + return rec_response diff --git a/sync/clients/sync.py b/sync/clients/sync.py index 3a7d700..61dd89b 100644 --- a/sync/clients/sync.py +++ b/sync/clients/sync.py @@ -122,6 +122,13 @@ def get_project_recommendation(self, project_id: str, recommendation_id: str) -> ) ) + def get_latest_project_recommendation(self, project_id: str) -> dict: + return self._send( + self._client.build_request( + "GET", f"/v1/projects/{project_id}/recommendations?page=0&per_page=1" + ) + ) + def get_project_submissions(self, project_id: str, params: dict = None) -> dict: return self._send( self._client.build_request( diff --git a/sync/models.py b/sync/models.py index e6118ad..661047f 100644 --- a/sync/models.py +++ b/sync/models.py @@ -4,7 +4,7 @@ from dataclasses import dataclass from enum import Enum, unique -from typing import Callable, Generic, List, TypeVar, Union +from typing import Callable, Dict, Generic, List, TypeVar, Union from botocore.exceptions import ClientError from pydantic import BaseModel, Field, root_validator, validator @@ -137,3 +137,41 @@ def check_consistency(cls, err, values): if err is None and values.get("result") is None: raise ValueError("must provide result or error") return err + + +class S3ClusterLogConfiguration(BaseModel): + destination: str + region: str + enable_encryption: bool + canned_acl: str + + +class DBFSClusterLogConfiguration(BaseModel): + destination: str + + +class AWSProjectConfiguration(BaseModel): + node_type_id: str + driver_node_type: str + custom_tags: Dict + cluster_log_conf: Union[S3ClusterLogConfiguration, DBFSClusterLogConfiguration] + cluster_name: str + num_workers: int + spark_version: str + runtime_engine: str + autoscale: Dict + spark_conf: Dict + aws_attributes: Dict + spark_env_vars: Dict + + +class AzureProjectConfiguration(BaseModel): + node_type_id: str + driver_node_type: str + cluster_log_conf: DBFSClusterLogConfiguration + custom_tags: Dict + num_workers: int + spark_conf: Dict + spark_version: str + runtime_engine: str + azure_attributes: Dict diff --git a/sync/utils/json.py b/sync/utils/json.py index fc783ca..37e11df 100644 --- a/sync/utils/json.py +++ b/sync/utils/json.py @@ -1,5 +1,6 @@ import datetime from json import JSONEncoder +from typing import Any, Dict, TypeVar class DefaultDateTimeEncoder(JSONEncoder): @@ -34,3 +35,24 @@ def default(self, obj): date = date.isoformat() date = date.replace("+00:00", "Z") return date + + +KeyType = TypeVar("KeyType") + + +def deep_update( + mapping: Dict[KeyType, Any], *updating_mappings: Dict[KeyType, Any] +) -> Dict[KeyType, Any]: + updated_mapping = mapping.copy() + for updating_mapping in updating_mappings: + for k, v in updating_mapping.items(): + if k in updated_mapping: + if isinstance(updated_mapping[k], dict) and isinstance(v, dict): + updated_mapping[k] = deep_update(updated_mapping[k], v) + elif isinstance(updated_mapping[k], list) and isinstance(v, list): + updated_mapping[k] += v + else: + updated_mapping[k] = v + else: + updated_mapping[k] = v + return updated_mapping diff --git a/tests/clients/test_sync.py b/tests/clients/test_sync.py new file mode 100644 index 0000000..f680f18 --- /dev/null +++ b/tests/clients/test_sync.py @@ -0,0 +1,280 @@ +import json +from unittest import TestCase, mock + +from sync.api.projects import ( + get_cluster_definition_and_recommendation, + get_latest_project_config_recommendation, + get_updated_cluster_defintion, +) +from sync.models import RecommendationError, Response + + +def get_aws_rec_from_file(): + with open("tests/test_files/aws_recommendation.json") as rec_in: + return json.loads(rec_in.read()) + + +def get_azure_rec_from_file(): + with open("tests/test_files/azure_recommendation.json") as rec_in: + return json.loads(rec_in.read()) + + +class TestSync(TestCase): + @mock.patch("sync.clients.sync.SyncClient._send") + def test_get_latest_aws_project_config_recommendation(self, mock_send): + mock_send.return_value = get_aws_rec_from_file() + expected_result = Response( + result={ + "node_type_id": "i6.xlarge", + "driver_node_type_id": "i6.xlarge", + "custom_tags": { + "sync:project-id": "b9bd7136-7699-4603-9040-c6dc4c914e43", + "sync:run-id": "e96401da-f64d-4ed0-8ded-db1317f40248", + "sync:recommendation-id": "e029a220-c6a5-49fd-b7ed-7ea046366741", + "sync:tenant-id": "352176a7-b605-4cc2-b3b2-ee591715b6b4", + }, + "num_workers": 20, + "spark_conf": {"spark.databricks.isv.product": "sync-gradient"}, + "spark_version": "13.3.x-scala2.12", + "runtime_engine": "PHOTON", + "aws_attributes": { + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100, + }, + } + ) + + result = get_latest_project_config_recommendation("project_id") + assert result == expected_result + + @mock.patch("sync.clients.sync.SyncClient._send") + def test_get_latest_azure_project_config_recommendation(self, mock_send): + mock_send.return_value = get_azure_rec_from_file() + expected_result = Response( + result={ + "node_type_id": "Standard_D4s_v3", + "driver_node_type_id": "Standard_D4s_v3", + "custom_tags": { + "sync:project-id": "769c3443-afd7-45ff-a72a-27bf4296b80e", + "sync:run-id": "d3f8db6c-df4b-430a-a511-a1e9c95d1ad0", + "sync:recommendation-id": "6024acdd-fd13-4bf1-82f5-44f1ab7008f2", + "sync:tenant-id": "290d381e-8eb4-4d6a-80d4-453d82897ecc", + }, + "num_workers": 5, + "spark_conf": {"spark.databricks.isv.product": "sync-gradient"}, + "spark_version": "13.3.x-scala2.12", + "runtime_engine": "STANDARD", + "azure_attributes": { + "availability": "SPOT_WITH_FALLBACK_AZURE", + "first_on_demand": 7, + "spot_bid_max_price": 100.0, + }, + } + ) + + result = get_latest_project_config_recommendation("project_id") + assert result == expected_result + + @mock.patch("sync.clients.sync.SyncClient._send") + def test_get_cluster_definition_no_recommendation(self, mock_send): + mock_send.return_value = {"result": []} + expected_result = Response(error=RecommendationError(message="Recommendation failed")) + + with open("tests/test_files/azure_cluster.json") as cluster_in: + result = get_cluster_definition_and_recommendation("project_id", cluster_in.read()) + assert result == expected_result + + @mock.patch("sync.clients.sync.SyncClient._send") + def test_get_cluster_definition_and_recommendation(self, mock_send): + mock_send.return_value = get_aws_rec_from_file() + + expected_result = Response( + result={ + "cluster_recommendation": { + "node_type_id": "i6.xlarge", + "driver_node_type_id": "i6.xlarge", + "custom_tags": { + "sync:project-id": "b9bd7136-7699-4603-9040-c6dc4c914e43", + "sync:run-id": "e96401da-f64d-4ed0-8ded-db1317f40248", + "sync:recommendation-id": "e029a220-c6a5-49fd-b7ed-7ea046366741", + "sync:tenant-id": "352176a7-b605-4cc2-b3b2-ee591715b6b4", + }, + "num_workers": 20, + "spark_conf": {"spark.databricks.isv.product": "sync-gradient"}, + "spark_version": "13.3.x-scala2.12", + "runtime_engine": "PHOTON", + "aws_attributes": { + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100, + }, + }, + "cluster_definition": { + "cluster_id": "1234-567890-reef123", + "spark_context_id": 4020997813441462000, + "cluster_name": "my-cluster", + "spark_version": "13.3.x-scala2.12", + "aws_attributes": { + "zone_id": "us-west-2c", + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100, + "ebs_volume_count": 0, + }, + "node_type_id": "i3.xlarge", + "driver_node_type_id": "i3.xlarge", + "autotermination_minutes": 120, + "enable_elastic_disk": False, + "disk_spec": {"disk_count": 0}, + "cluster_source": "UI", + "enable_local_disk_encryption": False, + "instance_source": {"node_type_id": "i3.xlarge"}, + "driver_instance_source": {"node_type_id": "i3.xlarge"}, + "state": "TERMINATED", + "state_message": "Inactive cluster terminated (inactive for 120 minutes).", + "start_time": 1618263108824, + "terminated_time": 1619746525713, + "last_state_loss_time": 1619739324740, + "num_workers": 30, + "default_tags": { + "Vendor": "Databricks", + "Creator": "someone@example.com", + "ClusterName": "my-cluster", + "ClusterId": "1234-567890-reef123", + }, + "creator_user_name": "someone@example.com", + "termination_reason": { + "code": "INACTIVITY", + "parameters": {"inactivity_duration_min": "120"}, + "type": "SUCCESS", + }, + "init_scripts_safe_mode": False, + "spec": {"spark_version": "13.3.x-scala2.12"}, + }, + } + ) + + with open("tests/test_files/aws_cluster.json") as cluster_in: + result = get_cluster_definition_and_recommendation("project_id", cluster_in.read()) + assert result == expected_result + + @mock.patch("sync.clients.sync.SyncClient.get_latest_project_recommendation") + def test_get_updated_aws_cluster_defintion(self, mock_send): + mock_send.return_value = get_aws_rec_from_file() + + expected_result = Response( + result={ + "cluster_id": "1234-567890-reef123", + "spark_context_id": 4020997813441462000, + "cluster_name": "my-cluster", + "spark_version": "13.3.x-scala2.12", + "aws_attributes": { + "first_on_demand": 1, + "ebs_volume_count": 0, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100, + "zone_id": "us-west-2c", + }, + "custom_tags": { + "sync:project-id": "b9bd7136-7699-4603-9040-c6dc4c914e43", + "sync:run-id": "e96401da-f64d-4ed0-8ded-db1317f40248", + "sync:recommendation-id": "e029a220-c6a5-49fd-b7ed-7ea046366741", + "sync:tenant-id": "352176a7-b605-4cc2-b3b2-ee591715b6b4", + }, + "node_type_id": "i6.xlarge", + "driver_node_type_id": "i6.xlarge", + "autotermination_minutes": 120, + "enable_elastic_disk": False, + "disk_spec": {"disk_count": 0}, + "cluster_source": "UI", + "enable_local_disk_encryption": False, + "instance_source": {"node_type_id": "i3.xlarge"}, + "driver_instance_source": {"node_type_id": "i3.xlarge"}, + "spark_conf": {"spark.databricks.isv.product": "sync-gradient"}, + "state": "TERMINATED", + "state_message": "Inactive cluster terminated (inactive for 120 minutes).", + "start_time": 1618263108824, + "terminated_time": 1619746525713, + "last_state_loss_time": 1619739324740, + "num_workers": 20, + "runtime_engine": "PHOTON", + "default_tags": { + "Vendor": "Databricks", + "Creator": "someone@example.com", + "ClusterName": "my-cluster", + "ClusterId": "1234-567890-reef123", + }, + "creator_user_name": "someone@example.com", + "termination_reason": { + "code": "INACTIVITY", + "parameters": {"inactivity_duration_min": "120"}, + "type": "SUCCESS", + }, + "init_scripts_safe_mode": False, + "spec": {"spark_version": "13.3.x-scala2.12"}, + } + ) + + with open("tests/test_files/aws_cluster.json") as cluster_in: + result = get_updated_cluster_defintion("project_id", cluster_in.read()) + assert result == expected_result + + @mock.patch("sync.clients.sync.SyncClient.get_latest_project_recommendation") + def test_get_updated_azure_cluster_defintion(self, mock_send): + mock_send.return_value = get_azure_rec_from_file() + + expected_result = Response( + result={ + "cluster_id": "1114-202840-mu1ql9xp", + "spark_context_id": 8637481617925571639, + "cluster_name": "my-cluster", + "spark_version": "13.3.x-scala2.12", + "azure_attributes": { + "first_on_demand": 7, + "availability": "SPOT_WITH_FALLBACK_AZURE", + "spot_bid_max_price": 100.0, + }, + "node_type_id": "Standard_D4s_v3", + "driver_node_type_id": "Standard_D4s_v3", + "autotermination_minutes": 120, + "enable_elastic_disk": False, + "disk_spec": {"disk_count": 0}, + "cluster_source": "UI", + "enable_local_disk_encryption": False, + "instance_source": {"node_type_id": "Standard_DS5_v2"}, + "driver_instance_source": {"node_type_id": "Standard_DS5_v2"}, + "state": "TERMINATED", + "state_message": "Inactive cluster terminated (inactive for 120 minutes).", + "start_time": 1618263108824, + "terminated_time": 1619746525713, + "last_state_loss_time": 1619739324740, + "num_workers": 5, + "default_tags": { + "Vendor": "Databricks", + "Creator": "someone@example.com", + "ClusterName": "my-cluster", + "ClusterId": "1234-567890-reef123", + }, + "creator_user_name": "someone@example.com", + "termination_reason": { + "code": "INACTIVITY", + "parameters": {"inactivity_duration_min": "120"}, + "type": "SUCCESS", + }, + "init_scripts_safe_mode": False, + "spec": {"spark_version": "13.3.x-scala2.12"}, + "custom_tags": { + "sync:project-id": "769c3443-afd7-45ff-a72a-27bf4296b80e", + "sync:run-id": "d3f8db6c-df4b-430a-a511-a1e9c95d1ad0", + "sync:recommendation-id": "6024acdd-fd13-4bf1-82f5-44f1ab7008f2", + "sync:tenant-id": "290d381e-8eb4-4d6a-80d4-453d82897ecc", + }, + "spark_conf": {"spark.databricks.isv.product": "sync-gradient"}, + "runtime_engine": "STANDARD", + } + ) + + with open("tests/test_files/azure_cluster.json") as cluster_in: + result = get_updated_cluster_defintion("project_id", cluster_in.read()) + assert result == expected_result diff --git a/tests/test_files/aws_cluster.json b/tests/test_files/aws_cluster.json new file mode 100644 index 0000000..1b0012f --- /dev/null +++ b/tests/test_files/aws_cluster.json @@ -0,0 +1,52 @@ +{ + "cluster_id": "1234-567890-reef123", + "spark_context_id": 4020997813441462000, + "cluster_name": "my-cluster", + "spark_version": "13.3.x-scala2.12", + "aws_attributes": { + "zone_id": "us-west-2c", + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100, + "ebs_volume_count": 0 + }, + "node_type_id": "i3.xlarge", + "driver_node_type_id": "i3.xlarge", + "autotermination_minutes": 120, + "enable_elastic_disk": false, + "disk_spec": { + "disk_count": 0 + }, + "cluster_source": "UI", + "enable_local_disk_encryption": false, + "instance_source": { + "node_type_id": "i3.xlarge" + }, + "driver_instance_source": { + "node_type_id": "i3.xlarge" + }, + "state": "TERMINATED", + "state_message": "Inactive cluster terminated (inactive for 120 minutes).", + "start_time": 1618263108824, + "terminated_time": 1619746525713, + "last_state_loss_time": 1619739324740, + "num_workers": 30, + "default_tags": { + "Vendor": "Databricks", + "Creator": "someone@example.com", + "ClusterName": "my-cluster", + "ClusterId": "1234-567890-reef123" + }, + "creator_user_name": "someone@example.com", + "termination_reason": { + "code": "INACTIVITY", + "parameters": { + "inactivity_duration_min": "120" + }, + "type": "SUCCESS" + }, + "init_scripts_safe_mode": false, + "spec": { + "spark_version": "13.3.x-scala2.12" + } +} \ No newline at end of file diff --git a/tests/test_files/aws_recommendation.json b/tests/test_files/aws_recommendation.json new file mode 100644 index 0000000..6087fe2 --- /dev/null +++ b/tests/test_files/aws_recommendation.json @@ -0,0 +1,55 @@ +{ + "result": [ + { + "created_at": "2024-02-29T23:11:58.559Z", + "updated_at": "2024-02-29T23:11:58.559Z", + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "state": "string", + "error": "string", + "recommendation": { + "metrics": { + "spark_duration_minutes": 0, + "spark_cost_requested_usd": 0, + "spark_cost_lower_usd": 0, + "spark_cost_midpoint_usd": 0, + "spark_cost_upper_usd": 0 + }, + "configuration": { + "node_type_id": "i6.xlarge", + "driver_node_type_id": "i6.xlarge", + "custom_tags": { + "sync:project-id": "b9bd7136-7699-4603-9040-c6dc4c914e43", + "sync:run-id": "e96401da-f64d-4ed0-8ded-db1317f40248", + "sync:recommendation-id": "e029a220-c6a5-49fd-b7ed-7ea046366741", + "sync:tenant-id": "352176a7-b605-4cc2-b3b2-ee591715b6b4" + }, + "num_workers": 20, + "spark_conf": { + "spark.databricks.isv.product": "sync-gradient" + }, + "spark_version": "13.3.x-scala2.12", + "runtime_engine": "PHOTON", + "aws_attributes": { + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100 + } + }, + "prediction_params": { + "sla_minutes": 0, + "force_ondemand_workers": false, + "force_ondemand_basis": false, + "fix_worker_family": true, + "fix_driver_type": true, + "fix_scaling_type": false + } + }, + "context": { + "latest_submission_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "phase": "LEARNING", + "current_learning_iteration": 0, + "total_learning_iterations": 0 + } + } + ] +} \ No newline at end of file diff --git a/tests/test_files/azure_cluster.json b/tests/test_files/azure_cluster.json new file mode 100644 index 0000000..3ef7977 --- /dev/null +++ b/tests/test_files/azure_cluster.json @@ -0,0 +1,50 @@ +{ + "cluster_id": "1114-202840-mu1ql9xp", + "spark_context_id": 8637481617925571639, + "cluster_name": "my-cluster", + "spark_version": "13.3.x-scala2.12", + "azure_attributes": { + "first_on_demand": 6, + "availability": "SPOT_WITH_FALLBACK_AZURE", + "spot_bid_max_price": 100.0 + }, + "node_type_id": "Standard_DS5_v2", + "driver_node_type_id": "Standard_DS5_v2", + "autotermination_minutes": 120, + "enable_elastic_disk": false, + "disk_spec": { + "disk_count": 0 + }, + "cluster_source": "UI", + "enable_local_disk_encryption": false, + "instance_source": { + "node_type_id": "Standard_DS5_v2" + }, + "driver_instance_source": { + "node_type_id": "Standard_DS5_v2" + }, + "state": "TERMINATED", + "state_message": "Inactive cluster terminated (inactive for 120 minutes).", + "start_time": 1618263108824, + "terminated_time": 1619746525713, + "last_state_loss_time": 1619739324740, + "num_workers": 30, + "default_tags": { + "Vendor": "Databricks", + "Creator": "someone@example.com", + "ClusterName": "my-cluster", + "ClusterId": "1234-567890-reef123" + }, + "creator_user_name": "someone@example.com", + "termination_reason": { + "code": "INACTIVITY", + "parameters": { + "inactivity_duration_min": "120" + }, + "type": "SUCCESS" + }, + "init_scripts_safe_mode": false, + "spec": { + "spark_version": "13.3.x-scala2.12" + } +} \ No newline at end of file diff --git a/tests/test_files/azure_recommendation.json b/tests/test_files/azure_recommendation.json new file mode 100644 index 0000000..8218219 --- /dev/null +++ b/tests/test_files/azure_recommendation.json @@ -0,0 +1,47 @@ +{ + "result": [ + { + "created_at": "2024-02-14T21:26:37Z", + "updated_at": "2024-02-14T21:29:38Z", + "id": "6024acdd-fd13-4bf1-82f5-44f1ab7008f2", + "state": "SUCCESS", + "recommendation": { + "configuration": { + "node_type_id": "Standard_D4s_v3", + "driver_node_type_id": "Standard_D4s_v3", + "custom_tags": { + "sync:project-id": "769c3443-afd7-45ff-a72a-27bf4296b80e", + "sync:run-id": "d3f8db6c-df4b-430a-a511-a1e9c95d1ad0", + "sync:recommendation-id": "6024acdd-fd13-4bf1-82f5-44f1ab7008f2", + "sync:tenant-id": "290d381e-8eb4-4d6a-80d4-453d82897ecc" + }, + "num_workers": 5, + "spark_conf": { + "spark.databricks.isv.product": "sync-gradient" + }, + "spark_version": "13.3.x-scala2.12", + "runtime_engine": "STANDARD", + "azure_attributes": { + "availability": "SPOT_WITH_FALLBACK_AZURE", + "first_on_demand": 7, + "spot_bid_max_price": 100.0 + } + }, + "prediction_params": { + "sla_minutes": 60, + "force_ondemand_workers": false, + "force_ondemand_basis": false, + "fix_worker_family": true, + "fix_driver_type": true, + "fix_scaling_type": true + } + }, + "context": { + "latest_submission_id": "8626c4f9-e57f-4563-9981-f21b936828b0", + "phase": "LEARNING", + "current_learning_iteration": 2, + "total_learning_iterations": 3 + } + } + ] +} \ No newline at end of file