In [2]:
import os, requests
from dotenv import load_dotenv

# Carga .env para esta ejecución
load_dotenv()

HOST = (os.getenv("DATABRICKS_HOST") or "").rstrip("/")
TOKEN = os.getenv("DATABRICKS_TOKEN") or ""

ORG_ID = "4044162781254195"  # opcional; si no te funciona, quítalo

r = requests.get(
    f"{HOST}/api/2.0/clusters/spark-versions",
    headers={"Authorization": f"Bearer {TOKEN}", "X-Databricks-Org-Id": ORG_ID},
    timeout=10,
)

print("HOST:", HOST)
print("STATUS:", r.status_code, "| OK?:", r.ok)
print(r.text[:500])


HOST: https://dbc-54859cf0-9d64.cloud.databricks.com
STATUS: 200 | OK?: True
{"versions":[{"key":"16.3.x-photon-scala2.12","name":"16.3 Photon (includes Apache Spark 3.5.2, Scala 2.12)"},{"key":"12.2.x-scala2.12","name":"12.2 LTS (includes Apache Spark 3.3.2, Scala 2.12)"},{"key":"11.3.x-photon-scala2.12","name":"11.3 LTS Photon (includes Apache Spark 3.3.0, Scala 2.12)"},{"key":"16.4.x-cpu-ml-scala2.12","name":"16.4 LTS ML (includes Apache Spark 3.5.2, Scala 2.12)"},{"key":"17.2.x-scala2.13","name":"17.2 Beta (includes Apache Spark 4.0.0, Scala 2.13)"},{"key":"17.1.x-cp


In [1]:
# scripts/dbx_run_now_b64.py
import os, base64, requests
from dotenv import load_dotenv

load_dotenv()
HOST  = (os.getenv("DATABRICKS_HOST") or "").rstrip("/")
TOKEN = os.getenv("DATABRICKS_TOKEN") or ""
JOBID = int(os.getenv("DATABRICKS_JOB_ID_AUDIT", "0"))

csv = "tx_id,date,account,debit,credit,desc\n1,2025-01-01,430,100,0,venta\n1,2025-01-01,700,0,100,venta contrapartida\n2,2025-13-05,430,50,0,fecha invalida\n"
b64  = base64.b64encode(csv.encode("utf-8")).decode("utf-8")

r = requests.post(
    f"{HOST}/api/2.2/jobs/run-now",
    headers={"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"},
    json={"job_id": JOBID, "notebook_params": {"csv_b64": b64, "file_name": "mini.csv"}},
    timeout=30,
)
r.raise_for_status()
print("RUN:", r.json())


HTTPError: 400 Client Error: Bad Request for url: https://dbc-54859cf0-9d64.cloud.databricks.com/api/2.2/jobs/run-now

In [1]:
# ============================================================
# Notebook: bot_audit_xlsx_b64 (solo Excel, versión robusta)
# Añadido: validación regex para fechas -> evita errores
# ============================================================

try:
    dbutils
except NameError:
    from pyspark.dbutils import DBUtils
    dbutils = DBUtils(spark)

import base64, io, json
import pandas as pd
from pyspark.sql import types as T, functions as F

# --- DEMO Excel si no hay file_b64 -------------------------------------------
file_b64 = (dbutils.widgets.get("file_b64") or "").strip()
if not file_b64:
    demo_df = pd.DataFrame({
        "tx_id":   ["1","1","2","3"],
        "date":    ["2025-01-02","2025-01-02","2025-13-05","2025-02-10"],
        "account": ["430","700","430","430"],
        "debit":   [100, 0, 50, 20],
        "credit":  [0, 100, 0, 0],
        "desc":    ["Venta A","Venta A contrapartida","Fecha inválida","Desbalance"],
    })
    buf = io.BytesIO()
    with pd.ExcelWriter(buf, engine="xlsxwriter") as writer:
        demo_df.to_excel(writer, index=False, sheet_name="Sheet1")
    file_b64 = base64.b64encode(buf.getvalue()).decode("utf-8")

raw = base64.b64decode(file_b64.encode("utf-8"))
pdf = pd.read_excel(io.BytesIO(raw), sheet_name=0, dtype=str)

# --- Normalización -----------------------------------------------------------
schema = T.StructType([
    T.StructField("tx_id",   T.StringType(), True),
    T.StructField("date",    T.StringType(), True),
    T.StructField("account", T.StringType(), True),
    T.StructField("debit",   T.DoubleType(), True),
    T.StructField("credit",  T.DoubleType(), True),
    T.StructField("desc",    T.StringType(), True),
])
df = spark.createDataFrame(pdf, schema=schema)

# --- Validación de fechas ----------------------------------------------------
df1 = df.withColumn(
    "is_valid_pattern",
    F.col("date").rlike(r"^\d{4}-(0[1-9]|1[0-2])-(0[1-9]|[12]\d|3[01])$")
)
# Solo parseamos si cumple el patrón
df1 = df1.withColumn(
    "date_parsed",
    F.when(F.col("is_valid_pattern"), F.to_date("date", "yyyy-MM-dd"))
)

# --- Auditoría ---------------------------------------------------------------
invalid_date   = df1.filter(~F.col("is_valid_pattern") & F.col("date").isNotNull())
dups           = df1.groupBy("tx_id").count().filter(F.col("count") > 1)
agg            = df1.groupBy("tx_id").agg(F.sum("debit").alias("sum_debit"), F.sum("credit").alias("sum_credit"))
unbalanced     = agg.withColumn("diff", F.col("sum_debit") - F.col("sum_credit")).filter(F.abs(F.col("diff")) > 1e-6)
required_nulls = df1.filter(F.col("tx_id").isNull() | F.col("account").isNull() | F.col("date").isNull())

summary = {
    "rows":            df1.count(),
    "invalid_date":    {"count": invalid_date.count()},
    "duplicates_tx":   {"count": dups.count()},
    "unbalanced_tx":   {"count": unbalanced.count()},
    "required_nulls":  {"count": required_nulls.count()},
}

# --- Salida ------------------------------------------------------------------
result = json.dumps(summary, ensure_ascii=False, indent=2)
try:
    dbutils.notebook.exit(result)   # si lo corres como Job
except Exception:
    print(result)                   # si lo corres a mano


ModuleNotFoundError: No module named 'pyspark'

In [2]:

import os, requests
from dotenv import load_dotenv
load_dotenv()
host   = (os.getenv("DATABRICKS_HOST") or "").rstrip("/")
token  = os.getenv("DATABRICKS_TOKEN")
job_id = os.getenv("DATABRICKS_JOB_ID_AUDIT")

assert host and token and job_id, "Faltan vars"
h = {"Authorization": f"Bearer {token}"}

# 1) ¿Responde el workspace?
r = requests.get(host + "/api/2.0/workspace/get-status", headers=h, params={"path":"/"}, timeout=30)
print("Workspace status:", r.status_code, r.text[:120])

# 2) ¿Existe el Job?
r = requests.get(host + "/api/2.2/jobs/get", headers=h, params={"job_id": job_id}, timeout=30)
print("Jobs.get:", r.status_code, r.text[:200])



Workspace status: 200 {"object_type":"DIRECTORY","path":"/","object_id":0,"resource_id":"0"}

Jobs.get: 200 {"job_id":949482580389040,"creator_user_name":"ja.tejeror@gmail.com","run_as_user_name":"ja.tejeror@gmail.com","run_as_owner":true,"settings":{"name":"New Job Sep 07, 2025, 03:43 AM","email_notificati


In [9]:
%pip install openpyxl


Collecting openpyxl
  Using cached openpyxl-3.1.5-py2.py3-none-any.whl.metadata (2.5 kB)
Collecting et-xmlfile (from openpyxl)
  Downloading et_xmlfile-2.0.0-py3-none-any.whl.metadata (2.7 kB)
Downloading openpyxl-3.1.5-py2.py3-none-any.whl (250 kB)
Downloading et_xmlfile-2.0.0-py3-none-any.whl (18 kB)
Installing collected packages: et-xmlfile, openpyxl
Successfully installed et-xmlfile-2.0.0 openpyxl-3.1.5
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.0.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
import pandas as pd

# Crear el DataFrame
demo = pd.DataFrame({
    "tx_id":   ["1","1","2","3","4","5","6","9","9","9"],
    "date":    ["2025-01-02","2025-01-02","2025-13-05","2025-02-10","", "2025-02-11","2025-02-11","2025-03-03","2025-03-03","2025-03-03"],
    "account": ["430","700","430","430","430","430","430","430","700","570"],
    "debit":   ["100", "0", "50", "20", "10", "€ 1.234,56", "10", "100", "0", "0"],
    "credit":  ["0", "100", "0", "0", "10", "10", "—", "0", "100", "10"],
    "desc":    ["Venta A","Venta A contrapartida","Fecha inválida","Desbalance","Falta fecha","Debit nulo","Credit nulo","Dup A","Dup B balanceado","Extra línea desbalancea"]
})

demo.to_excel("audit_demo.xlsx", index=False)

