Skip to content

Commit c0510ce

Browse files
eheinlein-synctaylorgaw
authored andcommitted
feat: Compare cluster configs in Sync Library
1 parent dc7a217 commit c0510ce

File tree

4 files changed

+359
-1
lines changed

4 files changed

+359
-1
lines changed

sync/clients/sync.py

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,48 @@
1+
import difflib
2+
import json
13
import logging
2-
from typing import Generator
4+
from typing import Dict, Generator
35

46
import httpx
7+
from pydantic import BaseModel
58

69
from ..config import API_KEY, CONFIG, APIKey
710
from . import USER_AGENT, RetryableHTTPClient, encode_json
811

912
logger = logging.getLogger(__name__)
1013

1114

15+
class S3ClusterLogConfiguration(BaseModel):
16+
destination: str
17+
region: str
18+
enable_encryption: bool
19+
canned_acl: str
20+
21+
22+
class DBFSClusterLogConfiguration(BaseModel):
23+
destination: str
24+
25+
26+
class ClusterLogConfiguration(BaseModel):
27+
s3: S3ClusterLogConfiguration
28+
dbfs: DBFSClusterLogConfiguration
29+
30+
31+
class ProjectConfiguration(BaseModel):
32+
node_type_id: str
33+
driver_node_type: str
34+
custom_tags: Dict
35+
cluster_log_conf: ClusterLogConfiguration
36+
cluster_name: str
37+
num_workers: int
38+
spark_version: str
39+
runtime_engine: str
40+
autoscale: Dict
41+
spark_conf: Dict
42+
aws_attributes: Dict
43+
spark_env_vars: Dict
44+
45+
1246
class SyncAuth(httpx.Auth):
1347
requires_response_body = True
1448

@@ -122,6 +156,13 @@ def get_project_recommendation(self, project_id: str, recommendation_id: str) ->
122156
)
123157
)
124158

159+
def get_latest_project_recommendation(self, project_id: str) -> dict:
160+
return self._send(
161+
self._client.build_request(
162+
"GET", f"/v1/projects/{project_id}/recommendations?page=0&per_page=1"
163+
)
164+
)
165+
125166
def get_project_submissions(self, project_id: str, params: dict = None) -> dict:
126167
return self._send(
127168
self._client.build_request(
@@ -194,6 +235,32 @@ def onboard_workflow(self, workspace_id, job_id: str, project_id: str) -> dict:
194235
)
195236
)
196237

238+
def get_latest_project_config_recommendation(self, project_id: str) -> ProjectConfiguration:
239+
latest_recommendation = self.get_latest_project_recommendation(project_id)
240+
if latest_recommendation.get("result"):
241+
return latest_recommendation["result"][0]["recommendation"]["configuration"]
242+
else:
243+
return {"error": f"No project recommendation found for Project {project_id}"}
244+
245+
def get_cluster_definition_and_recommendation(
246+
self, project_id: str, cluster_spec_str: str
247+
) -> dict:
248+
cluster_recommendation = self.get_latest_project_config_recommendation(project_id)
249+
return {
250+
"cluster_recommendations": cluster_recommendation,
251+
"cluster_definition": json.loads(cluster_spec_str),
252+
}
253+
254+
def get_updated_cluster_defintion(
255+
self, project_id: str, cluster_spec_str: str
256+
) -> ProjectConfiguration:
257+
latest_recommendation = self.get_latest_project_config_recommendation(project_id)
258+
cluster_definition = json.loads(cluster_spec_str)
259+
for key in latest_recommendation.keys():
260+
cluster_definition[key] = latest_recommendation[key]
261+
262+
return cluster_definition
263+
197264
def _send(self, request: httpx.Request) -> dict:
198265
response = self._send_request(request)
199266

tests/clients/test_sync.py

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
from unittest import TestCase, mock
2+
from unittest.mock import patch
3+
4+
import pytest
5+
from orjson import orjson
6+
7+
from sync.clients.sync import SyncClient
8+
from sync.config import APIKey
9+
10+
11+
@pytest.fixture()
12+
def mock_get_recommendation(request):
13+
with patch(
14+
"sync.clients.sync.SyncClient.get_latest_project_recommendation",
15+
side_effect=get_rec_from_file,
16+
):
17+
yield
18+
19+
20+
def get_rec_from_file():
21+
with open("tests/test_files/recommendation.json") as rec_in:
22+
return orjson.loads(rec_in.read())
23+
24+
25+
class TestSync(TestCase):
26+
@mock.patch("sync.clients.sync.SyncClient._send")
27+
def test_get_latest_project_config_recommendation(self, mock_send):
28+
mock_send.return_value = get_rec_from_file()
29+
test_client = SyncClient("url", APIKey())
30+
expected_result = {
31+
"node_type_id": "i6.xlarge",
32+
"driver_node_type_id": "i6.xlarge",
33+
"custom_tags": {
34+
"sync:project-id": "b9bd7136-7699-4603-9040-c6dc4c914e43",
35+
"sync:run-id": "e96401da-f64d-4ed0-8ded-db1317f40248",
36+
"sync:recommendation-id": "e029a220-c6a5-49fd-b7ed-7ea046366741",
37+
"sync:tenant-id": "352176a7-b605-4cc2-b3b2-ee591715b6b4",
38+
},
39+
"num_workers": 20,
40+
"spark_conf": {"spark.databricks.isv.product": "sync-gradient"},
41+
"spark_version": "13.3.x-scala2.12",
42+
"runtime_engine": "PHOTON",
43+
"aws_attributes": {
44+
"first_on_demand": 1,
45+
"availability": "SPOT_WITH_FALLBACK",
46+
"spot_bid_price_percent": 100,
47+
},
48+
}
49+
50+
result = test_client.get_latest_project_config_recommendation("project_id")
51+
assert result == expected_result
52+
53+
@mock.patch("sync.clients.sync.SyncClient._send")
54+
def test_get_cluster_definition_and_recommendation(self, mock_send):
55+
mock_send.return_value = get_rec_from_file()
56+
test_client = SyncClient("url", APIKey())
57+
58+
expected_result = {
59+
"cluster_recommendations": {
60+
"node_type_id": "i6.xlarge",
61+
"driver_node_type_id": "i6.xlarge",
62+
"custom_tags": {
63+
"sync:project-id": "b9bd7136-7699-4603-9040-c6dc4c914e43",
64+
"sync:run-id": "e96401da-f64d-4ed0-8ded-db1317f40248",
65+
"sync:recommendation-id": "e029a220-c6a5-49fd-b7ed-7ea046366741",
66+
"sync:tenant-id": "352176a7-b605-4cc2-b3b2-ee591715b6b4",
67+
},
68+
"num_workers": 20,
69+
"spark_conf": {"spark.databricks.isv.product": "sync-gradient"},
70+
"spark_version": "13.3.x-scala2.12",
71+
"runtime_engine": "PHOTON",
72+
"aws_attributes": {
73+
"first_on_demand": 1,
74+
"availability": "SPOT_WITH_FALLBACK",
75+
"spot_bid_price_percent": 100,
76+
},
77+
},
78+
"cluster_definition": {
79+
"cluster_id": "1234-567890-reef123",
80+
"spark_context_id": 4020997813441462000,
81+
"cluster_name": "my-cluster",
82+
"spark_version": "13.3.x-scala2.12",
83+
"aws_attributes": {
84+
"zone_id": "us-west-2c",
85+
"first_on_demand": 1,
86+
"availability": "SPOT_WITH_FALLBACK",
87+
"spot_bid_price_percent": 100,
88+
"ebs_volume_count": 0,
89+
},
90+
"node_type_id": "i3.xlarge",
91+
"driver_node_type_id": "i3.xlarge",
92+
"autotermination_minutes": 120,
93+
"enable_elastic_disk": False,
94+
"disk_spec": {"disk_count": 0},
95+
"cluster_source": "UI",
96+
"enable_local_disk_encryption": False,
97+
"instance_source": {"node_type_id": "i3.xlarge"},
98+
"driver_instance_source": {"node_type_id": "i3.xlarge"},
99+
"state": "TERMINATED",
100+
"state_message": "Inactive cluster terminated (inactive for 120 minutes).",
101+
"start_time": 1618263108824,
102+
"terminated_time": 1619746525713,
103+
"last_state_loss_time": 1619739324740,
104+
"num_workers": 30,
105+
"default_tags": {
106+
"Vendor": "Databricks",
107+
"Creator": "someone@example.com",
108+
"ClusterName": "my-cluster",
109+
"ClusterId": "1234-567890-reef123",
110+
},
111+
"creator_user_name": "someone@example.com",
112+
"termination_reason": {
113+
"code": "INACTIVITY",
114+
"parameters": {"inactivity_duration_min": "120"},
115+
"type": "SUCCESS",
116+
},
117+
"init_scripts_safe_mode": False,
118+
"spec": {"spark_version": "13.3.x-scala2.12"},
119+
},
120+
}
121+
122+
with open("tests/test_files/cluster.json") as cluster_in:
123+
result = test_client.get_cluster_definition_and_recommendation(
124+
"project_id", cluster_in.read()
125+
)
126+
assert result == expected_result
127+
128+
@mock.patch("sync.clients.sync.SyncClient.get_latest_project_recommendation")
129+
def test_get_updated_cluster_defintion(self, mock_send):
130+
mock_send.return_value = get_rec_from_file()
131+
test_client = SyncClient("url", APIKey())
132+
133+
expected_result = {
134+
"cluster_id": "1234-567890-reef123",
135+
"spark_context_id": 4020997813441462000,
136+
"cluster_name": "my-cluster",
137+
"spark_version": "13.3.x-scala2.12",
138+
"aws_attributes": {
139+
"first_on_demand": 1,
140+
"availability": "SPOT_WITH_FALLBACK",
141+
"spot_bid_price_percent": 100,
142+
},
143+
"custom_tags": {
144+
"sync:project-id": "b9bd7136-7699-4603-9040-c6dc4c914e43",
145+
"sync:run-id": "e96401da-f64d-4ed0-8ded-db1317f40248",
146+
"sync:recommendation-id": "e029a220-c6a5-49fd-b7ed-7ea046366741",
147+
"sync:tenant-id": "352176a7-b605-4cc2-b3b2-ee591715b6b4",
148+
},
149+
"node_type_id": "i6.xlarge",
150+
"driver_node_type_id": "i6.xlarge",
151+
"autotermination_minutes": 120,
152+
"enable_elastic_disk": False,
153+
"disk_spec": {"disk_count": 0},
154+
"cluster_source": "UI",
155+
"enable_local_disk_encryption": False,
156+
"instance_source": {"node_type_id": "i3.xlarge"},
157+
"driver_instance_source": {"node_type_id": "i3.xlarge"},
158+
"spark_conf": {"spark.databricks.isv.product": "sync-gradient"},
159+
"state": "TERMINATED",
160+
"state_message": "Inactive cluster terminated (inactive for 120 minutes).",
161+
"start_time": 1618263108824,
162+
"terminated_time": 1619746525713,
163+
"last_state_loss_time": 1619739324740,
164+
"num_workers": 20,
165+
"runtime_engine": "PHOTON",
166+
"default_tags": {
167+
"Vendor": "Databricks",
168+
"Creator": "someone@example.com",
169+
"ClusterName": "my-cluster",
170+
"ClusterId": "1234-567890-reef123",
171+
},
172+
"creator_user_name": "someone@example.com",
173+
"termination_reason": {
174+
"code": "INACTIVITY",
175+
"parameters": {"inactivity_duration_min": "120"},
176+
"type": "SUCCESS",
177+
},
178+
"init_scripts_safe_mode": False,
179+
"spec": {"spark_version": "13.3.x-scala2.12"},
180+
}
181+
182+
with open("tests/test_files/cluster.json") as cluster_in:
183+
result = test_client.get_updated_cluster_defintion("project_id", cluster_in.read())
184+
assert result == expected_result

tests/test_files/cluster.json

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
{
2+
"cluster_id": "1234-567890-reef123",
3+
"spark_context_id": 4020997813441462000,
4+
"cluster_name": "my-cluster",
5+
"spark_version": "13.3.x-scala2.12",
6+
"aws_attributes": {
7+
"zone_id": "us-west-2c",
8+
"first_on_demand": 1,
9+
"availability": "SPOT_WITH_FALLBACK",
10+
"spot_bid_price_percent": 100,
11+
"ebs_volume_count": 0
12+
},
13+
"node_type_id": "i3.xlarge",
14+
"driver_node_type_id": "i3.xlarge",
15+
"autotermination_minutes": 120,
16+
"enable_elastic_disk": false,
17+
"disk_spec": {
18+
"disk_count": 0
19+
},
20+
"cluster_source": "UI",
21+
"enable_local_disk_encryption": false,
22+
"instance_source": {
23+
"node_type_id": "i3.xlarge"
24+
},
25+
"driver_instance_source": {
26+
"node_type_id": "i3.xlarge"
27+
},
28+
"state": "TERMINATED",
29+
"state_message": "Inactive cluster terminated (inactive for 120 minutes).",
30+
"start_time": 1618263108824,
31+
"terminated_time": 1619746525713,
32+
"last_state_loss_time": 1619739324740,
33+
"num_workers": 30,
34+
"default_tags": {
35+
"Vendor": "Databricks",
36+
"Creator": "someone@example.com",
37+
"ClusterName": "my-cluster",
38+
"ClusterId": "1234-567890-reef123"
39+
},
40+
"creator_user_name": "someone@example.com",
41+
"termination_reason": {
42+
"code": "INACTIVITY",
43+
"parameters": {
44+
"inactivity_duration_min": "120"
45+
},
46+
"type": "SUCCESS"
47+
},
48+
"init_scripts_safe_mode": false,
49+
"spec": {
50+
"spark_version": "13.3.x-scala2.12"
51+
}
52+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
{
2+
"result": [
3+
{
4+
"created_at": "2024-02-29T23:11:58.559Z",
5+
"updated_at": "2024-02-29T23:11:58.559Z",
6+
"id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
7+
"state": "string",
8+
"error": "string",
9+
"recommendation": {
10+
"metrics": {
11+
"spark_duration_minutes": 0,
12+
"spark_cost_requested_usd": 0,
13+
"spark_cost_lower_usd": 0,
14+
"spark_cost_midpoint_usd": 0,
15+
"spark_cost_upper_usd": 0
16+
},
17+
"configuration": {
18+
"node_type_id": "i6.xlarge",
19+
"driver_node_type_id": "i6.xlarge",
20+
"custom_tags": {
21+
"sync:project-id": "b9bd7136-7699-4603-9040-c6dc4c914e43",
22+
"sync:run-id": "e96401da-f64d-4ed0-8ded-db1317f40248",
23+
"sync:recommendation-id": "e029a220-c6a5-49fd-b7ed-7ea046366741",
24+
"sync:tenant-id": "352176a7-b605-4cc2-b3b2-ee591715b6b4"
25+
},
26+
"num_workers": 20,
27+
"spark_conf": {
28+
"spark.databricks.isv.product": "sync-gradient"
29+
},
30+
"spark_version": "13.3.x-scala2.12",
31+
"runtime_engine": "PHOTON",
32+
"aws_attributes": {
33+
"first_on_demand": 1,
34+
"availability": "SPOT_WITH_FALLBACK",
35+
"spot_bid_price_percent": 100
36+
}
37+
},
38+
"prediction_params": {
39+
"sla_minutes": 0,
40+
"force_ondemand_workers": false,
41+
"force_ondemand_basis": false,
42+
"fix_worker_family": true,
43+
"fix_driver_type": true,
44+
"fix_scaling_type": false
45+
}
46+
},
47+
"context": {
48+
"latest_submission_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
49+
"phase": "LEARNING",
50+
"current_learning_iteration": 0,
51+
"total_learning_iterations": 0
52+
}
53+
}
54+
]
55+
}

0 commit comments

Comments
 (0)