#### 1. Init  widgets parameter

In [None]:

import org.apache.spark.sql.{SparkSession, DataFrame, Dataset, Row, Column}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import io.delta.tables._
import java.time.Year
import java.time.LocalDate
import spark.implicits._ 


//  pass the parameters
dbutils.widgets.text("environment", "dev")
dbutils.widgets.text("version", "v1_0")
dbutils.widgets.text("pipeline", "")

// dynamic paramters
val env = dbutils.widgets.get("environment")
val udm = dbutils.widgets.get("version")
val pipeline = dbutils.widgets.get("pipeline")

#### 2. WatermarkTracker definition - Option

In [None]:
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._
import java.sql.Timestamp
import java.time.Instant

case class WatermarkRecord(
    watermark_id: Option[Int] = None,
    pipeline_name: String,
    job_id: String,
    last_processed_ts: Option[Timestamp],
    current_run_ts: Option[Timestamp],
    last_processed_version: Option[Long],
    status: String = "RUNNING",
    created_at: Option[Timestamp] = Some(Timestamp.from(Instant.now())),
    updated_at: Option[Timestamp] = Some(Timestamp.from(Instant.now()))
)

object WatermarkTracker {

  val checkpointTable = "ag_ra_search_analytics_data_dev.dap_ops_v1_0.watermarks"

  /** Step 1: Start a new run for a pipeline **/
  def startPipelineRun(pipeline: String, jobId: String)(implicit spark: SparkSession): WatermarkRecord = {
    import spark.implicits._

    val lastRecordOpt = spark.table(checkpointTable)
      .filter($"pipeline_name" === pipeline)
      .orderBy(desc("created_at"))
      .limit(1)
      .as[WatermarkRecord]
      .collect()
      .headOption

    val lastProcessedTs = lastRecordOpt.flatMap(_.current_run_ts.orElse(_.last_processed_ts))
    val nowTs = Timestamp.from(Instant.now())

    // Update the current checkpoint to indicate ongoing run
    lastRecordOpt.foreach { rec =>
      spark.table(checkpointTable)
        .filter($"watermark_id" === rec.watermark_id.getOrElse(-1))
        .withColumn("current_run_ts", lit(nowTs))
        .withColumn("status", lit("RUNNING"))
        .withColumn("updated_at", current_timestamp())
        .write
        .format("delta")
        .mode("overwrite")
        .option("replaceWhere", s"watermark_id = ${rec.watermark_id.getOrElse(-1)}")
        .saveAsTable(checkpointTable)
    }

    WatermarkRecord(
      pipeline_name = pipeline,
      job_id = jobId,
      last_processed_ts = lastProcessedTs,
      current_run_ts = Some(nowTs),
      status = "RUNNING"
    )
  }

  /** Step 2: Get delta extraction window **/
  def getDeltaWindow(pipeline: String)(implicit spark: SparkSession): (Option[Timestamp], Timestamp) = {
    val lastRecordOpt = spark.table(checkpointTable)
      .filter($"pipeline_name" === pipeline)
      .orderBy(desc("created_at"))
      .limit(1)
      .collect()
      .headOption

    val startTs = lastRecordOpt.flatMap(row => Option(row.getAs[Timestamp]("last_processed_ts")))
    val endTs = Timestamp.from(Instant.now())
    (startTs, endTs)
  }

  /** Step 3: Finalize checkpoint for SUCCESS, FAILED, or SKIPPED **/
  def finalizePipelineRun(record: WatermarkRecord, finalStatus: String)(implicit spark: SparkSession): Unit = {
    import spark.implicits._

    require(Set("SUCCESS", "FAILED", "SKIPPED").contains(finalStatus.toUpperCase),
      s"Invalid status: $finalStatus. Must be SUCCESS, FAILED, or SKIPPED.")

    val nowTs = Timestamp.from(Instant.now())

    // 1. Update current record with final status
    spark.table(checkpointTable)
      .filter($"pipeline_name" === record.pipeline_name && $"job_id" === record.job_id)
      .withColumn("status", lit(finalStatus.toUpperCase))
      .withColumn("updated_at", current_timestamp())
      .write
      .format("delta")
      .mode("overwrite")
      .option("replaceWhere", s"pipeline_name = '${record.pipeline_name}' AND job_id = '${record.job_id}'")
      .saveAsTable(checkpointTable)

    // 2. Only create a new record if the run was SUCCESS
    if (finalStatus.toUpperCase == "SUCCESS") {
      val nextRecord = WatermarkRecord(
        pipeline_name = record.pipeline_name,
        job_id = java.util.UUID.randomUUID().toString,
        last_processed_ts = record.current_run_ts,
        current_run_ts = None,
        last_processed_version = record.current_run_ts.map(_.getTime),
        status = "READY"
      )

      Seq(nextRecord).toDF()
        .write
        .format("delta")
        .mode("append")
        .saveAsTable(checkpointTable)
    }
  }
}


#### 3. Test WatermarkTracker - Option

In [None]:
// Step 1: Start a new run for a pipeline

val wmRecord = WatermarkTracker.startPipelineRun("wos_pipeline", "job_123")


In [None]:
// Step 2: Get delta extraction window


val (startTsOpt, endTs) = WatermarkTracker.getDeltaWindow("wos_pipeline")

val deltaDF = spark.table("acs_source_table")
  .filter(col("updated_at") > startTsOpt.getOrElse(Timestamp.valueOf("1970-01-01 00:00:00")) &&
          col("updated_at") <= endTs)

In [None]:
// Step 3: Finalize checkpoint for SUCCESS


WatermarkTracker.finalizePipelineRun(wmRecord, finalStatus = "SUCCESS")



#### 4. WatermarkTracker definition - Navtive


In [None]:
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._
import java.sql.Timestamp
import java.time.Instant
import java.time.{Instant, LocalDate, ZoneId}


case class WatermarkRecord(
    watermark_id: String,
    pipeline_name: String,
    batch_id: String,
    last_processed_ts: Timestamp = Timestamp.valueOf("1970-01-01 00:00:00"),
    current_run_ts: Timestamp = Timestamp.from(Instant.now()),
    last_processed_version: Long = 0L,
    status: String = "RUNNING",
    created_at: Timestamp = Timestamp.from(Instant.now()),
    updated_at: Timestamp = Timestamp.from(Instant.now())
)

object WatermarkTracker {

  private def getWidget(name: String, default: String): String = {
    try {
      val value = dbutils.widgets.get(name)
      if (value == null || value.isEmpty) default else value
    } catch {
      case _: Throwable => default
    }
  }

  private def batchNumber(): String = {
    // Get current date
    val today = LocalDate.now(ZoneId.systemDefault())
    
    // Generate a unique number based on year, month, and day
    val uniqueNumber: Long = today.getYear * 10000L + today.getMonthValue * 100L + today.getDayOfMonth
    
    // Return as string
    uniqueNumber.toString
  }

  private val environment = getWidget("environment", "dev")
  private val version = getWidget("version", "v1_0")
  private val catalog = s"ag_ra_search_analytics_data_${environment}"
  private val checkpointTable = s"$catalog.dap_ops_${version}.watermarks"
  private val pipeline_name = getWidget("pipeline", "dummy")

  /** Fetch the latest checkpoint for a pipeline, or return None if not exists **/
  def getLastCheckpoint(pipelineName: String = null)(implicit spark: SparkSession): Option[WatermarkRecord] = {
    spark.table(checkpointTable)
      .filter($"pipeline_name" === pipelineName)
      .orderBy(desc("created_at"))
      .limit(1)
      .as[WatermarkRecord]
      .collect()
      .headOption
  }

  /** Start a new pipeline run; creates a checkpoint if first run **/
  def initializePipelineRun(nowTs: Timestamp = Timestamp.from(Instant.now()), pipelineName: String = null)(implicit spark: SparkSession): WatermarkRecord = {

    val curPipelineName = if (pipelineName == null) pipeline_name else pipelineName
    val lastRecordOpt = getLastCheckpoint(curPipelineName)

    val record = lastRecordOpt match {
      case Some(lastRecord) =>
        // Update current run timestamp
        spark.table(checkpointTable)
          .filter($"watermark_id" === lastRecord.watermark_id)
          .withColumn("current_run_ts", lit(nowTs))
          .withColumn("status", lit("RUNNING"))
          .withColumn("updated_at", current_timestamp())
          .write
          .format("delta")
          .mode("overwrite")
          .option("replaceWhere", s"watermark_id = '${lastRecord.watermark_id}'")
          .saveAsTable(checkpointTable)

        lastRecord.copy(current_run_ts = nowTs, status = "RUNNING", batch_id = lastRecord.batch_id)

      case None =>
        // First run, create new record
        val newRecord = WatermarkRecord(
          watermark_id = java.util.UUID.randomUUID().toString,
          pipeline_name = curPipelineName,
          batch_id = java.util.UUID.randomUUID().toString,
          last_processed_ts = Timestamp.valueOf("1970-01-01 00:00:00"),
          current_run_ts = nowTs,
          last_processed_version = 1L,
          status = "RUNNING"
        )

        Seq(newRecord).toDF()
          .write
          .format("delta")
          .mode("append")
          .saveAsTable(checkpointTable)

        newRecord
    }

    record
  }


  def initializePipelineWorkflow(
      pipelineNames: Seq[String],
      nowTs: Timestamp = Timestamp.from(Instant.now())
  )(implicit spark: SparkSession): Seq[WatermarkRecord] = {

  // Fetch last checkpoint records for all pipelines
  val lastRecords: Map[String, WatermarkRecord] = pipelineNames.flatMap { pname =>
    getLastCheckpoint(pname).map(r => pname -> r)
  }.toMap

  val batchId = "20251120" // batchNumber()
  
  val newRecords = pipelineNames.map { pname =>
    lastRecords.get(pname) match {
      case Some(lastRecord) =>
        // Update current run timestamp and status in checkpoint table
        spark.table(checkpointTable)
          .filter($"watermark_id" === lastRecord.watermark_id)
          .withColumn("current_run_ts", lit(nowTs))
          .withColumn("status", lit("RUNNING"))
          .withColumn("updated_at", current_timestamp())
          .write
          .format("delta")
          .mode("overwrite")
          .option("replaceWhere", s"watermark_id = '${lastRecord.watermark_id}'")
          .saveAsTable(checkpointTable)

        lastRecord.copy(current_run_ts = nowTs, status = "RUNNING", batch_id = lastRecord.batch_id)

      case None =>
        // First run, create new record
        val newRecord = WatermarkRecord(
          watermark_id = java.util.UUID.randomUUID().toString,
          pipeline_name = pname,
          batch_id = batchId,
          last_processed_ts = Timestamp.valueOf("1970-01-01 00:00:00"),
          current_run_ts = nowTs,
          last_processed_version = 1L,
          status = "RUNNING"
        )

        Seq(newRecord).toDF()
          .write
          .format("delta")
          .mode("append")
          .saveAsTable(checkpointTable)

        newRecord
    }
  }

  newRecords
}


  /** Get the extraction window (start/end timestamp) for a pipeline run **/
  def getExtractionWindow(pipelineName: String = null)(implicit spark: SparkSession): (Timestamp, Timestamp) = {

    val curPipelineName = if (pipelineName == null) pipeline_name else pipelineName
    getLastCheckpoint(curPipelineName) match {
      case Some(record) => (record.last_processed_ts, record.current_run_ts)
      case None => (Timestamp.valueOf("1970-01-01 00:00:00"), Timestamp.from(Instant.now()))
    }
  }

  /** Finalize a pipeline run: update status and create next checkpoint if SUCCESS **/
  def completePipelineRun( finalStatus: String = "SUCCESS", pipelineName: String = null)(implicit spark: SparkSession): Unit = {

    require(Set("SUCCESS", "FAILED", "SKIPPED").contains(finalStatus.toUpperCase),
      s"Invalid status: $finalStatus. Must be SUCCESS, FAILED, or SKIPPED.")
    
    val curPipelineName = if (pipelineName == null) pipeline_name else pipelineName
    val nowTs = Timestamp.from(Instant.now())
    val record = getLastCheckpoint(curPipelineName).getOrElse(
      throw new IllegalStateException(s"No checkpoint found for pipeline: $curPipelineName")
    )

    // 1. Update current record with final status
    spark.table(checkpointTable)
      .filter($"pipeline_name" === record.pipeline_name && $"batch_id" === record.batch_id)
      .withColumn("status", lit(finalStatus.toUpperCase))
      .withColumn("updated_at", current_timestamp())
      .write
      .format("delta")
      .mode("overwrite")
      .option("replaceWhere", s"pipeline_name = '${record.pipeline_name}' AND batch_id = '${record.batch_id}'")
      .saveAsTable(checkpointTable)

    // 2. Create new checkpoint if SUCCESS
    if (finalStatus.toUpperCase == "SUCCESS") {
      val lastVersion = spark.table(checkpointTable)
        .filter($"pipeline_name" === record.pipeline_name)
        .agg(max($"last_processed_version").alias("max_version"))
        .collect()
        .head
        .getAs[Long]("max_version")

      val nextRecord = WatermarkRecord(
        watermark_id = java.util.UUID.randomUUID().toString,
        pipeline_name = record.pipeline_name,
        batch_id = batchNumber(),
        last_processed_ts = record.current_run_ts,
        current_run_ts = null,
        last_processed_version = lastVersion + 1,
        status = "READY"
      )

      Seq(nextRecord).toDF()
        .write
        .format("delta")
        .mode("append")
        .saveAsTable(checkpointTable)
    }
  }
}


#### 5. Test WatermarkTracker - Native

In [None]:

 // Step 1: init extraction window based on checkpoint
 
 val currentRun = WatermarkTracker.initializePipelineRun() 


In [None]:
// Step 2:read Delta data


val (startTs, endTs) = WatermarkTracker.getExtractionWindow() 
// Perform delta read...

val deltaDF = spark.table("ag_content_ims_acs_dev.gold_entity.d_citation_patent")
  .filter(col("__END_AT") > lit(startTs) && col("__END_AT") <= lit(endTs))


In [None]:
// Step 3: Finalize checkpoint for SUCCESS

WatermarkTracker.completePipelineRun()

#### 6. Test WatermarkTracker - Batch

In [None]:

 // Step 1: Initialize extraction window  for a list of pipelines

// Example list of pipelines to initialize
val pipelines = Seq("pipelineA", "pipelineB", "pipelineC")

// Call the function to initialize workflows for all pipelines
val initializedRecords = WatermarkTracker.initializePipelineWorkflow(pipelines)

// Print the results
initializedRecords.foreach { record =>
  println(s"Pipeline: ${record.pipeline_name}, " +
          s"Batch ID: ${record.batch_id}, " ++
          s"Watermark ID: ${record.watermark_id}, " +
          s"Current Run TS: ${record.current_run_ts}, " +
          s"Status: ${record.status}")
}

In [None]:
// Step 2: Get extraction windows for all pipelines
// Print the results
pipelines.foreach { record =>
  val (startTs, endTs) = WatermarkTracker.getExtractionWindow(record)

   println(s"Pieplie: ${record}, " +
          s"startTs: ${startTs}, " +
          s"endTs: ${endTs}, " )
}


In [None]:

// Step 3: Finalize checkpoints for all pipelines as SUCCESS
// Print the results
pipelines.foreach { record =>
   WatermarkTracker.completePipelineRun("SUCCESS", record)
}




#### 7. Create watermarks Table

In [None]:
spark.sql(""" 
  CREATE TABLE IF NOT EXISTS ag_ra_search_analytics_data_dev.dap_ops_v1_0.watermarks (
      watermark_id             STRING,                     -- Unique ID for each watermark record
      pipeline_name            STRING,                  -- Name of the pipeline
      batch_id                 STRING,                  -- Job or run identifier
      last_processed_ts        TIMESTAMP,               -- Last processed event timestamp
      current_run_ts           TIMESTAMP,               -- Current processing run timestamp
      last_processed_version   BIGINT,                  -- Version of last processed data
      status                   STRING,                  -- Status of the run (e.g., SUCCESS, FAILED, RUNNING)
      created_at               TIMESTAMP, -- Record creation time
      updated_at               TIMESTAMP  -- Last update time
  )
  USING DELTA
  PARTITIONED BY (pipeline_name); 
""")

// SHOW TABLES IN ag_ra_search_analytics_data_dev.dap_reference_v1_0;
// DROP TABLE IF EXISTS  ag_ra_search_analytics_data_dev.dap_entity_wos_v1_0.journal;