forked from mlflow/mlflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
databricks_job_context.py
46 lines (43 loc) · 1.77 KB
/
databricks_job_context.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from mlflow.tracking.context.abstract_context import RunContextProvider
from mlflow.utils import databricks_utils
from mlflow.entities import SourceType
from mlflow.utils.mlflow_tags import (
MLFLOW_SOURCE_TYPE,
MLFLOW_SOURCE_NAME,
MLFLOW_DATABRICKS_WEBAPP_URL,
MLFLOW_DATABRICKS_JOB_ID,
MLFLOW_DATABRICKS_JOB_RUN_ID,
MLFLOW_DATABRICKS_JOB_TYPE,
MLFLOW_DATABRICKS_WORKSPACE_URL,
MLFLOW_DATABRICKS_WORKSPACE_ID,
)
class DatabricksJobRunContext(RunContextProvider):
def in_context(self):
return databricks_utils.is_in_databricks_job()
def tags(self):
job_id = databricks_utils.get_job_id()
job_run_id = databricks_utils.get_job_run_id()
job_type = databricks_utils.get_job_type()
webapp_url = databricks_utils.get_webapp_url()
workspace_url, workspace_id = databricks_utils.get_workspace_info_from_dbutils()
tags = {
MLFLOW_SOURCE_NAME: (
"jobs/{job_id}/run/{job_run_id}".format(job_id=job_id, job_run_id=job_run_id)
if job_id is not None and job_run_id is not None
else None
),
MLFLOW_SOURCE_TYPE: SourceType.to_string(SourceType.JOB),
}
if job_id is not None:
tags[MLFLOW_DATABRICKS_JOB_ID] = job_id
if job_run_id is not None:
tags[MLFLOW_DATABRICKS_JOB_RUN_ID] = job_run_id
if job_type is not None:
tags[MLFLOW_DATABRICKS_JOB_TYPE] = job_type
if webapp_url is not None:
tags[MLFLOW_DATABRICKS_WEBAPP_URL] = webapp_url
if workspace_url is not None:
tags[MLFLOW_DATABRICKS_WORKSPACE_URL] = workspace_url
if workspace_id is not None:
tags[MLFLOW_DATABRICKS_WORKSPACE_ID] = workspace_id
return tags