In [43]:
from pyspark.sql import functions as F, types as T
from pyspark.sql.window import Window
from delta.tables import *

In [32]:
df = spark.read.json("s3a://1-raw/topics/mongodb.PERFECT_STORE.SCORES/year=2024/month=06/day=30/*.json")

In [33]:
schema_document_id = T.StructType(
    [
        T.StructField("_id", T.StructType([
            T.StructField("$oid", T.StringType(), False),
        ]))
    ]
)

schema_full_document = T.StructType(
    [
        T.StructField("point_of_sale", T.StringType()),
        T.StructField("name_promoter", T.StringType()),
        T.StructField("date", T.DateType()),
        T.StructField("pillars", T.ArrayType(T.StructType([
            T.StructField("name", T.StringType()),
            T.StructField("max_score", T.IntegerType()),
            T.StructField("score_achieved", T.FloatType())
        ]))),
        T.StructField("update_date", T.StructType([
            T.StructField("$date", T.LongType(), False)
        ]), False)
    ]
)

schema_cluster_time = T.StructType(
    [
        T.StructField("$timestamp", T.StructType([
            T.StructField("t", T.LongType()),
            T.StructField("i", T.IntegerType())
        ]))
    ]
)

In [34]:
df = df.withColumn("clusterTime", F.from_json("clusterTime", schema_cluster_time))\
       .select(F.col("*"), F.col("clusterTime.$timestamp.t"))

In [35]:
df = df.withColumn("documentKey", F.from_json("documentKey", schema_document_id))\
       .select(F.col("*"), F.col("documentKey._id.$oid"))

In [36]:
df = df.withColumn("fullDocument", F.from_json("fullDocument", schema_full_document))\
    .select(F.col('operationType'), F.col("$oid"), F.col("t"), F.col('fullDocument.*'))

In [37]:
renamed_columns = {
    "$oid": "id",
    "t": "operation_time"
}

In [38]:
df = df.select([F.col(c).alias(renamed_columns.get(c, c)) for c in df.columns])

In [39]:
df = df.withColumn("update_date", F.col("update_date.$date"))

## Write data with format delta table

In [40]:
delta_table_path = "s3a://2-bronze/perfect_store"

window_spec = Window.partitionBy("id").orderBy(F.desc("operation_time"))
df_with_row_number = df.withColumn("row_number", F.row_number().over(window_spec))

# Filter to keep only the latest version for each 'id'
df = df_with_row_number.filter(F.col("row_number") == 1).drop("row_number")

if DeltaTable.isDeltaTable(spark, delta_table_path):
    delta_table = DeltaTable.forPath(spark, delta_table_path)
    delta_table.alias("target").merge(
        df.alias("source"),
        "target.id = source.id"
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
else:
    df.write.format("delta").save(delta_table_path)

24/06/30 16:10:09 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
24/06/30 16:10:09 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
24/06/30 16:10:10 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.


                                                                                

In [46]:
df = spark.read.format("delta") \
  .option("versionAsOf", 1) \
  .load("s3a://2-bronze/perfect_store")
df.where("id = '667a0057cb4244b935fa70b5'").show()

+-------------+--------------------+--------------+-------------+-------------+----+-------+-----------+
|operationType|                  id|operation_time|point_of_sale|name_promoter|date|pillars|update_date|
+-------------+--------------------+--------------+-------------+-------------+----+-------+-----------+
|       delete|667a0057cb4244b93...|    1719682492|         null|         null|null|   null|       null|
+-------------+--------------------+--------------+-------------+-------------+----+-------+-----------+



In [41]:
# get the full history of the table
delta_table_history = (DeltaTable
                        .forPath(spark, delta_table_path)
                        .history()
                      )

(delta_table_history
   .select("version", "timestamp", "operation", "operationParameters", "operationMetrics", "engineInfo")
   .show()
)

+-------+-------------------+---------+--------------------+--------------------+--------------------+
|version|          timestamp|operation| operationParameters|    operationMetrics|          engineInfo|
+-------+-------------------+---------+--------------------+--------------------+--------------------+
|      2|2024-06-30 16:10:10|    MERGE|{predicate -> (ta...|{numTargetRowsCop...|Apache-Spark/3.3....|
|      1|2024-06-30 16:04:52|    MERGE|{predicate -> (ta...|{numTargetRowsCop...|Apache-Spark/3.3....|
|      0|2024-06-30 15:15:17|    WRITE|{mode -> ErrorIfE...|{numFiles -> 8, n...|Apache-Spark/3.3....|
+-------+-------------------+---------+--------------------+--------------------+--------------------+

