# Integración de Datos Batch

Este cuaderno implementa el Motor de Ingesta Batch.

Realiza una lectura incremental desde Landing con Databricks Auto Loader. Además, archiva ficheros procesados de Landing a Raw.\ 
Es capaz de detectar todos los formatos de ficheros (csv, avro, parquet...).



Primero, vamos a definir las variables base de los path de las zonas involucradas en la ingesta.

In [0]:
import time, datetime, random

account = spark.conf.get("adls.account.name")

organization = 'FarmIA' #Carpta raíz en la que quedarán los ficheros
landing_container = f"abfss://landing-tarea@{account}.dfs.core.windows.net"
lakehouse_container = f"abfss://lakehouse-tarea@{account}.dfs.core.windows.net"

landing_path = landing_container
raw_path = f"{lakehouse_container}/raw"
bronze_path = f"{lakehouse_container}/bronze"

dbutils.fs.mkdirs(raw_path)
dbutils.fs.mkdirs(bronze_path)


Reutilizamos de los notebooks del módulo la función *land_file* que propone una convención de nombre con año, mes y día a cada archivo.

In [0]:
def land_file(df,datasource,dataset,format='json'):
  """
    Guarda un DataFrame en un sistema de archivos distribuido con una estructura de directorios basada en la fecha actual,
    utilizando un formato específico (por defecto, JSON). La función escribe el DataFrame en una ubicación temporal,
    lo mueve a una ruta final organizada por fuente de datos, conjunto de datos y marca de tiempo, y luego elimina el
    directorio temporal.

    Parámetros:
        df (pyspark.sql.DataFrame): El DataFrame de Spark que se desea guardar.
        datasource (str): Nombre o identificador de la fuente de datos, usado para organizar la ruta final.
        dataset (str): Nombre o identificador del conjunto de datos, usado para organizar la ruta final.
        format (str, opcional): Formato en el que se guardará el archivo. Por defecto es 'json'. 
                                Otros formatos soportados dependen de Spark (e.g., 'parquet', 'csv').

    Comportamiento:
        1. Escribe el DataFrame en una carpeta temporal (`tmp_path`) usando el formato especificado, coalesciendo los datos en un solo archivo.
        2. Genera una ruta final basada en la fecha actual (`YYYY/MM/DD`), el nombre de la fuente de datos, el conjunto de datos y una marca de tiempo.
        3. Mueve el archivo generado desde la carpeta temporal a la ruta final.
        4. Imprime la ruta final del archivo guardado.
        5. Elimina la carpeta temporal.

    Variables externas utilizadas:
        - landing_path (str): Ruta base del sistema de archivos donde se almacenan los datos. Debe estar definida globalmente.
        - dbutils.fs: Utilidad de Databricks para manipular el sistema de archivos (ls, mv, rm).
        - datetime: Módulo de Python para manejar fechas y marcas de tiempo.

    Ejemplo:
        save_file(mi_dataframe, "ventas", "diarias", format="parquet")
        # Salida esperada: "dbfs:/landing/ventas/diarias/2025/03/14/ventas_diarias_20250314123045.parquet"

    Notas:
        - La función asume que está ejecutándose en un entorno compatible con Databricks (por el uso de `dbutils.fs`).
        - Si el formato especificado no es compatible con Spark, se generará un error.
    """
  tmp_path = f'{landing_path}/tmp/'
  df.coalesce(1).write.format(format).mode("overwrite").save(tmp_path)
  now = datetime.datetime.utcnow()
  date_path = now.strftime("%Y/%m/%d")
  timestamp = now.strftime("%Y%m%d%H%M%S") 
  for file in dbutils.fs.ls(tmp_path):
    if file.name.endswith(f'.{format}'):
      final_path = file.path.replace('tmp',f'{datasource}/{dataset}')
      final_path = final_path.replace(file.name, f'{date_path}/{datasource}-{dataset}-{timestamp}.{format}')
      dbutils.fs.mv(file.path, final_path)
      print(final_path)
  dbutils.fs.rm(tmp_path, True)
  

Reutilizaremos la función *LandingStreamReader* que admite múltiples formatos de ficheros, aplicando Auto Loader para leer de forma incremental.

In [0]:
from pyspark.sql.functions import current_timestamp, input_file_name, replace,lit

class LandingStreamReader:

    def __init__(self, builder):
        self.datasource = builder.datasource
        self.dataset = builder.dataset
        self.landing_path = builder.landing_path
        self.raw_path = builder.raw_path
        self.bronze_path = builder.bronze_path
        self.format = builder.format
        self.dataset_landing_path = f'{self.landing_path}/{self.datasource}/{self.dataset}'
        self.dataset_bronze_schema_location = f'{self.bronze_path}/{self.datasource}/{self.dataset}_schema'
        dbutils.fs.mkdirs(self.dataset_bronze_schema_location)
    
    def __str__(self):
        return (f"LandingStreamReader(datasource='{self.datasource}',dataset='{self.dataset}')")
        
    def add_metadata_columns(self,df):
      data_cols = df.columns
      
      metadata_cols = ['_ingested_at','_ingested_filename']

      df = (df.withColumn("_ingested_at",current_timestamp())
              .withColumn("_ingested_filename",replace(input_file_name(),lit(self.landing_path),lit(self.raw_path)))
      ) 
      
      #reordernamos columnas
      return df.select(metadata_cols + data_cols)  
    
    def read_json(self):
      return (spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .option("cloudFiles.schemaLocation", self.dataset_bronze_schema_location)
            .load(self.dataset_landing_path)
      )

    def read_csv(self):
        return (spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "csv")
            .option("cloudFiles.inferColumnTypes", "true")
            .option("cloudFiles.schemaLocation", self.dataset_bronze_schema_location)
            .option("header","true")
            .option("delimiter",",")
            .load(self.dataset_landing_path)
      )
    
    def read_parquet(self):
       return (spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "parquet")
            .option("cloudFiles.schemaLocation", self.dataset_bronze_schema_location)
            .load(self.dataset_landing_path)
      ) 
    
    def read_avro(self):
        return (spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "avro")
            .option("cloudFiles.schemaLocation", self.dataset_bronze_schema_location)
            .load(self.dataset_landing_path)
      )
    
    def read_binaryfile(self):
        return (spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "binaryFile")
            .option("cloudFiles.schemaLocation", self.dataset_bronze_schema_location)
            .load(self.dataset_landing_path)
      )
    
    def read(self):
      df = None

      if (self.format == "json"):
        df = self.read_json()
      elif (self.format == "csv"):
        df = self.read_csv()
      elif (self.format == "parquet"):
        df = self.read_parquet()
      elif (self.format == "avro"):
        df = self.read_avro()
      elif self.format in ("binaryFile","binary"):
        df = self.read_binaryfile()
      else:
        raise Exception(f"Format {self.format} not supported")

      if df is not None:
        df = df.transform(self.add_metadata_columns)
      return df
    
    class Builder:
        def __init__(self):
            self.datasource = None
            self.dataset = None
            self.landing_path = None
            self.raw_path = None
            self.bronze_path = None
            self.format = None
        
        def set_datasource(self, datasource):
            self.datasource = datasource
            return self
        
        def set_dataset(self, dataset):
            self.dataset = dataset
            return self
        
        def set_landing_path(self, landing_path):
            self.landing_path = landing_path
            return self
        
        def set_raw_path(self, raw_path):
            self.raw_path = raw_path
            return self
        
        def set_bronze_path(self, bronze_path):
            self.bronze_path = bronze_path
            return self
          
        def set_format(self, format):
            self.format = format
            return self
        
        def build(self):
            return LandingStreamReader(self)

Reutilizamos también la función *BronzeStreamWriter* que nos permite salvar el dataset en la capa Bronze.

In [0]:
from pyspark.sql.functions import col, to_date, current_timestamp

class BronzeStreamWriter:
    def __init__(self, builder):
        self.datasource = builder.datasource
        self.dataset = builder.dataset
        self.landing_path = builder.landing_path
        self.raw_path = builder.raw_path
        self.bronze_path = builder.bronze_path
        self.dataset_landing_path = f"{self.landing_path}/{self.datasource}/{self.dataset}"
        self.dataset_raw_path =  f"{self.raw_path}/{self.datasource}/{self.dataset}"
        self.dataset_bronze_path = f"{self.bronze_path}/{self.datasource}/{self.dataset}"
        self.dataset_checkpoint_location = f'{self.dataset_bronze_path}_checkpoint'
        self.table = f'hive_metastore.bronze.{self.datasource}_{self.dataset}'
        self.query_name = f"bronze-{self.datasource}-{self.dataset}"
        dbutils.fs.mkdirs(self.dataset_raw_path)
        dbutils.fs.mkdirs(self.dataset_bronze_path)
        dbutils.fs.mkdirs(self.dataset_checkpoint_location)

    def __str__(self):
        return (f"BronzeStreamWriter(datasource='{self.datasource}',dataset='{self.dataset}')")
         
    def archive_raw_files(self,df):
      if "_ingested_filename" in df.columns:
        files = [row["_ingested_filename"] for row in df.select("_ingested_filename").distinct().collect()]
        for file in files:
          if file:
              file_landing_path = file.replace(self.dataset_raw_path,self.dataset_landing_path)
              dbutils.fs.mkdirs(file[0:file.rfind('/')+1])
              dbutils.fs.mv(file_landing_path,file)
    
    def write_data(self,df):
      spark.sql( 'CREATE DATABASE IF NOT EXISTS hive_metastore.bronze') 
      spark.sql(f"CREATE TABLE IF NOT EXISTS {self.table} USING DELTA LOCATION '{self.dataset_bronze_path}' ") 
      (df.write
          .format("delta")  
          .mode("append")
          .option("mergeSchema", "true")
          .option("path", self.dataset_bronze_path)
          .saveAsTable(self.table)
      )
        
    def append_2_bronze(self,batch_df, batch_id):
      batch_df.persist()
      self.write_data(batch_df)
      self.archive_raw_files(batch_df)
      batch_df.unpersist()
      

    class Builder:
        def __init__(self):
            self.datasource = None
            self.dataset = None
            self.landing_path = None
            self.raw_path = None
            self.bronze_path = None
        
        def set_datasource(self, datasource):
            self.datasource = datasource
            return self
        
        def set_dataset(self, dataset):
            self.dataset = dataset
            return self
        
        def set_landing_path(self, landing_path):
            self.landing_path = landing_path
            return self
        
        def set_raw_path(self, raw_path):
            self.raw_path = raw_path
            return self
        
        def set_bronze_path(self, bronze_path):
            self.bronze_path = bronze_path
            return self
        
        def build(self):
            return BronzeStreamWriter(self)

A continuación generamos los datos de ejemplo en la capa landing.\ Utilizaremos el dataset propio de Databricks *retail-org/sales_orders*.

In [0]:
#Dataset de ventas de clientes 
datasource="retail-org"
dataset="sales_orders"

dataset_bronze_path = f"{bronze_path}/{datasource}/{dataset}"

df_sales = spark.read.json(f"/databricks-datasets/{datasource}/{dataset}/")
#df_sales.printSchema()


file_sales_1 = df_sales.where("customer_id <= 10000000").coalesce(1)
file_sales_2 = df_sales.where("customer_id between 10000000 and 20000000").coalesce(1)
file_sales_3 = df_sales.where("customer_id > 20000000").coalesce(1)

land_file(file_sales_1,'FarmIA','sales')
time.sleep(5)
land_file(file_sales_2,'FarmIA','sales')
time.sleep(5)
land_file(file_sales_3,'FarmIA','sales')
time.sleep(5)


In [0]:
display(dbutils.fs.ls(f"{landing_path}/FarmIA/sales/"))

Utilizaremos ahora Auto Loader, que detectará automáticamente los ficheros generados enteriormente y los moverá a la capa Bronze con metadatos y evolución del esquema.

In [0]:
datasource="FarmIA"
dataset = "sales"

dataset_bronze_path = f"{bronze_path}/{datasource}/{dataset}"

reader = (LandingStreamReader.Builder()
  .set_datasource(datasource)
  .set_dataset(dataset)
  .set_landing_path(landing_path)
  .set_raw_path(raw_path)
  .set_bronze_path(bronze_path)
  .set_format("json")   # o csv/parquet/avro/binaryFile
  .build()
)

writer = (BronzeStreamWriter.Builder()
  .set_datasource(datasource)
  .set_dataset(dataset)
  .set_landing_path(landing_path)
  .set_raw_path(raw_path)
  .set_bronze_path(bronze_path)
  .build()
)

(reader.read()
 .writeStream
 .foreachBatch(writer.append_2_bronze)
 .trigger(availableNow = True)
 .option("checkpointLocation", writer.dataset_checkpoint_location)
 .queryName(writer.query_name)
 .start()
 .awaitTermination()
)

