In [0]:

from pyspark.sql.functions import lit
import json
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType
from delta import DeltaTable
import sys

# sys.path.append("../lib/")     
sys.path.append(f'/Workspace/Users/{dbutils.widgets.get("account")}/music_data_lake/src/lib')                           

import utils


In [0]:
catalog = "bronze"
schema = "music_data"
tablename = dbutils.widgets.get("tablename")
primary_key = dbutils.widgets.get("primary_key")
timestamp_field = "ts_ms"

if not utils.table_exists(spark, catalog=catalog, schema=schema, table=tablename):
    print("Tabela não existente, criando...")

    df_full = spark.read.format("parquet").load(f'/Volumes/raw/{schema}/full_load/{tablename}/')

    (df_full.coalesce(1)
            .write
            .format("delta")
            .mode("overwrite")
            .saveAsTable(f'{catalog}.{schema}.{tablename}')
    )

else:
    print(f"Tabela {tablename} já existente, ignorando full-load.")

In [0]:
# Caminhos dos arquivos Parquet
table_schema = utils.import_schema(tablename)
bronze = DeltaTable.forName(spark, f"{catalog}.{schema}.{tablename}")


def upsert(df, deltatable):
    from pyspark.sql.window import Window
    from pyspark.sql.functions import row_number, desc, col

    # df_stream.createOrReplaceGlobalTempView(f"view_{tablename}")  

    # query = f"""
    #     SELECT *
    #     FROM global_temp.view_{tablename}
    #     WHERE {primary_key} IS NOT NULL
    #     QUALIFY ROW_NUMBER() OVER (PARTITION BY {primary_key} ORDER BY {timestamp_field} DESC) = 1

    # """

    # df_cdc = spark.sql(query)
    
    ########################## Realizar o MERGE combinando upsert e delete ##################################
    # Filtra os registros onde a chave primária não é nula
    df_filtered = df.filter(f"{primary_key} IS NOT NULL")

    # Ordena os registros por chave primária e timestamp, e mantém apenas o mais recente
    windowSpec = Window.partitionBy(primary_key).orderBy(col(timestamp_field).desc())
    df_cdc = df_filtered.withColumn("row_number", row_number().over(windowSpec)) \
                        .filter(col("row_number") == 1) \
                        .drop("row_number")


    (deltatable.alias("target")
               .merge(df_cdc.alias("source"), f"target.{primary_key} = source.{primary_key}")
               .whenMatchedDelete(condition = "source.operation = 'd'")
               .whenMatchedUpdateAll(condition = "source.operation = 'u'")
               .whenNotMatchedInsertAll(condition = "source.operation = 'c' OR source.operation = 'u'")
               .execute()
    )



In [0]:

df_stream = (spark.readStream
              .format("parquet")
              .option("cloudFiles.format", "parquet")
              # .option("cloudFiles.maxFilesPerTrigger", "500")
              .schema(table_schema)
              .load(f"/Volumes/raw/{schema}/cdc/postgres.public.{tablename}/"))

# Converte release_date de dias desde a época Unix para date
df_stream = df_stream.withColumn("release_date", expr("DATE_FROM_UNIX_DATE(release_date)"))

# Converte created_at e updated_at de microssegundos para timestamp
df_stream = df_stream.withColumn("created_at", expr("from_unixtime(created_at / 1e6)")) \
                     .withColumn("updated_at", expr("from_unixtime(updated_at / 1e6)"))

# Converte ts_ms de milissegundos para timestamp
df_stream = df_stream.withColumn("ts_ms", expr("from_unixtime(ts_ms / 1000)"))

stream = (df_stream.writeStream
                   .option("checkpointLocation", f"/Volumes/raw/{schema}/cdc/postgres.public.{tablename}/{tablename}_checkpoint/")
                   .foreachBatch(lambda df, batchID: upsert(df, bronze))
                   .trigger(availableNow=True)
         )

In [0]:
start = stream.start()