# Databricks system data

This notebook ingests data from [Databricks Runtime release notes](https://learn.microsoft.com/en-us/azure/databricks/release-notes/runtime/) and Databricks API.  It is executed using Databricks Workflows as defined in resources/data_platform_tools_job.yml.

In [19]:
from pyspark.sql.functions import lit, col, when, expr, left, substring_index, regexp_replace
from databricks.sdk import AccountClient
from data_platform_tools.databricks_system_data import DatabricksSystemData

# --- CONFIG ----
ACCOUNT_HOST = "accounts.azuredatabricks.net"
TENANT_ID = "c3588c15-f840-4591-875f-b3d42610f22f"
ACCOUNT_ID = "42ba6f6a-250d-4e87-9433-3ab73685b3f6"
CLIENT_ID = "22a10d55-9e76-464d-96bc-3e6c3e44cc35"
CLIENT_SECRET = dbutils.secrets.get(scope="kv-redkic-ne-test", key="DatabricksAPI")
CATALOG_PATH = dbutils.widgets.get("catalog")
SCHEMA_PATH = dbutils.widgets.get("schema")
# --- END CONFIG ---

spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG_PATH}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG_PATH}.{SCHEMA_PATH}")

account_client = AccountClient(
    host = ACCOUNT_HOST,
    account_id = ACCOUNT_ID,
    azure_client_id = CLIENT_ID,
    azure_client_secret = CLIENT_SECRET,
    azure_tenant_id = TENANT_ID
)

dsd = DatabricksSystemData(
    spark = spark,
    account_client = account_client
    )

In [None]:
table_name = f"{CATALOG_PATH}.{SCHEMA_PATH}.runtime_versions"
df = dsd.get_runtime_versions()
df = (df
      .withColumn("spark_version_number", regexp_replace("version", " LTS", ""))
      )
df.writeTo(table_name).createOrReplace()

In [None]:
table_name = f"{CATALOG_PATH}.{SCHEMA_PATH}.users"
df = dsd.get_users()
df = (df
      .withColumn("givenname", df.name.givenname)
      .withColumn("familyname", df.name.familyname)
      .drop("name", "emails")
      )
df.writeTo(table_name).createOrReplace()

In [None]:
table_name = f"{CATALOG_PATH}.{SCHEMA_PATH}.workspaces"
df = dsd.get_workspaces()
df = df.drop("azure_workspace_info", "workspace_status", "workspace_status_message")
df.writeTo(table_name).createOrReplace()

In [None]:
table_name = f"{CATALOG_PATH}.{SCHEMA_PATH}.workspace_privileges"
df = dsd.get_workspace_privileges()
df = (df
      .withColumn("privilege_type", df.permissions[0])
      .withColumn("principal_id", df.principal.principal_id)
      .withColumn("type",
                  when(col("principal").group_name.isNotNull(), lit("Group"))
                  .when(col("principal").service_principal_name.isNotNull(), lit("Service Principal"))
                  .when(col("principal").user_name.isNotNull(), lit("User"))
                  .otherwise(lit("UNKNOWN")))
      .withColumn("display_name", when(col("type") == "User", df.principal.user_name).otherwise(df.principal.display_name))
      .drop("permissions")
      .drop("principal")
     )

df.writeTo(table_name).createOrReplace()

In [None]:
table_name = f"{CATALOG_PATH}.{SCHEMA_PATH}.clusters"
df = dsd.get_clusters()

drop = ("num_workers", "autoscale","spark_conf","azure_attributes","ssh_public_keys","creator_user_name","custom_tags",
        "cluster_source","cluster_log_status","default_tags","instance_pool_id","driver_instance_pool_id",
        "workload_type","enable_elastic_disk","enable_local_disk_encryption","jdbc_port","last_restarted_time",
        "last_state_loss_time","num_workers","policy_id","spark_conf","spark_env_vars","start_time","state",
        "state_message","terminated_time","termination_reason")

df = (df
      .withColumn("owner_id", df.custom_tags.owner_id)
      .withColumn("spark_version_number", substring_index("spark_version", ".x", 1))
      .drop(*drop)
      )

df.writeTo(table_name).createOrReplace()

In [None]:
table_name = f"{CATALOG_PATH}.{SCHEMA_PATH}.warehouses"
df = dsd.get_warehouses()

drop = ("jdbc_url","max_num_clusters","min_num_clusters", "num_active_sessions","num_clusters","state")

df = (df
      .withColumn("owner_id", expr("filter(tags.custom_tags, x -> x.key == 'owner_id')[0].value"))
      .drop(*drop)
      )

df.writeTo(table_name).createOrReplace()

In [8]:
def tranfsorm_privileges(df):
    drop = ("all_permissions", "permission_level", "user_name", "service_principal_name", "group_name")

    return (df
            .withColumn("inherited", col("all_permissions")[0].inherited)
            .withColumn("inherited_from_object", col("all_permissions")[0].inherited_from_object)
            .withColumn("permission_level", col("all_permissions")[0].permission_level)
            .withColumn("privilege_type", col("permission_level"))
            .withColumn("type",
                   when(df.group_name.isNotNull(), lit("Group"))
                  .when(df.service_principal_name.isNotNull(), lit("Service Principal"))
                  .when(df.user_name.isNotNull(), lit("User"))
                  .otherwise(lit("UNKNOWN")))
            .withColumn("display_name",
                   when(col("type") == "Group", df.group_name)
                   .when(col("type") == "User", df.user_name)
                   .otherwise(df.display_name))
            .drop(*drop)
            )

In [None]:
table_name = f"{CATALOG_PATH}.{SCHEMA_PATH}.cluster_privileges"
df = dsd.get_cluster_privileges()
df = tranfsorm_privileges(df)
df.writeTo(table_name).createOrReplace()

In [None]:
table_name = f"{CATALOG_PATH}.{SCHEMA_PATH}.warehouse_privileges"
df = dsd.get_warehouse_privileges()
df = tranfsorm_privileges(df)
df.writeTo(table_name).createOrReplace()