# Unity Catalog: Data Discovery

## Bloque 6: Visualizar Relaciones entre Tablas

In [0]:
%sql
drop catalog if exists sesion4 cascade

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS sesion4;

-- 01 · Landing / Raw
CREATE SCHEMA IF NOT EXISTS sesion4.layer10_landing;
CREATE VOLUME IF NOT EXISTS sesion4.layer10_landing.volume;

-- 02 · Bronze
CREATE SCHEMA IF NOT EXISTS sesion4.layer20_bronze;

-- 03 · Silver
CREATE SCHEMA IF NOT EXISTS sesion4.layer30_silver;
CREATE SCHEMA IF NOT EXISTS sesion4.layer30_silver_quarantine;

-- 04 · Gold
CREATE SCHEMA IF NOT EXISTS sesion4.layer40_gold;

-- 99 · Operaciones / Configuración
CREATE SCHEMA IF NOT EXISTS sesion4.layer99_ops;
CREATE VOLUME IF NOT EXISTS sesion4.layer99_ops.configs;

In [0]:
%sh
curl -L https://raw.githubusercontent.com/jmartinezceste/IEF_Course1_360degreesDatabricks/refs/heads/main/IEF_360deg_sesion4/data/dim_aircraft.csv -o /Volumes/sesion4/layer10_landing/volume/dim_aircraft.csv

curl -L https://raw.githubusercontent.com/jmartinezceste/IEF_Course1_360degreesDatabricks/refs/heads/main/IEF_360deg_sesion4/data/dim_phase.csv -o /Volumes/sesion4/layer10_landing/volume/dim_phase.csv

curl -L https://raw.githubusercontent.com/jmartinezceste/IEF_Course1_360degreesDatabricks/refs/heads/main/IEF_360deg_sesion4/data/dim_sensor_reading.csv -o /Volumes/sesion4/layer10_landing/volume/dim_sensor_reading.csv

curl -L https://raw.githubusercontent.com/jmartinezceste/IEF_Course1_360degreesDatabricks/refs/heads/main/IEF_360deg_sesion4/data/fact_sensor.csv -o /Volumes/sesion4/layer10_landing/volume/fact_sensor.csv

curl -L https://raw.githubusercontent.com/jmartinezceste/IEF_Course1_360degreesDatabricks/refs/heads/main/IEF_360deg_sesion4/data/schemas_enriched_pkfk.json -o /Volumes/sesion4/layer99_ops/configs/schemas_enriched_pkfk.json

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  1796  100  1796    0     0   1151      0  0:00:01  0:00:01 --:--:--  1152100  1796  100  1796    0     0   1151      0  0:00:01  0:00:01 --:--:--  1151
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100   622  100   622    0     0   3170      0 --:--:-- --:--:-- --:--:--  3173
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  2005  100  2005    0     0  12065      0 --:

Creamos un nuevo JOB usando el YAML de esta seccion

<img src="https://raw.githubusercontent.com/jmartinezceste/IEF_Course1_360degreesDatabricks/main/photos/sesion2_createjob1.png" width="800">

<img src="https://raw.githubusercontent.com/jmartinezceste/IEF_Course1_360degreesDatabricks/main/photos/sesion2_yaml2.png" width="800">

Copiamos el contenido del archivo .yaml que encontraremos en el repositorio dentro de la sesion 4 y dentro de la carpeta src. 

El archivo especifico es:
 IEF_360deg_sesion4/src/yaml.yaml

<img src="https://raw.githubusercontent.com/jmartinezceste/IEF_Course1_360degreesDatabricks/main/photos/sesion2_yaml3.png" width="800">

In [0]:
  %sql
-- Evita nulos a nivel de metadatos
ALTER TABLE sesion4.layer30_silver.dim_phase
  ALTER COLUMN phase_key SET NOT NULL;

-- Crea la primary key (fallará si quedan duplicados)
ALTER TABLE sesion4.layer30_silver.dim_phase
  ADD CONSTRAINT pk_dim_phase PRIMARY KEY (phase_key);

-- Evita nulos a nivel de metadatos
ALTER TABLE sesion4.layer30_silver.dim_aircraft
  ALTER COLUMN aircraft_id SET NOT NULL;

-- Crea la primary key (fallará si quedan duplicados)
ALTER TABLE sesion4.layer30_silver.dim_aircraft
  ADD CONSTRAINT pk_aircraft_id PRIMARY KEY (aircraft_id);

  -- Evita nulos a nivel de metadatos
ALTER TABLE sesion4.layer30_silver.dim_sensor_reading
  ALTER COLUMN reading_id SET NOT NULL;

-- Crea la primary key (fallará si quedan duplicados)
ALTER TABLE sesion4.layer30_silver.dim_sensor_reading
  ADD CONSTRAINT pk_reading_id PRIMARY KEY (reading_id);

-- FACT: PK y FKs no nulas
ALTER TABLE sesion4.layer30_silver.fact_engine_sensor ALTER COLUMN reading_id       SET NOT NULL;
ALTER TABLE sesion4.layer30_silver.fact_engine_sensor ALTER COLUMN aircraft_id      SET NOT NULL;
ALTER TABLE sesion4.layer30_silver.fact_engine_sensor ALTER COLUMN phase_of_flight  SET NOT NULL;


-- FACT (definimos primary key en fact table)
ALTER TABLE sesion4.layer30_silver.fact_engine_sensor
ADD CONSTRAINT pk_fact_engine_sensor PRIMARY KEY (reading_id);


-- FK: fact.aircraft_id → dim_aircraft.aircraft_id
ALTER TABLE sesion4.layer30_silver.fact_engine_sensor
ADD CONSTRAINT fk_fact_engine_sensor_aircraft
FOREIGN KEY (aircraft_id)
REFERENCES sesion4.layer30_silver.dim_aircraft(aircraft_id);

-- FK: fact.phase_of_flight → dim_phase.phase_key
ALTER TABLE sesion4.layer30_silver.fact_engine_sensor
ADD CONSTRAINT fk_fact_engine_sensor_phase
FOREIGN KEY (phase_of_flight)
REFERENCES sesion4.layer30_silver.dim_phase(phase_key);

-- FK: fact.reading_id → dim_sensor_reading.reading_id
ALTER TABLE sesion4.layer30_silver.fact_engine_sensor
ADD CONSTRAINT fk_fact_engine_sensor_reading
FOREIGN KEY (reading_id)
REFERENCES sesion4.layer30_silver.dim_sensor_reading(reading_id);


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-6162119931642732>, line 1[0m
[0;32m----> 1[0m get_ipython()[38;5;241m.[39mrun_cell_magic([38;5;124m'[39m[38;5;124msql[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124m  -- Evita nulos a nivel de metadatos[39m[38;5;130;01m\n[39;00m[38;5;124mALTER TABLE sesion4.layer30_silver.dim_phase[39m[38;5;130;01m\n[39;00m[38;5;124m  ALTER COLUMN phase_key SET NOT NULL;[39m[38;5;130;01m\n[39;00m[38;5;130;01m\n[39;00m[38;5;124m-- Crea la primary key (fallará si quedan duplicados)[39m[38;5;130;01m\n[39;00m[38;5;124mALTER TABLE sesion4.layer30_silver.dim_phase[39m[38;5;130;01m\n[39;00m[38;5;124m  ADD CONSTRAINT pk_dim_phase PRIMARY KEY (phase_key);[39m[38;5;130;01m\n[39;00m[38;5;130;01m\n[39;00m[38;5;124m-- Evita nulos a nivel de metadato

### 6.2 Visualizar relacion

%md
Databricks permite definir y visualizar relaciones entre tablas directamente desde Unity Catalog, utilizando constraints de tipo Primary Key (PK) y Foreign Key (FK).

Estas relaciones no solo documentan el modelo de datos, sino que además refuerzan la integridad referencial y facilitan el análisis visual de dependencias.

<img src="https://raw.githubusercontent.com/jmartinezceste/251101Course_UC_Ceste/main/photos/view_relationship.png" width="800">

## Bloque 7: Uso de Genie

### 7.1 🤖 Introducción a Databricks Genie

**Genie** es el agente conversacional de **Databricks** diseñado para interactuar con tus datos mediante lenguaje natural.  
En lugar de escribir consultas SQL, puedes simplemente *preguntar* y Genie genera automáticamente las queries necesarias, interpretando tus intenciones y usando el contexto de tu workspace.

### 7.2 ¿Qué hace Genie?

Genie utiliza el modelo de inteligencia artificial integrado en la **Databricks Data Intelligence Platform** para:

- **Comprender el contexto** de tus catálogos, esquemas y tablas registradas en Unity Catalog.  
- **Generar consultas SQL precisas** en función de tus preguntas.  
- **Analizar datos, resumir tendencias y explicar resultados** directamente en lenguaje natural.  
- **Aprender del entorno**, usando metadatos, descripciones y ejemplos guardados en tu workspace.  

### 7.3 ¿Por qué es útil?

Genie te permite:
- Explorar tus datos sin necesidad de conocer SQL o Spark.  
- Validar hipótesis rápidamente durante una demo o análisis exploratorio.  
- Generar visualizaciones simples desde lenguaje natural.  
- Crear una experiencia más accesible para usuarios de negocio dentro del entorno Databricks.

### 7.4 Ejemplo

#### 7.4.1 Abrir un espacio GENIE

##### 7.4.1.1 Puedes iniciar Genie desde el Menu o desde una tabla ya creada

<img src="https://raw.githubusercontent.com/jmartinezceste/251101Course_UC_Ceste/main/photos/genie step1.png" width="800">

##### 7.4.1.2 Selecciona las tablas las cuales formarar parte del espacio

%md
<img src="https://raw.githubusercontent.com/jmartinezceste/251101Course_UC_Ceste/main/photos/genie step2 tablas.png" width="800">

##### 7.4.1.3 Preguntas de ejemplo

In [0]:
# 💬 Ejemplos de preguntas para usar con Genie
# Estas consultas están pensadas para probar el agente una vez creadas las tablas:
# dim_aircraft, dim_phase, dim_sensor_reading y fact_sensor

# 1️⃣ ¿Qué modelo de aeronave acumula más horas totales de vuelo?
# → Usa la tabla dim_aircraft y la columna hours_total
question_1 = "¿Qué modelo de aeronave acumula más horas totales de vuelo?"

# 2️⃣ Muéstrame las fases de vuelo con mayor número de lecturas de sensor.
# → Cruza fact_sensor con dim_phase por phase_key o phase_id
question_2 = "Muéstrame las fases de vuelo con mayor número de lecturas de sensor."

# 3️⃣ ¿En qué fase del vuelo suelen registrarse las altitudes más altas?
# → Usa fact_sensor.altitude y dim_phase.phase_name
question_3 = "¿En qué fase del vuelo suelen registrarse las altitudes más altas?"

# 4️⃣ ¿Existen sensores con valores fuera de rango o anomalías en velocidad?
# → Consulta fact_sensor.speed_knots y detecta valores > 600 nudos
question_4 = "¿Existen sensores con valores fuera de rango o anomalías en velocidad?"

# 5️⃣ Dame un resumen de las condiciones del sensor para aeronaves con más de 20.000 horas de vuelo.
# → Combina dim_aircraft (filtro por horas) y fact_sensor (promedios de temperatura, velocidad, etc.)
question_5 = "Dame un resumen de las condiciones del sensor para aeronaves con más de 20.000 horas de vuelo."




# Unity Catalog: Data Sharing

## Bloque 8: Delta Sharing

### 8.1 Delta Sharing

### 8.2 Marketplace

### 8.3 Databricks Clean Rooms

# Unity Catalog: Data Auditing

## Bloque 9: Systems Table

### 9.1 State of Objects

#### 9.1.1 Cuales son las todas las Tablas en el Catalogo X?

In [0]:
%sql
select table_name
from system.information_schema.tables
where table_catalog="sesion1_2"
and table_schema!="information_schema"



#### 9.1.2 Quien tiene acceso a esa tabla?

In [0]:
%sql
-- Quien tiene acceso a esa tabla

SELECT grantee, table_name, privilege_type
FROM system.information_schema.table_privileges
WHERE table_name = 'dim_aircraft'



#### 9.1.3 Quien fue el ultimo en actualizar las tabla plata y cuando

In [0]:
%sql

SELECT table_name,last_altered_by,last_altered
FROM system.information_schema.tables
WHERE table_schema ="layer30_silver"
ORDER BY 1,3 DESC;



#### 9.1.4 Quien es el owner de esa tabla?

In [0]:
%sql
SELECT table_owner
FROM system.information_schema.tables
WHERE table_catalog ="sesion1_2" and table_schema ="layer30_silver"
and table_name="dim_aircraft"



### 9.2 Audit Logs

#### 9.2.1 Quien accede a esta tabla con mas frecuencia?

In [0]:
%sql
SELECT user_identity.email, count(*)
FROM system.access.audit
WHERE request_params.table_full_name = "sesion1_2.layer30_silver.dim_aircraft"
AND service_name = "unityCatalog"
AND action_name = "generateTemporaryTableCredential"
GROUP BY 1 ORDER BY 2 DESC LIMIT 1;



#### 9.2.2 Quien borro esta tabla?

In [0]:
%sql
SELECT user_identity.email
FROM system.access.audit
WHERE request_params.full_name_arg = "sesion1_2.layer30_silver.dim_aircraft"
AND service_name = "unityCatalog"
AND action_name = "deleteTable"



In [0]:
%sql
select *
from system.access.audit



In [0]:
%sql
select distinct(action_name)
from system.access.audit



#### 9.2.3 ¿a qué ha accedido este usuario en las ultimas 24h?

In [0]:
%sql
SELECT 
  date_format(event_time, 'yyyy-MM-dd HH:mm:ss') AS event_time_local,
  action_name
FROM system.access.audit
WHERE user_identity.email = "251009javiceste@gmail.com"
  AND service_name = "unityCatalog"
  AND event_date > current_date() - INTERVAL 1 DAY
ORDER BY event_time DESC;




#### 9.2.4 A que tablas accede este usuario con mas frecuencia?

In [0]:
%sql
SELECT request_params.table_full_name, count(*)
FROM system.access.audit
WHERE user_identity.email = "251009javiceste@gmail.com"
AND service_name = "unityCatalog"
GROUP BY 1 ORDER BY 2 DESC LIMIT 10;




### 9.3 Billing Logs

#### 9.3.1 Cual es la tendencia de consumo diario en DBU?

In [0]:
%sql
SELECT
  DATE(usage_start_time) AS usage_date,
  SUM(usage_quantity) AS dbus_consumed
FROM system.billing.usage
WHERE usage_unit = 'DBU'
GROUP BY DATE(usage_start_time)
ORDER BY usage_date ASC;




#### 9.3.2 Top 10 usuarios que más DBUs consumen

In [0]:
%sql
SELECT
  COALESCE(custom_tags['Creator'], custom_tags['Owner'], 'unknown') AS User,
  SUM(usage_quantity) AS DBUs
FROM system.billing.usage
WHERE usage_unit = 'DBU'
GROUP BY 1
ORDER BY DBUs DESC
LIMIT 10;




In [0]:
%sql
--Alternativa por Jobs (si no tienes tags): unir con metadatos del job para obtener run_as:
SELECT
  j.run_as AS User,
  SUM(u.usage_quantity) AS DBUs
FROM system.billing.usage u
JOIN system.lakeflow.jobs j
  ON CAST(u.usage_metadata.job_id AS STRING) = CAST(j.job_id AS STRING)
WHERE u.usage_unit = 'DBU'
GROUP BY j.run_as
ORDER BY DBUs DESC
LIMIT 10;



#### 9.3.3 DBUs por SKU en el mes actual

In [0]:
%sql
SELECT
  sku_name              AS SKU,
  SUM(usage_quantity)   AS DBUs
FROM system.billing.usage
WHERE usage_unit = 'DBU'
  AND DATE_TRUNC('month', usage_start_time) = DATE_TRUNC('month', CURRENT_DATE())
GROUP BY sku_name
ORDER BY DBUs DESC;



#### 9.3.4 Qué Jobs consumieron más DBUs


In [0]:
%sql
SELECT
  CAST(usage_metadata.job_id AS STRING) AS `Job ID`,
  SUM(usage_quantity)                   AS DBUs
FROM system.billing.usage
WHERE usage_unit = 'DBU'
  AND usage_metadata.job_id IS NOT NULL
GROUP BY CAST(usage_metadata.job_id AS STRING)
ORDER BY DBUs DESC;




### 9.4 Lineage Data

#### 9.4.1 Qué tablas se crean a partir de una tabla origen — descendencia

``esta NO funcionan en version free``

In [0]:
%sql
SELECT DISTINCT
  target_table_full_name
FROM
  system.lakeflow.table_lineage
WHERE
  source_table_name = 'login_data_bronze';






#### 9.4.2 Que usuarios leen de esta tabla?

In [0]:
%sql
SELECT DISTINCT
  entity_type,
  entity_id,
  source_table_full_name
FROM
  system.lakeflow.table_lineage
WHERE
  source_table_name = 'login_data_silver';




### 9.5 Ejercicio de Creacion de Dashboard

Vamos a crear este dashboard con las tablas de sistema que nos va a permitir poder visualizar informacion referente a nuestro entorno.

Esto nos permitira tener un dashboard que nos aporte una gran cantidad de informacion referente a las ejecuciones que estan sucediendo 

<img src="https://raw.githubusercontent.com/jmartinezceste/251101Course_UC_Ceste/main/photos/DASHBOARD_JOBS.png" width="1300">

Hemos creado una query que nos permite interactuar y crear un dashboard con información relevante a los costes que impactan en nuestro espacio.

Agregaremos la query que alimenta el dashboard y crearemos alguna visualizaciones

- Ve a Dashboards
- Crea un nuevo Dashboards
- añade esta query en la pestaña de "Data"

#### 9.5.1 Query a usar



```sql
-- CTE to fetch the latest job metadata per workspace and job
WITH latest_job_metadata AS (
  SELECT
    *,
    ROW_NUMBER() OVER (
      PARTITION BY workspace_id, job_id 
      ORDER BY change_time DESC
    ) AS rn
  FROM
    system.lakeflow.jobs
  QUALIFY rn = 1
),

-- CTE to fetch the latest cluster metadata per workspace and cluster
latest_clusters_metadata AS (
  SELECT
    *,
    ROW_NUMBER() OVER (
      PARTITION BY workspace_id, cluster_id 
      ORDER BY change_time DESC
    ) AS rn
  FROM
    system.compute.clusters
  QUALIFY rn = 1
),

-- CTE for job run timeline details
jrt AS (
  SELECT
    jr.account_id,
    jr.workspace_id,
    jr.job_id,
    j.name AS job_name,
    j.creator_id,
    j.run_as,
    jr.run_id,
    jr.period_start_time,
    jr.period_end_time,
    jr.trigger_type,
    jr.run_type,
    jr.result_state AS result_state,
    jr.compute_ids,
    (UNIX_TIMESTAMP(jr.period_end_time) - UNIX_TIMESTAMP(jr.period_start_time)) / 60 AS duration_minutes,
    CASE WHEN jr.result_state = 'SUCCEEDED' THEN jr.run_id ELSE NULL END AS success_run_ids,
    CASE WHEN jr.result_state <> 'SUCCEEDED' THEN jr.run_id ELSE NULL END AS failure_run_ids
  FROM
    system.lakeflow.job_run_timeline jr
  JOIN latest_job_metadata j ON jr.job_id = j.job_id
  WHERE
    jr.period_start_time >= DATE(:start_date)
    AND jr.period_start_time < DATE(:end_date) + INTERVAL 1 day
),

-- CTE for pricing
pricing AS (
  SELECT
    p.pricing.effective_list.default AS price_per_unit,
    p.price_start_time,
    p.sku_name,
    COALESCE(p.price_end_time, NOW()) AS price_end_time
  FROM
    system.billing.list_prices p
),

-- CTE with user info, usage, pricing
usage_with_names AS (
  SELECT
    u.account_id,
    u.workspace_id,
    u.usage_metadata['job_id'] AS job_id,
    DATE(u.usage_date) AS usage_date,
    u.sku_name,
    TRANSFORM(
      MAP_KEYS(custom_tags), (k, i) -> CONCAT(
        lower(k),
        '=',
        lower(MAP_VALUES(custom_tags)[i])
      )
    ) AS key_value_tags,
    u.usage_metadata,
    jrt.job_name,
    CASE 
      WHEN REGEXP_REPLACE(
        u.identity_metadata.run_as,
        '([a-zA-Z]+)\\.([a-zA-Z]+)@.*',
        '$1 $2'
      ) = u.identity_metadata.run_as THEN u.identity_metadata.run_as
      ELSE INITCAP(
        REGEXP_REPLACE(
          u.identity_metadata.run_as,
          '([a-zA-Z]+)\\.([a-zA-Z]+)@.*',
          '$1 $2'
        )
      )
    END AS user_name_cleaned,
    jrt.run_id,
    jrt.success_run_ids,
    jrt.failure_run_ids,
    jrt.duration_minutes,
    jrt.result_state,
    jrt.trigger_type,
    SUM(u.usage_quantity) AS usage_quantity, 
    SUM(u.usage_quantity * p.price_per_unit) AS dollar_cost
  FROM
    system.billing.usage u
  LEFT OUTER JOIN jrt ON u.usage_metadata['job_run_id'] = jrt.run_id
    AND u.usage_metadata['job_id'] = jrt.job_id
    AND u.usage_start_time >= DATE_TRUNC("HOUR", jrt.period_start_time)
    AND u.usage_end_time < DATE_TRUNC("HOUR", jrt.period_end_time) + INTERVAL 1 HOUR
  INNER JOIN pricing p ON u.sku_name = p.sku_name
    AND u.usage_start_time BETWEEN p.price_start_time AND p.price_end_time
  WHERE
    u.billing_origin_product = 'JOBS'
    AND u.usage_date >= DATE(:start_date)
    AND u.usage_date < DATE(:end_date) + INTERVAL 1 day
    AND (
      :tags = 'All'
      OR array_contains(
        TRANSFORM(
          MAP_KEYS(custom_tags), (k, i) -> CONCAT(
            lower(k),
            '=',
            lower(MAP_VALUES(custom_tags)[i])
          )
        ),
        :tags
      )
    )
  GROUP BY ALL
),

-- Top k jobs per date
top_jobs_raw AS (
  SELECT
    usage_date,
    job_name,
    SUM(usage_quantity) AS total_usage
  FROM usage_with_names
  GROUP BY usage_date, job_name
),
top_jobs AS (
  SELECT *
  FROM (
    SELECT *,
           ROW_NUMBER() OVER (PARTITION BY usage_date ORDER BY total_usage DESC) AS rn
    FROM top_jobs_raw
  ) t
  WHERE rn <= :top_k
),

-- Top k users per date
top_users_raw AS (
  SELECT
    usage_date,
    user_name_cleaned AS user,
    SUM(usage_quantity) AS total_usage
  FROM usage_with_names
  GROUP BY usage_date, user_name_cleaned
),
top_users AS (
  SELECT *
  FROM (
    SELECT *,
           ROW_NUMBER() OVER (PARTITION BY usage_date ORDER BY total_usage DESC) AS rn
    FROM top_users_raw
  ) t
  WHERE rn <= :top_k
)

-- Final output
SELECT
  uwn.account_id,
  uwn.workspace_id,
  uwn.usage_date,
  uwn.usage_quantity,
  uwn.sku_name,
  key_value_tags,
  uwn.job_id,
  uwn.job_name,
  uwn.run_id,
  uwn.success_run_ids,
  uwn.failure_run_ids,
  uwn.result_state,
  uwn.user_name_cleaned AS user,
  CASE
    WHEN EXISTS (
      SELECT 1 FROM top_jobs tj
      WHERE tj.usage_date = uwn.usage_date AND tj.job_name = uwn.job_name
    ) THEN uwn.job_name
    ELSE 'All Others'
  END AS job_name_group,
  CASE
    WHEN EXISTS (
      SELECT 1 FROM top_users tu
      WHERE tu.usage_date = uwn.usage_date AND tu.user = uwn.user_name_cleaned
    ) THEN uwn.user_name_cleaned
    ELSE 'All Others'
  END AS user_group,
  uwn.duration_minutes,
  uwn.trigger_type,
  uwn.dollar_cost
FROM
  usage_with_names uwn
  ```

#### 9.5.2 Visuales a construir

- Nivel 1:
  - KPI Total Jobs executed: 
      - Tipo Visual: Counter
      - Value: COUNT DISTINCT(run_id)
  - KPI Total DBUs consumed:
      - Tipo Visual: Counter
      - Value: SUM(usage_quantity)
  - KPI Total Cost consumed:
      - Tipo Visual: Counter
      - Value: SUM(dollar_cost)
  - KPI Average JOB duration:
      - Tipo Visual: Counter
      - Value: AVG(duration_minutes)

- Nivel 2:
  - BARCHART Total Cost ($) per JOB:
      - Tipo Visual: Bar
      - X axis: SUM(dollar_cost)
      - Y axis: job_name_group
  - PIECHART Job Status:
      - Tipo Visual: PIE
      - Angle: SUM(dollar_cost)
      - Color: result_state

- Nivel 3:
  - HISTOGRAM Cost ($)  Over time per JOB:
      - Tipo Visual: Bar
      - X axis: DAILY(usage_date)
      - Y axis: SUM(dollar_cost)

- Nivel 4:
  - HISTOGRAM RUNS per day & result_state:
      - Tipo Visual: Bar
      - X axis: DAILY(usage_date)
      - Y axis: COUNT_DISTINCT(run_id)
      - Color: result_state

- Nivel 5: 
  - SUMMARY MATRIX:
      - Tipo Visual: Pivot
      - Rows: job_name, user, workspace_id
      - Columns: result_state
      - Values: SUM(dollar_cost)
