## Criação das metas dinâmicas - band.com - Data Lake Vibra
### Etapa: Refined
#### Responsável: Thaís Gonçalves

| Responsável | Data Alteração | Alteração|
|---|---|---|
| Thaís Gonçalves | 04/09/2024 | Implementação |

#### Criação das metas dinâmicas para sessões, usuários visitantes, usuários cadastrados, impressões e receita programática por dia
#### band.com

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Window

# Filtra colunas da tabela do GA para otimizar o código
df_events = (
    spark.table("gold.google_analytics4_site_events")
    .filter(F.col("event_date") >= "2024-01-01")
    .select("event_date", "client_id", "ga_session_id", "user_id", "event_name")
)

# Agrupa dados da tabela do GA
df_resultados_diarios = (
    df_events.groupBy("event_date")
    .agg(
        F.countDistinct(F.concat(F.col("client_id"), F.col("ga_session_id"))).alias("sessions"),
        F.countDistinct(F.col("client_id")).alias("users_visitors"),
        (F.sum(F.when(F.col("event_name") == "page_view", 1).otherwise(0)) / 
         F.countDistinct(F.concat(F.col("client_id"), F.col("ga_session_id")))).alias("pageview_per_session")
    )
    .withColumnRenamed("event_date", "date")
)

# Carrega e filtra dados da tabela de metas
df_goals = (
    spark.table("refined.website_daily_goals")
    .filter(F.col("date") >= "2024-01-01")
    .select("date", "sessions", "users", "registered_users", "ad_impressions_prog", "ad_impressions_venda_dir", "revenue_prog")
    .groupBy("date")
    .agg(
        F.sum("sessions").alias("meta_sessions"),
        F.sum("users").alias("meta_users_visitors"),
        F.sum("registered_users").alias("meta_users_cadastrados"),
        (F.sum("ad_impressions_prog") + F.sum("ad_impressions_venda_dir")).alias("meta_impressions"),
        F.sum("revenue_prog").alias("meta_revenue_prog")
    )
)

# Carrega e filtra dados da tabela do admanager do UOL
df_admanager = (
    spark.table("gold.admanager_relatorio_fromexcel")
    .filter(F.col("date") >= "2024-01-01")
    .groupBy("date")
    .agg(
        F.sum("total_impressions").alias("impressions"),
        F.sum("total_CPM_CPC_CPD_and_vCPM_revenue").alias("receita_prog")
    )
)

# Realizar o left join entre as tabelas
df_balanco_diario = (
    df_goals
    .join(df_resultados_diarios, on="date", how="left") 
    .join(df_admanager, on="date", how="left") 
)

# Calcula as colunas de resíduos
df_balanco_diario = df_balanco_diario.withColumns({
    "residuo_sessions": F.when(F.col("sessions") > 0, F.col("sessions") - F.col("meta_sessions")).otherwise(0),
    "residuo_users_visitors": F.when(F.col("users_visitors") > 0, F.col("users_visitors") - F.col("meta_users_visitors")).otherwise(0),
    "residuo_impressions": F.when(F.col("impressions") > 0, F.col("impressions") - F.col("meta_impressions")).otherwise(0),
    "residuo_revenue_prog": F.when(F.col("receita_prog") > 0, F.col("receita_prog") - F.col("meta_revenue_prog")).otherwise(0)
})

# Adiciona uma coluna de contagem de dias do ano
df_balanco_diario = df_balanco_diario.withColumn(
    "dias_restantes_no_ano",
    F.when(F.year(F.col("date")) % 4 == 0, 366 - F.dayofyear(F.col("date")) + 1)  
    .otherwise(365 - F.dayofyear(F.col("date")) + 1)  
)

# Substituir valores nulos por 0
df_balanco_diario = df_balanco_diario.fillna({"sessions": 0, "users_visitors": 0, "impressions": 0, "receita_prog": 0, "pageview_per_session": 0})

In [0]:
from pyspark.sql import functions as F

# Filtra a primeira data em que 'sessions' é 0 
df_max_date = (
    df_balanco_diario
    .filter(F.col("sessions") == 0)  
    .orderBy(F.col("date").asc())  
    .limit(1)  
)

# Extrai o valor de 'dias_restantes_no_ano' e 'date'
primeira_data_row = df_max_date.select("dias_restantes_no_ano", "date").first()

# Verifica se a data foi encontrada
if primeira_data_row:
    dias_restantes_ano = primeira_data_row["dias_restantes_no_ano"]
    max_date = primeira_data_row["date"]
    
# Calcula a soma de cada resíduo e divide pelos dias restantes do ano
df_soma_residuos = (
    df_balanco_diario
    .agg(
        (F.sum("residuo_sessions") / dias_restantes_ano).alias("residuo_sessions_por_dia"),
        (F.sum("residuo_users_visitors") / dias_restantes_ano).alias("residuo_users_visitors_por_dia"),
        (F.sum("residuo_impressions") / dias_restantes_ano).alias("residuo_impressions_por_dia"),
        (F.sum("residuo_revenue_prog") / dias_restantes_ano).alias("residuo_revenue_prog_por_dia")
    )
)
    
# Guarda a soma de cada resíduo
soma_residuos_row = df_soma_residuos.first()
residuo_sessions_por_dia = soma_residuos_row["residuo_sessions_por_dia"]
residuo_users_visitors_por_dia = soma_residuos_row["residuo_users_visitors_por_dia"]
residuo_impressions_por_dia = soma_residuos_row["residuo_impressions_por_dia"]
residuo_revenue_prog_por_dia = soma_residuos_row["residuo_revenue_prog_por_dia"]

# Ajustaras metas quando o valor realizado for menor ou igual a 0
df_metas_ajustadas = (
    df_balanco_diario
    .withColumn(
        "ajuste_meta_sessions", 
        F.when(F.col("sessions") <= 0, F.col("meta_sessions") + residuo_sessions_por_dia)
        .otherwise(F.col("meta_sessions"))
    )
    .withColumn(
        "ajuste_meta_users_visitors", 
        F.when(F.col("users_visitors") <= 0, F.col("meta_users_visitors") + residuo_users_visitors_por_dia)
        .otherwise(F.col("meta_users_visitors"))
    )
    .withColumn(
        "ajuste_meta_impressions", 
        F.when(F.col("impressions") <= 0, F.col("meta_impressions") + residuo_impressions_por_dia)
        .otherwise(F.col("meta_impressions"))
    )
    .withColumn(
        "ajuste_meta_revenue_prog", 
        F.when(F.col("receita_prog") <= 0, F.col("meta_revenue_prog") + residuo_revenue_prog_por_dia)
        .otherwise(F.col("meta_revenue_prog"))
    )
)

# Prints
print(f"Dias restantes na primeira data com sessions = 0: {dias_restantes_ano}")
print(f"Primeira data sem dados = 0: {max_date}")
print(f"Residuo de sessões por dia: {residuo_sessions_por_dia}")
print(f"Residuo de users por dia: {residuo_users_visitors_por_dia}")
print(f"Residuo de impressões por dia: {residuo_impressions_por_dia}")
print(f"Residuo de receita por dia: {residuo_revenue_prog_por_dia}")

Dias restantes na primeira data com sessions = 0: 111
Primeira data sem dados = 0: 2024-09-12
Residuo de sessões por dia: -375607.6126126126
Residuo de users por dia: 68487.16216216216
Residuo de impressões por dia: -3173468.8198198196
Residuo de receita por dia: 4955624.408865022


In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Window

# Realiza o cálculo ajustado utilizando as variáveis de resíduos diários, aplicando o resíduo com o sinal invertido (multiplicado por -1) nas metas futuras
df_metas_ajustadas = (
    df_balanco_diario
    .withColumn(
        "ajuste_meta_sessions", 
        F.when(F.col("sessions") == 0, F.round(F.col("meta_sessions") + F.lit(residuo_sessions_por_dia) * -1).cast("int"))
        .otherwise(F.round(F.col("meta_sessions")).cast("int"))
    )
    .withColumn(
        "ajuste_meta_users_visitors", 
        F.when(F.col("sessions") == 0, F.round(F.col("meta_users_visitors") + F.lit(residuo_users_visitors_por_dia) * -1).cast("int"))
        .otherwise(F.round(F.col("meta_users_visitors")).cast("int"))
    )
    .withColumn(
        "ajuste_meta_impressions", 
        F.when(F.col("sessions") == 0, F.round(F.col("meta_impressions") + F.lit(residuo_impressions_por_dia) * -1).cast("int"))
        .otherwise(F.round(F.col("meta_impressions")).cast("int"))
    )
    .withColumn(
        "ajuste_meta_revenue_prog", 
        F.when(F.col("sessions") == 0, F.round(F.col("meta_revenue_prog") + F.lit(residuo_revenue_prog_por_dia) * -1).cast("int"))
        .otherwise(F.round(F.col("meta_revenue_prog")).cast("int"))
    )
)

# Monta tabela final
df_metas_dinamicas = df_metas_ajustadas.select(
    "date", 
    "sessions", 
    "users_visitors", 
    "impressions", 
    "receita_prog", 
    "meta_sessions", 
    "meta_users_visitors", 
    "meta_impressions", 
    "meta_revenue_prog", 
    "residuo_sessions", 
    "residuo_users_visitors", 
    "residuo_impressions", 
    "residuo_revenue_prog",
    "ajuste_meta_sessions", 
    "ajuste_meta_users_visitors", 
    "ajuste_meta_impressions", 
    "ajuste_meta_revenue_prog"
)

# Drop da tabela se já existir
spark.sql("DROP TABLE IF EXISTS refined.vw_site_daily_goals_dinamic")

# Escrever o resultado final na tabela Delta
df_metas_dinamicas.write.format("delta") \
  .mode("overwrite") \
  .saveAsTable("refined.vw_site_daily_goals_dinamic")

#### Criação das metas dinâmicas para usuários visitantes agrupados por mês
#### band.com

In [0]:
from pyspark.sql import functions as F

# Filtra e agrupa dados de users da tabela do GA por mês e ano
df_resultados_users_mensais_ano = (
    spark.table("gold.google_analytics4_site_events")
    .filter(F.col("event_date") >= "2024-01-01") 
    .groupBy(F.year("event_date").alias("year"), F.month("event_date").alias("month"))
    .agg(
        F.countDistinct(F.col("client_id")).alias("users_visitors")  
    )
)

# Carrega e filtra dados da tabela de metas por mês e ano
df_goals_mensal_ano = (
    spark.table("refined.website_daily_goals")
    .filter(F.col("date") >= "2024-01-01") 
    .groupBy(F.year("date").alias("year"), F.month("date").alias("month"))  
    .agg(
        F.sum("users").alias("meta_users_visitors")  
    )
)

# Realiza join entre as tabelas agrupadas por mês e ano
df_balanco_diario_mes_ano = (
    df_goals_mensal_ano
    .join(df_resultados_users_mensais_ano, on=["year", "month"], how="left")  
)

# Calcula as colunas de resíduos (diferença entre usuários realizados e a meta)
df_balanco_diario_mes_ano = df_balanco_diario_mes_ano.withColumn(
    "residuo_users_visitors", 
    F.when(F.col("users_visitors").isNotNull(), F.col("users_visitors") - F.col("meta_users_visitors")).otherwise(0)
)

# Adiciona uma coluna de contagem de meses restantes no ano
df_balanco_diario_mes_ano = df_balanco_diario_mes_ano.withColumn(
    "meses_restantes_no_ano",
    12 - F.col("month") 
)

# Substitui valores nulos por 0
df_balanco_diario_mes_ano = df_balanco_diario_mes_ano.fillna({"users_visitors": 0, "residuo_users_visitors": 0, "meta_users_visitors": 0})


In [0]:
df_balanco_diario_mes_ano.orderBy("year", "month").show()

+----+-----+-------------------+--------------+----------------------+----------------------+
|year|month|meta_users_visitors|users_visitors|residuo_users_visitors|meses_restantes_no_ano|
+----+-----+-------------------+--------------+----------------------+----------------------+
|2024|    1|           10505623|      10182636|               -322987|                    11|
|2024|    2|           10279348|      10328020|                 48672|                    10|
|2024|    3|           10720182|      10755480|                 35298|                     9|
|2024|    4|            9267096|       9171132|                -95964|                     8|
|2024|    5|           11763423|      11541727|               -221696|                     7|
|2024|    6|            9396019|       9201194|               -194825|                     6|
|2024|    7|           13989463|       9123911|              -4865552|                     5|
|2024|    8|           15580414|       9445366|             

In [0]:
from pyspark.sql import functions as F

# Filtra a primeira data em que 'users_visitors' é 0 
df_max_mes_ano_ = (
    df_balanco_diario_mes_ano
    .filter(F.col("users_visitors") == 0)  
    .orderBy(F.col("year").asc(), F.col("month").asc())  # Ordenar por ano e mês
    .limit(1)  
)

# Extrai o valor de 'year', 'month', e 'meses_restantes_no_ano'
primeira_mes_ano_row = df_max_mes_ano_.select("year", "month", "meses_restantes_no_ano").first()

# Verifica se a data foi encontrada
if primeira_mes_ano_row:
    max_year = primeira_mes_ano_row["year"]
    max_month = primeira_mes_ano_row["month"]
    meses_restantes_no_ano = primeira_mes_ano_row["meses_restantes_no_ano"]
else:
    max_year = None
    max_month = None
    meses_restantes_no_ano = 0  # Ajustado para 0 caso não tenha meses restantes

# Calcula a soma de cada resíduo e divide pelos meses restantes no ano
if meses_restantes_no_ano > 0:  # Verifica se ainda restam meses
    df_soma_residuos = (
        df_balanco_diario_mes_ano
        .agg(
            (F.sum("residuo_users_visitors") / meses_restantes_no_ano).alias("residuo_users_visitors_por_mes")
        )
    )
    
    # Guarda a soma de cada resíduo
    soma_residuos_row = df_soma_residuos.first()
    residuo_users_visitors_por_mes_ano = soma_residuos_row["residuo_users_visitors_por_mes"] if soma_residuos_row else 0
else:
    residuo_users_visitors_por_mes_ano = 0

# Ajusta as metas quando o valor realizado for menor ou igual a 0
df_metas_ajustadas = (
    df_balanco_diario_mes_ano  # Consistência com o DataFrame de origem
    .withColumn(
        "ajuste_meta_users_visitors", 
        F.when(F.col("users_visitors") <= 0, F.col("meta_users_visitors") + residuo_users_visitors_por_mes_ano)
        .otherwise(F.col("meta_users_visitors"))
    )
)

# Prints
print(f"Meses restantes na primeira data com users_visitors = 0: {meses_restantes_no_ano}")
print(f"Primeira data sem dados = ano: {max_year}, mês: {max_month}")
print(f"Resíduo de users por mês: {residuo_users_visitors_por_mes_ano}")


Meses restantes na primeira data com users_visitors = 0: 2
Primeira data sem dados = ano: 2024, mês: 10
Resíduo de users por mês: -11510494.0


In [0]:
from pyspark.sql import functions as F

# Filtra a primeira data em que 'sessions' é 0 
df_max_date = (
    df_balanco_diario
    .filter(F.col("sessions") == 0)  
    .orderBy(F.col("date").asc())  
    .limit(1)  
)

# Extrai o valor de 'dias_restantes_no_ano' e 'date'
primeira_data_row = df_max_date.select("dias_restantes_no_ano", "date").first()

# Verifica se a data foi encontrada
if primeira_data_row:
    dias_restantes_do_mes_ano = primeira_data_row["dias_restantes_no_ano"]
    max_date = primeira_data_row["date"]
    
# Calcula a soma de cada resíduo e divide pelos dias restantes do mes
df_soma_residuos = (
    df_balanco_diario
    .agg(
        (F.sum("residuo_users_visitors") / dias_restantes_ano).alias("residuo_users_visitors_por_dia")
    )
)
    
# Guarda a soma de cada resíduo
soma_residuos_row = df_soma_residuos.first()
residuo_users_visitors_por_mes_ano = soma_residuos_row["residuo_users_visitors_por_dia"]


# Ajustaras metas quando o valor realizado for menor ou igual a 0
df_metas_ajustadas = (
    df_balanco_diario
    .withColumn(
        "ajuste_meta_users_visitors", 
        F.when(F.col("users_visitors") <= 0, F.col("meta_users_visitors") + residuo_users_visitors_por_mes_ano)
        .otherwise(F.col("meta_users_visitors"))
    )
)

# Prints
print(f"Dias restantes na primeira data com sessions = 0: {dias_restantes_ano}")
print(f"Primeira data sem dados = 0: {max_date}")
print(f"Residuo de users por mês: {residuo_users_visitors_por_mes_ano}")


Dias restantes na primeira data com sessions = 0: 111
Primeira data sem dados = 0: 2024-09-12
Residuo de users por mês: 68487.16216216216
