In [0]:
%scala
val database = "bd_lab"
val container = "transacciones"
val startFrom ="Beginning"
val tableBronze = "bronze_app"
val STREAM_QUERY_NAME = s"stream_${database}_${container}_bronze"
val PATH_CHECKPOINT = "dbfs:/checkpoints/stream_bronze_app"

In [0]:
%scala
val cosmosMasterKey = dbutils.secrets.get(scope = "scope-dsrp", key = "key-cosmos")
val cosmosEndpoint = "https://dtbdsrp.documents.azure.com:443/"

In [0]:
%scala
val configCosmos: Map[String, String] = Map(
    "spark.cosmos.accountEndpoint" -> cosmosEndpoint,
    "spark.cosmos.accountKey" -> cosmosMasterKey,
    "spark.cosmos.database" -> database,
    "spark.cosmos.container" -> container,
)

In [0]:
%scala
val dfRes = spark.readStream.format("cosmos.oltp.changefeed")
      .options(configCosmos)
      .option("spark.cosmos.read.maxItemCount", 80)
      .option("spark.cosmos.read.inferSchema.enabled", "false")
      .option("spark.cosmos.changeFeed.startFrom", startFrom)
      .load()

In [0]:
%sql
DROP TABLE IF EXISTS class_dsrp_dtb.default.bronze_app;

CREATE TABLE IF NOT EXISTS bronze_app (
    entityType STRING COMMENT "entity from cosmos",
    document STRING COMMENT "json text from container",
    metadata STRUCT<
        partition: STRING,
        offset: STRING,
        sequenceNumbers: LONG,
        enqueuedTime: TIMESTAMP
    > COMMENT "metadata from cosmos",
    auditTime TIMESTAMP COMMENT "date and time of execution",
    p_auditDate DATE COMMENT "date of execution"
)
USING DELTA
PARTITIONED BY (p_auditDate)
COMMENT 'table of transactions';

In [0]:
%scala
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{ DataType }
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

In [0]:
%scala
val stream = dfRes
  .withColumn("entityType", lit(container))
  .withColumn("document", $"_rawBody")
  .withColumn("metadata", struct(
      $"id".cast("string").as("partition"),
      $"_etag".as("offset"),
      lit(0L).as("sequenceNumbers"),
      to_timestamp($"_ts").as("enqueuedTime")
  ))
  .drop("id")
  .drop("_rawBody")
  .drop("_ts")
  .drop("_etag")
  .drop("_lsn")
  .withColumn("auditTime", current_timestamp())
  .withColumn("p_auditDate", to_date(current_timestamp()))

stream.writeStream
  .partitionBy("p_auditDate")
  .format("delta")
  .trigger(Trigger.Once())
  .option("checkpointLocation", PATH_CHECKPOINT)
  .queryName(STREAM_QUERY_NAME)
  .table(tableBronze)