In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, max, lit, trim
from datetime import datetime
from delta.tables import DeltaTable

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Elasticsearch to Delta") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .enableHiveSupport() \
    .getOrCreate()

# Configuration
es_cloud_url = dbutils.secrets.get(scope="snocko", key="es_cloud_url")
es_user = dbutils.secrets.get(scope="snocko", key="es_user")
es_pass = dbutils.secrets.get(scope="snocko", key="es_pass")

delta_base_path = "hive_metastore.default."
tracking_table_name = "hive_metastore.default.tracking_table"

# Define schema
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, LongType, TimestampType, ArrayType, FloatType, MapType

trade_txn_schema = StructType([
    StructField("create_timestamp", TimestampType()),
    StructField("update_timestamp", TimestampType()),
    StructField("order_id", StringType()),
    StructField("price", FloatType()),
    StructField("quantity", LongType()),
    StructField("status", StringType()),
    StructField("stock_symbol", StringType()),
    StructField("trader_id", StringType()),
    StructField("type", StringType()),
    StructField("sub_transactions", ArrayType(StructType([
        StructField("sub_transaction_id", StringType()),
        StructField("price_executed", FloatType()),
        StructField("quantity_fulfilled", LongType()),
        StructField("matched_order_id", StringType()),
        StructField("timestamp", TimestampType())
    ]))),
    StructField("_metadata", MapType(StringType(), StringType()))
])

# Retrieve last execution timestamp
def get_last_execution_time():
    try:
        tracking_df = spark.sql(f"SELECT max(last_execution_time) FROM {tracking_table_name}")
        last_execution_time = tracking_df.collect()[0][0]
        return last_execution_time if last_execution_time else "1970-01-01T00:00:00"
    except:
        return "1970-01-01T00:00:00"

# Save execution timestamp
def save_execution_time(timestamp):
    timestamp_df = spark.createDataFrame([(timestamp,)], ["last_execution_time"])
    timestamp_df.write.format("delta").mode("overwrite").saveAsTable(tracking_table_name)

# Get timestamps
last_execution_time = get_last_execution_time()
current_execution_time = datetime.utcnow().isoformat()

# Read data from Elasticsearch
es_index_pattern = "trade_txn_*"
es_df = spark.read.format("org.elasticsearch.spark.sql") \
    .option("es.nodes", es_cloud_url) \
    .option("es.port", "443") \
    .option("es.net.ssl", "true") \
    .option("es.nodes.wan.only", "true") \
    .option("es.nodes.discovery", "false") \
    .option("es.net.http.auth.user", es_user) \
    .option("es.net.http.auth.pass", es_pass) \
    .option("es.read.metadata", "true") \
    .option("es.resource", es_index_pattern) \
    .option("es.query", f'{{"range": {{"update_timestamp": {{"gt": "{last_execution_time}"}}}}}}') \
    .schema(trade_txn_schema) \
    .load()

# Extract _index from _metadata
es_df = es_df.withColumn("delta_table", col("_metadata").getItem("_index"))

# Process data if new records exist
if es_df.count() > 0:
    for index_name in es_df.select("delta_table").distinct().collect():
        table_name = index_name["delta_table"]
        table_hive_name = f"hive_metastore.default.{table_name}"
        
        # Filter records for this specific table
        table_df = es_df.filter(trim(col("delta_table")) == lit(table_name.strip())).drop("_metadata")
        
        # Check if Delta table exists, if not create it
        try:
            spark.sql(f"DESCRIBE TABLE {table_hive_name}")
        except:
            table_df.write.format("delta").mode("overwrite").saveAsTable(table_hive_name)
        
        # Perform upsert using merge
        delta_table = DeltaTable.forName(spark, table_hive_name)
        (delta_table.alias("tgt")
            .merge(table_df.alias("src"), "tgt.order_id = src.order_id")
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute())
    
    # Save new execution timestamp
    current_execution_time = es_df.agg(max("update_timestamp")).collect()[0][0].isoformat() # to be removed when executing continuously
    save_execution_time(current_execution_time)

print("Job completed successfully.")


Job completed successfully.
