diff --git a/qlib/data/_libs/expanding.pyx b/qlib/data/_libs/expanding.pyx index 76b824c947..47bc49610b 100644 --- a/qlib/data/_libs/expanding.pyx +++ b/qlib/data/_libs/expanding.pyx @@ -14,7 +14,7 @@ cdef class Expanding(object): cdef int na_count def __init__(self): self.na_count = 0 - + cdef double update(self, double val): pass @@ -25,7 +25,7 @@ cdef class Mean(Expanding): def __init__(self): super(Mean, self).__init__() self.vsum = 0 - + cdef double update(self, double val): self.barv.push_back(val) if isnan(val): @@ -62,7 +62,7 @@ cdef class Slope(Expanding): return (N*self.xy_sum - self.x_sum*self.y_sum) / \ (N*self.x2_sum - self.x_sum*self.x_sum) - + cdef class Resi(Expanding): """1-D array expanding residuals""" cdef double x_sum @@ -94,7 +94,7 @@ cdef class Resi(Expanding): interp = y_mean - slope*x_mean return val - (slope*size + interp) - + cdef class Rsquare(Expanding): """1-D array expanding rsquare""" cdef double x_sum @@ -117,7 +117,7 @@ cdef class Rsquare(Expanding): self.na_count += 1 else: self.x_sum += size - self.x2_sum += size + self.x2_sum += size * size self.y_sum += val self.y2_sum += val * val self.xy_sum += size * val @@ -126,7 +126,7 @@ cdef class Rsquare(Expanding): sqrt((N*self.x2_sum - self.x_sum*self.x_sum) * (N*self.y2_sum - self.y_sum*self.y_sum)) return rvalue * rvalue - + cdef np.ndarray[double, ndim=1] expanding(Expanding r, np.ndarray a): cdef int i cdef int N = len(a) diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 04715c8920..b8751830d9 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -5,7 +5,7 @@ import abc import bisect import logging -from typing import Union, Tuple, List +from typing import Union, Tuple, List, Iterator, Optional import pandas as pd import numpy as np @@ -113,8 +113,7 @@ def _fetch_df_by_index( CS_ALL = "__all" def _fetch_df_by_col(self, df: pd.DataFrame, col_set: str) -> pd.DataFrame: - cln = len(df.columns.levels) - if cln == 1: + if not isinstance(df.columns, pd.MultiIndex): return df elif col_set == self.CS_ALL: return df.droplevel(axis=1, level=0) @@ -126,6 +125,7 @@ def fetch( selector: Union[pd.Timestamp, slice, str], level: Union[str, int] = "datetime", col_set: Union[str, List[str]] = CS_ALL, + squeeze: bool = False ) -> pd.DataFrame: """ fetch data from underlying data source @@ -141,13 +141,22 @@ def fetch( select a set of meaningful columns.(e.g. features, columns) if isinstance(col_set, List[str]): select several sets of meaningful columns, the returned data has multiple levels + squeeze : bool + whether squeeze columns and index Returns ------- pd.DataFrame: """ df = self._fetch_df_by_index(self._data, selector, level) - return self._fetch_df_by_col(df, col_set) + df = self._fetch_df_by_col(df, col_set) + if squeeze: + # squeeze columns + df = df.squeeze() + # squeeze index + if isinstance(selector, (str, pd.Timestamp)): + df = df.reset_index(level=level, drop=True) + return df def get_cols(self, col_set=CS_ALL) -> list: """ @@ -167,6 +176,40 @@ def get_cols(self, col_set=CS_ALL) -> list: df = self._fetch_df_by_col(df, col_set) return df.columns.to_list() + def get_range_selector(self, cur_date: Union[pd.Timestamp, str], periods: int) -> slice: + """ + get range selector by number of periods + + Args: + cur_date (pd.Timestamp or str): current date + periods (int): number of periods + """ + trading_dates = self._data.index.unique(level='datetime') + cur_loc = trading_dates.get_loc(cur_date) + pre_loc = cur_loc - periods + 1 + if pre_loc < 0: + warnings.warn('`periods` is too large. the first date will be returned.') + pre_loc = 0 + ref_date = trading_dates[pre_loc] + return slice(ref_date, cur_date) + + def get_range_iterator(self, periods: int, min_periods: Optional[int] = None, + **kwargs) -> Iterator[Tuple[pd.Timestamp, pd.DataFrame]]: + """ + get a iterator of sliced data with given periods + + Args: + periods (int): number of periods + min_periods (int): minimum periods for sliced dataframe + kwargs (dict): will be passed to `self.fetch` + """ + trading_dates = self._data.index.unique(level='datetime') + if min_periods is None: + min_periods = periods + for cur_date in trading_dates[min_periods:]: + selector = self.get_range_selector(cur_date, periods) + yield cur_date, self.fetch(selector, **kwargs) + class DataHandlerLP(DataHandler): """ diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index e4f2f86199..abfc695d96 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -1,78 +1,75 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. -from abc import ABC, abstractmethod +import abc +import warnings import pandas as pd -from qlib.data import D + from typing import Tuple +from qlib.data import D -class DataLoader(ABC): - """ +class DataLoader(abc.ABC): + ''' DataLoader is designed for loading raw data from original data source. - """ - - @abstractmethod + ''' + @abc.abstractmethod def load(self, instruments, start_time=None, end_time=None) -> pd.DataFrame: """ - load the data as pd.DataFrame - - Parameters - ---------- - self : [TODO:type] - [TODO:description] - instruments : [TODO:type] - [TODO:description] - start_time : [TODO:type] - [TODO:description] - end_time : [TODO:type] - [TODO:description] - - Returns - ------- - pd.DataFrame: - data load from the under layer source + load the data as pd.DataFrame - Example of the data: - The multi-index of the columns is optional. - feature label - $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 - datetime instrument - 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 - SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 - SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289 + Parameters + ---------- + self : [TODO:type] + [TODO:description] + instruments : [TODO:type] + [TODO:description] + start_time : [TODO:type] + [TODO:description] + end_time : [TODO:type] + [TODO:description] + + Returns + ------- + pd.DataFrame: + data load from the under layer source + + Example of the data: + (The multi-index of the columns is optional.) + feature label + $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 + datetime instrument + 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 + SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 + SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289 """ pass class QlibDataLoader(DataLoader): - """Same as QlibDataLoader. The fields can be define by config""" - + '''Same as QlibDataLoader. The fields can be define by config''' def __init__(self, config: Tuple[list, tuple, dict], filter_pipe=None): """ Parameters ---------- - config : Tuple[list ,tuple, dict] + config : Tuple[list, tuple, dict] Config will be used to describe the fields and column names := { "group_name1": "group_name2": } - + or := := ["expr", ...] | (["expr", ...], ["col_name", ...]) - - Here is a few examples to describe the fields - TODO: """ - self.is_group = isinstance(config, dict) + self.is_group = isinstance(config, dict) if self.is_group: self.fields = {grp: self._parse_fields_info(fields_info) for grp, fields_info in config.items()} else: - self.fields = self._parse_fields_info(fields_info) + self.fields = self._parse_fields_info(config) self.filter_pipe = filter_pipe @@ -86,14 +83,18 @@ def _parse_fields_info(self, fields_info: Tuple[list, tuple]) -> Tuple[list, lis return exprs, names def load(self, instruments, start_time=None, end_time=None) -> pd.DataFrame: + if isinstance(instruments, str): + instruments = D.instruments(instruments, filter_pipe=self.filter_pipe) + elif self.filter_pipe is not None: + warnings.warn('`filter_pipe` is not None, but it will not be used with `instruments` as list') def _get_df(exprs, names): - df = D.features(D.instruments(instruments, filter_pipe=self.filter_pipe), exprs, start_time, end_time) + df = D.features(instruments, exprs, start_time, end_time) df.columns = names return df - if self.is_group: df = pd.concat({grp: _get_df(exprs, names) for grp, (exprs, names) in self.fields.items()}, axis=1) else: + exprs, names = self.fields df = _get_df(exprs, names) - df = df.swaplevel().sort_index() + df = df.swaplevel().sort_index() # NOTE: always return return df diff --git a/qlib/data/ops.py b/qlib/data/ops.py index 9f66a88afd..d9c657595c 100644 --- a/qlib/data/ops.py +++ b/qlib/data/ops.py @@ -8,6 +8,8 @@ import numpy as np import pandas as pd +from scipy.stats import percentileofscore + from .base import Expression, ExpressionOps from ..log import get_module_logger @@ -687,6 +689,8 @@ def _load_internal(self, instrument, start_index, end_index, freq): # isnull = series.isnull() # NOTE: isnull = NaN, inf is not null if self.N == 0: series = getattr(series.expanding(min_periods=1), self.func)() + elif 0 < self.N < 1: + series = series.ewm(alpha=self.N, min_periods=1).mean() else: series = getattr(series.rolling(self.N, min_periods=1), self.func)() # series.iloc[:self.N-1] = np.nan @@ -696,6 +700,8 @@ def _load_internal(self, instrument, start_index, end_index, freq): def get_longest_back_rolling(self): if self.N == 0: return np.inf + if 0 < self.N < 1: + return int(np.log(1e-6) / np.log(1 - self.N)) # (1 - N)**window == 1e-6 return self.feature.get_longest_back_rolling() + self.N - 1 def get_extended_window_size(self): @@ -704,6 +710,11 @@ def get_extended_window_size(self): # remove such support for N == 0? get_module_logger(self.__class__.__name__).warning("The Rolling(ATTR, 0) will not be accurately calculated") return self.feature.get_extended_window_size() + elif 0 < self.N < 1: + lft_etd, rght_etd = self.feature.get_extended_window_size() + size = int(np.log(1e-6) / np.log(1 - self.N)) + lft_etd = max(lft_etd + size - 1, lft_etd) + return lft_etd, rght_etd else: lft_etd, rght_etd = self.feature.get_extended_window_size() lft_etd = max(lft_etd + self.N - 1, lft_etd) @@ -1087,7 +1098,7 @@ def rank(x): x1 = x[~np.isnan(x)] if x1.shape[0] == 0: return np.nan - return (x1.argsort()[-1] + 1) / len(x1) + return percentileofscore(x1, x1[-1]) / len(x1) if self.N == 0: series = series.expanding(min_periods=1).apply(rank, raw=True) @@ -1273,7 +1284,7 @@ class EMA(Rolling): ---------- feature : Expression feature instance - N : int + N : int, float rolling window size Returns @@ -1296,6 +1307,8 @@ def exp_weighted_mean(x): if self.N == 0: series = series.expanding(min_periods=1).apply(exp_weighted_mean, raw=True) + elif 0 < self.N < 1: + series = series.ewm(alpha=self.N, min_periods=1).mean() else: series = series.ewm(span=self.N, min_periods=1).mean() return series diff --git a/qlib/model/riskmodel.py b/qlib/model/riskmodel.py new file mode 100644 index 0000000000..e63b8d4a21 --- /dev/null +++ b/qlib/model/riskmodel.py @@ -0,0 +1,455 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import warnings +import numpy as np +import pandas as pd + +from typing import Union + +from qlib.model.base import BaseModel + + +class RiskModel(BaseModel): + """Risk Model + + A risk model is used to estimate the covariance matrix of stock returns. + """ + + MASK_NAN = 'mask' + FILL_NAN = 'fill' + IGNORE_NAN = 'ignore' + + def __init__(self, nan_option: str = 'ignore', assume_centered: bool = False, scale_return: bool = True): + """ + Args: + nan_option (str): nan handling option (`ignore`/`mask`/`fill`) + assume_centered (bool): whether the data is assumed to be centered + scale_return (bool): whether scale returns as percentage + """ + # nan + assert nan_option in [self.MASK_NAN, self.FILL_NAN, self.IGNORE_NAN], \ + f'`nan_option={nan_option}` is not supported' + self.nan_option = nan_option + + self.assume_centered = assume_centered + self.scale_return = scale_return + + def predict(self, X: Union[pd.Series, pd.DataFrame, np.ndarray], + return_corr: bool = False, is_price: bool = True) -> Union[pd.DataFrame, np.ndarray]: + """ + Args: + X (pd.Series, pd.DataFrame or np.ndarray): data from which to estimate the covariance, + with variables as columns and observations as rows. + return_corr (bool): whether return the correlation matrix + is_price (bool): whether `X` contains price (if not assume stock returns) + + Returns: + pd.DataFrame or np.ndarray: estimated covariance (or correlation) + """ + # transform input into 2D array + if not isinstance(X, (pd.Series, pd.DataFrame)): + columns = None + else: + if isinstance(X.index, pd.MultiIndex): + if isinstance(X, pd.DataFrame): + X = X.iloc[:, 0].unstack(level='instrument') # always use the first column + else: + X = X.unstack(level='instrument') + else: + # X is 2D DataFrame + pass + columns = X.columns # will be used to restore dataframe + X = X.values + + # calculate pct_change + if is_price: + X = X[1:] / X[:-1] - 1 # NOTE: resulting `n - 1` rows + + # scale return + if self.scale_return: + X *= 100 + + # handle nan and centered + X = self._preprocess(X) + + # estimate covariance + S = self._predict(X) + + # return correlation if needed + if return_corr: + vola = np.sqrt(np.diag(S)) + corr = S / np.outer(vola, vola) + if columns is None: + return corr + return pd.DataFrame(corr, index=columns, columns=columns) + + # return covariance + if columns is None: + return S + return pd.DataFrame(S, index=columns, columns=columns) + + def _predict(self, X: np.ndarray) -> np.ndarray: + """covariance estimation implementation + + This method should be overridden by child classes. + + By default, this method implements the empirical covariance estimation. + + Args: + X (np.ndarray): data matrix containing multiple variables (columns) and observations (rows) + + Returns: + np.ndarray: covariance matrix + """ + xTx = np.asarray(X.T.dot(X)) + N = len(X) + if isinstance(X, np.ma.MaskedArray): + M = 1 - X.mask + N = M.T.dot(M) # each pair has distinct number of samples + return xTx / N + + def _preprocess(self, X: np.ndarray) -> Union[np.ndarray, np.ma.MaskedArray]: + """handle nan and centerize data + + Note: + if `nan_option='mask'` then the returned array will be `np.ma.MaskedArray` + """ + # handle nan + if self.nan_option == self.FILL_NAN: + X = np.nan_to_num(X) + elif self.nan_option == self.MASK_NAN: + X = np.ma.masked_invalid(X) + # centerize + if not self.assume_centered: + X = X - np.nanmean(X, axis=0) + return X + + +class ShrinkCovEstimator(RiskModel): + """Shrinkage Covariance Estimator + + This estimator will shrink the sample covariance matrix towards + an identify matrix: + S_hat = (1 - alpha) * S + alpha * F + where `alpha` is the shrink parameter and `F` is the shrinking target. + + The following shrinking parameters (`alpha`) are supported: + - `lw` [1][2][3]: use Ledoit-Wolf shrinking parameter + - `oas` [4]: use Oracle Approximating Shrinkage shrinking parameter + - float: directly specify the shrink parameter, should be between [0, 1] + + The following shrinking targets (`F`) are supported: + - `const_var` [1][4][5]: assume stocks have the same constant variance and zero correlation + - `const_corr` [2][6]: assume stocks have different variance but equal correlation + - `single_factor` [3][7]: assume single factor model as the shrinking target + - np.ndarray: provide the shrinking targets directly + + Note: + - The optimal shrinking parameter depends on the selection of the shrinking target. + Currently, `oas` is not supported for `const_corr` and `single_factor`. + - Remember to set `nan_option` to `fill` or `mask` if your data has missing values. + + References: + [1] Ledoit, O., & Wolf, M. (2004). A well-conditioned estimator for large-dimensional covariance matrices. + Journal of Multivariate Analysis, 88(2), 365–411. https://doi.org/10.1016/S0047-259X(03)00096-4 + [2] Ledoit, O., & Wolf, M. (2004). Honey, I shrunk the sample covariance matrix. + Journal of Portfolio Management, 30(4), 1–22. https://doi.org/10.3905/jpm.2004.110 + [3] Ledoit, O., & Wolf, M. (2003). Improved estimation of the covariance matrix of stock returns + with an application to portfolio selection. + Journal of Empirical Finance, 10(5), 603–621. https://doi.org/10.1016/S0927-5398(03)00007-0 + [4] Chen, Y., Wiesel, A., Eldar, Y. C., & Hero, A. O. (2010). Shrinkage algorithms for MMSE covariance estimation. + IEEE Transactions on Signal Processing, 58(10), 5016–5029. https://doi.org/10.1109/TSP.2010.2053029 + [5] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-0000-00007f64e5b9/cov1para.m.zip + [6] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-ffff-ffffde5e2d4e/covCor.m.zip + [7] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-0000-0000648dfc98/covMarket.m.zip + """ + + SHR_LW = 'lw' + SHR_OAS = 'oas' + + TGT_CONST_VAR = 'const_var' + TGT_CONST_CORR = 'const_corr' + TGT_SINGLE_FACTOR = 'single_factor' + + def __init__(self, alpha: Union[str, float] = 0.0, target: Union[str, np.ndarray] = 'const_var', **kwargs): + """ + Args: + alpha (str or float): shrinking parameter or estimator (`lw`/`oas`) + target (str or np.ndarray): shrinking target (`const_var`/`const_corr`/`single_factor`) + kwargs: see `RiskModel` for more information + """ + super().__init__(**kwargs) + + # alpha + if isinstance(alpha, str): + assert alpha in [self.SHR_LW, self.SHR_OAS], \ + f'shrinking method `{alpha}` is not supported' + elif isinstance(alpha, (float, np.floating)): + assert 0 <= alpha <= 1, 'alpha should be between [0, 1]' + else: + raise TypeError('invalid argument type for `alpha`') + self.alpha = alpha + + # target + if isinstance(target, str): + assert target in [self.TGT_CONST_VAR, self.TGT_CONST_CORR, self.TGT_SINGLE_FACTOR], \ + f'shrinking target `{target} is not supported' + elif isinstance(target, np.ndarray): + pass + else: + raise TypeError('invalid argument type for `target`') + if alpha == self.SHR_OAS and target != self.TGT_CONST_VAR: + raise NotImplementedError('currently `oas` can only support `const_var` as target') + self.target = target + + def _predict(self, X: np.ndarray) -> np.ndarray: + # sample covariance + S = super()._predict(X) + + # shrinking target + F = self._get_shrink_target(X, S) + + # get shrinking parameter + alpha = self._get_shrink_param(X, S, F) + + # shrink covariance + if alpha > 0: + S *= (1 - alpha) + F *= alpha + S += F + + return S + + def _get_shrink_target(self, X: np.ndarray, S: np.ndarray) -> np.ndarray: + """get shrinking target `F`""" + if self.target == self.TGT_CONST_VAR: + return self._get_shrink_target_const_var(X, S) + if self.target == self.TGT_CONST_CORR: + return self._get_shrink_target_const_corr(X, S) + if self.target == self.TGT_SINGLE_FACTOR: + return self._get_shrink_target_single_factor(X, S) + + def _get_shrink_target_const_var(self, X: np.ndarray, S: np.ndarray) -> np.ndarray: + """get shrinking target with constant variance + + This target assumes zero pair-wise correlation and constant variance. + The constant variance is estimated by averaging all sample's variances. + """ + n = len(S) + F = np.eye(n) + np.fill_diagonal(F, np.mean(np.diag(S))) + return F + + def _get_shrink_target_const_corr(self, X: np.ndarray, S: np.ndarray) -> np.ndarray: + """get shrinking target with constant correlation + + This target assumes constant pair-wise correlation but keep the sample variance. + The constant correlation is estimated by averaging all pairwise correlations. + """ + n = len(S) + var = np.diag(S) + sqrt_var = np.sqrt(var) + covar = np.outer(sqrt_var, sqrt_var) + r_bar = (np.sum(S / covar) - n) / (n * (n - 1)) + F = r_bar * covar + np.fill_diagonal(F, var) + return F + + def _get_shrink_target_single_factor(self, X: np.ndarray, S: np.ndarray) -> np.ndarray: + """get shrinking target with single factor model""" + X_mkt = np.nanmean(X, axis=1) + cov_mkt = np.asarray(X.T.dot(X_mkt) / len(X)) + var_mkt = np.asarray(X_mkt.dot(X_mkt) / len(X)) + F = np.outer(cov_mkt, cov_mkt) / var_mkt + np.fill_diagonal(F, np.diag(S)) + return F + + def _get_shrink_param(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float: + """get shrinking parameter `alpha` + + Note: + The Ledoit-Wolf shrinking parameter estimator consists of three different + """ + if self.alpha == self.SHR_OAS: + return self._get_shrink_param_oas(X, S, F) + elif self.alpha == self.SHR_LW: + if self.target == self.TGT_CONST_VAR: + return self._get_shrink_param_lw_const_var(X, S, F) + if self.target == self.TGT_CONST_CORR: + return self._get_shrink_param_lw_const_corr(X, S, F) + if self.target == self.TGT_SINGLE_FACTOR: + return self._get_shrink_param_lw_single_factor(X, S, F) + return self.alpha + + def _get_shrink_param_oas(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float: + """Oracle Approximating Shrinkage Estimator + + This method uses the following formula to estimate the `alpha` + parameter for the shrink covariance estimator: + A = (1 - 2 / p) * trace(S^2) + trace^2(S) + B = (n + 1 - 2 / p) * (trace(S^2) - trace^2(S) / p) + alpha = A / B + where `n`, `p` are the dim of observations and variables respectively. + """ + trS2 = np.sum(S**2) + tr2S = np.trace(S)**2 + + n, p = X.shape + + A = (1 - 2 / p) * (trS2 + tr2S) + B = (n + 1 - 2 / p) * (trS2 + tr2S / p) + alpha = A / B + + return alpha + + def _get_shrink_param_lw_const_var(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float: + """Ledoit-Wolf Shrinkage Estimator (Constant Variance) + + This method shrinks the covariance matrix towards the constand variance target. + """ + t, n = X.shape + + y = X**2 + phi = np.sum(y.T.dot(y) / t - S**2) + + gamma = np.linalg.norm(S - F, 'fro')**2 + + kappa = phi / gamma + alpha = max(0, min(1, kappa / t)) + + return alpha + + def _get_shrink_param_lw_const_corr(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float: + """Ledoit-Wolf Shrinkage Estimator (Constant Correlation) + + This method shrinks the covariance matrix towards the constand correlation target. + """ + t, n = X.shape + + var = np.diag(S) + sqrt_var = np.sqrt(var) + r_bar = (np.sum(S / np.outer(sqrt_var, sqrt_var)) - n) / (n * (n - 1)) + + y = X**2 + phi_mat = y.T.dot(y) / t - S**2 + phi = np.sum(phi_mat) + + theta_mat = (X**3).T.dot(X) / t - var[:, None] * S + np.fill_diagonal(theta_mat, 0) + rho = np.sum(np.diag(phi_mat)) + r_bar * np.sum(np.outer(1 / sqrt_var, sqrt_var) * theta_mat) + + gamma = np.linalg.norm(S - F, 'fro')**2 + + kappa = (phi - rho) / gamma + alpha = max(0, min(1, kappa / t)) + + return alpha + + def _get_shrink_param_lw_single_factor(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float: + """Ledoit-Wolf Shrinkage Estimator (Single Factor Model) + + This method shrinks the covariance matrix towards the single factor model target. + """ + t, n = X.shape + + X_mkt = np.nanmean(X, axis=1) + cov_mkt = np.asarray(X.T.dot(X_mkt) / len(X)) + var_mkt = np.asarray(X_mkt.dot(X_mkt) / len(X)) + + y = X**2 + phi = np.sum(y.T.dot(y)) / t - np.sum(S**2) + + rdiag = np.sum(y**2) / t - np.sum(np.diag(S)**2) + z = X * X_mkt[:, None] + v1 = y.T.dot(z) / t - cov_mkt[:, None] * S + roff1 = np.sum(v1 * cov_mkt[:, None].T) / var_mkt - np.sum(np.diag(v1) * cov_mkt) / var_mkt + v3 = z.T.dot(z) / t - var_mkt * S + roff3 = np.sum(v3 * np.outer(cov_mkt, cov_mkt)) / var_mkt**2 - np.sum(np.diag(v3) * cov_mkt**2) / var_mkt**2 + roff = 2 * roff1 - roff3 + rho = rdiag + roff + + gamma = np.linalg.norm(S - F, 'fro')**2 + + kappa = (phi - rho) / gamma + alpha = max(0, min(1, kappa / t)) + + return alpha + + +class POETCovEstimator(RiskModel): + """Principal Orthogonal Complement Thresholding Estimator (POET) + + Reference: + [1] Fan, J., Liao, Y., & Mincheva, M. (2013). Large covariance estimation by thresholding principal orthogonal complements. + Journal of the Royal Statistical Society. Series B: Statistical Methodology, 75(4), 603–680. https://doi.org/10.1111/rssb.12016 + [2] http://econweb.rutgers.edu/yl1114/papers/poet/POET.m + """ + + THRESH_SOFT = 'soft' + THRESH_HARD = 'hard' + THRESH_SCAD = 'scad' + + def __init__(self, num_factors: int = 0, thresh: float = 1.0, thresh_method: str = 'soft', **kwargs): + """ + Args: + num_factors (int): number of factors (if set to zero, no factor model will be used) + thresh (float): the positive constant for thresholding + thresh_method (str): thresholding method, which can be + - 'soft': soft thresholding + - 'hard': hard thresholding + - 'scad': scad thresholding + kwargs: see `RiskModel` for more information + """ + super().__init__(**kwargs) + + assert num_factors >= 0, '`num_factors` requires a positive integer' + self.num_factors = num_factors + + assert thresh >= 0, '`thresh` requires a positive float number' + self.thresh = thresh + + assert thresh_method in [self.THRESH_HARD, self.THRESH_SOFT, self.THRESH_SCAD], \ + '`thresh_method` should be `soft`/`hard`/`scad`' + self.thresh_method = thresh_method + + def _predict(self, X: np.ndarray) -> np.ndarray: + + Y = X.T # NOTE: to match POET's implementation + p, n = Y.shape + + if self.num_factors > 0: + Dd, V = np.linalg.eig(Y.T.dot(Y)) + V = V[:, np.argsort(Dd)] + F = V[:, -self.num_factors:][:, ::-1] * np.sqrt(n) + LamPCA = Y.dot(F) / n + uhat = np.asarray(Y - LamPCA.dot(F.T)) + Lowrank = np.asarray(LamPCA.dot(LamPCA.T)) + rate = 1 / np.sqrt(p) + np.sqrt(np.log(p) / n) + else: + uhat = np.asarray(Y) + rate = np.sqrt(np.log(p) / n) + Lowrank = 0 + + lamb = rate * self.thresh + SuPCA = uhat.dot(uhat.T) / n + SuDiag = np.diag(np.diag(SuPCA)) + R = np.linalg.inv(SuDiag**0.5).dot(SuPCA).dot(np.linalg.inv(SuDiag**0.5)) + + if self.thresh_method == self.THRESH_HARD: + M = R * (np.abs(R) > lamb) + elif self.thresh_method == self.THRESH_SOFT: + res = (np.abs(R) - lamb) + res = (res + np.abs(res)) / 2 + M = np.sign(R) * res + else: + M1 = (np.abs(R) < 2 * lamb) * np.sign(R) * (np.abs(R) - lamb) * (np.abs(R) > lamb) + M2 = (np.abs(R) < 3.7 * lamb) * (np.abs(R) >= 2 * lamb) * (2.7 * R - 3.7 * np.sign(R) * lamb) / 1.7 + M3 = (np.abs(R) >= 3.7 * lamb) * R + M = M1 + M2 + M3 + + Rthresh = M - np.diag(np.diag(M)) + np.eye(p) + SigmaU = (SuDiag**0.5).dot(Rthresh).dot(SuDiag**0.5) + SigmaY = SigmaU + Lowrank + + return SigmaY diff --git a/qlib/portfolio/__init__.py b/qlib/portfolio/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/qlib/portfolio/optimizer.py b/qlib/portfolio/optimizer.py new file mode 100644 index 0000000000..4b06e25b32 --- /dev/null +++ b/qlib/portfolio/optimizer.py @@ -0,0 +1,265 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import warnings +import numpy as np +import pandas as pd +import scipy.optimize as so + +from typing import Optional, Union, Callable, List + + +class PortfolioOptimizer(object): + """Portfolio Optimizer + + The following optimization algorithms are supported: + - `gmv`: Global Minimum Variance Portfolio + - `mvo`: Mean Variance Optimized Portfolio + - `rp`: Risk Parity + - `inv`: Inverse Volatility + + Note: + This optimizer always assumes full investment and no-shorting. + """ + + OPT_GMV = 'gmv' + OPT_MVO = 'mvo' + OPT_RP = 'rp' + OPT_INV = 'inv' + + def __init__(self, method: str = 'inv', lamb: float = 0, delta: float = 0, + alpha: float = 0.0, scale_alpha: bool = True, tol: float = 1e-8): + """ + Args: + method (str): portfolio optimization method + lamb (float): risk aversion parameter (larger `lamb` means more focus on return) + delta (float): turnover rate limit + alpha (float): l2 norm regularizer + tol (float): tolerance for optimization termination + """ + assert method in [self.OPT_GMV, self.OPT_MVO, self.OPT_RP, self.OPT_INV], \ + f'method `{method}` is not supported' + self.method = method + + assert lamb >= 0, f'risk aversion parameter `lamb` should be positive' + self.lamb = lamb + + assert delta >= 0, f'turnover limit `delta` should be positive' + self.delta = delta + + assert alpha >= 0, f'l2 norm regularizer `alpha` should be positive' + self.alpha = alpha + + self.tol = tol + + def __call__(self, S: Union[np.ndarray, pd.DataFrame], + u: Optional[Union[np.ndarray, pd.Series]] = None, + w0: Optional[Union[np.ndarray, pd.Series]] = None) -> Union[np.ndarray, pd.Series]: + """ + Args: + S (np.ndarray or pd.DataFrame): covariance matrix + u (np.ndarray or pd.Series): expected returns (a.k.a., alpha) + w0 (np.ndarray or pd.Series): initial weights (for turnover control) + + Returns: + np.ndarray or pd.Series: optimized portfolio allocation + """ + # transform dataframe into array + index = None + if isinstance(S, pd.DataFrame): + index = S.index + S = S.values + + # transform alpha + if u is not None: + assert len(u) == len(S), '`u` has mismatched shape' + if isinstance(u, pd.Series): + assert all(u.index == index), '`u` has mismatched index' + u = u.values + + # transform initial weights + if w0 is not None: + assert len(w0) == len(S), '`w0` has mismatched shape' + if isinstance(w0, pd.Series): + assert all(w0.index == index), '`w0` has mismatched index' + w0 = w0.values + + # scale alpha to match volatility + if u is not None: + u = u / u.std() + u *= np.mean(np.diag(S))**0.5 + + # optimize + w = self._optimize(S, u, w0) + + # restore index if needed + if index is not None: + w = pd.Series(w, index=index) + + return w + + def _optimize(self, S: np.ndarray, u: Optional[np.ndarray] = None, + w0: Optional[np.ndarray] = None) -> np.ndarray: + + # inverse volatility + if self.method == self.OPT_INV: + if u is not None: + warnings.warn('`u` is set but will not be used for `inv` portfolio') + if w0 is not None: + warnings.warn('`w0` is set but will not be used for `inv` portfolio') + return self._optimize_inv(S) + + # global minimum variance + if self.method == self.OPT_GMV: + if u is not None: + warnings.warn('`u` is set but will not be used for `gmv` portfolio') + return self._optimize_gmv(S, w0) + + # mean-variance + if self.method == self.OPT_MVO: + return self._optimize_mvo(S, u, w0) + + # risk parity + if self.method == self.OPT_RP: + if u is not None: + warnings.warn('`u` is set but will not be used for `rp` portfolio') + return self._optimize_rp(S, w0) + + def _optimize_inv(self, S: np.ndarray) -> np.ndarray: + """Inverse volatility""" + vola = np.diag(S)**0.5 + w = 1 / vola + w /= w.sum() + return w + + def _optimize_gmv(self, S: np.ndarray, w0: Optional[np.ndarray] = None) -> np.ndarray: + """optimize global minimum variance portfolio + + This method solves the following optimization problem + min_w w' S w + s.t. w >= 0, sum(w) == 1 + where `S` is the covariance matrix. + """ + return self._solve( + len(S), + self._get_objective_gmv(S), + *self._get_constrains(w0) + ) + + def _optimize_mvo(self, S: np.ndarray, u: Optional[np.ndarray] = None, + w0: Optional[np.ndarray] = None) -> np.ndarray: + """optimize mean-variance portfolio + + This method solves the following optimization problem + min_w - w' u + lamb * w' S w + s.t. w >= 0, sum(w) == 1 + where `S` is the covariance matrix, `u` is the expected returns, + and `lamb` is the risk aversion parameter. + """ + return self._solve( + len(S), + self._get_objective_mvo(S, u), + *self._get_constrains(w0) + ) + + def _optimize_rp(self, S: np.ndarray, w0: Optional[np.ndarray] = None) -> np.ndarray: + """optimize risk parity portfolio + + This method solves the following optimization problem + min_w sum_i [w_i - (w' S w) / ((S w)_i * N)]**2 + s.t. w >= 0, sum(w) == 1 + where `S` is the covariance matrix and `N` is the number of stocks. + """ + return self._solve( + len(S), + self._get_objective_rp(S), + *self._get_constrains(w0) + ) + + def _get_objective_gmv(self, S: np.ndarray) -> np.ndarray: + """global minimum variance optimization objective + + Optimization objective + min_w w' S w + """ + + def func(x): + return x @ S @ x + + return func + + def _get_objective_mvo(self, S: np.ndarray, u: np.ndarray = None) -> np.ndarray: + """mean-variance optimization objective + + Optimization objective + min_w - w' u + lamb * w' S w + """ + + def func(x): + risk = x @ S @ x + ret = x @ u + return -ret + self.lamb * risk + + return func + + def _get_objective_rp(self, S: np.ndarray) -> np.ndarray: + """risk-parity optimization objective + + Optimization objective + min_w sum_i [w_i - (w' S w) / ((S w)_i * N)]**2 + """ + + def func(x): + N = len(x) + Sx = S @ x + xSx = x @ Sx + return np.sum((x - xSx / Sx / N)**2) + + return func + + def _get_constrains(self, w0: Optional[np.ndarray] = None): + """optimization constraints + + Defines the following constraints: + - no shorting and leverage: 0 <= w <= 1 + - full investment: sum(w) == 1 + - turnover constraint: |w - w0| <= delta + """ + + # no shorting and leverage + bounds = so.Bounds(0.0, 1.0) + + # full investment constraint + cons = [ + {'type': 'eq', 'fun': lambda x: np.sum(x) - 1} # == 0 + ] + + # turnover constraint + if w0 is not None: + cons.append( + {'type': 'ineq', 'fun': lambda x: self.delta - np.sum(np.abs(x - w0))} # >= 0 + ) + + return bounds, cons + + def _solve(self, n: int, obj: Callable, bounds: so.Bounds, cons: List) -> np.ndarray: + """solve optimization + + Args: + n (int): number of parameters + obj (callable): optimization objective + bounds (Bounds): bounds of parameters + cons (list): optimization constraints + """ + # add l2 regularization + wrapped_obj = obj + if self.alpha > 0: + wrapped_obj = lambda x: obj(x) + self.alpha * np.sum(np.square(x)) + + # solve + x0 = np.ones(n) / n # init results + sol = so.minimize(wrapped_obj, x0, bounds=bounds, constraints=cons, tol=self.tol) + if not sol.success: + warnings.warn(f'optimization not success ({sol.status})') + + return sol.x