# This is a sample Jupyter Notebook

Below is an example of a code cell. 
Put your cursor into the cell and press Shift+Enter to execute it and select the next one, or click 'Run Cell' button.

Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.

To learn more about Jupyter Notebooks in PyCharm, see [help](https://www.jetbrains.com/help/pycharm/ipython-notebook-support.html).
For an overview of PyCharm, go to Help -> Learn IDE features or refer to [our documentation](https://www.jetbrains.com/help/pycharm/getting-started.html).

In [1]:
print("Hello World!")


Hello World!


In [None]:
import numpy as np

# 步骤1：定义各类型值域边界
# 保守型
conservative_bounds = {"F1": [0.4, 0.7], "F2": [0.7, 0.9], "F3": [0.4, 0.8]}

# 平衡型
balanced_bounds = {"F1": [0.5, 0.8], "F2": [0.5, 0.7], "F3": [0.5, 0.8]}
# 激进型
aggressive_bounds = {"F1": [0.7, 0.9], "F2": [0.5, 0.7], "F3": [0.7, 0.9]}

# 步骤2：生成保守型参考点（12个）
conservative = []
F1_list = np.linspace(*conservative_bounds["F1"], 4)
F2_list = np.linspace(*conservative_bounds["F2"], 3)
for f1 in F1_list:
    for f2 in F2_list:
        f3 = np.random.uniform(*conservative_bounds["F3"])
        conservative.append([f1, f2, f3])

# 步骤3：生成平衡型参考点（12个）
balanced = []
F1_list = np.linspace(*balanced_bounds["F1"], 4)
F2_list = np.linspace(*balanced_bounds["F2"], 3)
for f1 in F1_list:
    for f2 in F2_list:
        f3 = np.random.uniform(*balanced_bounds["F3"])
        balanced.append([f1, f2, f3])

# 步骤4：生成激进型参考点（6个）
aggressive = []
while len(aggressive) < 6:
    f1 = np.random.uniform(*aggressive_bounds["F1"])
    f2 = np.random.uniform(*aggressive_bounds["F2"])
    f3 = np.random.uniform(*aggressive_bounds["F3"])
    # 确保至少满足F1≥0.7 或 F3≥0.8
    if f1 >= 0.7 or f3 >= 0.8:
        aggressive.append([f1, f2, f3])

# 步骤5：合并并微调参考点
ref_points = np.array(conservative + balanced + aggressive)

# 微调：保证参考点间距≥0.1
def adjust_ref_points(ref_points, min_distance=0.1):
    adjusted = []
    for p in ref_points:
        # 检查与已调整点的距离
        while True:
            if all(np.linalg.norm(p - ap) >= min_distance for ap in adjusted):
                break
            # 微调：±0.05，且不超出值域
            p += np.random.uniform(-0.05, 0.05, size=3)
            # 限制在[0,1]
            p = np.clip(p, 0, 1)
        adjusted.append(p)
    return np.array(adjusted)

ref_points_adjusted = adjust_ref_points(ref_points)
print("最终风险分层参考点数量：", len(ref_points_adjusted))
print("参考点示例：\n", ref_points_adjusted[:5])

In [None]:
# 函数：load_price_data
# 输入：price_files: dict {code: DataFrame with columns ['date','adj_close']} 或单一 DataFrame
# 输出：prices_df: DataFrame indexed by date, columns = stock codes, values = 前复权收盘价
def load_price_data(price_files):
    # 导入库
    import pandas as pd
    import numpy as np
    # 如果传入的是单个 DataFrame（包含 code 列），将其转换为字典形式
    # 创建一个空列表用于存放每只股票的时间序列
    series_list = []
    # 遍历输入字典，将每只股票的 adj_close 转为以 date 为索引的 Series
    for code, df in price_files.items():
        # 确保日期列为 datetime 类型
        df['date'] = pd.to_datetime(df['date'])
        # 以 date 为索引并取 adj_close 列
        s = df.set_index('date')['adj_close'].rename(code)
        # 将 Series 添加到列表
        series_list.append(s)
    # 使用 outer join 将所有 Series 合并为一个 DataFrame，按日期对齐
    prices_df = pd.concat(series_list, axis=1, join='outer')
    # 返回合并后的价格矩阵（日期索引，列为股票代码）
    return prices_df


In [None]:
# 函数：align_and_fill
# 输入：prices_df (date-indexed DataFrame), trading_calendar (DatetimeIndex), fill_method ('ffill' 或 'industry_mean')
# 输出：aligned_returns_daily (n x T 日度收益率矩阵), mask_valid_stocks (布尔 Series 表示样本是否合格)
def align_and_fill(prices_df, trading_calendar, fill_method='ffill', industry_map=None):
    import pandas as pd
    import numpy as np
    # 1. 以交易日历为基准重索引价格表，确保所有股票具有相同的日期索引
    prices = prices_df.reindex(trading_calendar)
    # 2. 缺失值处理：若 fill_method == 'ffill'，用前值填充
    if fill_method == 'ffill':
        prices = prices.fillna(method='ffill')
    elif fill_method == 'industry_mean':
        # 按行业填充：对每个日期和行业计算行业均值并填充
        # industry_map: dict {code: industry_index}
        # 将行业信息映射到列
        industries = pd.Series(industry_map)
        # 对每个日期按行业分组计算均值并填充缺失
        for date in prices.index:
            row = prices.loc[date]
            # 对每个行业计算均值并替换该行业的 NaN
            for ind in industries.unique():
                cols = industries[industries == ind].index
                mean_val = row[cols].mean(skipna=True)
                prices.loc[date, cols] = row[cols].fillna(mean_val)
    # 3. 计算日度对数或简单收益率（这里用简单收益率）
    returns = prices.pct_change().fillna(0)  # 第一行用0填充
    # 4. 样本完整性检查：剔除在统计周期内存在连续缺失或总体缺失过多的股票
    # 定义阈值，例如允许最大连续缺失天数 = 5，或总体缺失比例 <= 0.05
    max_consecutive_missing = 5
    max_missing_ratio = 0.05
    valid_mask = pd.Series(True, index=returns.columns)
    for col in returns.columns:
        s = returns[col].isna()
        # 连续缺失最长长度
        if s.any():
            # 计算最长连续 True 长度
            groups = (s != s.shift()).cumsum()
            lengths = s.groupby(groups).sum()
            max_len = lengths.max() if not lengths.empty else 0
        else:
            max_len = 0
        missing_ratio = s.mean()
        if max_len > max_consecutive_missing or missing_ratio > max_missing_ratio:
            valid_mask[col] = False
    # 5. 最终返回日度收益矩阵（剔除无效股票）和有效股票掩码
    returns = returns.loc[:, valid_mask]
    return returns, valid_mask


In [None]:
# 函数：compute_monthly_excess_returns
# 输入：monthly_prices_df (date-indexed DataFrame of adj_close monthly), rf_series (monthly risk-free rate series)
# 输出：monthly_excess_returns (n x T_month)
def compute_monthly_excess_returns(monthly_prices_df, rf_series):
    import pandas as pd
    # 计算月度简单收益率
    monthly_ret = monthly_prices_df.pct_change().dropna(how='all')
    # 对齐无风险利率索引并计算超额收益
    rf = rf_series.reindex(monthly_ret.index).fillna(method='ffill')
    excess = monthly_ret.sub(rf, axis=0)
    return excess


In [None]:
# 函数：run_ff5_regression
# 输入：excess_returns (DataFrame n x T_month), ff5_df (DataFrame T_month x 5)
# 输出：beta_df (DataFrame n x 5), alpha_series (n), valid_mask (布尔 Series)
def run_ff5_regression(excess_returns, ff5_df, min_months=24):
    import statsmodels.api as sm
    import pandas as pd
    import numpy as np
    # 初始化输出容器
    betas = {}
    alphas = {}
    valid = {}
    # 对每只股票做回归
    for code in excess_returns.columns:
        y = excess_returns[code].dropna()
        # 若有效月数不足则标记为无效
        if len(y) < min_months:
            valid[code] = False
            continue
        # 对齐因子数据
        X = ff5_df.reindex(y.index)
        X = sm.add_constant(X)  # 添加截距项
        # 运行 OLS 回归
        model = sm.OLS(y.values, X.values, missing='drop')
        res = model.fit()
        # 提取系数（const, MKT, SMB, HML, RMW, CMA）
        params = res.params
        # 存储 beta（跳过 const）
        betas[code] = dict(zip(ff5_df.columns, params[1:]))
        alphas[code] = params[0]
        valid[code] = True
    # 转换为 DataFrame
    beta_df = pd.DataFrame(betas).T  # 行=code 列=因子
    alpha_series = pd.Series(alphas)
    valid_mask = pd.Series(valid)
    return beta_df, alpha_series, valid_mask


In [None]:
# 函数：standardize_and_score
# 输入：beta_df (n x 5), weights (dict or list length 5)
# 输出：S_series (n) 每只股票的综合得分
def standardize_and_score(beta_df, weights=None):
    import pandas as pd
    import numpy as np
    # 默认权重
    if weights is None:
        weights = {'MKT':0.15,'SMB':0.20,'HML':0.25,'RMW':0.25,'CMA':0.15}
    # 横截面 Z-score 标准化
    beta_std = (beta_df - beta_df.mean()) / beta_df.std(ddof=0)
    # 加权合成
    # 确保列顺序与 weights 键一致
    cols = list(weights.keys())
    w = pd.Series(weights)
    S = beta_std[cols].dot(w)
    return S


In [None]:
# 函数：compute_nav_series
# 输入：z (1D array of 0/1), returns_daily (DataFrame n x T)
# 输出：nav_series (Series length T) 组合每日净值（起始值 1）
def compute_nav_series(z, returns_daily):
    import numpy as np
    import pandas as pd
    # 选出入选股票的列名
    selected = returns_daily.columns[z.astype(bool)]
    # 计算等权组合每日收益：每日日均选股收益
    K = z.sum()
    if K == 0:
        # 若无选股，返回全 1 的净值序列
        return pd.Series(1.0, index=returns_daily.index)
    # 逐日计算组合收益率（等权）
    daily_mean = returns_daily[selected].mean(axis=1)  # 每日等权平均收益
    # 计算净值序列，起始净值 1
    # 为数值稳定性使用对数累加
    log_nav = (np.log1p(daily_mean)).cumsum()
    nav = np.exp(log_nav)
    # 在第一天之前插入起始净值 1（若需要）
    nav = nav / nav.iloc[0]  # 归一化起点为 1
    return nav


In [None]:
# 函数：compute_annualized_return
# 输入：nav_series (Series), T_days (int) 历史窗口交易日数
# 输出：annual_return (float)
def compute_annualized_return(nav_series, T_days):
    # 以起始净值 1 计算年化收益
    start = nav_series.iloc[0]
    end = nav_series.iloc[-1]
    # 若 start 非 1，按比例计算
    total_return = end / start - 1.0
    # 年化因子按 250 交易日
    annual_return = (1 + total_return) ** (250.0 / T_days) - 1.0
    return annual_return


In [None]:
# 函数：compute_max_drawdown
# 输入：nav_series (Series)
# 输出：max_dd (float) 最大回撤（0-1）
def compute_max_drawdown(nav_series):
    import numpy as np
    # 计算历史峰值
    peak = nav_series.cummax()
    # 计算回撤序列
    drawdown = (peak - nav_series) / peak
    # 返回最大回撤
    return drawdown.max()


In [None]:
# 函数：compute_f3
# 输入：z (0-1 vector), S_series (Series)
# 输出：f3 (float) 组合五因子得分
def compute_f3(z, S_series):
    import numpy as np
    selected = S_series.index[z.astype(bool)]
    K = z.sum()
    if K == 0:
        return 0.0
    return S_series[selected].mean()


In [None]:
# 函数：random_feasible_solution
# 输入：n, K_min, K_max, industry_map, M_min, alpha_max
# 输出：z (numpy array 0/1) 一个满足约束的随机解
def random_feasible_solution(n, K_min, K_max, industry_map, M_min, alpha_max):
    import numpy as np
    import random
    # industry_map: dict {index -> industry_id} 或 list length n
    # 1. 随机选择 K 在 [K_min, K_max]
    K = random.randint(K_min, K_max)
    # 2. 初始随机选择 K 个股票
    z = np.zeros(n, dtype=int)
    selected_idx = random.sample(range(n), K)
    z[selected_idx] = 1
    # 3. 检查行业覆盖数与单行业占比，若不满足则修复（简单启发式）
    # 计算行业覆盖
    import collections
    industries = [industry_map[i] for i in range(n)]
    def industry_counts(z):
        cnt = collections.Counter()
        for i, val in enumerate(z):
            if val:
                cnt[industries[i]] += 1
        return cnt
    cnt = industry_counts(z)
    # 若覆盖数 < M_min，尝试替换未覆盖行业的股票
    covered = set(cnt.keys())
    unique_inds = set(industries)
    missing_inds = list(unique_inds - covered)
    while len(covered) < M_min and missing_inds:
        ind = missing_inds.pop()
        # 从该行业随机选一只股票加入（若未超 K_max）
        candidates = [i for i, ind_i in enumerate(industries) if ind_i == ind and z[i]==0]
        if candidates and z.sum() < K_max:
            z[random.choice(candidates)] = 1
            cnt = industry_counts(z)
            covered = set(cnt.keys())
    # 单行业占比修复
    K_now = z.sum()
    for ind, c in list(cnt.items()):
        if c / max(1, K_now) > alpha_max:
            # 随机删除超额股票直到满足
            remove_num = int(c - (K_now * alpha_max))
            idxs = [i for i, ind_i in enumerate(industries) if ind_i == ind and z[i]==1]
            for _ in range(remove_num):
                if idxs:
                    z[random.choice(idxs)] = 0
                    idxs.remove(idxs[0])
                    K_now = z.sum()
    # 若删除后 K < K_min，随机补回高质量股票（此处不考虑 S_i）
    while z.sum() < K_min:
        zeros = [i for i in range(n) if z[i]==0]
        z[random.choice(zeros)] = 1
    return z
