In [0]:
!pip install kaggle
!pip install kagglehub[pandas-datasets]

In [0]:
import kagglehub, shutil, os, delta, time
from kagglehub import KaggleDatasetAdapter
from pyspark import SparkContext
from datetime import datetime, timedelta
from pyspark.sql.functions import col

def table_exists(database, table):
    count = (spark.sql(f"SHOW TABLES FROM {database}")
                  .filter(f"database='{database}' AND tableName='{table}'")
                  .count())    
    return count == 1

In [0]:
database = "bronze"
tablename = "customers"
id_field = "idCliente"
timestamp_field = "DtAtualizacao"

In [0]:
# O arquivo CSV não possui schea, então foi passado alguns parâmetros para ele definir. 
# Já com arquivos Parquet,  naturalmente já vem o schema inferido pois possui metadados.
df_full = spark.read.format("csv").options(sep=";", header=True).load(f"/Volumes/workspace/upsell/full_load/{tablename}/")
schema = df_full.schema

In [0]:
if not table_exists(database, tablename):
    print("Tabela não existente, criando tabela...")
    df_full = spark.read.format("csv").options(sep=";", header=True).load(f"/Volumes/workspace/upsell/full_load/{tablename}/")
    (df_full.coalesce(1).write.format("delta").mode("overwrite").saveAsTable(f"{schema}.{tablename}"))
else:
    print("Tabela já existente, ignorando a carga completa.")

### Atualização dos dados via Streaming + carga periódica do CDC

In [0]:
#def ingest_from_kaggle():
kaggle_path = kagglehub.dataset_download("teocalvo/teomewhy-loyalty-system")

# Definir destino dos arquivos
dest_path =  "/Volumes/workspace/upsell/cdc/kaggle"
os.makedirs(dest_path, exist_ok=True)

# Copiando arquivos para o diretório de destino
for file in os.listdir(kaggle_path):
    if file.startswith("clientes") and file.endswith(".csv"):
        base, ext = os.path.splitext(file)
        new_name = f"{base}_{int(time.time())}{ext}"
        full_path = os.path.join(kaggle_path, file)

        # Carregando snapshot completo do arquivo
        df_kaggle = (spark.read.format("csv")
            .option("sep", ";")
            .option("header", True)
            .load(full_path)
        )

        # Delimitação de período
        periodo_carga = datetime.now() - timedelta(days=5)
        df_filtrado = df_kaggle.filter(col("DtAtualizacao") >= periodo_carga.strftime("%Y-%m-%d %H:%M:%S"))
        print("Registros Filtrados: ", df_filtrado.count())

        (df_filtrado.write
            .option("sep", ";")
            .option("header", True)
            .mode("overwrite")
            .csv(os.path.join(dest_path, new_name))
        )

        #shutil.copy(
        #    os.path.join(kaggle_path, file), 
        #    os.path.join(dest_path, new_name)
        #)

In [0]:
bronze = delta.DeltaTable.forName(spark, f"{schema}.{tablename}")

def upsert(df, deltatable):
  df.createOrReplaceGlobalTempView(f"view_{tablename}")

  query = f'''
      SELECT * 
      FROM global_temp.view_{tablename}
      QUALIFY ROW_NUMBER() OVER (PARTITION BY {id_field} ORDER BY {timestamp_field} DESC) = 1
  '''

  df_cdc = spark.sql(query)

  (deltatable.alias("b")
             .merge(df_cdc.alias("d"), f"b.{id_field} = d.{id_field}")
             .whenMatchedUpdateAll()
             .whenNotMatchedInsertAll()
             .execute()
  )

  # Dataframe que realiza a leitura dos dados no formato stream.
  df_stream = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("sep", ";")
    .option("header", True)
    .schema(schema)
    .load(f"/Volumes/workspace/upsell/cdc/kaggle/{tablename}/"))

  # Etapa que realiza a persistência dos dados.
  # Para cada batch recebido, ele aplicará uma função chamada upsert que recebe o pedaço de dados e a base onde será salvo.
  stream = (df_stream.writeStream
            .option("checkpointLocation", f"/Volumes/workspace/upsell/cdc/{tablename}_checkpoint/")
            .foreachBatch(lambda df, batchID: upsert(df, bronze))
            #.trigger(processingTime="1 minute")
            .trigger(availableNow=True)
          )


In [0]:
stream.start()

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import col

# Lê o CSV do Kaggle diretamente (sem streaming)
df_kaggle = (spark.read.format("csv")
    .option("sep", ";")
    .option("header", True)
    .load("/Volumes/workspace/upsell/cdc/kaggle/clientes.csv"))

print("Registros Kaggle:", df_kaggle.count())

# Carrega a tabela Bronze
bronze = DeltaTable.forName(spark, "bronze.customers")

# Conta antes
before = bronze.toDF().count()
print("Antes do merge:", before)

# Executa o merge manual
(bronze.alias("b")
    .merge(df_kaggle.alias("d"), "b.idCliente = d.idCliente")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute())

# Conta depois
after = bronze.toDF().count()
print("Depois do merge:", after)
print("Inseridos:", after - before)


In [0]:
bronze = delta.DeltaTable.forName(spark, f"{schema}.{tablename}")

def upsert(df, deltatable):
  df.createOrReplaceGlobalTempView(f"view_{tablename}")

  query = f'''
      SELECT * 
      FROM global_temp.view_{tablename}
      QUALIFY ROW_NUMBER() OVER (PARTITION BY {id_field} ORDER BY {timestamp_field} DESC) = 1
  '''

  df_cdc = spark.sql(query)

  (deltatable.alias("b")
             .merge(df_cdc.alias("d"), f"b.{id_field} = d.{id_field}")
             .whenMatchedDelete(condition = "d.OP = 'D'")
             .whenMatchedUpdateAll(condition = "d.OP = 'U'")
             .whenNotMatchedInsertAll(condition = "d.OP = 'I' or d.OP = 'U'")
             .execute()
  )

# Dataframe que realiza a leitura dos dados no formato stream.
df_stream = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  #.option("cloudFiles.maxFilesPerTrigger", 500)
  .schema(schema)
  .load(f"/Volumes/workspace/upsell/cdc/{tablename}/"))

# Etapa que realiza a persistência dos dados.
# Para cada batch recebido, ele aplicará uma função chamada upsert que recebe o pedaço de dados e a base onde será salvo.
stream = (df_stream.writeStream
          .option("checkpointLocation", f"/Volumes/workspace/upsell/cdc/{tablename}_checkpoint/")
          .foreachBatch(lambda df, batchID: upsert(df, bronze))
          .trigger(availableNow=True)
        )


In [0]:
start = stream.start()

In [0]:
df_cdc = spark.read.format("csv").options(sep=";", header=True).load(f"/Volumes/workspace/upsell/cdc/kaggle")
ids_bronze = spark.table(f"{database}.{tablename}").select(id_field).distinct()
ids_kaggle = df_cdc.select(id_field).distinct()
ids_novos = ids_kaggle.subtract(ids_bronze)

print("Novos IDs:", ids_novos.count())


In [0]:
%sql

SELECT * FROM bronze.customers