<a href="https://colab.research.google.com/github/jpclima96/mmmsaturation_analysis/blob/main/An%C3%A1lise_de_Decl%C3%ADnio_e_Satura%C3%A7%C3%A3o_de_M%C3%ADdia.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.optimize import curve_fit, minimize
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from statsmodels.tsa.seasonal import seasonal_decompose
import statsmodels.api as sm
import statsmodels.formula.api as smf
from pygam import GAM, s
import warnings
warnings.filterwarnings('ignore')

plt.style.use('seaborn-v0_8-whitegrid')
sns.set_palette("viridis")
plt.rcParams['figure.figsize'] = (12, 8)
plt.rcParams['font.size'] = 12

np.random.seed(123)


In [None]:
def generate_synthetic_data(weeks=104):
    """
    Gera dados sintéticos para simulação de marketing mix modeling.

    Args:
        weeks: Número de semanas para gerar dados

    Returns:
        DataFrame com dados sintéticos
    """
    dates = pd.date_range(start='2023-01-01', periods=weeks, freq='W')

    channels = ['TV', 'Radio', 'SocialMedia', 'Search', 'Display']

    df = pd.DataFrame({'Date': dates})
    df['Week'] = df.index + 1

    df['Season'] = np.sin(df['Week'] * 2 * np.pi / 52) + 1

    baseline = 1000

    for channel in channels:
        base_spend = np.random.uniform(500, 2000)

        seasonal_factor = 1 + 0.3 * np.sin(df['Week'] * 2 * np.pi / 52 + np.random.uniform(0, 2*np.pi))

        trend_factor = 1 + df['Week'] / weeks * np.random.uniform(0.2, 0.5)

        noise = np.random.normal(1, 0.15, size=len(df))

        df[f'{channel}_Spend'] = base_spend * seasonal_factor * trend_factor * noise

        zero_periods = np.random.choice(weeks, size=int(weeks*0.1), replace=False)
        df.loc[zero_periods, f'{channel}_Spend'] = 0

    saturation_params = {
        'TV': {'decay': 0.7, 'shape': 0.5, 'power': 0.7},
        'Radio': {'decay': 0.5, 'shape': 0.7, 'power': 0.6},
        'SocialMedia': {'decay': 0.3, 'shape': 0.6, 'power': 0.5},
        'Search': {'decay': 0.2, 'shape': 0.8, 'power': 0.8},
        'Display': {'decay': 0.4, 'shape': 0.4, 'power': 0.4}
    }

    for channel in channels:
        df[f'{channel}_Adstock'] = apply_adstock(
            df[f'{channel}_Spend'],
            decay=saturation_params[channel]['decay']
        )

        df[f'{channel}_Response'] = hill_transform(
            df[f'{channel}_Adstock'],
            shape=saturation_params[channel]['shape'],
            ec50=df[f'{channel}_Adstock'].max() * 0.5
        )

        coefficient = np.random.uniform(100, 500)
        df[f'{channel}_Contribution'] = coefficient * df[f'{channel}_Response']

    df['Price'] = 100 + 5 * np.sin(df['Week'] * 2 * np.pi / 26) + np.random.normal(0, 2, size=len(df))
    df['Promotion'] = np.random.binomial(1, 0.2, size=len(df))

    price_effect = -10 * (df['Price'] - df['Price'].mean()) / df['Price'].std()
    promo_effect = 200 * df['Promotion']

    df['Sales'] = (
        baseline +
        df['Season'] * 200 +
        price_effect +
        promo_effect +
        df['TV_Contribution'] +
        df['Radio_Contribution'] +
        df['SocialMedia_Contribution'] +
        df['Search_Contribution'] +
        df['Display_Contribution']
    )

    df['Sales'] = df['Sales'] * np.random.normal(1, 0.05, size=len(df))

    df['Sales'] = np.maximum(df['Sales'], 0)

    return df, saturation_params

In [None]:
def apply_adstock(x, decay=0.5):
    """
    Aplica transformação de adstock (efeito carryover).

    Args:
        x: Série de valores
        decay: Taxa de decaimento (entre 0 e 1)

    Returns:
        Série com adstock aplicado
    """
    adstock = np.zeros(len(x))

    adstock[0] = x[0]

    for t in range(1, len(x)):
        adstock[t] = x[t] + decay * adstock[t-1]

    return adstock

def hill_transform(x, ec50, shape):
    """
    Aplica transformação S-shaped (Hill function) para modelar saturação.

    Args:
        x: Série de valores de adstock
        ec50: Valor de adstock que produz metade do efeito máximo
        shape: Parâmetro de forma da curva (maior = mais abrupto)

    Returns:
        Série transformada
    """
    return x**shape / (ec50**shape + x**shape)

def power_transform(x, power):
    """
    Aplica transformação de potência para modelar saturação.

    Args:
        x: Série de valores
        power: Expoente (0-1 para representar saturação)

    Returns:
        Série transformada
    """
    return np.power(x, power)

In [None]:
def fit_media_saturation_model(df, channel, apply_gam=True):
    """
    Ajusta um modelo para análise de saturação de mídia.

    Args:
        df: DataFrame com dados de marketing
        channel: Nome do canal a ser modelado
        apply_gam: Se True, usa GAM; caso contrário, usa regressão não-linear

    Returns:
        Modelo ajustado e estatísticas de ajuste
    """
    data = df.copy()
    spend_col = f'{channel}_Spend'

    train_df, test_df = train_test_split(data, test_size=0.2, random_state=42)

    if apply_gam:
        X = train_df[spend_col].values.reshape(-1, 1)
        y = train_df['Sales'].values

        gam = GAM(s(0, n_splines=10, spline_order=3))
        gam.fit(X, y)

        y_pred = gam.predict(test_df[spend_col].values.reshape(-1, 1))

        r2 = r2_score(test_df['Sales'], y_pred)
        rmse = np.sqrt(mean_squared_error(test_df['Sales'], y_pred))

        return gam, {'r2': r2, 'rmse': rmse}

    else:
        def response_function(x, a, ec50, shape):
            return a * x**shape / (ec50**shape + x**shape)

        try:
            popt, _ = curve_fit(
                response_function,
                train_df[spend_col].values,
                train_df['Sales'].values,
                bounds=([0, 0, 0], [np.inf, np.inf, 2])
            )

            y_pred = response_function(test_df[spend_col].values, *popt)

            r2 = r2_score(test_df['Sales'], y_pred)
            rmse = np.sqrt(mean_squared_error(test_df['Sales'], y_pred))

            return popt, {'r2': r2, 'rmse': rmse, 'params': {'a': popt[0], 'ec50': popt[1], 'shape': popt[2]}}

        except:
            def simple_response(x, a, power):
                return a * np.power(x, power)

            popt, _ = curve_fit(
                simple_response,
                train_df[spend_col].values,
                train_df['Sales'].values,
                bounds=([0, 0], [np.inf, 1])
            )

            y_pred = simple_response(test_df[spend_col].values, *popt)

            r2 = r2_score(test_df['Sales'], y_pred)
            rmse = np.sqrt(mean_squared_error(test_df['Sales'], y_pred))

            return popt, {'r2': r2, 'rmse': rmse, 'params': {'a': popt[0], 'power': popt[1]}}

def estimate_adstock_parameters(df, channel):
    """
    Estima parâmetros de adstock para um canal.

    Args:
        df: DataFrame com dados de marketing
        channel: Nome do canal

    Returns:
        Taxa de decaimento ótima
    """
    spend_col = f'{channel}_Spend'

    train_df, _ = train_test_split(df, test_size=0.2, random_state=42)

    def objective(decay):
        adstock = apply_adstock(train_df[spend_col].values, decay=decay[0])
        corr = np.corrcoef(adstock, train_df['Sales'].values)[0, 1]
        return -corr

    result = minimize(objective, x0=[0.5], bounds=[(0.01, 0.99)])

    return result.x[0]

def find_optimal_spend(model_params, channel_data, fixed_budget, model_type='hill'):
    """
    Encontra o nível ótimo de gasto para maximizar ROI.

    Args:
        model_params: Parâmetros do modelo ajustado
        channel_data: Dados do canal
        fixed_budget: Orçamento fixo
        model_type: Tipo de modelo ('hill' ou 'power')

    Returns:
        Ponto ótimo de gasto
    """
    if model_type == 'hill':
        a, ec50, shape = model_params

        def roi_objective(x):
            response = a * x**shape / (ec50**shape + x**shape)
            roi = response / x if x > 0 else 0
            return -roi

    else:
        a, power = model_params

        def roi_objective(x):
            response = a * np.power(x, power)
            roi = response / x if x > 0 else 0
            return -roi

    result = minimize(roi_objective, x0=[fixed_budget/2], bounds=[(0, fixed_budget)])

    return result.x[0]

def calculate_marginal_roi(model_params, spend_range, model_type='hill'):
    """
    Calcula ROI marginal para diferentes níveis de gasto.

    Args:
        model_params: Parâmetros do modelo
        spend_range: Array de valores de gasto
        model_type: Tipo de modelo ('hill' ou 'power')

    Returns:
        Array de ROI marginal
    """
    if model_type == 'hill':
        a, ec50, shape = model_params
        response = a * spend_range**shape / (ec50**shape + spend_range**shape)
    else:
        a, power = model_params
        response = a * np.power(spend_range, power)

    roi = np.zeros_like(spend_range)
    mask = spend_range > 0
    roi[mask] = response[mask] / spend_range[mask]

    if model_type == 'hill':
        derivative = a * shape * ec50**shape * spend_range**(shape-1) / (ec50**shape + spend_range**shape)**2
    else:
        derivative = a * power * np.power(spend_range, power-1)

    marginal_roi = np.zeros_like(spend_range)
    marginal_roi[mask] = derivative[mask]

    return roi, marginal_roi

In [None]:
def plot_adstock_transformation(df, channel, decay):
    """
    Visualiza a transformação de adstock.

    Args:
        df: DataFrame com dados
        channel: Nome do canal
        decay: Taxa de decaimento

    Returns:
        None (plotagem)
    """
    spend_col = f'{channel}_Spend'

    sample_df = df.iloc[:20].copy()

    adstock = apply_adstock(sample_df[spend_col].values, decay=decay)

    plt.figure(figsize=(12, 6))
    plt.plot(sample_df['Week'], sample_df[spend_col], 'b-', label='Gasto Original')
    plt.plot(sample_df['Week'], adstock, 'r-', label=f'Adstock (decay={decay:.2f})')
    plt.title(f'Transformação de Adstock para {channel}')
    plt.xlabel('Semana')
    plt.ylabel('Valor')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

def plot_saturation_curve(model_params, channel, max_spend, model_type='hill'):
    """
    Visualiza a curva de saturação.

    Args:
        model_params: Parâmetros do modelo
        channel: Nome do canal
        max_spend: Gasto máximo para visualização
        model_type: Tipo de modelo ('hill' ou 'power')

    Returns:
        None (plotagem)
    """
    spend_range = np.linspace(0, max_spend, 100)

    if model_type == 'hill':
        a, ec50, shape = model_params
        response = a * spend_range**shape / (ec50**shape + spend_range**shape)

        inflection = ec50 * ((shape-1)/(shape+1))**(1/shape) if shape > 1 else ec50

    else:
        a, power = model_params
        response = a * np.power(spend_range, power)

        inflection = None

    roi = np.zeros_like(spend_range)
    mask = spend_range > 0
    roi[mask] = response[mask] / spend_range[mask]

    roi, marginal_roi = calculate_marginal_roi(model_params, spend_range, model_type)

    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))

    ax1.plot(spend_range, response, 'b-', linewidth=2)
    ax1.set_title(f'Curva de Resposta para {channel}')
    ax1.set_xlabel('Gasto')
    ax1.set_ylabel('Resposta (Vendas)')
    if inflection is not None:
        ax1.axvline(x=inflection, color='r', linestyle='--', label=f'Ponto de Inflexão ({inflection:.2f})')
        ax1.legend()

    ax2.plot(spend_range, roi, 'g-', linewidth=2, label='ROI')
    ax2.plot(spend_range, marginal_roi, 'r--', linewidth=2, label='ROI Marginal')
    ax2.set_title(f'Análise de ROI para {channel}')
    ax2.set_xlabel('Gasto')
    ax2.set_ylabel('ROI')
    ax2.legend()

    plt.tight_layout()
    plt.show()

def plot_channel_comparison(model_results, channels, max_spend):
    """
    Compara a eficácia e saturação entre canais.

    Args:
        model_results: Dicionário com resultados do modelo por canal
        channels: Lista de canais
        max_spend: Gasto máximo para visualização

    Returns:
        None (plotagem)
    """
    spend_range = np.linspace(0, max_spend, 100)

    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 12))

    for channel in channels:
        params = model_results[channel]['params']
        model_type = 'hill' if 'shape' in params else 'power'

        if model_type == 'hill':
            a, ec50, shape = params['a'], params['ec50'], params['shape']
            response = a * spend_range**shape / (ec50**shape + spend_range**shape)
        else:
            a, power = params['a'], params['power']
            response = a * np.power(spend_range, power)

        ax1.plot(spend_range, response, linewidth=2, label=channel)

    ax1.set_title('Comparação de Curvas de Resposta por Canal')
    ax1.set_xlabel('Gasto')
    ax1.set_ylabel('Resposta (Vendas)')
    ax1.legend()

    for channel in channels:
        params = model_results[channel]['params']
        model_type = 'hill' if 'shape' in params else 'power'

        if model_type == 'hill':
            model_params = (params['a'], params['ec50'], params['shape'])
        else:
            model_params = (params['a'], params['power'])

        _, marginal_roi = calculate_marginal_roi(model_params, spend_range, model_type)
        ax2.plot(spend_range, marginal_roi, linewidth=2, label=channel)

    ax2.set_title('Comparação de ROI Marginal por Canal')
    ax2.set_xlabel('Gasto')
    ax2.set_ylabel('ROI Marginal')
    ax2.legend()

    plt.tight_layout()
    plt.show()

def plot_budget_optimization(model_results, channels, total_budget):
    """
    Visualiza a otimização de orçamento entre canais.

    Args:
        model_results: Dicionário com resultados do modelo por canal
        channels: Lista de canais
        total_budget: Orçamento total

    Returns:
        None (plotagem)
    """
    optimized_allocation = optimize_budget_allocation(model_results, channels, total_budget)

    plt.figure(figsize=(12, 8))

    plt.bar(channels, [optimized_allocation[channel] for channel in channels])

    plt.title('Alocação Otimizada de Orçamento')
    plt.xlabel('Canal')
    plt.ylabel('Orçamento Alocado')
    plt.xticks(rotation=45)

    for i, channel in enumerate(channels):
        plt.text(i, optimized_allocation[channel] + total_budget*0.01,
                 f'{optimized_allocation[channel]:.2f}',
                 ha='center')

    plt.tight_layout()
    plt.show()

def plot_time_series_analysis(df, channel):
    """
    Realiza análise de série temporal para um canal.

    Args:
        df: DataFrame com dados
        channel: Nome do canal

    Returns:
        None (plotagem)
    """
    spend_col = f'{channel}_Spend'

    try:
        result = seasonal_decompose(df[spend_col], model='additive', period=52)

        fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, figsize=(14, 12))

        result.observed.plot(ax=ax1)
        ax1.set_title(f'Série Temporal Original: {channel}')
        ax1.set_xlabel('')

        result.trend.plot(ax=ax2)
        ax2.set_title('Tendência')
        ax2.set_xlabel('')

        result.seasonal.plot(ax=ax3)
        ax3.set_title('Sazonalidade')
        ax3.set_xlabel('')

        result.resid.plot(ax=ax4)
        ax4.set_title('Resíduos')

        plt.tight_layout()
        plt.show()
    except:
        print(f"Não foi possível realizar decomposição para {channel}")

#Otimizações

In [None]:
def optimize_budget_allocation(model_results, channels, total_budget):
    """
    Otimiza a alocação de orçamento entre canais.

    Args:
        model_results: Dicionário com resultados do modelo por canal
        channels: Lista de canais
        total_budget: Orçamento total

    Returns:
        Dicionário com alocação otimizada
    """
    def objective(allocations):
        total_response = 0

        for i, channel in enumerate(channels):
            params = model_results[channel]['params']
            model_type = 'hill' if 'shape' in params else 'power'

            if model_type == 'hill':
                a, ec50, shape = params['a'], params['ec50'], params['shape']
                response = a * allocations[i]**shape / (ec50**shape + allocations[i]**shape)
            else:
                a, power = params['a'], params['power']
                response = a * np.power(allocations[i], power)

            total_response += response

        return -total_response

    def constraint(allocations):
        return total_budget - sum(allocations)

    initial_guess = [total_budget / len(channels)] * len(channels)

    bounds = [(0, total_budget)] * len(channels)

    result = minimize(
        objective,
        initial_guess,
        method='SLSQP',
        bounds=bounds,
        constraints={'type': 'eq', 'fun': constraint}
    )

    optimized_allocation = {channel: allocation for channel, allocation in zip(channels, result.x)}

    return optimized_allocation

#Função principal

In [None]:
def run_media_saturation_analysis():
    """
    Função principal para execução da análise completa.
    """
    print("Iniciando análise de saturação de mídia para Marketing Mix Modeling...")

    print("\n1. Gerando dados sintéticos...")
    df, true_params = generate_synthetic_data(weeks=104)
    print(f"- Dados gerados: {len(df)} períodos")

    channels = ['TV', 'Radio', 'SocialMedia', 'Search', 'Display']

    print("\n2. Resumo dos dados:")
    print(df[['Week', 'Sales'] + [f'{c}_Spend' for c in channels]].describe())

    print("\n3. Estimando parâmetros de adstock por canal...")
    adstock_params = {}
    for channel in channels:
        decay = estimate_adstock_parameters(df, channel)
        adstock_params[channel] = decay
        print(f"- {channel}: Taxa de decaimento estimada = {decay:.4f}")

    print("\n4. Visualizando transformação de adstock para TV...")
    plot_adstock_transformation(df, 'TV', adstock_params['TV'])

    print("\n5. Ajustando modelos de saturação por canal...")
    model_results = {}
    for channel in channels:
        print(f"\nAnalisando canal: {channel}")

        model, stats = fit_media_saturation_model(df, channel, apply_gam=False)
        model_results[channel] = stats

        print(f"- R² = {stats['r2']:.4f}")
        print(f"- RMSE = {stats['rmse']:.4f}")
        print(f"- Parâmetros: {stats['params']}")

        max_spend = df[f'{channel}_Spend'].max() * 1.5
        model_type = 'hill' if 'shape' in stats['params'] else 'power'

        if model_type == 'hill':
            model_params = (stats['params']['a'], stats['params']['ec50'], stats['params']['shape'])
        else:
            model_params = (stats['params']['a'], stats['params']['power'])

        print(f"- Visualizando curva de saturação para {channel}...")
        plot_saturation_curve(model_params, channel, max_spend, model_type)

    print("\n6. Comparando a eficácia e saturação entre canais...")
    max_spend = max([df[f'{c}_Spend'].max() for c in channels]) * 1.5
    plot_channel_comparison(model_results, channels, max_spend)

    print("\n7. Otimizando alocação de orçamento...")
    total_budget = sum([df[f'{c}_Spend'].mean() * len(df) for c in channels])
    optimized_allocation = optimize_budget_allocation(model_results, channels, total_budget)

    print(f"- Orçamento total: ${total_budget:.2f}")
    for channel in channels:
        print(f"- {channel}: ${optimized_allocation[channel]:.2f} ({optimized_allocation[channel]/total_budget*100:.1f}%)")

    plot_budget_optimization(model_results, channels, total_budget)

    print("\n8. Realizando análise de série temporal por canal...")
    for channel in channels:
        print(f"- Analisando padrões temporais para {channel}...")
        plot_time_series_analysis(df, channel)

    print("\n9. Resumo dos principais insights:")
    print("\nPontos de Saturação por Canal:")
    for channel in channels:
        model_type = 'hill' if 'shape' in model_results[channel]['params'] else 'power'
        if model_type == 'hill':
            ec50 = model_results[channel]['params']['ec50']
            shape = model_results[channel]['params']['shape']
            inflection = ec50 * ((shape-1)/(shape+1))**(1/shape) if shape > 1 else ec50
            print(f"- {channel}: Ponto de saturação estimado em ${inflection:.2f}")
        else:
            power = model_results[channel]['params']['power']
            print(f"- {channel}: Expoente de saturação = {power:.4f} (menor valor = saturação mais rápida)")

    print("\nEficiência de Canais (ROI em gasto médio):")
    for channel in channels:
        avg_spend = df[f'{channel}_Spend'].mean()
        params = model_results[channel]['params']
        model_type = 'hill' if 'shape' in params else 'power'

        if model_type == 'hill':
            a, ec50, shape = params['a'], params['ec50'], params['shape']
            response = a * avg_spend**shape / (ec50**shape + avg_spend**shape)
        else:
            a, power = params['a'], params['power']
            response = a * np.power(avg_spend, power)

        roi = response / avg_spend if avg_spend > 0 else 0
        print(f"- {channel}: ROI = {roi:.2f}")

    print("\nRecomendações de Realocação de Orçamento:")
    current_allocation = {channel: df[f'{channel}_Spend'].sum() for channel in channels}
    total_current = sum(current_allocation.values())

    for channel in channels:
        current_pct = current_allocation[channel] / total_current * 100
        optimized_pct = optimized_allocation[channel] / total_budget * 100
        change = optimized_pct - current_pct

        direction = "aumentar" if change > 0 else "reduzir"
        print(f"- {channel}: {direction} em {abs(change):.1f} pontos percentuais (de {current_pct:.1f}% para {optimized_pct:.1f}%)")

    print("\nAnálise completa! Os resultados demonstram os padrões de saturação por canal e as recomendações para otimização do orçamento.")

    return df, model_results, optimized_allocation

#Adicionais

In [None]:
def export_results_to_excel(df, model_results, optimized_allocation, filepath="mmm_saturation_results.xlsx"):
    """
    Exporta os resultados da análise para um arquivo Excel.

    Args:
        df: DataFrame com dados
        model_results: Dicionário com resultados do modelo
        optimized_allocation: Dicionário com alocação otimizada
        filepath: Caminho para salvar o arquivo

    Returns:
        None
    """
    writer = pd.ExcelWriter(filepath, engine='xlsxwriter')

    df.to_excel(writer, sheet_name='Dados', index=False)

    model_params = []
    for channel in model_results:
        params = model_results[channel]['params']
        model_type = 'hill' if 'shape' in params else 'power'

        if model_type == 'hill':
            model_params.append({
                'Canal': channel,
                'Tipo': 'Hill',
                'a': params['a'],
                'ec50': params['ec50'],
                'shape': params['shape'],
                'R²': model_results[channel]['r2'],
                'RMSE': model_results[channel]['rmse']
            })
        else:
            model_params.append({
                'Canal': channel,
                'Tipo': 'Power',
                'a': params['a'],
                'power': params['power'],
                'R²': model_results[channel]['r2'],
                'RMSE': model_results[channel]['rmse']
            })

    pd.DataFrame(model_params).to_excel(writer, sheet_name='Parâmetros', index=False)

    allocation_data = []
    channels = list(optimized_allocation.keys())
    current_allocation = {channel: df[f'{channel}_Spend'].sum() for channel in channels}
    total_current = sum(current_allocation.values())
    total_optimized = sum(optimized_allocation.values())

    for channel in channels:
        allocation_data.append({
            'Canal': channel,
            'Alocação Atual': current_allocation[channel],
            'Alocação Atual (%)': current_allocation[channel] / total_current * 100,
            'Alocação Otimizada': optimized_allocation[channel],
            'Alocação Otimizada (%)': optimized_allocation[channel] / total_optimized * 100,
            'Variação (pp)': (optimized_allocation[channel] / total_optimized - current_allocation[channel] / total_current) * 100
        })

    pd.DataFrame(allocation_data).to_excel(writer, sheet_name='Otimização', index=False)

    channels = list(model_results.keys())
    max_spend = max([df[f'{c}_Spend'].max() for c in channels]) * 1.5
    spend_range = np.linspace(0, max_spend, 100)

    response_data = pd.DataFrame({'Gasto': spend_range})
    roi_data = pd.DataFrame({'Gasto': spend_range})

    for channel in channels:
        params = model_results[channel]['params']
        model_type = 'hill' if 'shape' in params else 'power'

        if model_type == 'hill':
            a, ec50, shape = params['a'], params['ec50'], params['shape']
            response = a * spend_range**shape / (ec50**shape + spend_range**shape)
        else:
            a, power = params['a'], params['power']
            response = a * np.power(spend_range, power)

        roi, marginal_roi = calculate_marginal_roi(
            (a, ec50, shape) if model_type == 'hill' else (a, power),
            spend_range,
            model_type
        )

        response_data[channel] = response
        roi_data[f'{channel}_ROI'] = roi
        roi_data[f'{channel}_ROI_Marginal'] = marginal_roi

    response_data.to_excel(writer, sheet_name='Curvas_Resposta', index=False)
    roi_data.to_excel(writer, sheet_name='Análise_ROI', index=False)

    writer.close()

    print(f"Resultados exportados com sucesso para {filepath}")

#Criação do Dashboard

In [None]:
def create_dashboard(df, model_results, optimized_allocation):
    """
    Cria um dashboard interativo para explorar os resultados.

    Args:
        df: DataFrame com dados
        model_results: Dicionário com resultados do modelo
        optimized_allocation: Dicionário com alocação otimizada

    Returns:
        None (abre dashboard no navegador)
    """
    import plotly.graph_objects as go
    from plotly.subplots import make_subplots
    import plotly.express as px
    import dash
    from dash import dcc, html
    from dash.dependencies import Input, Output

    app = dash.Dash(__name__)

    channels = list(model_results.keys())
    max_spend = max([df[f'{c}_Spend'].max() for c in channels]) * 1.5

    app.layout = html.Div([
        html.H1("Dashboard: Análise de Saturação de Mídia"),

        html.Div([
            html.Div([
                html.H3("Selecione o Canal:"),
                dcc.Dropdown(
                    id='channel-dropdown',
                    options=[{'label': c, 'value': c} for c in channels],
                    value=channels[0]
                ),
            ], style={'width': '30%', 'display': 'inline-block'}),

            html.Div([
                html.H3("Tipo de Análise:"),
                dcc.RadioItems(
                    id='analysis-type',
                    options=[
                        {'label': 'Curva de Resposta', 'value': 'response'},
                        {'label': 'ROI', 'value': 'roi'},
                        {'label': 'Série Temporal', 'value': 'time'}
                    ],
                    value='response'
                ),
            ], style={'width': '30%', 'display': 'inline-block'}),
        ]),

        dcc.Graph(id='main-graph'),

        html.Div([
            html.H2("Alocação Otimizada de Orçamento"),
            dcc.Graph(id='budget-graph'),
        ]),

        html.Div([
            html.H2("Comparação de Canais"),
            dcc.Graph(id='channel-comparison'),
        ]),
    ])

    @app.callback(
        Output('main-graph', 'figure'),
        [Input('channel-dropdown', 'value'),
         Input('analysis-type', 'value')]
    )
    def update_graph(selected_channel, analysis_type):
        if analysis_type == 'response':
            spend_range = np.linspace(0, df[f'{selected_channel}_Spend'].max() * 1.5, 100)

            params = model_results[selected_channel]['params']
            model_type = 'hill' if 'shape' in params else 'power'

            if model_type == 'hill':
                a, ec50, shape = params['a'], params['ec50'], params['shape']
                response = a * spend_range**shape / (ec50**shape + spend_range**shape)
                title = f'Curva de Resposta para {selected_channel} (Modelo Hill)'
            else:
                a, power = params['a'], params['power']
                response = a * np.power(spend_range, power)
                title = f'Curva de Resposta para {selected_channel} (Modelo Power)'

            fig = go.Figure()
            fig.add_trace(go.Scatter(x=spend_range, y=response, mode='lines', name='Resposta'))

            avg_spend = df[f'{selected_channel}_Spend'].mean()
            if model_type == 'hill':
                avg_response = a * avg_spend**shape / (ec50**shape + avg_spend**shape)
            else:
                avg_response = a * np.power(avg_spend, power)

            fig.add_trace(go.Scatter(
                x=[avg_spend, avg_spend],
                y=[0, avg_response],
                mode='lines',
                line=dict(color='red', dash='dash'),
                name='Gasto Médio Atual'
            ))

            opt_spend = optimized_allocation[selected_channel] / len(df)
            if model_type == 'hill':
                opt_response = a * opt_spend**shape / (ec50**shape + opt_spend**shape)
            else:
                opt_response = a * np.power(opt_spend, power)

            fig.add_trace(go.Scatter(
                x=[opt_spend],
                y=[opt_response],
                mode='markers',
                marker=dict(size=10, color='green'),
                name='Gasto Ótimo Recomendado'
            ))

            fig.update_layout(
                title=title,
                xaxis_title='Gasto',
                yaxis_title='Resposta (Vendas)',
                height=600
            )

        elif analysis_type == 'roi':
            spend_range = np.linspace(0.1, df[f'{selected_channel}_Spend'].max() * 1.5, 100)

            params = model_results[selected_channel]['params']
            model_type = 'hill' if 'shape' in params else 'power'

            if model_type == 'hill':
                model_params = (params['a'], params['ec50'], params['shape'])
            else:
                model_params = (params['a'], params['power'])

            roi, marginal_roi = calculate_marginal_roi(model_params, spend_range, model_type)

            fig = make_subplots(specs=[[{"secondary_y": True}]])

            fig.add_trace(
                go.Scatter(x=spend_range, y=roi, mode='lines', name='ROI', line=dict(color='blue')),
                secondary_y=False
            )

            fig.add_trace(
                go.Scatter(x=spend_range, y=marginal_roi, mode='lines', name='ROI Marginal',
                          line=dict(color='red', dash='dash')),
                secondary_y=True
            )

            avg_spend = df[f'{selected_channel}_Spend'].mean()
            fig.add_vline(x=avg_spend, line_width=2, line_dash="dash", line_color="green",
                         annotation_text="Gasto Médio Atual")

            fig.update_layout(
                title=f'Análise de ROI para {selected_channel}',
                xaxis_title='Gasto',
                height=600
            )

            fig.update_yaxes(title_text="ROI", secondary_y=False)
            fig.update_yaxes(title_text="ROI Marginal", secondary_y=True)

        else:
            fig = make_subplots(rows=2, cols=1, shared_xaxes=True)

            fig.add_trace(
                go.Scatter(x=df['Date'], y=df[f'{selected_channel}_Spend'], mode='lines', name='Gasto'),
                row=1, col=1
            )

            fig.add_trace(
                go.Scatter(x=df['Date'], y=df['Sales'], mode='lines', name='Vendas'),
                row=2, col=1
            )

            fig.update_layout(
                title=f'Análise Temporal para {selected_channel}',
                height=600
            )

            fig.update_yaxes(title_text="Gasto", row=1, col=1)
            fig.update_yaxes(title_text="Vendas", row=2, col=1)
            fig.update_xaxes(title_text="Data", row=2, col=1)

        return fig

    @app.callback(
        Output('budget-graph', 'figure'),
        [Input('channel-dropdown', 'value')]
    )
    def update_budget_graph(selected_channel):
        current_allocation = {channel: df[f'{channel}_Spend'].sum() for channel in channels}
        total_current = sum(current_allocation.values())
        total_optimized = sum(optimized_allocation.values())

        current_pct = {channel: current_allocation[channel] / total_current * 100 for channel in channels}
        optimized_pct = {channel: optimized_allocation[channel] / total_optimized * 100 for channel in channels}

        fig = go.Figure()

        fig.add_trace(go.Bar(
            x=channels,
            y=[current_pct[channel] for channel in channels],
            name='Alocação Atual (%)',
            marker_color='blue'
        ))

        fig.add_trace(go.Bar(
            x=channels,
            y=[optimized_pct[channel] for channel in channels],
            name='Alocação Otimizada (%)',
            marker_color='green'
        ))

        fig.update_layout(
            title='Comparação: Alocação Atual vs. Otimizada',
            xaxis_title='Canal',
            yaxis_title='Porcentagem do Orçamento (%)',
            barmode='group',
            height=500
        )

        return fig

    @app.callback(
        Output('channel-comparison', 'figure'),
        [Input('analysis-type', 'value')]
    )
    def update_channel_comparison(analysis_type):
        max_spend = max([df[f'{c}_Spend'].max() for c in channels]) * 1.5
        spend_range = np.linspace(0.1, max_spend, 100)

        fig = go.Figure()

        if analysis_type == 'response':
            for channel in channels:
                params = model_results[channel]['params']
                model_type = 'hill' if 'shape' in params else 'power'

                if model_type == 'hill':
                    a, ec50, shape = params['a'], params['ec50'], params['shape']
                    response = a * spend_range**shape / (ec50**shape + spend_range**shape)
                else:
                    a, power = params['a'], params['power']
                    response = a * np.power(spend_range, power)

                fig.add_trace(go.Scatter(
                    x=spend_range, y=response, mode='lines', name=channel
                ))

            title = 'Comparação de Curvas de Resposta entre Canais'
            yaxis_title = 'Resposta (Vendas)'

        elif analysis_type == 'roi':
            for channel in channels:
                params = model_results[channel]['params']
                model_type = 'hill' if 'shape' in params else 'power'

                if model_type == 'hill':
                    model_params = (params['a'], params['ec50'], params['shape'])
                else:
                    model_params = (params['a'], params['power'])

                _, marginal_roi = calculate_marginal_roi(model_params, spend_range, model_type)

                fig.add_trace(go.Scatter(
                    x=spend_range, y=marginal_roi, mode='lines', name=channel
                ))

            title = 'Comparação de ROI Marginal entre Canais'
            yaxis_title = 'ROI Marginal'

        else:
            for channel in channels:
                fig.add_trace(go.Scatter(
                    x=df['Date'], y=df[f'{channel}_Spend'], mode='lines', name=channel
                ))

            title = 'Comparação de Gastos ao Longo do Tempo'
            yaxis_title = 'Gasto'

        fig.update_layout(
            title=title,
            xaxis_title='Gasto' if analysis_type != 'time' else 'Data',
            yaxis_title=yaxis_title,
            height=500
        )

        return fig

    app.run(debug=True)

#Execução

In [None]:
def create_dashboard(df, model_results, optimized_allocation):
    """
    Cria um dashboard interativo para explorar os resultados.

    Args:
        df: DataFrame com dados
        model_results: Dicionário com resultados do modelo
        optimized_allocation: Dicionário com alocação otimizada

    Returns:
        None (abre dashboard no navegador)
    """
    import plotly.graph_objects as go
    from plotly.subplots import make_subplots
    import plotly.express as px
    import dash
    from dash import dcc, html
    from dash.dependencies import Input, Output

    app = dash.Dash(__name__)

    channels = list(model_results.keys())
    max_spend = max([df[f'{c}_Spend'].max() for c in channels]) * 1.5

    app.layout = html.Div([
        html.H1("Dashboard: Análise de Saturação de Mídia"),

        html.Div([
            html.Div([
                html.H3("Selecione o Canal:"),
                dcc.Dropdown(
                    id='channel-dropdown',
                    options=[{'label': c, 'value': c} for c in channels],
                    value=channels[0]
                ),
            ], style={'width': '30%', 'display': 'inline-block'}),

            html.Div([
                html.H3("Tipo de Análise:"),
                dcc.RadioItems(
                    id='analysis-type',
                    options=[
                        {'label': 'Curva de Resposta', 'value': 'response'},
                        {'label': 'ROI', 'value': 'roi'},
                        {'label': 'Série Temporal', 'value': 'time'}
                    ],
                    value='response'
                ),
            ], style={'width': '30%', 'display': 'inline-block'}),
        ]),

        dcc.Graph(id='main-graph'),

        html.Div([
            html.H2("Alocação Otimizada de Orçamento"),
            dcc.Graph(id='budget-graph'),
        ]),

        html.Div([
            html.H2("Comparação de Canais"),
            dcc.Graph(id='channel-comparison'),
        ]),
    ])

    @app.callback(
        Output('main-graph', 'figure'),
        [Input('channel-dropdown', 'value'),
         Input('analysis-type', 'value')]
    )
    def update_graph(selected_channel, analysis_type):
        if analysis_type == 'response':
            spend_range = np.linspace(0, df[f'{selected_channel}_Spend'].max() * 1.5, 100)

            params = model_results[selected_channel]['params']
            model_type = 'hill' if 'shape' in params else 'power'

            if model_type == 'hill':
                a, ec50, shape = params['a'], params['ec50'], params['shape']
                response = a * spend_range**shape / (ec50**shape + spend_range**shape)
                title = f'Curva de Resposta para {selected_channel} (Modelo Hill)'
            else:
                a, power = params['a'], params['power']
                response = a * np.power(spend_range, power)
                title = f'Curva de Resposta para {selected_channel} (Modelo Power)'

            fig = go.Figure()
            fig.add_trace(go.Scatter(x=spend_range, y=response, mode='lines', name='Resposta'))

            avg_spend = df[f'{selected_channel}_Spend'].mean()
            if model_type == 'hill':
                avg_response = a * avg_spend**shape / (ec50**shape + avg_spend**shape)
            else:
                avg_response = a * np.power(avg_spend, power)

            fig.add_trace(go.Scatter(
                x=[avg_spend, avg_spend],
                y=[0, avg_response],
                mode='lines',
                line=dict(color='red', dash='dash'),
                name='Gasto Médio Atual'
            ))

            opt_spend = optimized_allocation[selected_channel] / len(df)
            if model_type == 'hill':
                opt_response = a * opt_spend**shape / (ec50**shape + opt_spend**shape)
            else:
                opt_response = a * np.power(opt_spend, power)

            fig.add_trace(go.Scatter(
                x=[opt_spend],
                y=[opt_response],
                mode='markers',
                marker=dict(size=10, color='green'),
                name='Gasto Ótimo Recomendado'
            ))

            fig.update_layout(
                title=title,
                xaxis_title='Gasto',
                yaxis_title='Resposta (Vendas)',
                height=600
            )

        elif analysis_type == 'roi':
            spend_range = np.linspace(0.1, df[f'{selected_channel}_Spend'].max() * 1.5, 100)

            params = model_results[selected_channel]['params']
            model_type = 'hill' if 'shape' in params else 'power'

            if model_type == 'hill':
                model_params = (params['a'], params['ec50'], params['shape'])
            else:
                model_params = (params['a'], params['power'])

            roi, marginal_roi = calculate_marginal_roi(model_params, spend_range, model_type)

            fig = make_subplots(specs=[[{"secondary_y": True}]])

            fig.add_trace(
                go.Scatter(x=spend_range, y=roi, mode='lines', name='ROI', line=dict(color='blue')),
                secondary_y=False
            )

            fig.add_trace(
                go.Scatter(x=spend_range, y=marginal_roi, mode='lines', name='ROI Marginal',
                          line=dict(color='red', dash='dash')),
                secondary_y=True
            )

            avg_spend = df[f'{selected_channel}_Spend'].mean()
            fig.add_vline(x=avg_spend, line_width=2, line_dash="dash", line_color="green",
                         annotation_text="Gasto Médio Atual")

            fig.update_layout(
                title=f'Análise de ROI para {selected_channel}',
                xaxis_title='Gasto',
                height=600
            )

            fig.update_yaxes(title_text="ROI", secondary_y=False)
            fig.update_yaxes(title_text="ROI Marginal", secondary_y=True)

        else:
            fig = make_subplots(rows=2, cols=1, shared_xaxes=True)

            fig.add_trace(
                go.Scatter(x=df['Date'], y=df[f'{selected_channel}_Spend'], mode='lines', name='Gasto'),
                row=1, col=1
            )

            fig.add_trace(
                go.Scatter(x=df['Date'], y=df['Sales'], mode='lines', name='Vendas'),
                row=2, col=1
            )

            fig.update_layout(
                title=f'Análise Temporal para {selected_channel}',
                height=600
            )

            fig.update_yaxes(title_text="Gasto", row=1, col=1)
            fig.update_yaxes(title_text="Vendas", row=2, col=1)
            fig.update_xaxes(title_text="Data", row=2, col=1)

        return fig

    @app.callback(
        Output('budget-graph', 'figure'),
        [Input('channel-dropdown', 'value')]
    )
    def update_budget_graph(selected_channel):
        current_allocation = {channel: df[f'{channel}_Spend'].sum() for channel in channels}
        total_current = sum(current_allocation.values())
        total_optimized = sum(optimized_allocation.values())

        current_pct = {channel: current_allocation[channel] / total_current * 100 for channel in channels}
        optimized_pct = {channel: optimized_allocation[channel] / total_optimized * 100 for channel in channels}

        fig = go.Figure()

        fig.add_trace(go.Bar(
            x=channels,
            y=[current_pct[channel] for channel in channels],
            name='Alocação Atual (%)',
            marker_color='blue'
        ))

        fig.add_trace(go.Bar(
            x=channels,
            y=[optimized_pct[channel] for channel in channels],
            name='Alocação Otimizada (%)',
            marker_color='green'
        ))

        fig.update_layout(
            title='Comparação: Alocação Atual vs. Otimizada',
            xaxis_title='Canal',
            yaxis_title='Porcentagem do Orçamento (%)',
            barmode='group',
            height=500
        )

        return fig

    @app.callback(
        Output('channel-comparison', 'figure'),
        [Input('analysis-type', 'value')]
    )
    def update_channel_comparison(analysis_type):
        max_spend = max([df[f'{c}_Spend'].max() for c in channels]) * 1.5
        spend_range = np.linspace(0.1, max_spend, 100)

        fig = go.Figure()

        if analysis_type == 'response':
            for channel in channels:
                params = model_results[channel]['params']
                model_type = 'hill' if 'shape' in params else 'power'

                if model_type == 'hill':
                    a, ec50, shape = params['a'], params['ec50'], params['shape']
                    response = a * spend_range**shape / (ec50**shape + spend_range**shape)
                else:
                    a, power = params['a'], params['power']
                    response = a * np.power(spend_range, power)

                fig.add_trace(go.Scatter(
                    x=spend_range, y=response, mode='lines', name=channel
                ))

            title = 'Comparação de Curvas de Resposta entre Canais'
            yaxis_title = 'Resposta (Vendas)'

        elif analysis_type == 'roi':
            for channel in channels:
                params = model_results[channel]['params']
                model_type = 'hill' if 'shape' in params else 'power'

                if model_type == 'hill':
                    model_params = (params['a'], params['ec50'], params['shape'])
                else:
                    model_params = (params['a'], params['power'])

                _, marginal_roi = calculate_marginal_roi(model_params, spend_range, model_type)

                fig.add_trace(go.Scatter(
                    x=spend_range, y=marginal_roi, mode='lines', name=channel
                ))

            title = 'Comparação de ROI Marginal entre Canais'
            yaxis_title = 'ROI Marginal'

        else:
            for channel in channels:
                fig.add_trace(go.Scatter(
                    x=df['Date'], y=df[f'{channel}_Spend'], mode='lines', name=channel
                ))

            title = 'Comparação de Gastos ao Longo do Tempo'
            yaxis_title = 'Gasto'

        fig.update_layout(
            title=title,
            xaxis_title='Gasto' if analysis_type != 'time' else 'Data',
            yaxis_title=yaxis_title,
            height=500
        )

        return fig

    app.run_server(debug=True)