# First execution

If you need to select the kernel, select **.venv/bin/python**.

# Databricks Local Analysis
This notebook demonstrates how to interact with the local Spark environment,
the Hive Metastore, Delta Lake features, and the full **DBUtils Shim**.

In [74]:
import sys, os

# Asegurar que databricks_shim está en el path
sys.path.insert(0, os.path.abspath(os.path.join(os.getcwd(), "..")))

# Modo local puro: desactivar servicios Docker (S3/MinIO, PostgreSQL)
# que no están disponibles fuera de Docker
os.environ["APP_ENV"] = "local"
for key in ["AWS_ENDPOINT_URL", "POSTGRES_HOST", "POSTGRES_PORT",
            "POSTGRES_DB", "POSTGRES_USER", "POSTGRES_PASSWORD"]:
    os.environ.pop(key, None)

from databricks_shim import inject_notebook_context

inject_notebook_context("Notebook_Data_Analysis")
print("Spark Session Active — spark, dbutils, display, sc, uc disponibles")

⚡ Initializing Local Spark — Databricks 16.4 LTS + Unity Catalog Emulator
Spark Session Active — spark, dbutils, display, sc, uc disponibles


26/02/18 09:47:37 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## Crear datos de ejemplo (modo local)
Preparamos las tablas Delta necesarias directamente en el warehouse local.

In [75]:
# ── Crear esquema y datos de ejemplo ─────────────────────────────────────────
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import upper, col, avg, count, current_timestamp, round as spark_round
import tempfile, os

schema = StructType([
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("price", DoubleType()),
    StructField("category", StringType()),
])

data = [
    (1, "Product A", 100.0, "electronics"),
    (2, "Product B", 200.0, "electronics"),
    (3, "Product C",  50.0, "clothing"),
    (4, "Product D",  75.0, "clothing"),
    (5, "Product E", 300.0, "home"),
]

df = spark.createDataFrame(data, schema)

# Silver: transformación
silver_df = df.withColumn("name", upper(col("name"))) \
              .withColumn("category", upper(col("category")))

_tmp = tempfile.mkdtemp(prefix="nb_demo_")
silver_path = os.path.join(_tmp, "silver")
gold_path = os.path.join(_tmp, "gold")

silver_df.write.format("delta").mode("overwrite").save(silver_path)

# Gold: resumen por categoría
gold_df = silver_df.groupBy("category").agg(
    count("id").alias("total_products"),
    spark_round(avg("price"), 2).alias("avg_price"),
)
gold_df.write.format("delta").mode("overwrite").save(gold_path)

# Registrar como tablas temporales para consultas
spark.read.format("delta").load(silver_path).createOrReplaceTempView("products_silver")
spark.read.format("delta").load(gold_path).createOrReplaceTempView("category_summary_gold")

print(f"✓ Silver: {silver_path}")
print(f"✓ Gold:   {gold_path}")
print("✓ Vistas temporales: products_silver, category_summary_gold creadas")

✓ Silver: /tmp/nb_demo_n_bu9hck/silver
✓ Gold:   /tmp/nb_demo_n_bu9hck/gold
✓ Vistas temporales: products_silver, category_summary_gold creadas


## Query Silver & Gold Tables

In [76]:
print("=== Silver Table ===")
display(spark.sql("SELECT * FROM products_silver"))

print("\n=== Gold Table (Category Summary) ===")
display(spark.sql("SELECT * FROM category_summary_gold"))

=== Silver Table ===
+---+---------+-----+-----------+
| id|     name|price|   category|
+---+---------+-----+-----------+
|  2|PRODUCT B|200.0|ELECTRONICS|
|  1|PRODUCT A|100.0|ELECTRONICS|
|  4|PRODUCT D| 75.0|   CLOTHING|
|  3|PRODUCT C| 50.0|   CLOTHING|
|  5|PRODUCT E|300.0|       HOME|
+---+---------+-----+-----------+


=== Gold Table (Category Summary) ===
+-----------+--------------+---------+
|   category|total_products|avg_price|
+-----------+--------------+---------+
|ELECTRONICS|             2|    150.0|
|   CLOTHING|             2|     62.5|
|       HOME|             1|    300.0|
+-----------+--------------+---------+



## Delta Time Travel
Access previous versions of your Bronze table.

In [77]:
# Escribir un Delta table con varias versiones para demostrar time travel
import tempfile, os

delta_path = os.path.join(tempfile.mkdtemp(prefix="delta_tt_"), "products")

# Version 0
spark.createDataFrame([(1, "A", 10.0), (2, "B", 20.0)], "id INT, name STRING, price DOUBLE") \
     .write.format("delta").mode("overwrite").save(delta_path)

# Version 1 — append
spark.createDataFrame([(3, "C", 30.0)], "id INT, name STRING, price DOUBLE") \
     .write.format("delta").mode("append").save(delta_path)

print("Version 0 (original — 2 rows):")
spark.read.format("delta").option("versionAsOf", 0).load(delta_path).show()

print("Version 1 (latest — 3 rows):")
spark.read.format("delta").load(delta_path).show()

Version 0 (original — 2 rows):
+---+----+-----+
| id|name|price|
+---+----+-----+
|  1|   A| 10.0|
|  2|   B| 20.0|
+---+----+-----+

Version 1 (latest — 3 rows):
+---+----+-----+
| id|name|price|
+---+----+-----+
|  1|   A| 10.0|
|  2|   B| 20.0|
|  3|   C| 30.0|
+---+----+-----+



## Delta History

In [78]:
from delta.tables import DeltaTable

dt = DeltaTable.forPath(spark, delta_path)
dt.history().select("version", "timestamp", "operation").show(truncate=False)

+-------+-----------------------+---------+
|version|timestamp              |operation|
+-------+-----------------------+---------+
|1      |2026-02-18 14:47:41.034|WRITE    |
|0      |2026-02-18 14:47:40.293|WRITE    |
+-------+-----------------------+---------+



## DBUtils Shim — Complete Test
Test credentials, secrets, widgets, fs, notebook, jobs.taskValues and data modules.

In [79]:
# --- dbutils.credentials (no-op locally) ---
print(f"credentials.showCurrentRole: {dbutils.credentials.showCurrentRole()}")

# --- dbutils.secrets ---
aws_key = dbutils.secrets.get("any_scope", "AWS_ACCESS_KEY_ID")
print(f"secrets.get: AWS_ACCESS_KEY_ID = {aws_key}")

aws_bytes = dbutils.secrets.getBytes("any_scope", "AWS_ACCESS_KEY_ID")
print(f"secrets.getBytes: {aws_bytes}")

print(f"secrets.listScopes: {dbutils.secrets.listScopes()[:3]}")

credentials.showCurrentRole: []
secrets.get: AWS_ACCESS_KEY_ID = minioadmin
secrets.getBytes: b'minioadmin'
secrets.listScopes: [SecretScope(name=''), SecretScope(name='app'), SecretScope(name='application')]


In [80]:
# --- dbutils.widgets ---
dbutils.widgets.text("env", "dev", "Environment")
dbutils.widgets.dropdown("region", "us-east-1", ["us-east-1", "eu-west-1"], "Region")
dbutils.widgets.combobox("format", "delta", ["delta", "parquet", "csv"], "Format")

print(f"widgets.get('env'): {dbutils.widgets.get('env')}")
print(f"widgets.get('region'): {dbutils.widgets.get('region')}")
print(f"widgets.getAll(): {dbutils.widgets.getAll()}")

dbutils.widgets.remove('format')
print(f"After remove('format'): {dbutils.widgets.getAll()}")

widgets.get('env'): dev
widgets.get('region'): us-east-1
widgets.getAll(): {'env': 'dev', 'region': 'us-east-1', 'format': 'delta'}
After remove('format'): {'env': 'dev', 'region': 'us-east-1'}


In [81]:
# --- dbutils.fs (usando rutas DBFS locales) ---
dbfs_base = "dbfs:/tmp/fs_demo"
dbutils.fs.mkdirs(f"{dbfs_base}/subdir")
dbutils.fs.put(f"{dbfs_base}/hello.txt", "Hello from dbutils.fs!", overwrite=True)
dbutils.fs.put(f"{dbfs_base}/data.csv", "id,name\n1,alpha\n2,beta", overwrite=True)

print("fs.ls:")
for f in dbutils.fs.ls(f"{dbfs_base}/"):
    print(f"  {f.name}  size={f.size}")

print(f"\nfs.head: {dbutils.fs.head(f'{dbfs_base}/hello.txt')}")

# cp & rm
dbutils.fs.cp(f"{dbfs_base}/hello.txt", f"{dbfs_base}/hello_copy.txt")
print(f"\nAfter cp:")
for f in dbutils.fs.ls(f"{dbfs_base}/"):
    print(f"  {f.name}")

dbutils.fs.rm(f"{dbfs_base}", recurse=True)
print("\n✓ Cleaned up")

fs.ls:
  data.csv  size=22
  hello.txt  size=22
  subdir/  size=0

fs.head: Hello from dbutils.fs!

After cp:
  data.csv
  hello.txt
  hello_copy.txt
  subdir/

✓ Cleaned up


In [82]:
# --- dbutils.jobs.taskValues ---
dbutils.jobs.taskValues.set("row_count", 42)
val = dbutils.jobs.taskValues.get("my_task", "row_count", debugValue=0)
print(f"jobs.taskValues: row_count = {val}")

jobs.taskValues: row_count = 42


In [83]:
# --- dbutils.data.summarize ---
df = spark.sql("SELECT * FROM products_silver")
dbutils.data.summarize(df)

+-------+------------------+---------+-----------------+--------+
|summary|                id|     name|            price|category|
+-------+------------------+---------+-----------------+--------+
|  count|                 5|        5|                5|       5|
|   mean|               3.0|     NULL|            145.0|    NULL|
| stddev|1.5811388300841898|     NULL|103.6822067666386|    NULL|
|    min|                 1|PRODUCT A|             50.0|CLOTHING|
|    max|                 5|PRODUCT E|            300.0|    HOME|
+-------+------------------+---------+-----------------+--------+



## Unity Catalog — Emulación completa
Databricks Runtime 16.4 LTS incluye Unity Catalog con namespace de tres niveles  
`catalog.schema.table`. El objeto `uc` está disponible globalmente via `inject_notebook_context`.

In [84]:
# ── UC: Catálogos ────────────────────────────────────────────────────────────
print("=== UNITY CATALOG — Catálogos ===\n")

# Crear catálogos (soporta: IF NOT EXISTS, MANAGED LOCATION, COMMENT)
uc.sql("CREATE CATALOG IF NOT EXISTS analytics COMMENT 'Catálogo analítico'")
uc.sql("CREATE CATALOG IF NOT EXISTS staging  COMMENT 'Datos en tránsito'")

# Listar catálogos
cats = uc.list_catalogs()
print("Catálogos disponibles:")
for c in cats:
    print(f"  • {c.name}")

# DESCRIBE CATALOG
print("\nDESCRIBE CATALOG analytics:")
uc.sql("DESCRIBE CATALOG analytics").show()

# Catálogo activo
print(f"Catálogo activo: {uc.get_current_catalog()}")
uc.set_current_catalog("analytics")
print(f"Cambiado a: {uc.get_current_catalog()}")
uc.set_current_catalog("main")

=== UNITY CATALOG — Catálogos ===

Catálogos disponibles:
  • analytics
  • hive_metastore
  • main
  • staging

DESCRIBE CATALOG analytics:
+----------------+--------------------+
|             key|               value|
+----------------+--------------------+
|            name|           analytics|
|    catalog_type|             MANAGED|
|storage_location|/home/omar/Docume...|
|         comment|                    |
+----------------+--------------------+

Catálogo activo: main
Cambiado a: analytics


In [85]:
# ── UC: Schemas y tres niveles ───────────────────────────────────────────────
print("=== UNITY CATALOG — Schemas y namespace tres niveles ===\n")

# El shim UC gestiona schemas para cualquier catálogo
uc.create_schema("analytics", "bronze", comment="Capa Bronze", if_not_exists=True)
uc.create_schema("analytics", "silver", comment="Capa Silver", if_not_exists=True)
uc.create_schema("analytics", "gold",   comment="Capa Gold",   if_not_exists=True)

schemas = uc.list_schemas("analytics")
print("Schemas en 'analytics' (shim UC):")
for s in schemas:
    print(f"  • analytics.{s.name}")

# ── USE CATALOG / USE SCHEMA ─────────────────────────────────────────────
print("\n--- USE CATALOG / USE SCHEMA ---")
uc.sql("USE CATALOG analytics")
print(f"Catálogo activo: {uc.get_current_catalog()}")
uc.sql("USE SCHEMA bronze")
print(f"Schema activo: {uc.get_current_schema()}")

# SHOW SCHEMAS via SQL
print("\nSHOW SCHEMAS IN analytics:")
for s in uc.list_schemas("analytics"):
    print(f"  • {s.name}")

# Restaurar
uc.sql("USE CATALOG main")
uc.sql("USE SCHEMA default")
print(f"\nRestaurado a: {uc.get_current_catalog()}.{uc.get_current_schema()}")

# ── Crear tabla de eventos en main.events_demo ───────────────────────────
import shutil, pathlib
# Limpiar directorio y tabla si existen de una ejecución previa
_events_db = pathlib.Path(".warehouse/main/events_demo.db")
if _events_db.exists():
    shutil.rmtree(_events_db)

uc.sql("CREATE SCHEMA IF NOT EXISTS events_demo")
spark.sql("DROP TABLE IF EXISTS events_demo.events")
spark.sql("DROP DATABASE IF EXISTS events_demo CASCADE")
spark.sql("CREATE DATABASE IF NOT EXISTS events_demo")
spark.sql("""
    CREATE TABLE events_demo.events
    (event_id INT, user_id INT, event_type STRING, ts TIMESTAMP)
    USING DELTA
""")
print("\n✓ Tabla events_demo.events creada")

spark.sql("""
    INSERT INTO events_demo.events VALUES
    (1, 100, 'click',    current_timestamp()),
    (2, 101, 'purchase', current_timestamp()),
    (3, 100, 'view',     current_timestamp())
""")
print("Consulta events_demo.events:")
spark.sql("SELECT * FROM events_demo.events").show()

=== UNITY CATALOG — Schemas y namespace tres niveles ===

Schemas en 'analytics' (shim UC):

--- USE CATALOG / USE SCHEMA ---
Catálogo activo: analytics
Schema activo: bronze

SHOW SCHEMAS IN analytics:

Restaurado a: main.default

✓ Tabla events_demo.events creada
Consulta events_demo.events:
+--------+-------+----------+--------------------+
|event_id|user_id|event_type|                  ts|
+--------+-------+----------+--------------------+
|       2|    101|  purchase|2026-02-18 14:47:...|
|       1|    100|     click|2026-02-18 14:47:...|
|       3|    100|      view|2026-02-18 14:47:...|
+--------+-------+----------+--------------------+



In [86]:
# ── UC: Volumes y rutas /Volumes/ ────────────────────────────────────────────
print("=== UNITY CATALOG — Volumes y /Volumes/ paths ===\n")

# Crear volumes gestionados
uc.sql("CREATE VOLUME IF NOT EXISTS analytics.bronze.raw_files    COMMENT 'Archivos crudos'")
uc.sql("CREATE VOLUME IF NOT EXISTS analytics.bronze.checkpoints  COMMENT 'Checkpoints Spark'")

# Listar volumes
print("Volumes en analytics.bronze:")
uc.sql("SHOW VOLUMES IN analytics.bronze").show()

# Usar rutas /Volumes/ con dbutils.fs (igual que en Databricks real)
vol_base = "/Volumes/analytics/bronze/raw_files"

dbutils.fs.put(f"{vol_base}/config.json",
               '{"source": "api", "version": 1, "env": "prod"}',
               overwrite=True)
dbutils.fs.put(f"{vol_base}/data.csv",
               "id,name,value\n1,item_a,100\n2,item_b,200\n3,item_c,300",
               overwrite=True)
dbutils.fs.mkdirs(f"{vol_base}/subdir")

# ls() preserva el prefijo /Volumes/ (igual que Databricks)
print("\ndbutils.fs.ls('/Volumes/analytics/bronze/raw_files/'):")
for fi in dbutils.fs.ls(f"{vol_base}/"):
    print(f"  path={fi.path}  name={fi.name}  size={fi.size}")

# head() funciona directamente con paths /Volumes/
print(f"\ndbutils.fs.head({vol_base}/config.json):")
print(dbutils.fs.head(f"{vol_base}/config.json"))

# Leer CSV desde /Volumes/ con Spark
print("\nLeer CSV desde /Volumes/ con Spark:")
vol_path = uc.volume_path("analytics", "bronze", "raw_files", "data.csv")
spark.read.csv(vol_path, header=True, inferSchema=True).show()

# DESCRIBE VOLUME
print("\nDESCRIBE VOLUME:")
uc.sql("DESCRIBE VOLUME analytics.bronze.raw_files").show()

=== UNITY CATALOG — Volumes y /Volumes/ paths ===

Volumes en analytics.bronze:
+------------+-----------+-----------+-----------+--------------------+
|catalog_name|schema_name|       name|volume_type|    storage_location|
+------------+-----------+-----------+-----------+--------------------+
|   analytics|     bronze|  raw_files|    MANAGED|/home/omar/Docume...|
|   analytics|     bronze|checkpoints|    MANAGED|/home/omar/Docume...|
+------------+-----------+-----------+-----------+--------------------+


dbutils.fs.ls('/Volumes/analytics/bronze/raw_files/'):
  path=/Volumes/analytics/bronze/raw_files/config.json  name=config.json  size=46
  path=/Volumes/analytics/bronze/raw_files/data.csv  name=data.csv  size=52
  path=/Volumes/analytics/bronze/raw_files/subdir/  name=subdir/  size=0

dbutils.fs.head(/Volumes/analytics/bronze/raw_files/config.json):
{"source": "api", "version": 1, "env": "prod"}

Leer CSV desde /Volumes/ con Spark:
+---+------+-----+
| id|  name|value|
+---+------

In [87]:
# ── UC: DBFS paths ───────────────────────────────────────────────────────────
print("=== UNITY CATALOG — DBFS paths (dbfs:/) ===\n")

# Escribir y leer en DBFS
dbutils.fs.put("dbfs:/tmp/uc_demo/hello.txt",  "Hello from DBFS!", overwrite=True)
dbutils.fs.put("dbfs:/tmp/uc_demo/config.json", '{"key": "value"}', overwrite=True)

print("dbutils.fs.ls('dbfs:/tmp/uc_demo/'):")
for fi in dbutils.fs.ls("dbfs:/tmp/uc_demo/"):
    print(f"  path={fi.path}  name={fi.name}")

print(f"\ndbutils.fs.head('dbfs:/tmp/uc_demo/hello.txt'):")
print(dbutils.fs.head("dbfs:/tmp/uc_demo/hello.txt"))

# Copiar entre rutas DBFS
dbutils.fs.cp("dbfs:/tmp/uc_demo/hello.txt", "dbfs:/tmp/uc_demo/hello_copy.txt")
print("\nTras fs.cp:")
for fi in dbutils.fs.ls("dbfs:/tmp/uc_demo/"):
    print(f"  {fi.name}")

# Limpiar
dbutils.fs.rm("dbfs:/tmp/uc_demo", recurse=True)
print("\ndbfs:/tmp/uc_demo eliminado ✓")

=== UNITY CATALOG — DBFS paths (dbfs:/) ===

dbutils.fs.ls('dbfs:/tmp/uc_demo/'):
  path=dbfs:/tmp/uc_demo/config.json  name=config.json
  path=dbfs:/tmp/uc_demo/hello.txt  name=hello.txt

dbutils.fs.head('dbfs:/tmp/uc_demo/hello.txt'):
Hello from DBFS!

Tras fs.cp:
  config.json
  hello.txt
  hello_copy.txt

dbfs:/tmp/uc_demo eliminado ✓


In [88]:
# ── UC: Tags ─────────────────────────────────────────────────────────────
print("=== UNITY CATALOG — Tags ===\n")

# SET TAGS en tabla y volume
uc.sql("ALTER TABLE main.events_demo.events  SET TAGS ('env'='prod', 'team'='data-eng', 'pii'='false')")
uc.sql("ALTER VOLUME analytics.bronze.raw_files SET TAGS ('source'='api', 'format'='json')")

# COMMENT ON almacenado como tag _comment
uc.sql("COMMENT ON TABLE main.events_demo.events IS 'Tabla de eventos de usuario'")

# SHOW TAGS
print("Tags de main.events_demo.events:")
uc.sql("SHOW TAGS ON TABLE main.events_demo.events").show()

print("Tags de analytics.bronze.raw_files (Volume):")
uc.sql("SHOW TAGS ON VOLUME analytics.bronze.raw_files").show()

# GET tags via Python API
tags = uc.get_tags("main.events_demo.events")
print(f"Python API get_tags: {tags}")

# UNSET TAGS
uc.sql("ALTER TABLE main.events_demo.events UNSET TAGS ('pii')")
print("\nTras UNSET TAGS ('pii'):")
print(uc.get_tags("main.events_demo.events"))

=== UNITY CATALOG — Tags ===

[Unity] COMMENT ON TABLE main.events_demo.events: Tabla de eventos de usuario
Tags de main.events_demo.events:
+--------+--------------------+
|tag_name|           tag_value|
+--------+--------------------+
|     env|                prod|
|    team|            data-eng|
|_comment|Tabla de eventos ...|
|     pii|               false|
+--------+--------------------+

Tags de analytics.bronze.raw_files (Volume):
+--------+---------+
|tag_name|tag_value|
+--------+---------+
|  source|      api|
|  format|     json|
+--------+---------+

Python API get_tags: {'env': 'prod', 'team': 'data-eng', '_comment': 'Tabla de eventos de usuario', 'pii': 'false'}

Tras UNSET TAGS ('pii'):
{'env': 'prod', 'team': 'data-eng', '_comment': 'Tabla de eventos de usuario'}


In [89]:
# ── UC: GRANT / REVOKE / SHOW GRANTS ────────────────────────────────────────
print("=== UNITY CATALOG — Permisos ===\n")

# GRANTs en distintos tipos de objetos (incluyendo METASTORE, nuevos tipos)
grants_sql = [
    "GRANT CREATE CATALOG ON METASTORE TO data_engineers",
    "GRANT USE CATALOG ON CATALOG analytics TO analysts",
    "GRANT USE SCHEMA ON SCHEMA analytics.bronze TO analysts",
    "GRANT SELECT ON TABLE main.events_demo.events TO analyst@co.com",
    "GRANT READ VOLUME ON VOLUME analytics.bronze.raw_files TO analyst@co.com",
    "GRANT EXECUTE ON MODEL analytics.ml.churn_model TO ml_team",
]

for sql in grants_sql:
    uc.sql(sql)
    print(f"  ✓ {sql}")

# SHOW GRANTS — todos
print("\nSHOW GRANTS (todos):")
uc.sql("SHOW GRANTS").show()

# SHOW GRANTS — filtrado por objeto
print("SHOW GRANTS ON TABLE main.events_demo.events:")
uc.sql("SHOW GRANTS ON TABLE main.events_demo.events").show()

# REVOKE
uc.sql("REVOKE SELECT ON TABLE main.events_demo.events FROM analyst@co.com")
print("Tras REVOKE SELECT:")
uc.sql("SHOW GRANTS ON TABLE main.events_demo.events").show()

# DENY
uc.sql("DENY INSERT ON TABLE main.events_demo.events TO viewer_role")
print("Tras DENY INSERT:")
uc.sql("SHOW GRANTS ON TABLE main.events_demo.events").show()

# Audit log
print("Audit log:")
uc.audit_log().show(5, truncate=False)

=== UNITY CATALOG — Permisos ===

[Unity] GRANT CREATE CATALOG ON METASTORE  TO data_engineers
  ✓ GRANT CREATE CATALOG ON METASTORE TO data_engineers
[Unity] GRANT USE CATALOG ON CATALOG analytics TO analysts
  ✓ GRANT USE CATALOG ON CATALOG analytics TO analysts
[Unity] GRANT USE SCHEMA ON SCHEMA analytics.bronze TO analysts
  ✓ GRANT USE SCHEMA ON SCHEMA analytics.bronze TO analysts
[Unity] GRANT SELECT ON TABLE main.events_demo.events TO analyst@co.com
  ✓ GRANT SELECT ON TABLE main.events_demo.events TO analyst@co.com
[Unity] GRANT READ VOLUME ON VOLUME analytics.bronze.raw_files TO analyst@co.com
  ✓ GRANT READ VOLUME ON VOLUME analytics.bronze.raw_files TO analyst@co.com
[Unity] GRANT EXECUTE ON MODEL analytics.ml.churn_model TO ml_team
  ✓ GRANT EXECUTE ON MODEL analytics.ml.churn_model TO ml_team

SHOW GRANTS (todos):
+--------------+--------------+-----------+--------------------+
|     principal|     privilege|object_type|          object_key|
+--------------+--------------+

In [90]:
# ── UC: information_schema ───────────────────────────────────────────────────
print("=== UNITY CATALOG — information_schema ===\n")

print("Catálogos:")
uc.information_schema.catalogs().show()

print("Schemas en 'analytics':")
uc.information_schema.schemata("analytics").show()

print("Tablas en 'main':")
uc.information_schema.tables("main").show()

print("Volumes en 'analytics':")
uc.information_schema.volumes("analytics").show()

print("Table Privileges:")
uc.information_schema.table_privileges().show()

print("Routines (funciones):")
uc.information_schema.routines().show()

=== UNITY CATALOG — information_schema ===

Catálogos:
+--------------+-------+
|  catalog_name|comment|
+--------------+-------+
|     analytics|       |
|hive_metastore|       |
|          main|       |
|       staging|       |
+--------------+-------+

Schemas en 'analytics':
+------------+-----------+-------+
|catalog_name|schema_name|comment|
+------------+-----------+-------+
+------------+-----------+-------+

Tablas en 'main':
+-------------+------------+--------------------+----------+
|table_catalog|table_schema|          table_name|table_type|
+-------------+------------+--------------------+----------+
|         main|     default|category_summary_...|   MANAGED|
|         main|     default|     products_silver|   MANAGED|
+-------------+------------+--------------------+----------+

Volumes en 'analytics':
+--------------+-------------+-----------+-----------+--------------------+
|volume_catalog|volume_schema|volume_name|volume_type|    storage_location|
+--------------+--

## UC: Funciones, Grupos, Lineage, UNDROP

In [91]:
# ── UC: Funciones ─────────────────────────────────────────────────────────────
print("=== UNITY CATALOG — Funciones ===\n")

# Crear funciones via API
uc.create_function("main", "default", "clean_text",
                   definition="TRIM(LOWER(input))",
                   description="Limpia texto: trim + lowercase",
                   if_not_exists=True)
uc.create_function("main", "default", "calc_tax",
                   definition="price * 0.21",
                   description="Calcula IVA 21%",
                   if_not_exists=True)

# Listar funciones
funcs = uc.list_functions("main", "default")
print("Funciones en main.default:")
for f in funcs:
    print(f"  • {f.catalog_name}.{f.schema_name}.{f.name} — {f.description}")

# Describe función
print("\nDESCRIBE FUNCTION main.default.clean_text:")
uc.describe_function_sql("main.default.clean_text").show()

# Drop función
uc.drop_function("main", "default", "calc_tax")
print(f"\nTras DROP calc_tax: {[f.name for f in uc.list_functions('main', 'default')]}")

# ── UC: Grupos ────────────────────────────────────────────────────────────────
print("\n=== UNITY CATALOG — Grupos ===\n")

uc.sql("CREATE GROUP IF NOT EXISTS data_engineers")
uc.sql("CREATE GROUP IF NOT EXISTS analysts")
uc.sql("ALTER GROUP data_engineers ADD USER alice@company.com")
uc.sql("ALTER GROUP data_engineers ADD USER bob@company.com")
uc.sql("ALTER GROUP analysts ADD USER carol@company.com")

print("SHOW GROUPS:")
uc.sql("SHOW GROUPS").show()

groups = uc.list_groups()
for g in groups:
    print(f"  Grupo '{g.name}': {g.members}")

# Remove member
uc.sql("ALTER GROUP data_engineers REMOVE USER bob@company.com")
print("\nTras REMOVE bob de data_engineers:")
for g in uc.list_groups():
    if g.name == "data_engineers":
        print(f"  {g.name}: {g.members}")

# ── UC: UNDROP TABLE ─────────────────────────────────────────────────────────
print("\n=== UNITY CATALOG — UNDROP TABLE ===\n")

# Simular drop y undrop
uc.track_drop_table("main.sales.old_report")
uc.track_drop_table("main.sales.temp_staging")
print("Tablas dropped registradas:")
uc.sql("SHOW TABLES DROPPED").show()

uc.sql("UNDROP TABLE old_report")
print("Tras UNDROP old_report:")
uc.sql("SHOW TABLES DROPPED").show()

# ── UC: Lineage ──────────────────────────────────────────────────────────────
print("\n=== UNITY CATALOG — Lineage ===\n")

uc.track_lineage("main.bronze.raw_products", "main.silver.products_silver", "TABLE")
uc.track_lineage("main.silver.products_silver", "main.gold.category_summary", "TABLE")
uc.track_lineage("main.bronze.raw_events", "main.silver.events_clean", "TABLE")

print("Lineage completo:")
uc.lineage_as_dataframe().show()

print("Lineage filtrado por 'main.silver.products_silver':")
lineage = uc.get_lineage("main.silver.products_silver")
for l in lineage:
    print(f"  {l.source} → {l.target} ({l.lineage_type})")

=== UNITY CATALOG — Funciones ===

[Unity] Función 'main.default.calc_tax' registrada.
Funciones en main.default:
  • main.default.clean_text — Limpia texto: trim + lowercase
  • main.default.calc_tax — Calcula IVA 21%

DESCRIBE FUNCTION main.default.clean_text:
+-----------+--------------------+
|  info_name|          info_value|
+-----------+--------------------+
|   Function|main.default.clea...|
|       Type|              SCALAR|
|Description|Limpia texto: tri...|
| Definition|  TRIM(LOWER(input))|
+-----------+--------------------+

[Unity] Función 'main.default.calc_tax' eliminada.

Tras DROP calc_tax: ['clean_text']

=== UNITY CATALOG — Grupos ===

[Unity] 'alice@company.com' añadido al grupo 'data_engineers'.
[Unity] 'bob@company.com' añadido al grupo 'data_engineers'.
[Unity] 'carol@company.com' añadido al grupo 'analysts'.
SHOW GROUPS:
+--------------+
|          name|
+--------------+
|      analysts|
|data_engineers|
+--------------+

  Grupo 'analysts': ['carol@company.com

In [92]:
# ── UC: No-ops (Delta Sharing, External Locations, Credentials, etc.) ─────
print("=== UNITY CATALOG — Comandos no-op (infraestructura cloud) ===\n")

noop_commands = [
    "CREATE SHARE data_share",
    "CREATE RECIPIENT external_partner",
    "CREATE EXTERNAL LOCATION s3_location",
    "CREATE STORAGE CREDENTIAL s3_cred",
    "CREATE SERVICE CREDENTIAL azure_cred",
    "CREATE CONNECTION pg_conn",
    "CREATE CLEAN ROOM collab_room",
    "CREATE MATERIALIZED VIEW mv_test AS SELECT 1",
    "CREATE STREAMING TABLE st_test AS SELECT 1",
    "CREATE SERVER my_server",
    "SHOW SHARES",
    "SHOW EXTERNAL LOCATIONS",
    "SHOW STORAGE CREDENTIALS",
    "SHOW SERVICE CREDENTIALS",
    "SHOW CONNECTIONS",
    "SHOW CLEAN ROOMS",
    "SYNC SCHEMA my_schema",
    "MSCK REPAIR PRIVILEGES",
    "REFRESH FOREIGN CATALOG ext_cat",
]

for cmd in noop_commands:
    result = uc.sql(cmd)
    print(f"  ✓ {cmd}")

print(f"\n✅ Todos los {len(noop_commands)} no-op completados sin error")

=== UNITY CATALOG — Comandos no-op (infraestructura cloud) ===

[Unity] CREATE SHARE — no-op locally
  ✓ CREATE SHARE data_share
[Unity] CREATE RECIPIENT — no-op locally
  ✓ CREATE RECIPIENT external_partner
[Unity] CREATE EXTERNAL LOCATION — no-op locally
  ✓ CREATE EXTERNAL LOCATION s3_location
[Unity] CREATE STORAGE CREDENTIAL — no-op locally
  ✓ CREATE STORAGE CREDENTIAL s3_cred
[Unity] CREATE SERVICE CREDENTIAL — no-op locally
  ✓ CREATE SERVICE CREDENTIAL azure_cred
[Unity] CREATE CONNECTION — no-op locally
  ✓ CREATE CONNECTION pg_conn
[Unity] CREATE CLEAN ROOM — no-op locally
  ✓ CREATE CLEAN ROOM collab_room
[Unity] CREATE MATERIALIZED VIEW — no-op locally
  ✓ CREATE MATERIALIZED VIEW mv_test AS SELECT 1
[Unity] CREATE STREAMING TABLE — no-op locally
  ✓ CREATE STREAMING TABLE st_test AS SELECT 1
[Unity] CREATE SERVER — no-op locally
  ✓ CREATE SERVER my_server
[Unity] SHOW SHARES — no-op locally
  ✓ SHOW SHARES
[Unity] SHOW EXTERNAL LOCATIONS — no-op locally
  ✓ SHOW EXTERNAL