In [1]:
from pyspark.sql.functions import to_json, struct, col, expr, row_number, from_json, get_json_object, explode, when
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StringType, IntegerType, MapType, StructField
from pyspark.sql import SparkSession

# Creamos la sesión de Spark con configuración para Kubernetes
spark = (
    SparkSession.builder
    .appName("JupyterSparkApp")
    .master("k8s://https://192.168.1.150:6443")
    .config("spark.submit.deployMode", "client")
    .config("spark.driver.host", "spark-driver-headless.default.svc.cluster.local")
    .config("spark.driver.port", "7077")
    .config("spark.driver.bindAddress", "0.0.0.0")
    .config("spark.executor.instances", "2")
    .config("spark.kubernetes.container.image", "docker.io/bitnami/spark:3.5.6")
    .config("spark.kubernetes.executor.deleteOnTermination", "true")
    .config("spark.kubernetes.executor.nodeSelector", "node-role.kubernetes.io/control-plane=")
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.2,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.3,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions")
    .config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.nessie.uri", "http://nessie.nessie-ns.svc.cluster.local:19120/api/v1")
    .config("spark.sql.catalog.nessie.ref", "main")
    .config("spark.sql.catalog.nessie.authentication.type", "NONE")
    .config("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")
    .config("spark.sql.catalog.nessie.warehouse", "s3a://synthetic")
    .config("spark.hadoop.fs.s3a.access.key", "qVgFWBabQmQrSuWTJGhj")
    .config("spark.hadoop.fs.s3a.secret.key", "l2GjPEVu22SfiqtaAU2zj3lBptEIoG1iRXGucn3o")
    .config("spark.hadoop.fs.s3a.endpoint", "http://myminio-hl.minio-tenant.svc.cluster.local:9000")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .getOrCreate()
)

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3f23eb95-b14b-4188-8942-230bba0cbbf1;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.9.2 in central
	found org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12;0.103.3 in central
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spar

In [None]:
spark.sql("DROP TABLE IF EXISTS nessie.products_silver")

In [2]:
df_bronze = spark.read.format("iceberg").table("nessie.products_bronze")

payload_schema = StructType().add("payload", MapType(StringType(), StringType()))
oid_schema = StructType([StructField("$oid", StringType())])


df_parsed = df_bronze.selectExpr("CAST(value AS STRING) as json_str", "CAST(key AS STRING) as json_key") \
    .withColumn("value", from_json(col("json_str"), payload_schema)) \
    .withColumn("key", from_json(col("json_key"), payload_schema)) \
    .withColumn("key_id", from_json(col("key.payload")["id"], oid_schema)) \
    .select("value", "key_id")

df_parsed_cols = df_parsed.select(
    col("key_id.$oid").alias("oid"),
    col("value.payload")["before"].alias("before"),
    col("value.payload")["after"].alias("after"),
    col("value.payload")["source"].alias("source"),
    col("value.payload")["op"].alias("op"),
    col("value.payload")["ts_ms"].alias("ts_ms"),
    col("value.payload")["transaction"].alias("transaction"))

product_type_schema = StructType([
    StructField("type_id", StringType()),
    StructField("type_name", StringType()),
    StructField("type_description", StringType())
])

product_schema = StructType([
    StructField("_id", StructType([StructField("$oid", StringType())])),
    StructField("product_id", StringType()),
    StructField("product_name", StringType()),
    StructField("product_price", StringType()),
    StructField("product_description", StringType()),
    StructField("product_type", product_type_schema)
])
    
df_if = df_parsed_cols.withColumn("data_map", from_json(col("after"), product_schema))

df_BIGINT = df_if.withColumn("ts_ms", col("ts_ms").cast("bigint"))

df_final = df_BIGINT.select(
        col("oid"),
        col("data_map")["product_id"].alias("product_id"),
        col("data_map")["product_name"].alias("product_name"),
        col("data_map")["product_price"].alias("product_price"),
        col("data_map")["product_description"].alias("product_description"),
        col("data_map")["product_type"]["type_id"].alias("type_id"),
        col("data_map")["product_type"]["type_name"].alias("type_name"),
        col("data_map")["product_type"]["type_description"].alias("type_description"),
        col("op").alias("op"),
        col("ts_ms")
    )

window = Window.partitionBy("oid").orderBy(col("ts_ms").desc())

df_final_batch = df_final.withColumn("rn", row_number().over(window)) \
                         .select("oid", "product_id", "op", "ts_ms", "rn")

df_final_batch.show(30, truncate = False)

25/10/19 09:27:21 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

+------------------------+----------+----+-------------+---+
|oid                     |product_id|op  |ts_ms        |rn |
+------------------------+----------+----+-------------+---+
|68f3ea5916383378e218562d|NULL      |d   |1760865786197|1  |
|68f3ea5916383378e218562d|PROD07    |r   |1760862255932|2  |
|68f3ea5916383378e218562d|NULL      |NULL|NULL         |3  |
|68f3ea5916383378e218562e|PROD08    |r   |1760862255940|1  |
|68f3ea5916383378e218562f|PROD09    |r   |1760862255948|1  |
|68f3ea5916383378e2185630|NULL      |d   |1760863766228|1  |
|68f3ea5916383378e2185630|PROD10    |r   |1760862255941|2  |
|68f3ea5916383378e2185630|NULL      |NULL|NULL         |3  |
|68f3ea5916383378e2185631|PROD11    |r   |1760862255947|1  |
|68f3ea5916383378e2185632|PROD12    |r   |1760862255941|1  |
|68f3ea5916383378e2185633|NULL      |d   |1760865798371|1  |
|68f3ea5916383378e2185633|PROD13    |r   |1760862255951|2  |
|68f3ea5916383378e2185633|NULL      |NULL|NULL         |3  |
|68f3ea5916383378e218563

In [None]:
# Silver layer
spark.sql("""
CREATE TABLE IF NOT EXISTS nessie.products_silver (
  oid STRING,
  product_id STRING,
  product_name STRING,
  product_price STRING,
  product_description STRING,
  type_id STRING,
  type_name STRING,
  type_description STRING,
  op STRING,
  ts_ms BIGINT
)
USING iceberg
PARTITIONED BY (bucket(16, product_id)) -- 16 buckets
LOCATION 's3a://synthetic/products-silver'
""")

df_bronze = spark.readStream.format("iceberg").table("nessie.products_bronze")

payload_schema = StructType().add("payload", MapType(StringType(), StringType()))
oid_schema = StructType([StructField("$oid", StringType())])


df_parsed = df_bronze.selectExpr("CAST(value AS STRING) as json_str", "CAST(key AS STRING) as json_key") \
    .withColumn("value", from_json(col("json_str"), payload_schema)) \
    .withColumn("key", from_json(col("json_key"), payload_schema)) \
    .withColumn("key_id", from_json(col("key.payload")["id"], oid_schema)) \
    .select("value", "key_id")

df_parsed_cols = df_parsed.select(
    col("key_id.$oid").alias("oid"),
    col("value.payload")["before"].alias("before"),
    col("value.payload")["after"].alias("after"),
    col("value.payload")["source"].alias("source"),
    col("value.payload")["op"].alias("op"),
    col("value.payload")["ts_ms"].alias("ts_ms"),
    col("value.payload")["transaction"].alias("transaction"))

product_type_schema = StructType([
    StructField("type_id", StringType()),
    StructField("type_name", StringType()),
    StructField("type_description", StringType())
])

product_schema = StructType([
    StructField("_id", StructType([StructField("$oid", StringType())])),
    StructField("product_id", StringType()),
    StructField("product_name", StringType()),
    StructField("product_price", StringType()),
    StructField("product_description", StringType()),
    StructField("product_type", product_type_schema)
])
    
df_if = df_parsed_cols.withColumn("data_map", from_json(col("after"), product_schema))

df_BIGINT = df_if.withColumn("ts_ms", col("ts_ms").cast("bigint"))

df_final = df_BIGINT.select(
        col("oid"),
        col("data_map")["product_id"].alias("product_id"),
        col("data_map")["product_name"].alias("product_name"),
        col("data_map")["product_price"].alias("product_price"),
        col("data_map")["product_description"].alias("product_description"),
        col("data_map")["product_type"]["type_id"].alias("type_id"),
        col("data_map")["product_type"]["type_name"].alias("type_name"),
        col("data_map")["product_type"]["type_description"].alias("type_description"),
        col("op").alias("op"),
        col("ts_ms")
    )

# YOU NEED A FUNCTION THAT LOOKS AT ts_ms AND TAKES THE MOST RECENT RSTATE OF EACH PRODUCT DEPENDING ON WHETHER IT IS AN UPDATE A DELETE OR AN INSERT. 
# WE WILL USE BTACH SYNTAX AND FUNCTIONALITIES FOR EACH STREAMING BATCH TO HEKP US LIKE WE CAN SEE ON THE FOLLOWING CODE
def merge_into_products_silver(microBatchOutputDF, batchId):

    window = Window.partitionBy("oid").orderBy(col("ts_ms").desc())

    df_final_batch = microBatchOutputDF.withColumn("rn", row_number().over(window)) \
                      .filter(col("rn") == 1) \
                      .drop("rn")
    
    df_final_batch.createOrReplaceTempView("batch_updates")

    microBatchOutputDF.sparkSession.sql("""
        MERGE INTO nessie.products_silver AS target
        USING batch_updates AS source
        ON target.oid = source.oid
        WHEN MATCHED AND source.op = 'd' AND source.ts_ms > target.ts_ms THEN DELETE
        WHEN MATCHED AND source.op IN ('u', 'r', 'i') AND source.ts_ms > target.ts_ms THEN UPDATE SET *
        WHEN NOT MATCHED AND source.op IN ('c', 'r', 'i') THEN INSERT *
    """)
    
#    microBatchOutputDF.sparkSession.sql("""
#        DELETE FROM nessie.products_silver AS target
#        WHERE target.product_id IN (SELECT product_id FROM batch_updates) 
#        AND target.oid NOT IN (SELECT oid FROM batch_updates)
#    """)

query = df_final.writeStream \
    .foreachBatch(merge_into_products_silver) \
    .option("checkpointLocation", "s3a://synthetic/checkpoints/products_silver") \
    .outputMode("update") \
    .start()
query.awaitTermination()
# USE /workspace/kafka-spark so you can delete checkpoints folder and rerun the process from the first topic offset if you run from earliest everytime
# ON THE CLEAN VERSION YOU WILL USE s3 BUCKET TO SAVE THE CHECKPOINTS
# REMEMBER THAT STOPPING THIS CELL WON'T MAKE THE SPARK STREAMS JOB STOP, IT WILL CONTINUE UNLESS YOU USE SPARK COMMANDS TO STOP IT. HERE IS NOT NECESSARY TO DO IT SO WE WILL NOT USE IT.
# YOU HAVE TO DISCOVER WHY THE CONTROLPLANE CAN'T HOST SPARK EXECUTORS

In [12]:
spark.streams.active

[<pyspark.sql.streaming.query.StreamingQuery at 0x7a63c44a5e50>]

In [None]:
query.stop()

In [11]:
query = spark.sql("""
       SELECT * FROM nessie.products_silver
       SORT BY oid
    """)
query.show(30)

+--------------------+----------+--------------------+-------------+--------------------+-------+--------------------+--------------------+---+-------------+
|                 oid|product_id|        product_name|product_price| product_description|type_id|           type_name|    type_description| op|        ts_ms|
+--------------------+----------+--------------------+-------------+--------------------+-------+--------------------+--------------------+---+-------------+
|68f3ea5916383378e...|    PROD11| Adjustable LED Lamp|        $9.37|Computer his toug...|   Furn|   Furniture / Decor|Alone must someti...|  r|1760862255947|
|68f3ea5916383378e...|    PROD16|     Sports Sneakers|      $483.37|   Rise when manage.|   Spor|   Sports / Footwear|Others owner majo...|  r|1760862255945|
|68f3ea5916383378e...|    PROD22|   Adjustable Tripod|      $369.39|Relationship rest...|   Phot|         Photography|Us long anyone av...|  r|1760862255944|
|68f3ea5916383378e...|    PROD23|  Portable LED Flas