---

## 
- **Usage**: Interactive development and ad hoc analysis
- **Management**: Manged by the user via UI, CLI or API REST
- **Termination**: Manual or Auto-termination after inactivity. 120 min (default), 10 min (minimun)
- **Cost efficiency**: cost more to run when compared to job and sql warehouse pro. $0.65 per DBU classic and $0.95 per DBU serveless 
    https://www.databricks.com/product/pricing/datascience-ml

---

---

##  Serveless Compute for Notebooks
- On demand
- Auto scalable
- Run SQL and Python
- No infra
- Budget Policies
---

---

##  Single-node clustrre
- Just driver node
- Driver handles both driver and worker responsabilities
- Cost effective
- Run SQL and Python
---


In [0]:
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

print("Attempting to create cluster. Please wait...")

c = w.clusters.create_and_wait(
  cluster_name             = 'my-cluster',
  spark_version            = '13.3.x-scala2.12',
  node_type_id             = 'i3.xlarge',
  autotermination_minutes  = 15,
  num_workers              = 1
)

print(f"The cluster is now ready at " \
      f"{w.config.host}#setting/clusters/{c.cluster_id}/configuration\n")

In [0]:
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
 
for c in w.clusters.list():
  print(c.cluster_name)

In [0]:
%sql
with node_counts as
(SELECT
    driver_node_type as node_type, 1 as node_count
  FROM
    system.compute.clusters
WHERE 
workspace_id = dataops_prd.libs.get_workspace_id()
UNION ALL
SELECT
    worker_node_type as node_type, coalesce(worker_count,max_autoscale_workers) as node_count
  FROM
    system.compute.clusters
WHERE workspace_id = dataops_prd.libs.get_workspace_id()
)
select node_type, sum(node_count) as count
FROM
    node_counts
GROUP BY ALL
ORDER BY count DESC

In [0]:
%sql
with node_counts as
(SELECT
    driver_node_type as node_type, 1 as node_count
  FROM
    system.compute.clusters
WHERE 
workspace_id = dataops_prd.libs.get_workspace_id() AND driver_instance_pool_id is null
UNION ALL
SELECT
    worker_node_type as node_type, coalesce(worker_count,max_autoscale_workers) as node_count
  FROM
    system.compute.clusters 
WHERE workspace_id = dataops_prd.libs.get_workspace_id() and worker_instance_pool_id is null
)
select node_type, sum(node_count) as count
FROM
    node_counts
GROUP BY ALL
ORDER BY count DESC

In [0]:
import datetime
from pyspark.sql.functions import col
from concurrent.futures import ThreadPoolExecutor
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import workspace
from databricks.sdk.service import jobs
from datetime import datetime
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, LongType, FloatType, BooleanType, TimestampType

days_back = 1
w = WorkspaceClient()

from dataclasses import is_dataclass, asdict

def safe_getattr(obj, attr, default=None):
    try:
        # Convert dataclass to dict if necessary
        if is_dataclass(obj):
            obj = asdict(obj)

        for a in attr.split('.'):
            if isinstance(obj, dict):
                obj = obj.get(a)
            else:
                obj = getattr(obj, a, None)

        # Attempt to cast obj to the type of default, if default is not None
        if default is not None and obj is not None:
            obj_type = type(default)
            try:
                return obj_type(obj)
            except (ValueError, TypeError):
                return default
        return obj if obj is not None else default
    except (AttributeError, KeyError):
        return default

def process_job_object(job_obj):
    row = {
        "job_id": safe_getattr(job_obj, "job_id"),
        "created_time": safe_getattr(job_obj, "created_time"),
        "creator_user_name": safe_getattr(job_obj, "creator_user_name", "None"),
        "name": safe_getattr(job_obj, "settings.name"),
        "parameters": safe_getattr(job_obj, "settings.parameters"),
        "schedule_paused_status": safe_getattr(job_obj, "settings.schedule.pause_status.value", "None"),
        "schedule_quartz_cron_expression": safe_getattr(job_obj, "settings.schedule.quartz_cron_expression", "None"),
        "schedule_timezone_id": safe_getattr(job_obj, "settings.schedule.timezone_id", "None"),
        "schedule_paused_status": safe_getattr(job_obj, "settings.schedule.pause_status.value", "None"),
        "schedule_continuous": safe_getattr(job_obj, "settings.continuous.pause_status.value", "None"),
        # Uncomment and adjust the following line if needed
        # "webhook_notifications.on_success": safe_getattr(jobObj, "settings.webhook_notifications.on_success", "None"),
        "task_count": 0 if job_obj.settings.tasks is None else len(job_obj.settings.tasks),
        "job_clusters": 0 if job_obj.settings.job_clusters is None else len(job_obj.settings.job_clusters),
    }
    return row

def parallel_process_jobs(jobs_gen, max_workers=20):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = executor.map(process_job_object, jobs_gen)
        return list(results)

# Usage
jobs_gen = w.jobs.list(expand_tasks=True)
parsed_job_info = parallel_process_jobs(jobs_gen)

# Define the schema based on the provided types_dict
schema = StructType([
    StructField("job_id", LongType()),
    StructField("created_time", LongType()),
    StructField("creator_user_name", StringType()),
    StructField("name", StringType()),
    StructField("parameters", ArrayType(StringType())), 
    StructField("schedule_paused_status", StringType()),
    StructField("schedule_quartz_cron_expression", StringType()),
    StructField("schedule_continuous", StringType()),
    StructField("schedule_timezone_id", StringType()),
    StructField("task_count", IntegerType()),
    StructField("job_clusters", IntegerType())
])

sparkJobsDF = spark.createDataFrame(data=parsed_job_info, schema=schema)
sparkJobsDF.write.mode("overwrite").option("mergeSchema", "true").saveAsTable("dataops_prd.control.job_info")

In [0]:
%sql
with clusters as (
  select distinct cluster_id, cluster_name
  from (
    SELECT
    *,
    ROW_NUMBER() OVER(PARTITION BY workspace_id, cluster_id ORDER BY change_time DESC) as rn
  FROM system.compute.clusters where workspace_id = dataops_prd.libraries.get_workspace_id() and (cluster_name not like "job-%" and cluster_name not like "dlt-%") and delete_time is null
  QUALIFY rn=1
  )
),
jobs_info as (
  select distinct j.job_id, j.name as job_name, coalesce(ui.userName,ji.creator_user_name) as creator, from_unixtime(ji.created_time / 1000) as creation_date, change_time,
  ROW_NUMBER() OVER(PARTITION BY j.job_id ORDER BY change_time DESC) as rn
from 
system.lakeflow.jobs j
 left join dataops_prd.control.users_id ui on j.creator_id = ui.id
 inner join dataops_prd.control.job_info ji on j.job_id = ji.job_id
 where workspace_id = dataops_prd.libraries.get_workspace_id() and j.delete_time is null
 QUALIFY rn=1
),
job_tasks_exploded AS (
  SELECT
    workspace_id,
    job_id,
    EXPLODE(compute_ids) as cluster_id,
    period_start_time,
    ROW_NUMBER() OVER(PARTITION BY job_id ORDER BY period_start_time DESC) as rn
  FROM system.lakeflow.job_task_run_timeline
  where workspace_id = dataops_prd.libraries.get_workspace_id()
  QUALIFY rn=1
)
select distinct j.job_id, j.job_name, j.creator, j.creation_date, t.cluster_id, c.cluster_name
from jobs_info as j
left join job_tasks_exploded as t on j.job_id = t.job_id and t.period_start_time >= j.change_time
inner join clusters as c on t.cluster_id = c.cluster_id

### Managing cluster logs
- **Event log**: records all significant actions related to the cluster, such as when the cluster was created, terminated, edited, scalated or encontered any erros
- **Spark UI**: interface for monitoring and debugging job execution, stages and tasks
- **Driver logs**: outputs from the notebooks and libraries