In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV
from sklearn.metrics import roc_auc_score, make_scorer
from lightgbm import LGBMClassifier

# 固定随机种子
SEED = 42

# Step 1: 手动拆分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.33, stratify=y, random_state=SEED
)

# Step 2: 设置 LightGBM 基础模型
lgb_base = LGBMClassifier(
    class_weight='balanced',
    random_state=SEED,
    n_jobs=-1  # 多线程加速
)

# Step 3: 设置超参数网格（你可以进一步微调）
param_grid = {
    'n_estimators': [100, 200],
    'max_depth': [3, 5, 7],
    'learning_rate': [0.01, 0.05, 0.1],
    'subsample': [0.8, 1.0],
    'colsample_bytree': [0.8, 1.0]
}

# Step 4: 设置交叉验证器和 AUC 评分器
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED)
scorer = make_scorer(roc_auc_score, needs_proba=True)

# Step 5: 网格搜索
grid_search = GridSearchCV(
    lgb_base,
    param_grid,
    scoring=scorer,
    cv=cv,
    verbose=1,
    n_jobs=-1
)

grid_search.fit(X_train, y_train)

# Step 6: 最佳模型评估
best_model = grid_search.best_estimator_
y_prob_test = best_model.predict_proba(X_test)[:, 1]
auc_test = roc_auc_score(y_test, y_prob_test)

print("✅ Best params:", grid_search.best_params_)
print(f"✅ Test AUC with best LightGBM model: {auc_test:.4f}")


In [None]:
def fit_and_plot_sigmoid_plotly_simple(df_volume):
    df = df_volume.copy()
    df = df.sort_values('rolling_week')

    # 将 rolling_week 映射为顺序 index（只是为了横轴顺序）
    week_to_index = {week: i for i, week in enumerate(sorted(df['rolling_week'].unique()))}
    df['t'] = df['rolling_week'].map(week_to_index)

    # 所有时间点和市场份额点
    t_all = df['t'].values
    y_all = df['wineb_marketshare'].values

    try:
        popt, _ = curve_fit(sigmoid, t_all, y_all, p0=[1, 1, np.median(t_all)], maxfev=10000)
        y_pred = sigmoid(t_all, *popt)
        r2 = r2_score(y_all, y_pred)
        L, k, t0 = popt

        df['predicted'] = y_pred

        fig = go.Figure()

        fig.add_trace(go.Scatter(
            x=df['t'], y=df['wineb_marketshare'],
            mode='markers',
            name='Observed Market Share',
            marker=dict(size=5, opacity=0.4)
        ))

        fig.add_trace(go.Scatter(
            x=df['t'], y=df['predicted'],
            mode='lines',
            name='Sigmoid Fit',
            line=dict(width=2)
        ))

        fig.add_vline(
            x=t0,
            line=dict(dash='dash', color='red'),
            annotation_text=f"Tipping Point\nMS ≈ {L/2:.2f}",
            annotation_position="top right"
        )

        fig.update_layout(
            title="All Market Share Points + Sigmoid Fit",
            xaxis_title="Time Index (t)",
            yaxis_title="Market Share",
            height=500,
            width=900
        )

        fig.show()

        return {
            't0': t0,
            'L': L,
            'tipping_marketshare': L / 2,
            'r2': r2
        }

    except Exception as e:
        print(f"❌ 拟合失败: {e}")
        return None


In [None]:
def fit_sigmoid_to_all_points(df_volume):
    df = df_volume.copy()
    df = df.sort_values(['rolling_week'])

    # Step 1: 构造 X（时间点索引）和 y（market share）
    # 将 rolling_week 映射为顺序 t 值（0, 1, 2, ..., N）
    week_to_index = {week: i for i, week in enumerate(sorted(df['rolling_week'].unique()))}
    df['t'] = df['rolling_week'].map(week_to_index)

    # Step 2: 拟合 sigmoid
    t_all = df['t'].values
    y_all = df['wineb_marketshare'].values

    try:
        popt, _ = curve_fit(sigmoid, t_all, y_all, p0=[1, 1, np.median(t_all)], maxfev=10000)
        y_pred = sigmoid(t_all, *popt)
        r2 = r2_score(y_all, y_pred)
        L, k, t0 = popt

        # 转换拟合出的 t0 为 rolling_week
        inv_week_map = {v: k for k, v in week_to_index.items()}
        tipping_week = inv_week_map.get(int(round(t0)), None)

        return {
            'L': L,
            'k': k,
            't0': t0,
            'tipping_week': tipping_week,
            'r2': r2,
            'df_all': df.assign(predicted_ms=y_pred)
        }

    except Exception as e:
        print(f"❌ 拟合失败: {e}")
        return None


In [None]:
import plotly.graph_objects as go
import numpy as np

def plot_account_sigmoid(df_volume, account_id, L, k, t0):
    # 1. 取该账户的数据
    df_acct = df_volume[df_volume['account_id'] == account_id].sort_values('rolling_week').reset_index(drop=True)
    t = np.arange(len(df_acct))
    y = df_acct['wineb_marketshare'].values

    # 2. 生成拟合曲线
    def sigmoid(t, L, k, t0):
        return L / (1 + np.exp(-k * (t - t0)))

    y_pred = sigmoid(t, L, k, t0)

    # 3. 创建 Plotly 图
    fig = go.Figure()

    # 实际散点
    fig.add_trace(go.Scatter(
        x=t,
        y=y,
        mode='markers',
        name='Observed MS',
        marker=dict(size=6, opacity=0.6)
    ))

    # 拟合曲线
    fig.add_trace(go.Scatter(
        x=t,
        y=y_pred,
        mode='lines',
        name='Sigmoid Fit',
        line=dict(width=2, color='red')
    ))

    # 标注拟合参数
    fig.add_annotation(
        x=max(t) * 0.7,
        y=max(y) * 0.9,
        text=f"<b>L</b>: {L:.2f}<br><b>k</b>: {k:.2f}<br><b>t0</b>: {t0:.2f}",
        showarrow=False,
        bgcolor="white",
        bordercolor="black",
        borderwidth=1
    )

    # 可选：画出拐点位置
    fig.add_vline(
        x=t0,
        line=dict(dash='dot', color='gray'),
        annotation_text=f"t0 ≈ {t0:.1f}",
        annotation_position="top left"
    )

    fig.update_layout(
        title=f"Sigmoid Fit for Account {account_id}",
        xaxis_title="Rolling Order Index (t)",
        yaxis_title="Market Share",
        height=500,
        width=900
    )

    fig.show()


In [None]:
import plotly.graph_objects as go
import numpy as np

def plot_account_sigmoid(df_volume, account_id, L, k, t0):
    # 1. 取该账户的数据
    df_acct = df_volume[df_volume['account_id'] == account_id].sort_values('rolling_week').reset_index(drop=True)
    t = np.arange(len(df_acct))
    y = df_acct['wineb_marketshare'].values

    # 2. 生成拟合曲线
    def sigmoid(t, L, k, t0):
        return L / (1 + np.exp(-k * (t - t0)))

    y_pred = sigmoid(t, L, k, t0)

    # 3. 创建 Plotly 图
    fig = go.Figure()

    # 实际散点
    fig.add_trace(go.Scatter(
        x=t,
        y=y,
        mode='markers',
        name='Observed MS',
        marker=dict(size=6, opacity=0.6)
    ))

    # 拟合曲线
    fig.add_trace(go.Scatter(
        x=t,
        y=y_pred,
        mode='lines',
        name='Sigmoid Fit',
        line=dict(width=2, color='red')
    ))

    # 标注拟合参数
    fig.add_annotation(
        x=max(t) * 0.7,
        y=max(y) * 0.9,
        text=f"<b>L</b>: {L:.2f}<br><b>k</b>: {k:.2f}<br><b>t0</b>: {t0:.2f}",
        showarrow=False,
        bgcolor="white",
        bordercolor="black",
        borderwidth=1
    )

    # 可选：画出拐点位置
    fig.add_vline(
        x=t0,
        line=dict(dash='dot', color='gray'),
        annotation_text=f"t0 ≈ {t0:.1f}",
        annotation_position="top left"
    )

    fig.update_layout(
        title=f"Sigmoid Fit for Account {account_id}",
        xaxis_title="Rolling Order Index (t)",
        yaxis_title="Market Share",
        height=500,
        width=900
    )

    fig.show()
