In [0]:
%run ./utlis

In [0]:
import logging
from pyspark.sql.functions import expr

In [0]:
def read_last_timestamp(watermark_table):
    try:
        # Read the last read timestamp from the watermark table
        watermark_df = spark.read.table(watermark_table)
        # Get the last read timestamp
        last_read_timestamp = watermark_df.select("last_read_timestamp").collect()[0][0]
        return last_read_timestamp
    except AnalysisException as e:
        logger.error(f"Error reading watermark table: {e}")
        raise
    except IndexError:
        logger.warning("Watermark table is empty. Setting last_read_timestamp to '1970-01-01 00:00:00'")
        return '1970-01-01 00:00:00'
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        raise

In [0]:
def read_new_data(jdbcUrl, jdbcUsername, jdbcPassword, source_table, last_read_timestamp):
    try:
        # Read new data from the source table using the last read timestamp
        new_data_df = (spark.read
                       .format("jdbc")
                       .option("url", jdbcUrl)
                       .option("user", jdbcUsername)
                       .option("password", jdbcPassword)
                       .option("query", f"SELECT * FROM {source_table} WHERE last_updated > '{last_read_timestamp}'")
                       .load())
        return new_data_df
    except Exception as e:
        logger.error(f"Error reading new data from source table: {e}")
        raise

In [0]:
def update_watermark_table(watermark_table, latest_timestamp):
    try:
        # Update the watermark table with the latest timestamp
        if latest_timestamp:
            spark.sql(f"UPDATE {watermark_table} SET last_read_timestamp = '{latest_timestamp}'")
    except Exception as e:
        logger.error(f"Error updating watermark table: {e}")
        raise

In [0]:
def read_incremental_data(source_table):
    try:
        # Read incremental data from the source table
        incremental_data_df = (spark.readStream
                               .format("delta")
                               .table(source_table))
        logger.info(f"Started reading from {source_table}")
        return incremental_data_df
    except Exception as e:
        logger.error(f"Error in reading from {source_table}: {e}")
        raise

In [0]:
def read_delta_data(source_table):
    try:
        # Read incremental data from the source table
        incremental_data_df = (spark.read
                               .format("delta")
                               .table(source_table))
        logger.info(f"Started reading from {source_table}")
        return incremental_data_df
    except Exception as e:
        logger.error(f"Error in reading from {source_table}: {e}")
        raise

In [0]:
def write_delta_data(data_df, target_table):
    try:
        # Write the data to the target table
        (data_df.write
         .format("delta")
         .mode("append")
         .saveAsTable(target_table))
        logger.info(f"Started writing to {target_table}")
    except Exception as e:
        logger.error(f"Error in writing to {target_table}: {e}")
        raise