Блок с оптимизацией по историческим данным

In [None]:
# Единый конфиг (словарь)

config = {
    "model": "XGBRegressor", # модель (Prophet или XGBRegressor)
    "tickers": ["SBER", "GAZP", "LKOH", "NVTK", "MGNT", "TATN", "ROSN", "RUAL", "T", "MOEX", "YDEX", "HHRU"], # тикеры "AAPL", "MSFT", "NVDA", "AMZN", "TSLA", "MA" / "SBER", "GAZP", "LKOH", "NVTK", "MGNT", "TATN"
    "bond_tickers": [],  # номера облигаций "SU29020RMFS3", "RU000A1094F2"
    "start_date": "2023-01-01",           # дата начала истории
    "end_date": "2025-05-12",             # дата конца истории
    "source": "moex",                     # выбор источника истории (moex или yfinance)
    "risk_free_rate": 0.15                # безрисковая ставка (15% для РФ / 5% для US)
}


In [None]:
# Универсальный загрузчик акций и облигаций
def get_price_data(tickers, start, end, source="yfinance", bond_tickers=None):
    if source == "yfinance": 
        price_df = get_data_yfinance(tickers, start, end)
    elif source == "moex":
        price_df = get_data_moex(tickers, start, end)
        
        # Если переданы облигации — загружаем и объединяем
        if bond_tickers:
            bond_df = get_bond_data_moex(bond_tickers, start, end)
            price_df = pd.concat([price_df, bond_df], axis=1).sort_index()
    else:
        raise ValueError("Источник должен быть 'yfinance' или 'moex'")
    
    return price_df

In [None]:
# Загружаем данные из функции выше
price_data = get_price_data(
    config["tickers"],
    config["start_date"],
    config["end_date"],
    config["source"],
    bond_tickers=config.get("bond_tickers")
)

# Проверка и очистка
if price_data.empty:
    raise ValueError("Ошибка: данные не загружены. Проверьте тикеры и даты.")

# Заполняем пропуски методом прямой подстановки
price_data = price_data.fillna(method="ffill").fillna(method="bfill") # заполнение пустоты предыдущим значением, оставшиеся пропуски просто удаляются

# Визуализация нормализованных цен
(price_data / price_data.iloc[0]).plot(figsize=(12, 6), title="Нормализованные цены акций", logy=True) # деление нужно чтобы все акции начинались с 1
plt.xlabel("Дата")
plt.ylabel("Относительная цена")
plt.grid(True)
plt.show()

In [None]:
# Логарифмическая доходность
log_returns = np.log(price_data / price_data.shift(1)).dropna()

In [None]:
# Строим корреляционную матрицу доходностей
correlation_matrix = log_returns.corr()

In [None]:
# === Назначение весов по годам ===

# Настраиваемый словарь весов по годам
year_weights = {
    2021: 0.4,
    2022: 0.2,
    2023: 0.8,
    2024: 1.0,
}

# Функция для получения веса для каждой даты
def get_weights_for_dates(dates, year_weights_dict, default_weight=1.0):
    years = dates.year  # тут dates — это сразу DatetimeIndex
    weights = pd.Series([year_weights_dict.get(y, default_weight) for y in years], index=dates)
    return weights

# Применяем веса к логарифмическим доходностям
weights_series = get_weights_for_dates(log_returns.index, year_weights)

# Сохраняем в отдельную переменную на будущее
log_returns_weights = weights_series


In [None]:
# 1. Получим список тикеров из конфига
tickers = config['tickers'] + config['bond_tickers']

# 2. Оставим только нужные активы в log_returns
log_returns_subset = log_returns[tickers]

# 3. Создадим бенчмарк — среднюю доходность портфеля
benchmark_returns = log_returns_subset.mean(axis=1)

# 4. Расчёт бета-коэффициентов
def compute_beta(returns_df, benchmark_returns):
    betas = {}
    for ticker in returns_df.columns:
        model = LinearRegression()
        model.fit(benchmark_returns.values.reshape(-1, 1), returns_df[ticker].values)
        betas[ticker] = model.coef_[0]  # это и есть β
    return pd.Series(betas)

betas_series = compute_beta(log_returns_subset, benchmark_returns)

# 5. Преобразуем в словарь для маппинга в ML
betas_dict = betas_series.to_dict()

In [None]:
# === Взвешенная средняя доходность ===
mean_daily_returns = (log_returns.mul(log_returns_weights, axis=0)).sum() / log_returns_weights.sum()

# === Взвешенная ковариационная матрица ===
# Центрируем данные
centered_returns = log_returns - mean_daily_returns

# Применяем веса
weighted_centered = centered_returns.mul(np.sqrt(log_returns_weights), axis=0)

# Взвешенная ковариация
cov_matrix = (weighted_centered.T @ weighted_centered) / log_returns_weights.sum()

In [None]:
# Кол-во активов
num_assets = len(config["tickers"] + config["bond_tickers"])

In [None]:
# Целевая функция — отрицательный коэффициент Шарпа
def neg_sharpe_ratio(weights, mean_returns, cov_matrix, risk_free_rate): # основные параметры для функции минимизации
    port_return = np.dot(weights, mean_returns) # ожидаемая средняя доходность портфеля
    port_volatility = np.sqrt(np.dot(weights.T, np.dot(cov_matrix, weights))) # стандартное отклонение доходности
    sharpe_ratio = (port_return - risk_free_rate / 252) / port_volatility # расчет коээфа Шарпа (деление на 252 нужно чтобы получить дневной процент)
    return -sharpe_ratio # отицательный коэфф Шарпа для максимизации

In [None]:
# Ограничения и условия
constraints = ({'type': 'eq', 'fun': lambda x: np.sum(x) - 1})  # сумма весов = 1
bounds = tuple((0, 1) for _ in range(num_assets))  # веса от 0 до 1
initial_guess = num_assets * [1. / num_assets]  # равные веса

In [None]:
# Оптимизация
opt_result = minimize(
    neg_sharpe_ratio, # отрицательный коэфф Шарпа
    initial_guess, # равные веса активов
    args=(mean_daily_returns, cov_matrix, config["risk_free_rate"]), # аргументы для целевой функции где мы отрицательный коэф Шарпа искали
    method="SLSQP", # метод минимизации 
    bounds=bounds, # ограничения по весам активов
    constraints=constraints # ограничение по сумме весов
)

In [None]:
# Оптимальные веса
optimal_weights = opt_result.x

In [None]:
# Оптимальная дневная доходность и риск
port_return_daily = np.dot(optimal_weights, mean_daily_returns)
port_volatility_daily = np.sqrt(np.dot(optimal_weights.T, np.dot(cov_matrix, optimal_weights)))

# Годовые метрики
port_return_annual = port_return_daily * 252
port_volatility_annual = port_volatility_daily * np.sqrt(252)

# Шарп
sharpe_ratio_annual = (port_return_annual - config["risk_free_rate"]) / port_volatility_annual

In [None]:
# Печатаем метрики
print("Оптимальные веса в портфеле:")
for ticker, weight in zip(tickers, optimal_weights):
    print(f"{ticker}: {weight:.2%}")

print(f"\nДоходность портфеля: {port_return_daily:.4f} в день, {port_return_annual:.2%} в год")
print(f"Риск портфеля (волатильность): {port_volatility_daily:.4f} в день, {port_volatility_annual:.2%} в год")
print(f"Коэффициент Шарпа (годовой): {sharpe_ratio_annual:.2f}")

In [None]:
# Исключаем активы с нулевым весом
non_zero_weights = [(ticker, weight) for ticker, weight in zip(tickers, optimal_weights) if weight > 1e-6]  # Убираем веса, близкие к нулю

# Проверяем, есть ли активы с ненулевыми весами
if non_zero_weights:
    # Разделяем тикеры и веса
    filtered_tickers, filtered_weights = zip(*non_zero_weights)

    # Построение диаграммы
    plt.figure(figsize=(8, 6))
    plt.pie(filtered_weights, labels=filtered_tickers, autopct="%1.1f%%", startangle=140)
    plt.title("Оптимальное распределение активов")
    plt.axis("equal")
    plt.show()
else:
    print("Все активы имеют нулевые веса. Нечего отображать.")

In [None]:
# Доходность портфеля с равными и оптимальными весами
equal_weights = np.repeat(1/num_assets, num_assets)

# Капитал под управлением
initial_capital = 100

# Кумулятивная доходность
portfolio_equal = (log_returns @ equal_weights).cumsum()
portfolio_optimal = (log_returns @ optimal_weights).cumsum()

plt.figure(figsize=(12, 6))
plt.plot(portfolio_equal, label="Равные веса", linestyle="--")
plt.plot(portfolio_optimal, label="Оптимальный портфель", linewidth=2)
plt.title("Кумулятивная доходность портфеля")
plt.ylabel("Суммарная лог-доходность")
plt.legend()
plt.grid(True)
plt.show()

Блок с прогнозом

In [None]:
n_lags = 30  # Количество лагов (дней), используемых для построения признаков
horizon = 30  # Горизонт прогнозирования (количество дней вперед)
expected_returns_ml = {}  # Словарь для хранения ожидаемых доходностей

In [None]:
predictions = {} # Словарь для хранения прогнозных цен

if config["model"] == "XGBRegressor":
    for ticker in price_data.columns:
        # Подготовка данных
        df = price_data[ticker].copy().reset_index()  # Преобразуем индекс в столбец
        df = df.rename(columns={"index": "Date"})  # Убедимся, что столбец называется "Date"

        # Создаем лаги
        for lag in range(1, n_lags + 1):
            df[f"lag_{lag}"] = df[ticker].shift(lag)

        # Целевая переменная — цена на следующий день
        df["target"] = df[ticker].shift(-1)
        df_model = df.dropna().reset_index(drop=True)

        # Проверяем, достаточно ли данных для обучения
        if len(df_model) < 100:
            print(f"Недостаточно данных для {ticker}, пропускаем.")
            continue

        # Сохраняем оригинальные индексы
        original_indices = df_model["Date"]

        # Разделяем данные на признаки и целевую переменную
        X = df_model[[f"lag_{i}" for i in range(1, n_lags + 1)]].copy()
        y = df_model["target"]

        # Разделяем данные на обучающую и тестовую выборки
        X_train, X_test, y_train, y_test, indices_train, indices_test = train_test_split(
            X, y, original_indices, shuffle=False, test_size=0.2
        )

        # Отбор весов по оригинальным индексам
        weights_train = log_returns_weights.loc[indices_train]

        # Обучение модели с весами
        model = XGBRegressor(n_estimators=100, learning_rate=0.1, random_state=42)
        model.fit(X_train, y_train, sample_weight=weights_train)

        # Последние данные для предсказания
        latest_data = X.iloc[[-1]]

        # Прогноз на horizon дней вперед
        future_predictions = []
        for day in range(horizon):
            predicted_price = model.predict(latest_data)[0]
            future_predictions.append(predicted_price)

            # Обновляем данные для следующего прогноза
            new_row = latest_data.to_numpy().flatten().tolist()[1:] + [predicted_price]
            columns = [f"lag_{i}" for i in range(1, n_lags + 1)]
            latest_data = pd.DataFrame([new_row], columns=columns)

        # Сохраняем прогнозы
        predictions[ticker] = future_predictions

elif config["model"] == "Prophet":
    for ticker in price_data.columns:
        # Подготовка исходных данных
        df = price_data[ticker].copy().reset_index()  # Преобразуем индекс в столбец
        df = df.rename(columns={"index": "Date", ticker: "y"}).dropna()  # Переименовываем столбцы

        # Присоединяем веса
        df["weight"] = df["Date"].map(log_returns_weights).fillna(1.0)

        if len(df) < 100:
            print(f"Недостаточно данных для {ticker}, пропускаем.")
            continue

        # Эмуляция sample_weight: дублируем строки согласно весу (с округлением)
        df["dup_count"] = (df["weight"] * 10).round().astype(int).clip(lower=1)
        df_weighted = df.loc[df.index.repeat(df["dup_count"])][["Date", "y"]]

        # Обучение модели
        model = Prophet()
        model.fit(df_weighted.rename(columns={"Date": "ds"}))  # Prophet ожидает столбец 'ds' для дат

        # Прогноз
        future = model.make_future_dataframe(periods=horizon)
        forecast = model.predict(future)

        # Сохраняем прогноз
        predicted_values = forecast[["ds", "yhat"]].tail(horizon)["yhat"].values
        predictions[ticker] = predicted_values

In [None]:
# Создаем DataFrame с прогнозными ценами закрытия
price_data_fcst = pd.DataFrame(predictions, index=pd.date_range(start=price_data.index[-1] + pd.Timedelta(days=1), periods=horizon))
price_data_fcst.index.name = "Date"

# Выводим DataFrame с прогнозами
print("Прогнозные цены закрытия:")
print(price_data_fcst.head())

In [None]:
# Объединение исторических и прогнозных данных
combined_data = pd.concat([price_data, price_data_fcst])

# Нормализация объединенных данных
normalized_combined_data = combined_data / combined_data.iloc[0]

# Фильтрация данных, начиная с 2024 года
normalized_combined_data = normalized_combined_data.loc["2024-01-01":]

# Визуализация нормализованных данных
plt.figure(figsize=(12, 6))

for ticker in normalized_combined_data.columns:
    # Полная линия для каждого тикера
    plt.plot(
        normalized_combined_data.index, 
        normalized_combined_data[ticker], 
        label=ticker
    )

# Добавляем вертикальную линию для разделения истории и прогноза
plt.axvline(x=price_data.index[-1], color="red", linestyle="--", label="Граница прогноза")

plt.title("Нормализованные исторические и прогнозные цены акций")
plt.xlabel("Дата")
plt.ylabel("Относительная цена")
plt.legend(title="Тикеры")
plt.grid(True)
plt.show()

Блок с оптимизацией портфеля с учетом прогнозных значений

In [None]:
# 2. Рассчитываем логарифмическую доходность на основе объединенных данных
log_returns_fcst = np.log(combined_data / combined_data.shift(1)).dropna()


In [None]:
# Строим корреляционную матрицу доходностей
correlation_matrix = log_returns_fcst.corr()

In [None]:
# 1. Получим список тикеров из конфига
tickers = config['tickers'] + config['bond_tickers']

# 2. Оставим только нужные активы в log_returns
log_returns_subset_fcst = log_returns_fcst[tickers]

# 3. Создадим бенчмарк — среднюю доходность портфеля
benchmark_returns_fcst = log_returns_subset_fcst.mean(axis=1)

# 4. Расчёт бета-коэффициентов
def compute_beta_fcst(returns_df_fcst, benchmark_returns_fcst):
    betas_fcst = {}
    for ticker in returns_df_fcst.columns:
        model_fcst = LinearRegression()
        model_fcst.fit(benchmark_returns_fcst.values.reshape(-1, 1), returns_df_fcst[ticker].values)
        betas_fcst[ticker] = model_fcst.coef_[0]  # это и есть β
    return pd.Series(betas_fcst)

betas_series_fcst = compute_beta_fcst(log_returns_subset_fcst, benchmark_returns_fcst)

In [None]:
# 3. Рассчитываем среднюю логарифмическую доходность и ковариационную матрицу
mean_daily_returns_fcst = log_returns_fcst.mean()  # Средняя логарифмическая доходность по каждому тикеру
cov_matrix_fcst = log_returns_fcst.cov()  # Ковариационная матрица логарифмических доходностей

In [None]:
# 4. Оптимизация портфеля через коэффициент Шарпа
def neg_sharpe_ratio_fcst(weights, expected_log_returns_fcst, cov_matrix_fcst, risk_free_rate):
    port_return_fcst = np.dot(weights, expected_log_returns_fcst)
    port_volatility_fcst = np.sqrt(np.dot(weights.T, np.dot(cov_matrix_fcst, weights)))
    sharpe_ratio_fcst = (port_return_fcst - risk_free_rate / 252) / port_volatility_fcst  # Отрицательный Шарп для минимизации
    return -sharpe_ratio_fcst  # Отрицательный коэффициент Шарпа для максимизации

In [None]:
# 5. Ограничения и условия
constraints_fcst = ({'type': 'eq', 'fun': lambda x: np.sum(x) - 1})  # сумма весов = 1
bounds_fcst = tuple((0, 1) for _ in range(num_assets))  # веса от 0.1 до 1
initial_guess_fcst = num_assets * [1. / num_assets]  # равные веса

In [None]:
# Оптимизация
opt_result_fcst = minimize(
    neg_sharpe_ratio_fcst, # отрицательный коэфф Шарпа
    initial_guess_fcst, # равные веса активов
    args=(mean_daily_returns_fcst, cov_matrix_fcst, config["risk_free_rate"]), # аргументы для целевой функции где мы отрицательный коэф Шарпа искали
    method="SLSQP", # метод минимизации 
    bounds=bounds, # ограничения по весам активов
    constraints=constraints # ограничение по сумме весов
)

In [None]:
# Оптимальные веса
optimal_weights_fcst = opt_result_fcst.x

In [None]:
# Оптимальная дневная доходность и риск
port_return_daily_fcst = np.dot(optimal_weights_fcst, mean_daily_returns_fcst)
port_volatility_daily_fcst = np.sqrt(np.dot(optimal_weights_fcst.T, np.dot(cov_matrix_fcst, optimal_weights_fcst)))

# Годовые метрики
port_return_annual_fcst = port_return_daily_fcst * 252
port_volatility_annual_fcst = port_volatility_daily_fcst * np.sqrt(252)

# Шарп
sharpe_ratio_annual_fcst = (port_return_annual_fcst - config["risk_free_rate"]) / port_volatility_annual_fcst

In [None]:
# Печатаем метрики
print("Оптимальные веса в портфеле:")
for ticker, weight in zip(tickers, optimal_weights_fcst):
    print(f"{ticker}: {weight:.2%}")

print(f"\nДоходность портфеля: {port_return_daily_fcst:.4f} в день, {port_return_annual_fcst:.2%} в год")
print(f"Риск портфеля (волатильность): {port_volatility_daily_fcst:.4f} в день, {port_volatility_annual_fcst:.2%} в год")
print(f"Коэффициент Шарпа (годовой): {sharpe_ratio_annual_fcst:.2f}")

In [None]:
# Исключаем активы с нулевым весом
non_zero_weights_fcst = [(ticker, weight) for ticker, weight in zip(tickers, optimal_weights_fcst) if weight > 1e-6]  # Убираем веса, близкие к нулю

# Проверяем, есть ли активы с ненулевыми весами
if non_zero_weights_fcst:
    # Разделяем тикеры и веса
    filtered_tickers_fcst, filtered_weights_fcst = zip(*non_zero_weights_fcst)

    # Построение диаграммы
    plt.figure(figsize=(8, 6))
    plt.pie(filtered_weights_fcst, labels=filtered_tickers_fcst, autopct="%1.1f%%", startangle=140)
    plt.title("Оптимальное распределение активов")
    plt.axis("equal")
    plt.show()
else:
    print("Все активы имеют нулевые веса. Нечего отображать.")

In [None]:
# Доходность портфеля с равными и оптимальными весами
equal_weights_fcst = np.repeat(1/num_assets, num_assets)

# Капитал под управлением
initial_capital = 100

# Кумулятивная доходность
portfolio_equal_fcst = (log_returns_fcst @ equal_weights_fcst).cumsum()
portfolio_optimal_fcst = (log_returns_fcst @ optimal_weights_fcst).cumsum()

plt.figure(figsize=(12, 6))
plt.plot(portfolio_equal_fcst, label="Равные веса", linestyle="--")
plt.plot(portfolio_optimal_fcst, label="Оптимальный портфель", linewidth=2)
plt.title("Кумулятивная доходность портфеля")
plt.ylabel("Суммарная лог-доходность")
plt.legend()
plt.grid(True)
plt.show()