### ETL: CRM Ifood - Bases de Disparos de Comunicações para clientes

Esse notebook realiza as operações de ETL das bases de disparos de CRM, assim como outras dimensões necessárias para análises complementares.

Os dados foram organizados no estilo medalhão (bronze, silver e gold):

- Bronze: Tabelas cruas, assim como foram disponibilizadas via .csv
- Silver: Tratamentos e padronizações realizadas. Aqui as tabelas já estão no formato Dimensional
- Gold: Tabelas já consolidadas e agrupadas, prontas para serem usadas em Dashboards e outras análises.

obs: Os arquivos .csv devem ser armazenados nas pastas correspondentes na camada bronze

In [0]:
%sql
-- Cria os schemas:
CREATE SCHEMA IF NOT EXISTS workspace.ifood_bronze
COMMENT 'Camada Bronze - dados brutos do case iFood';

CREATE SCHEMA IF NOT EXISTS workspace.ifood_silver
COMMENT 'Camada Silver - dados limpos e padronizados';

CREATE SCHEMA IF NOT EXISTS workspace.ifood_gold
COMMENT 'Camada Gold - dados analíticos e agregados';


Os arquivos .csv devem ser armazenados nos diretórios correspondentes criados nesse bloco:

In [0]:
# Cria o volume 'origens'
spark.sql(
    """
    CREATE VOLUME IF NOT EXISTS workspace.ifood_bronze.origens
    """
)

# Cria subfolders para cada arquivos recebido
base_path = "/Volumes/workspace/ifood_bronze/origens/"

folders = [
    "communications_base",
    "merchant_info",
    "cluster_merchant_history",
    "conversion_base"
]

for folder in folders:
    path = f"{base_path}{folder}"
    dbutils.fs.mkdirs(path)

In [0]:

# Importando bibliotecas

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import when, col, lit
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, BooleanType, DateType, TimestampType, LongType
)
from delta.tables import DeltaTable
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("ETL_CRM_iFood").getOrCreate()

# Funções:

# Helper para converter strings/bools para int (booleano) (tratando 'true','1','yes','t', etc.)
def str_to_int_expr(col):
    return (
        F.when(F.lower(F.trim(F.col(col))).isin('true','1','t','yes','y'), F.lit(1))
         .when(F.lower(F.trim(F.col(col))).isin('false','0','f','no','n'), F.lit(0))
         .otherwise(F.lit(None))
    )


# Função para tratar nulos: transforma 'null' em null
def normalize_nulls(df: DataFrame) -> DataFrame:
    """
    Substitui valores textuais como 'null', 'none', 'nan', 'na', 'undefined', etc. por null real.
    Só aplica em colunas StringType.
    """
    null_equivalents = ["null", "none", "nan", "na", "undefined", "n/a", ""]
    df_out = df
    for field in df.schema.fields:
        if isinstance(field.dataType, StringType):
            df_out = df_out.withColumn(
                field.name,
                F.when(F.lower(F.trim(F.col(field.name))).isin(null_equivalents), F.lit(None))
                 .otherwise(F.col(field.name))
            )
    return df_out


# Camada Bronze:

Dados Brutos

In [0]:

# Fazer upload dos arquivos nos diretórios correspondentes


# communications_base
# Carregar dados
comm = spark.read.option("header", True).csv("/Volumes/workspace/ifood_bronze/origens/communications_base/communications_base_*.csv")

# Salvar como Delta Table
comm.write.format("delta").mode("overwrite").saveAsTable("ifood_bronze.communications_base")


# cluster_merchant_history
# Carregar dados
comm = spark.read.option("header", True).csv("/Volumes/workspace/ifood_bronze/origens/cluster_merchant_history/cluster_merchant_history_*.csv")

# Salvar como Delta Table
comm.write.format("delta").mode("overwrite").saveAsTable("ifood_bronze.cluster_merchant_history")


# merchant_info
# Carregar dados
comm = spark.read.option("header", True).csv("/Volumes/workspace/ifood_bronze/origens/merchant_info/merchant_info_*.csv")

# Salvar como Delta Table
comm.write.format("delta").mode("overwrite").saveAsTable("ifood_bronze.merchant_info")


# conversion_base
# Carregar dados
comm = spark.read.option("header", True).csv("/Volumes/workspace/ifood_bronze/origens/conversion_base/conversion_base_*.csv")

# Salvar como Delta Table
comm.write.format("delta").mode("overwrite").saveAsTable("ifood_bronze.conversion_base")

# %sql
# select * from ifood_bronze.communications_base

# Camada Silver

Criação das Fatos e Dimensões (vide diagrama em anexo)

OBS: Comentar os blocos de criação após a primeira criação de cada base.

In [0]:
# Criação da TbDimMerchant

# leitura da tabela bronze
df_merchant_bronze = spark.read.table("workspace.ifood_bronze.merchant_info")

# normalizações + casts
df_dim_merchant = (
    df_merchant_bronze
    .select(
        F.col("merchant_id").cast(StringType()).alias("IdMerchant"),
        F.col("contact_key_id").cast(StringType()).alias("IdSubscriber"),
        F.col("performance_classification3").cast(StringType()).alias("DsPerformanceClass"),
        F.col("contract_mode").cast(StringType()).alias("DsModoContrato"),
        F.col("status_saturacao").cast(StringType()).alias("StSaturacao"),
        str_to_int_expr("top_restaurants").alias("FlTopRestaurantes"),
        F.col("dish_type").cast(StringType()).alias("DsTipoComida"),
        F.col("state").cast(StringType()).alias("DsEstado"),
        F.col("city").cast(StringType()).alias("DsCidade"),
        F.col("merchant_city_maturity").cast(StringType()).alias("DsMaturidadeMercadoCidade"),
        F.current_timestamp().alias("TsInclusao")
    )
    .dropDuplicates(["IdMerchant"])
)

# aplica a limpeza de nulos textuais
df_dim_merchant = normalize_nulls(df_dim_merchant)

# Faz o merge: cria a tabela se não existir, ou atualiza se existir

# ------------------------------------------------------------------------------------------------------------
# Depois que a tabela for criada é só comentar esse bloco
# Cria como Delta na camada Silver
df_dim_merchant.write.format("delta").mode("overwrite").saveAsTable("workspace.ifood_silver.TbDimMerchant")
# ------------------------------------------------------------------------------------------------------------

# Referência à tabela Silver
delta_table = DeltaTable.forName(spark, "workspace.ifood_silver.TbDimMerchant")

# Merge dos dados novos
(delta_table.alias("t")
 .merge(
     df_dim_merchant.alias("s"),
     "t.IdMerchant = s.IdMerchant"
 )
 .whenMatchedUpdate(set={
    "IdSubscriber": "s.IdSubscriber",
    "DsPerformanceClass": "s.DsPerformanceClass",
    "DsModoContrato": "s.DsModoContrato",
    "StSaturacao": "s.StSaturacao",
    "FlTopRestaurantes": "s.FlTopRestaurantes",
    "DsTipoComida": "s.DsTipoComida",
    "DsEstado": "s.DsEstado",
    "DsCidade": "s.DsCidade",
    "DsMaturidadeMercadoCidade": "s.DsMaturidadeMercadoCidade",
    "TsInclusao": "s.TsInclusao"
 })
 .whenNotMatchedInsert(values={
    "IdMerchant": "s.IdMerchant",
    "IdSubscriber": "s.IdSubscriber",
    "DsPerformanceClass": "s.DsPerformanceClass",
    "DsModoContrato": "s.DsModoContrato",
    "StSaturacao": "s.StSaturacao",
    "FlTopRestaurantes": "s.FlTopRestaurantes",
    "DsTipoComida": "s.DsTipoComida",
    "DsEstado": "s.DsEstado",
    "DsCidade": "s.DsCidade",
    "DsMaturidadeMercadoCidade": "s.DsMaturidadeMercadoCidade",
    "TsInclusao": "s.TsInclusao"
 })
 .execute()
)



# %sql
# select * from workspace.ifood_silver.TbDimMerchant


# spark.sql("DROP TABLE IF EXISTS workspace.ifood_silver.TbDimMerchant")

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
# Criação da TbDimMerchantClusterHist

df_cluster_bronze = spark.read.table("workspace.ifood_bronze.cluster_merchant_history")

df_cluster_hist = (
    df_cluster_bronze
    .select(
        F.col("merchant_id").cast(StringType()).alias("IdMerchant"),
        # tenta converter coluna de data para DateType; ajustar o formato se necessário
        F.to_date(F.col("reference_date")).alias("DtReferencia"),
        F.col("macro_cluster_crm").cast(StringType()).alias("DsCluster"),
        F.current_timestamp().alias("TsInclusao")
    )
    .dropDuplicates(["IdMerchant","DtReferencia"])
)

# aplica a limpeza de nulos textuais
df_cluster_hist = normalize_nulls(df_cluster_hist)



# Faz o merge: cria a tabela se não existir, ou atualiza se existir

# ------------------------------------------------------------------------------------------------------------
# Depois que a tabela for criada é só comentar esse bloco
# Cria como Delta na camada Silver
df_cluster_hist.write.format("delta").mode("overwrite").saveAsTable("workspace.ifood_silver.TbDimMerchantClusterHist")
# ------------------------------------------------------------------------------------------------------------

# Referência à tabela Silver
delta_table = DeltaTable.forName(spark, "workspace.ifood_silver.TbDimMerchantClusterHist")

# Merge dos dados novos
(delta_table.alias("t")
 .merge(
     df_cluster_hist.alias("s"),
     "t.IdMerchant = s.IdMerchant and t.DtReferencia = s.DtReferencia"
 )
 .whenMatchedUpdate(set={
    "DsCluster": "s.DsCluster",
    "TsInclusao": "s.TsInclusao"
 })
 .whenNotMatchedInsert(values={
    "IdMerchant": "s.IdMerchant",
    "DtReferencia": "s.DtReferencia",
    "DsCluster": "s.DsCluster",
    "TsInclusao": "s.TsInclusao"
 })
 .execute()
)





# %sql
# select * from workspace.ifood_silver.TbDimMerchantClusterHist

# spark.sql("DROP TABLE IF EXISTS workspace.ifood_silver.TbDimMerchantClusterHist")


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
# Criação da TbDimMerchantCluster

# Lê a tabela histórica
df_hist = spark.read.table("workspace.ifood_silver.tbdimmerchantclusterhist")

# Cria janela para pegar a data mais recente por IdMerchant
w = Window.partitionBy("IdMerchant").orderBy(F.col("DtReferencia").desc())

# Marca a linha mais recente
df_latest = (
    df_hist
    .withColumn("rank", F.row_number().over(w))
    .filter(F.col("rank") == 1)
    .select("IdMerchant", "DsCluster", "DtReferencia", "TsInclusao")
)

# Grava na tabela final
df_latest.write.format("delta").mode("overwrite").saveAsTable("workspace.ifood_silver.TbDimMerchantCluster")


# %sql
# select * from workspace.ifood_silver.TbDimMerchantCluster

# spark.sql("DROP TABLE IF EXISTS workspace.ifood_silver.TbDimMerchantCluster")


In [0]:
# Criação da TbDimCampanha

df_comm_bronze = spark.read.table("workspace.ifood_bronze.communications_base")

df_campanha_distinct = (
    df_comm_bronze
    .select(
        F.col("nome_campanha").cast(StringType()).alias("DsNomeCampanha"),
        F.col("categoria").cast(StringType()).alias("DsCategoria"),
        # Tenta converter a data — se não funcionar, fica nula
        F.try_to_timestamp(F.col("data_primeira_acao")).alias("DtInclusao"),

        F.col("flag_regua").cast(StringType()).alias("DsRegua")
    )
    .dropDuplicates(["DsNomeCampanha", "DsCategoria"])
)
F.try_to_timestamp(F.col("data_primeira_acao")).alias("DtInclusao")

# Cria janela de ordenação determinística
w = Window.partitionBy().orderBy(
    F.coalesce(F.col("DtInclusao"), F.lit("1900-01-01")).asc(),
    F.col("DsNomeCampanha").asc()
)

# Gera IdCampanha numérico e sequencial
df_campanha = (
    df_campanha_distinct
    .withColumn("IdCampanha", F.row_number().over(w).cast(IntegerType()))
    .withColumn("TsInclusao", F.current_timestamp())
    .select("IdCampanha", "DsNomeCampanha", "DsCategoria", "DsRegua","TsInclusao")
    .dropDuplicates(["IdCampanha"])
)

# aplica a limpeza de nulos textuais
df_campanha = normalize_nulls(df_campanha)

# Grava na camada Silver

# Faz o merge: cria a tabela se não existir, ou atualiza se existir

# ------------------------------------------------------------------------------------------------------------
# Depois que a tabela for criada é só comentar esse bloco
# Cria como Delta na camada Silver
df_campanha.write.format("delta").mode("overwrite").saveAsTable("workspace.ifood_silver.TbDimCampanha")
# ------------------------------------------------------------------------------------------------------------


from delta.tables import DeltaTable

delta_table = DeltaTable.forName(spark, "workspace.ifood_silver.TbDimCampanha")

(delta_table.alias("t")
 .merge(
     df_campanha.alias("s"),
     "t.IdCampanha = s.IdCampanha"
 )
 .whenMatchedUpdate(set={
     "DsNomeCampanha": "s.DsNomeCampanha",
     "DsCategoria": "s.DsCategoria",
     "DsRegua": "s.DsRegua",
     "TsInclusao": "s.TsInclusao"
 })
 .whenNotMatchedInsert(values={
     "IdCampanha": "s.IdCampanha",
     "DsNomeCampanha": "s.DsNomeCampanha",
     "DsCategoria": "s.DsCategoria",
     "DsRegua": "s.DsRegua",
     "TsInclusao": "s.TsInclusao"
 })
 .execute()
)

# %sql 
# select * from workspace.ifood_silver.TbDimCampanha
# spark.sql("DROP TABLE IF EXISTS workspace.ifood_silver.TbDimCampanha")




DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]



In [0]:
# Lê comunicações brutas
df_comm = spark.read.table("workspace.ifood_bronze.communications_base")

# %sql
# select * from workspace.ifood_bronze.communications_base
# where SubscriberKey = '1Zx8011'
# WHERE VALUE = 2
# 

# Mapeia etapas para flags e normaliza datas/canal
df_events = (
    df_comm
    .withColumn("IdSubscriber", F.col("SubscriberKey").cast(StringType()))
    .withColumn("IdDisparo", F.col("BatchID").cast(StringType()))
    .withColumn("DtDisparo", F.try_to_timestamp(F.col("data_primeira_acao")))  # ajuste formato se necessário
    .withColumn("DsNomeCampanha", F.col("nome_campanha").cast(StringType()))
    .withColumn("DsCanal", F.col("tipo_disparo").cast(StringType()))
    .withColumn("etapa", F.col("etapa").cast(StringType()))
    .select("IdSubscriber", "IdDisparo", "DtDisparo", "DsNomeCampanha", "DsCanal", "etapa")
)

# Agregação por subscriber + data + campanha + canal
agg = (
    df_events.groupBy("IdSubscriber", "IdDisparo", "DsNomeCampanha", "DsCanal")
    .agg(
        F.min("DtDisparo").alias("DtDisparo"),

        F.max(F.when(F.col("etapa") == "REQUESTED_SENT", 1).otherwise(0)).alias("FlSolicitacaodisparo"),
        F.max(F.when(F.col("etapa") == "REQUESTED_FAILED", 1).otherwise(0)).alias("FlFalhaSolicitacaodisparo"),

        F.max(F.when(F.col("etapa") == "SENT_SUCCESS", 1).otherwise(0)).alias("FlDisparo"),

        F.max(F.when(F.col("etapa") == "DELIVERED_SUCCESS", 1).otherwise(0)).alias("FlEntrega"),
        F.max(F.when(F.col("etapa") == "FAILED", 1).otherwise(0)).alias("FlFalhaEntrega"),

        F.max(F.when(F.col("etapa").isin("OPEN_SUCCESS","READ_SUCCESS"), 1).otherwise(0)).alias("FlAbertura"),

        F.max(F.when(F.col("etapa") == "CLICK_SUCCESS", 1).otherwise(0)).alias("FlClique")
    )
)

# Junta com dim campanha para obter IdCampanha
df_campanha = spark.table("workspace.ifood_silver.TbDimCampanha")

fato_disparo = (
    agg.join(df_campanha, agg.DsNomeCampanha == df_campanha.DsNomeCampanha, how="left")
    .select(
        F.col("IdSubscriber").alias("IdSubscriber"),
        F.col("IdDisparo").alias("IdDisparo"),
        F.col("DtDisparo").alias("DthrDisparo"),
        F.to_date(col("DtDisparo")).alias("DtDisparo"),
        F.col("IdCampanha").cast(IntegerType()).alias("IdCampanha"),
        F.col("DsCanal").cast(StringType()).alias("DsCanal"),
        F.col("FlSolicitacaodisparo").cast(IntegerType()).alias("FlSolicitacaodisparo"),
        F.col("FlFalhaSolicitacaodisparo").cast(IntegerType()).alias("FlFalhaSolicitacaodisparo"),
        F.col("FlDisparo").cast(IntegerType()).alias("FlDisparo"),
        F.col("FlEntrega").cast(IntegerType()).alias("FlEntrega"),
        F.col("FlFalhaEntrega").cast(IntegerType()).alias("FlFalhaEntrega"),
        F.col("FlAbertura").cast(IntegerType()).alias("FlAbertura"),
        F.col("FlClique").cast(IntegerType()).alias("FlClique"),
        F.current_timestamp().alias("TsInclusao")
    )
    .dropDuplicates(["IdSubscriber","IdDisparo"])
)

# Grava na camada Silver

# Faz o merge: cria a tabela se não existir, ou atualiza se existir

# ------------------------------------------------------------------------------------------------------------
# Depois que a tabela for criada é só comentar esse bloco
# Cria como Delta na camada Silver
fato_disparo.write.format("delta").mode("overwrite").saveAsTable("workspace.ifood_silver.TbFatoDisparo")
# ------------------------------------------------------------------------------------------------------------

delta_table = DeltaTable.forName(spark, "workspace.ifood_silver.TbFatoDisparo")

(delta_table.alias("t")
 .merge(
     fato_disparo.alias("s"),
     "t.IdSubscriber = s.IdSubscriber and t.IdDisparo = s.IdDisparo"
 )
 .whenMatchedUpdate(set={
     "DtDisparo": "s.DtDisparo",
     "DtDisparo": "s.DtDisparo",
     "IdCampanha": "s.IdCampanha",
     "DsCanal": "s.DsCanal",
     "FlSolicitacaodisparo": "s.FlSolicitacaodisparo",
     "FlFalhaSolicitacaodisparo": "s.FlFalhaSolicitacaodisparo",
     "FlDisparo": "s.FlDisparo",
     "FlEntrega": "s.FlEntrega",
     "FlFalhaEntrega": "s.FlFalhaEntrega",
     "FlAbertura": "s.FlAbertura",
     "FlClique": "s.FlClique",
     "TsInclusao": "s.TsInclusao"
 })
 .whenNotMatchedInsert(values={
     "IdSubscriber": "s.IdSubscriber",
     "IdDisparo": "s.IdDisparo",
     "DtDisparo": "s.DtDisparo",
     "DthrDisparo": "s.DthrDisparo",
     "IdCampanha": "s.IdCampanha",
     "DsCanal": "s.DsCanal",
     "FlSolicitacaodisparo": "s.FlSolicitacaodisparo",
     "FlFalhaSolicitacaodisparo": "s.FlFalhaSolicitacaodisparo",
     "FlDisparo": "s.FlDisparo",
     "FlEntrega": "s.FlEntrega",
     "FlFalhaEntrega": "s.FlFalhaEntrega",
     "FlAbertura": "s.FlAbertura",
     "FlClique": "s.FlClique",
     "TsInclusao": "s.TsInclusao"
 })
 .execute()
)


# %sql
# select * from workspace.ifood_silver.TbFatoDisparo
# where flentrega = flfalhaentrega and flfalhaentrega = 1
# and dscanal = 'WHATSAPP'

# spark.sql("DROP TABLE IF EXISTS workspace.ifood_silver.TbFatoDisparo")


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
# Criação da TbFatoConversao

df_conv_bronze = spark.read.table("workspace.ifood_bronze.conversion_base")

df_conv = (
    df_conv_bronze
    .select(
        F.to_date(F.col("dt")).alias("DtConversao"),
        F.col("SubscriberKey").cast(StringType()).alias("IdSubscriber"),
        F.col("categoria").cast(StringType()).alias("DsCategoria"),
        F.current_timestamp().alias("TsInclusao")
    )
    .dropDuplicates(["DtConversao","IdSubscriber","DsCategoria"])
)

# aplica a limpeza de nulos textuais
df_conv = normalize_nulls(df_conv)

df_conv.write.format("delta").mode("overwrite").saveAsTable("workspace.ifood_silver.TbFatoConversao")



# Grava na camada Silver

# Faz o merge: cria a tabela se não existir, ou atualiza se existir

# ------------------------------------------------------------------------------------------------------------
# Depois que a tabela for criada é só comentar esse bloco
# Cria como Delta na camada Silver
df_conv.write.format("delta").mode("overwrite").saveAsTable("workspace.ifood_silver.TbFatoConversao")
# ------------------------------------------------------------------------------------------------------------

delta_table = DeltaTable.forName(spark, "workspace.ifood_silver.TbFatoConversao")

(delta_table.alias("t")
 .merge(
     df_conv.alias("s"),
     "t.IdSubscriber = s.IdSubscriber and t.DtConversao = s.DtConversao and t.DsCategoria = s.DsCategoria"
 )
 .whenMatchedUpdate(set={
     "TsInclusao": "s.TsInclusao"
 })
 .whenNotMatchedInsert(values={
     "DtConversao": "s.DtConversao",
     "IdSubscriber": "s.IdSubscriber",
     "DsCategoria": "s.DsCategoria",
     "TsInclusao": "s.TsInclusao"
 })
 .execute()
)



# %sql 
# select * from workspace.ifood_silver.TbFatoConversao

# spark.sql("DROP TABLE IF EXISTS workspace.ifood_silver.TbFatoConversao")


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

# Camada Gold

Criação das Tabelas Consolidadas

In [0]:
%sql
-- # Criação da TbVisãoCampanhaMensal
-- %sql

CREATE OR REPLACE TABLE workspace.ifood_gold.TbVisaoCampanhaMensal AS 

select distinct

    year(to_date(dtdisparo)) as ano,
    month(to_date(dtdisparo)) as mes, 
    a.IdCampanha, 
    b.DsNomeCampanha,
    b.DsCategoria,
    b.DsRegua,
    DsCanal,
    count(distinct case when FlSolicitacaoDisparo = 1 then a.idsubscriber end) as QtSolicitacaoDisparo,
    count(distinct case when FlFalhaSolicitacaoDisparo = 1 then a.idsubscriber end) as QtFalhaSolicitacaoDisparo,
    count(distinct case when FlDisparo = 1 then a.idsubscriber end) as QtDisparo,
    count(distinct case when FlEntrega = 1 then a.idsubscriber end) as QtEntrega,
    count(distinct case when FlFalhaEntrega = 1 then a.idsubscriber end) as QtFalhaEntrega,
    count(distinct case when FlAbertura = 1 then a.idsubscriber end) as QtAbertura,
    count(distinct case when FlClique = 1 then a.idsubscriber end) as QtClique,
    count(distinct case when datediff(c.DtConversao, a.DtDisparo)<=7 then c.idsubscriber end) as QtConversao7Dias,
    count(distinct case when datediff(c.DtConversao, a.DtDisparo)<=15 then c.idsubscriber end) as QtConversao15Dias,
    count(distinct case when datediff(c.DtConversao, a.DtDisparo)<=30 then c.idsubscriber end) as QtConversao30Dias

from workspace.ifood_silver.TbFatoDisparo as a
left join workspace.ifood_silver.TbDimCampanha as b
    on a.IdCampanha = b.IdCampanha
left join workspace.ifood_silver.TbFatoConversao as c
    on a.IdSubscriber = c.IdSubscriber and b.DsCategoria = c.DsCategoria and datediff(c.DtConversao, a.DtDisparo) between 0 and 30
group by 
    year(to_date(dtdisparo)),
    month(to_date(dtdisparo)),
    a.IdCampanha, 
    b.DsNomeCampanha,
    b.DsCategoria,
    b.DsRegua,
    DsCanal

-- %sql select * from workspace.ifood_gold.TbVisaoCampanhaMensal
-- spark.sql("DROP TABLE IF EXISTS workspace.ifood_silver.TbVisaoCampanhaMensal")

num_affected_rows,num_inserted_rows


In [0]:
%sql
-- # Criação da TbVisãoCampanhaDiaria
-- %sql

CREATE OR REPLACE TABLE workspace.ifood_gold.TbVisaoCampanhaDiaria AS 

select distinct

    dtdisparo as DtDisparo, 
    a.IdCampanha, 
    b.DsNomeCampanha,
    b.DsCategoria,
    b.DsRegua,
    DsCanal,
    count(distinct case when FlSolicitacaoDisparo = 1 then a.idsubscriber end) as QtSolicitacaoDisparo,
    count(distinct case when FlFalhaSolicitacaoDisparo = 1 then a.idsubscriber end) as QtFalhaSolicitacaoDisparo,
    count(distinct case when FlDisparo = 1 then a.idsubscriber end) as QtDisparo,
    count(distinct case when FlEntrega = 1 then a.idsubscriber end) as QtEntrega,
    count(distinct case when FlFalhaEntrega = 1 then a.idsubscriber end) as QtFalhaEntrega,
    count(distinct case when FlAbertura = 1 then a.idsubscriber end) as QtAbertura,
    count(distinct case when FlClique = 1 then a.idsubscriber end) as QtClique,
    count(distinct case when datediff(c.DtConversao, a.DtDisparo)<=7 then c.idsubscriber end) as QtConversao7Dias,
    count(distinct case when datediff(c.DtConversao, a.DtDisparo)<=15 then c.idsubscriber end) as QtConversao15Dias,
    count(distinct case when datediff(c.DtConversao, a.DtDisparo)<=30 then c.idsubscriber end) as QtConversao30Dias

from workspace.ifood_silver.TbFatoDisparo as a
left join workspace.ifood_silver.TbDimCampanha as b
    on a.IdCampanha = b.IdCampanha
left join workspace.ifood_silver.TbFatoConversao as c
    on a.IdSubscriber = c.IdSubscriber and b.DsCategoria = c.DsCategoria and datediff(c.DtConversao, a.DtDisparo) between 0 and 30
group by 
    dtdisparo,
    a.IdCampanha, 
    b.DsNomeCampanha,
    b.DsCategoria,
    b.DsRegua,
    DsCanal

-- %sql select * from workspace.ifood_gold.TbVisaoCampanhaDiaria
-- spark.sql("DROP TABLE IF EXISTS workspace.ifood_silver.TbVisaoCampanhaDiaria")

num_affected_rows,num_inserted_rows


In [0]:
%sql
-- # Criação da TbVisaoMerchantMensal
-- %sql

CREATE OR REPLACE TABLE workspace.ifood_gold.TbVisaoMerchantMensal AS 

select distinct

    year(to_date(a.dtdisparo)) as ano,
    month(to_date(a.dtdisparo)) as mes, 
    a.IdCampanha, 
    b.DsNomeCampanha,
    b.DsCategoria,
    b.DsRegua,
    a.DsCanal,
    d.DsCidade,
    d.DsEstado,
    d.DsMaturidadeMercadoCidade,
    d.DsModoContrato,
    d.DsPerformanceClass,
    d.StSaturacao,
    d.FlTopRestaurantes,
    d.DsTipoComida,
    e.DsCluster,
    count(distinct case when FlSolicitacaoDisparo = 1 then a.idsubscriber end) as QtSolicitacaoDisparo,
    count(distinct case when FlFalhaSolicitacaoDisparo = 1 then a.idsubscriber end) as QtFalhaSolicitacaoDisparo,
    count(distinct case when FlDisparo = 1 then a.idsubscriber end) as QtDisparo,
    count(distinct case when FlEntrega = 1 then a.idsubscriber end) as QtEntrega,
    count(distinct case when FlFalhaEntrega = 1 then a.idsubscriber end) as QtFalhaEntrega,
    count(distinct case when FlAbertura = 1 then a.idsubscriber end) as QtAbertura,
    count(distinct case when FlClique = 1 then a.idsubscriber end) as QtClique,
    count(distinct case when datediff(c.DtConversao, a.DtDisparo)<=7 then c.idsubscriber end) as QtConversao7Dias,
    count(distinct case when datediff(c.DtConversao, a.DtDisparo)<=15 then c.idsubscriber end) as QtConversao15Dias,
    count(distinct case when datediff(c.DtConversao, a.DtDisparo)<=30 then c.idsubscriber end) as QtConversao30Dias

from workspace.ifood_silver.TbFatoDisparo as a
left join workspace.ifood_silver.TbDimCampanha as b
    on a.IdCampanha = b.IdCampanha
left join workspace.ifood_silver.TbFatoConversao as c
    on a.IdSubscriber = c.IdSubscriber and b.DsCategoria = c.DsCategoria and datediff(c.DtConversao, a.DtDisparo) between 0 and 30
left join workspace.ifood_silver.TbDimMerchant as d
    on a.IdSubscriber = d.IdSubscriber
left join workspace.ifood_silver.TbDimMerchantCluster as e
    on d.IdMerchant = e.IdMerchant

group by 
    year(to_date(dtdisparo)),
    month(to_date(dtdisparo)),
    a.IdCampanha, 
    b.DsNomeCampanha,
    b.DsCategoria,
    b.DsRegua,
    a.DsCanal,
    d.DsCidade,
    d.DsEstado,
    d.DsMaturidadeMercadoCidade,
    d.DsModoContrato,
    d.DsPerformanceClass,
    d.StSaturacao,
    d.FlTopRestaurantes,
    d.DsTipoComida,
    e.DsCluster

-- %sql select count(*) from workspace.ifood_gold.TbVisaoMerchantMensal
-- %sql select count(*) from workspace.ifood_silver.TbFatoDisparo
-- spark.sql("DROP TABLE IF EXISTS workspace.ifood_silver.TbVisaoMerchantMensal")

num_affected_rows,num_inserted_rows


In [0]:
%sql
-- # Criação da TbVisaoMerchantDiaria
-- %sql

CREATE OR REPLACE TABLE workspace.ifood_gold.TbVisaoMerchantDiaria AS 

select distinct

    a.DtDisparo,
    a.IdCampanha, 
    b.DsNomeCampanha,
    b.DsCategoria,
    b.DsRegua,
    a.DsCanal,
    d.DsCidade,
    d.DsEstado,
    d.DsMaturidadeMercadoCidade,
    d.DsModoContrato,
    d.DsPerformanceClass,
    d.StSaturacao,
    d.FlTopRestaurantes,
    d.DsTipoComida,
    e.DsCluster,
    count(distinct case when FlSolicitacaoDisparo = 1 then a.idsubscriber end) as QtSolicitacaoDisparo,
    count(distinct case when FlFalhaSolicitacaoDisparo = 1 then a.idsubscriber end) as QtFalhaSolicitacaoDisparo,
    count(distinct case when FlDisparo = 1 then a.idsubscriber end) as QtDisparo,
    count(distinct case when FlEntrega = 1 then a.idsubscriber end) as QtEntrega,
    count(distinct case when FlFalhaEntrega = 1 then a.idsubscriber end) as QtFalhaEntrega,
    count(distinct case when FlAbertura = 1 then a.idsubscriber end) as QtAbertura,
    count(distinct case when FlClique = 1 then a.idsubscriber end) as QtClique,
    count(distinct case when datediff(c.DtConversao, a.DtDisparo)<=7 then c.idsubscriber end) as QtConversao7Dias,
    count(distinct case when datediff(c.DtConversao, a.DtDisparo)<=15 then c.idsubscriber end) as QtConversao15Dias,
    count(distinct case when datediff(c.DtConversao, a.DtDisparo)<=30 then c.idsubscriber end) as QtConversao30Dias

from workspace.ifood_silver.TbFatoDisparo as a
left join workspace.ifood_silver.TbDimCampanha as b
    on a.IdCampanha = b.IdCampanha
left join workspace.ifood_silver.TbFatoConversao as c
    on a.IdSubscriber = c.IdSubscriber and b.DsCategoria = c.DsCategoria and datediff(c.DtConversao, a.DtDisparo) between 0 and 30
left join workspace.ifood_silver.TbDimMerchant as d
    on a.IdSubscriber = d.IdSubscriber
left join workspace.ifood_silver.TbDimMerchantCluster as e
    on d.IdMerchant = e.IdMerchant

group by 
    a.DtDisparo,
    a.IdCampanha, 
    b.DsNomeCampanha,
    b.DsCategoria,
    b.DsRegua,
    a.DsCanal,
    d.DsCidade,
    d.DsEstado,
    d.DsMaturidadeMercadoCidade,
    d.DsModoContrato,
    d.DsPerformanceClass,
    d.StSaturacao,
    d.FlTopRestaurantes,
    d.DsTipoComida,
    e.DsCluster

-- %sql select count(*) from workspace.ifood_gold.TbVisaoMerchantDiaria
-- %sql select count(*) from workspace.ifood_silver.TbFatoDisparo
-- spark.sql("DROP TABLE IF EXISTS workspace.ifood_silver.TbVisaoMerchantDiaria")

num_affected_rows,num_inserted_rows
