### Fazendo a leitura dos dados na origem

In [0]:
df_raw = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("pathGlobFilter", "*.csv")
    .option("header", "true")
    .option("sep", ";")
    .option("multiline", "true")
    .option("cloudFiles.schemaHints", "dataNascimento DATE")
    .option("cloudFiles.schemaLocation", "/Volumes/dados_clientes/silver/checkpoints/memory_silver")
    .load("/Volumes/dados_clientes/raw/arqs_recebidos/dados_clientes_*/")
)

In [0]:
from pyspark.sql.functions import col, regexp_replace, initcap, cast, right, expr, date_trunc, from_utc_timestamp
df_silver = (
    df_raw
    .withColumn("cpf", regexp_replace(col("cpf"), "[. -]", ""))
    .withColumn("cargo", initcap(col("cargo")))
    .withColumn("dataReferenciaDado", date_trunc("second", from_utc_timestamp(col("_metadata.file_modification_time"),"America/Sao_Paulo")))
    .withColumn("origemArquivoDado", col("_metadata.file_name").cast("string"))
    .withColumn("estado", expr("right(endereco, 2)"))
    .select("nome", "cpf", "dataNascimento", "cargo", "empresa", "email", "estado", "endereco", "telefone", "website", "origemArquivoDado", "dataReferenciaDado")
)

In [0]:
df_silver.writeStream \
    .option("checkpointLocation", "/Volumes/dados_clientes/silver/checkpoints/memory_silver") \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .toTable("dados_clientes.silver.dados_clientes_silver")

In [0]:
%sql
select dataReferenciaDado, count(1)
from dados_clientes.silver.dados_clientes_silver
group by dataReferenciaDado
order by dataReferenciaDado asc;