## 1. Configuração do Ambiente

Importamos bibliotecas, iniciamos a `SparkSession` e registramos as tables.

In [None]:
# 1. Carregamento Completo e Otimizado com Pandas
# Autora: Sophia Katze de Paula – 2025-06-11

import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from pandasql import sqldf
import py7zr  

# Configurações gerais
pd.set_option('display.max_columns', None)
sns.set_theme(style='whitegrid')

#Unzip do Dataset
py7zr.SevenZipFile('arquivo.7z', mode='r').extractall(path='destino/')


# Carregar os dados (simulação de caminho para exemplo)
df_items = pd.read_csv('../data/user_transaction_items_retificado2.csv')
df_users = pd.read_csv('../data/users.csv')

# Remover espaços dos nomes das colunas
df_items.columns = df_items.columns.str.replace(' ', '_')
df_users.columns = df_users.columns.str.replace(' ', '_')

# Exibir os novos nomes das colunas
print("Colunas de df_items:")
print(df_items.columns.tolist())

print("\nColunas de df_users:")
print(df_users.columns.tolist())

KeyboardInterrupt: 

In [None]:
# Read data using SQLAlchemy and convert to Spark dataframes
users_pd = pd.read_sql_table('users', engine)
transactions_pd = pd.read_sql_table('user_transactions', engine)

users_df = spark.createDataFrame(users_pd)
transactions_df = spark.createDataFrame(transactions_pd)

# Cache the dataframes
users_df.cache()
transactions_df.cache()

# Create temporary views
users_df.createOrReplaceTempView("users")
transactions_df.createOrReplaceTempView("user_transactions")

files_to_load = [
    (r"data\users.csv", "users"),
    (r"data\user_transaction_items_retificado2.csv", "user_transactions")
]

with ThreadPoolExecutor(max_workers=2) as executor:
    dataframes = dict(zip(
        [f[1] for f in files_to_load],
        executor.map(lambda x: spark.createDataFrame(pd.read_sql_table(x[1], engine)), files_to_load)
    ))

In [None]:
# Função para executar SQL nos DataFrames
def run_sql(path):
    query = open(path).read()
    return spark.sql(query, globals=dataframes)

## 2. Desafio 1.1: Taxa de Usuários por Status

Qual a proporção de usuários ativos, onboarding, desabilitados e deletados?

In [None]:
df_status = run_sql(r"sql/01_user_status.sql")
df_status.toPandas()

**Insight:** A saúde da base é medida por X% ativos e Y% banidos, direcionando estratégias de retenção e reengajamento.

## 3. Desafio 1.2: Padrão de Compras e Sazonalidade

Como variam as compras e receita ao longo do tempo?

In [None]:
df_daily = run_sql("sql/02_daily_sales.sql").toPandas()
sns.lineplot(data=df_daily, x='dt', y='revenue')
plt.title("Receita Diária")
plt.xticks(rotation=45)
plt.show()


**Insight:** Observamos sazonalidade semanal com picos em datas específicas, indicando janelas ideais para campanhas.

## 4. Desafio 1.3: Usuários Pagantes e ARPU

Quantos usuários pagam e quanto em média gastam?

In [None]:
df_arpu = run_sql("sql/03_spend_per_user.sql").toPandas()
df_arpu


## 5. Desafio 1.4: Faturamento Mensal

Qual a performance de receita mês a mês?

In [None]:
df_monthly = run_sql("sql/04_monthly_revenue.sql").toPandas()
sns.barplot(data=df_monthly, x='month', y='revenue')
plt.title("Receita Mensal")
plt.xticks(rotation=45)
plt.show()


## 6. Desafio 2.1: Identificação da Promoção

Quando ocorreu a promoção de 85% de desconto?

In [None]:
periodo = run_sql("sql/05_promotion_period.sql").collect()
print(f"Promoção de ~85% de desconto: {periodo[0][0]} até {periodo[-1][0]}")


## 7. Desafio 2.2: Impacto da Promoção

Como a receita e o número de transações mudaram?

In [None]:
df_impact = run_sql("sql/06_promotion_impact.sql").toPandas()
df_impact
sns.barplot(data=df_impact.melt(id_vars='period', value_vars=['avg_rev','avg_tx']), x='period', y='value', hue='variable')
plt.title("Impacto da Promoção")
plt.show()


## 8. Desafio 2.3: Simulação Monte Carlo de Desconto Ideal

Executamos a simulação para recomendar desconto ideal.

In [None]:
from scripts.promo_simulation import carregar_dados_spark, identificar_periodo_promocional_spark, simulacao_monte_carlo

df_trans = carregar_dados_spark("data/user_transactions.csv")
inicio, fim = identificar_periodo_promocional_spark(df_trans, limite_desconto=0.85)
resultados = simulacao_monte_carlo(df_trans, (inicio, fim), descontos=list(range(0,91,10)), n_sim=5000)
import pandas as pd
df_sim = pd.DataFrame(resultados)
sns.lineplot(data=df_sim, x='discount_pct', y='mean_revenue', marker='o')
plt.fill_between(df_sim['discount_pct'], df_sim['revenue_5th_pct'], df_sim['revenue_95th_pct'], alpha=0.3)
plt.title("Receita vs Desconto")
plt.show()


## 9. Conclusão e Próximos Passos

- Resumo dos principais insights.
- Ações recomendadas: testes de descontos moderados, monitoramento contínuo via Spark.