
# Ingesta de órdenes de FarmAhorra en Raw


## 0. Prerequisitos


In [0]:
# 0) PRE-REQUISITOS
# - PyMongo para Cosmos (API Mongo)
# - dateutil para parsear fechas ISO
%pip install --quiet "pymongo[srv]" python-dateutil

import time, uuid, datetime as dt
from dateutil import parser as dateparser
from pymongo import MongoClient

[0m[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


## 1. Definición y carga de credenciales

In [0]:
# Widgets para pegar las credenciales de Cosmos (Mongo vCore)
dbutils.widgets.text("COSMOS_URI",  "")
dbutils.widgets.text("COSMOS_USER", "")
dbutils.widgets.text("COSMOS_PASS", "")

COSMOS_URI  = dbutils.widgets.get("COSMOS_URI").strip()
COSMOS_USER = dbutils.widgets.get("COSMOS_USER").strip()
COSMOS_PASS = dbutils.widgets.get("COSMOS_PASS").strip()

assert COSMOS_URI and COSMOS_USER and COSMOS_PASS, "Faltan credenciales en los widgets."
print("Credenciales cargadas")

Credenciales cargadas


## 2. Definición del contrato operativo

In [0]:
# Descubre la ruta física del schema RAW para no hardcodear
raw_loc = (
    spark.sql("DESCRIBE SCHEMA EXTENDED farma.raw")
         .where("database_description_item = 'Location'")
         .collect()[0]["database_description_value"]
)
assert raw_loc, "No se puede resolver la ubicación de farma.raw"
farma_base = raw_loc.rsplit("/", 1)[0]

# Diccionario de configuración
cfg = {
    "db_raw":    "farma.raw",
    "db_bronze": "farma.bronze",
    "raw_path":  f"{raw_loc}/orders_farmahorra",  # JSON aterrice acá
    "checkpoints_dir": f"{farma_base}/_checkpoints/orders_farmahorra",

    # Definción de watermarks
    "wm_table":  "farma.raw.ingest_watermarks", # guarda último confirmed_at
    "wm_source": "orders_farmahorra",
    "wm_field":  "confirmed_at", # fecha de confirmacion de orden
    "wm_initial": "1970-01-01T00:00:00Z",
    "wm_delay_minutes": 120,  # tolerancia late arrivals
    "raw_partition_col": "ingest_date", # YYYY-MM-DD
    "batch_max_docs": 20000 # max docs por batch
}

cfg

{'db_raw': 'farma.raw',
 'db_bronze': 'farma.bronze',
 'raw_path': 'abfss://datalake@adlsfarmahorra.dfs.core.windows.net/farma/raw/__unitystorage/schemas/8c784cab-51b3-40d8-bc1a-0806b69779a5/orders_farmahorra',
 'checkpoints_dir': 'abfss://datalake@adlsfarmahorra.dfs.core.windows.net/farma/raw/__unitystorage/schemas/_checkpoints/orders_farmahorra',
 'wm_table': 'farma.raw.ingest_watermarks',
 'wm_source': 'orders_farmahorra',
 'wm_field': 'confirmed_at',
 'wm_initial': '1970-01-01T00:00:00Z',
 'wm_delay_minutes': 120,
 'raw_partition_col': 'ingest_date',
 'batch_max_docs': 20000}

## 3. Conectividad y testing

In [0]:
%sql
USE CATALOG farma;
USE SCHEMA raw;

In [0]:
# 3.a) Conexión a Cosmos (Mongo API)
client = MongoClient(COSMOS_URI, username=COSMOS_USER, password=COSMOS_PASS, tls=True)
mongo_db = client["fa_db"]
coll = mongo_db["orders_farmahorra"]
print("Conexión con base de datos Cosmos: exitosa, docs encontrados:", coll.estimated_document_count())

  client = MongoClient(COSMOS_URI, username=COSMOS_USER, password=COSMOS_PASS, tls=True)
  self._resolve_srv()


Conexión con base de datos Cosmos: exitosa, docs encontrados: 300


In [0]:
# 3.b) Escritura de prueba a ADLS
EXTERNAL_BASE = "abfss://datalake@adlsfarmahorra.dfs.core.windows.net/farma"

cfg["raw_path"]        = f"{EXTERNAL_BASE}/raw/orders_farmahorra"
cfg["checkpoints_dir"] = f"{EXTERNAL_BASE}/raw/_checkpoints/orders_farmahorra"
cfg["tmp_dir"]         = f"{EXTERNAL_BASE}/raw/_tmp"

# sanity check:
print(cfg["raw_path"])
print(cfg["checkpoints_dir"])
print(cfg["tmp_dir"])

# Prueba de escritura
test_path = f"{cfg['tmp_dir'].rstrip('/')}/_ping_{int(time.time())}"
spark.createDataFrame([{"ok": True, "ts": dt.datetime.now(dt.timezone.utc).isoformat()}]) \
     .write.mode("overwrite").parquet(test_path)
print("Escritura en ADLS exitosa: ", test_path)


abfss://datalake@adlsfarmahorra.dfs.core.windows.net/farma/raw/orders_farmahorra
abfss://datalake@adlsfarmahorra.dfs.core.windows.net/farma/raw/_checkpoints/orders_farmahorra
abfss://datalake@adlsfarmahorra.dfs.core.windows.net/farma/raw/_tmp
Escritura en ADLS exitosa:  abfss://datalake@adlsfarmahorra.dfs.core.windows.net/farma/raw/_tmp/_ping_1757652140


## 4. Ingesta Incremental en Raw

In [0]:
import datetime as dt
from dateutil import parser as dateparser

# 4.1) Tabla de watermark (vive como Delta gobernado por UC)
#     Guarda por cada "source" el último confirmed_at procesado.
spark.sql(f"""
  CREATE TABLE IF NOT EXISTS {cfg['wm_table']} (
    source STRING,
    last_confirmed_at TIMESTAMP -- último confirmed_at ya procesado
  ) USING DELTA
""")
exists = spark.sql(f"SELECT 1 FROM {cfg['wm_table']} WHERE source = '{cfg['wm_source']}' LIMIT 1").count() > 0
if not exists:
    spark.sql(f"""
      INSERT INTO {cfg['wm_table']}
      VALUES ('{cfg['wm_source']}', TIMESTAMP('{cfg['wm_initial']}'))
    """)

# 4.2) Lee watermark actual y aplica delay (tolerancia de llegadas tardías).
# WM efectivo = WM actual - wm_delay_minutes
last_wm = spark.sql(f"SELECT last_confirmed_at FROM {cfg['wm_table']} WHERE source = '{cfg['wm_source']}'").collect()[0][0]
wm_effective = (last_wm - dt.timedelta(minutes=cfg["wm_delay_minutes"])).strftime("%Y-%m-%dT%H:%M:%SZ")
print("WM actual:", last_wm, "| WM efectivo:", wm_effective)

# 4.3) Trae de Cosmos sólo documentos con confirmed_at > WM efectivo, ordenados ascendentemente (o sea los documentos más recientes no procesados)
q = { cfg["wm_field"]: {"$gt": wm_effective} }
cursor = coll.find(q).sort(cfg["wm_field"], 1).limit(cfg["batch_max_docs"])
docs = list(cursor)

if not docs:
    print("No hay documentos nuevos.")
else:
    # Normalización:
    # Convierte tipos no serializables: ObjectId a str, datetime a ISO-8601 (UTC),
    # y se aplica recursivamente sobre dicts.
    def norm(d):
        out = {}
        for k, v in d.items():
            if k == "_id":
                out["_id"] = str(v)
            elif isinstance(v, dt.datetime):
                out[k] = v.astimezone(dt.timezone.utc).isoformat()
            elif isinstance(v, list):
                out[k] = [norm(x) if isinstance(x, dict) else x for x in v]
            elif isinstance(v, dict):
                out[k] = norm(v)
            else:
                out[k] = v
        return out

    rows = [norm(d) for d in docs]
    df = spark.createDataFrame(rows)

    # Partición por fecha de ingesta (UTC)
    ingest_date = dt.datetime.now(dt.timezone.utc).date().isoformat()
    dest = f"{cfg['raw_path']}/{cfg['raw_partition_col']}={ingest_date}"

    # Escribe en RAW como JSON (append). En RAW se aceptan posibles duplicados
    df.write.mode("append").json(dest)
    print(f"RAW escrito: {df.count()} filas: {dest}")

    # Actualiza watermark al máximo confirmed_at observado
    def to_utc(s):
        try:
            d = dateparser.isoparse(s)
            if d.tzinfo is None:
                d = d.replace(tzinfo=dt.timezone.utc)
            return d.astimezone(dt.timezone.utc)
        except Exception:
            return None

    max_ts = None
    for d in rows:
        val = d.get(cfg["wm_field"])
        if not val: 
            continue
        ts = to_utc(val)
        if ts and (max_ts is None or ts > max_ts):
            max_ts = ts

    if max_ts:
        # Upsert del nuevo watermark
        max_iso = max_ts.strftime("%Y-%m-%d %H:%M:%S")
        spark.sql(f"""
          MERGE INTO {cfg['wm_table']} t
          USING (SELECT '{cfg['wm_source']}' AS source, TIMESTAMP('{max_iso}') AS ts) s
          ON t.source = s.source
          WHEN MATCHED THEN UPDATE SET last_confirmed_at = s.ts
          WHEN NOT MATCHED THEN INSERT (source, last_confirmed_at) VALUES (s.source, s.ts)
        """)
        print("Watermark actualizado a:", max_ts.isoformat())
    else:
        # No vino ningún confirmed_at válido: no se actualiza el watermark
        print("No encontré confirmed_at válido; watermark sin cambios.")


WM actual: 1970-01-01 00:00:00 | WM efectivo: 1969-12-31T22:00:00Z
RAW escrito: 300 filas: abfss://datalake@adlsfarmahorra.dfs.core.windows.net/farma/raw/orders_farmahorra/ingest_date=2025-09-12
Watermark actualizado a: 2025-09-11T05:47:55.788433+00:00


Revisión de algunas órdenes ingestadas y almacenadas en Raw:

In [0]:
display(spark.read.json(f"{cfg['raw_path']}/*/*").limit(5))

_id,client_id,confirmed_at,discount,discount_pct,external_order_id,farmacia_order_id,id_farmacia,items,source,subtotal,total
68c26105d0fc5a696f14dcbb,CLI-272,2025-09-11T05:41:25.425406+00:00,0.04,5.0,FAC-002-00038,a79afa84-2078-42ea-b2b5-6d6642207db8,farma_002,"List(List(NASAL SPRAY 0.05%, Oxymetazoline Hydrochloride, 0.84, 70000000101, 1, 0.84))",farmahorra,0.84,0.8
68c26106d0fc5a696f14dcbc,CLI-250,2025-09-11T05:41:26.063227+00:00,0.31,5.0,FAC-002-00039,233c864e-272b-478b-a9a6-6e61b1037b2f,farma_002,"List(List(MIDODRINE HCL 10 MG TABLET, Midodrine Hydrochloride, 6.22, 00904681907, 1, 6.22))",farmahorra,6.22,5.91
68c26106d0fc5a696f14dcbd,CLI-377,2025-09-11T05:41:26.690969+00:00,0.11,5.0,FAC-001-00040,2bd2d702-dda2-4558-9728-59ec1dbaed13,farma_001,"List(List(BACITRACIN 500 UNIT/GM OINTMNT, Bacitracin, 2.22, 68001047746, 2, 1.11))",farmahorra,2.22,2.11
68c26107d0fc5a696f14dcbe,CLI-451,2025-09-11T05:41:27.329124+00:00,46.31,5.0,FAC-002-00041,b9e223df-d809-41e1-b28f-03c0bac5c38b,farma_002,"List(List(FIORICET 50-300-40 MG CAPSULE, Butalbital, Acetaminophen, and Caffeine, 926.16, 52544008001, 1, 926.16))",farmahorra,926.16,879.85
68c26108d0fc5a696f14dcbf,CLI-782,2025-09-11T05:41:27.965556+00:00,0.13,5.0,FAC-001-00042,9b22a4b0-c703-428a-b8cc-431adfeb2ef1,farma_001,"List(List(PIOGLITAZONE HCL 15 MG TABLET, PIOGLITAZONE HYDROCHLORIDE, 2.57, 62135080430, 1, 2.57))",farmahorra,2.57,2.44
