Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data analysis feature for report #918

Merged
merged 2 commits into from Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
132 changes: 105 additions & 27 deletions qlib/contrib/eva/alpha.py
Expand Up @@ -4,8 +4,10 @@
The interface should be redesigned carefully in the future.
"""
import pandas as pd

from typing import Tuple
from qlib import get_module_logger
from qlib.utils.paral import complex_parallel, DelayedDict
from joblib import Parallel, delayed


def calc_long_short_prec(
Expand Down Expand Up @@ -61,32 +63,6 @@ def calc_long_short_prec(
return (l_dom.groupby(date_col).sum() / l_c), (s_dom.groupby(date_col).sum() / s_c)


def calc_ic(pred: pd.Series, label: pd.Series, date_col="datetime", dropna=False) -> Tuple[pd.Series, pd.Series]:
"""calc_ic.

Parameters
----------
pred :
pred
label :
label
date_col :
date_col

Returns
-------
(pd.Series, pd.Series)
ic and rank ic
"""
df = pd.DataFrame({"pred": pred, "label": label})
ic = df.groupby(date_col).apply(lambda df: df["pred"].corr(df["label"]))
ric = df.groupby(date_col).apply(lambda df: df["pred"].corr(df["label"], method="spearman"))
if dropna:
return ic.dropna(), ric.dropna()
else:
return ic, ric


def calc_long_short_return(
pred: pd.Series,
label: pd.Series,
Expand Down Expand Up @@ -127,3 +103,105 @@ def calc_long_short_return(
r_short = group.apply(lambda x: x.nsmallest(N(x), columns="pred").label.mean())
r_avg = group.label.mean()
return (r_long - r_short) / 2, r_avg


def pred_autocorr(pred: pd.Series, lag=1, inst_col="instrument", date_col="datetime"):
"""pred_autocorr.

Limitation:
- If the datetime is not sequential densely, the correlation will be calulated based on adjacent dates. (some users may expected NaN)

:param pred: pd.Series with following format
instrument datetime
SH600000 2016-01-04 -0.000403
2016-01-05 -0.000753
2016-01-06 -0.021801
2016-01-07 -0.065230
2016-01-08 -0.062465
:type pred: pd.Series
:param lag:
"""
if isinstance(pred, pd.DataFrame):
pred = pred.iloc[:, 0]
get_module_logger("pred_autocorr").warning("Only the first column in {pred.columns} of `pred` is kept")
pred_ustk = pred.sort_index().unstack(inst_col)
corr_s = {}
for (idx, cur), (_, prev) in zip(pred_ustk.iterrows(), pred_ustk.shift(lag).iterrows()):
corr_s[idx] = cur.corr(prev)
corr_s = pd.Series(corr_s).sort_index()
return corr_s


def pred_autocorr_all(pred_dict, n_jobs=-1, **kwargs):
"""
calculate auto correlation for pred_dict

Parameters
----------
pred_dict : dict
A dict like {<method_name>: <prediction>}
kwargs :
all these arguments will be passed into pred_autocorr
"""
ac_dict = {}
for k, pred in pred_dict.items():
ac_dict[k] = delayed(pred_autocorr)(pred, **kwargs)
return complex_parallel(Parallel(n_jobs=n_jobs, verbose=10), ac_dict)


def calc_ic(pred: pd.Series, label: pd.Series, date_col="datetime", dropna=False) -> (pd.Series, pd.Series):
"""calc_ic.

Parameters
----------
pred :
pred
label :
label
date_col :
date_col

Returns
-------
(pd.Series, pd.Series)
ic and rank ic
"""
df = pd.DataFrame({"pred": pred, "label": label})
ic = df.groupby(date_col).apply(lambda df: df["pred"].corr(df["label"]))
ric = df.groupby(date_col).apply(lambda df: df["pred"].corr(df["label"], method="spearman"))
if dropna:
return ic.dropna(), ric.dropna()
else:
return ic, ric


def calc_all_ic(pred_dict_all, label, date_col="datetime", dropna=False, n_jobs=-1):
"""calc_all_ic.

Parameters
----------
pred_dict_all :
A dict like {<method_name>: <prediction>}
label:
A pd.Series of label values

Returns
-------
{'Q2+IND_z': {'ic': <ic series like>
2016-01-04 -0.057407
...
2020-05-28 0.183470
2020-05-29 0.171393
'ric': <rank ic series like>
2016-01-04 -0.040888
...
2020-05-28 0.236665
2020-05-29 0.183886
}
...}
"""
pred_all_ics = {}
for k, pred in pred_dict_all.items():
pred_all_ics[k] = DelayedDict(["ic", "ric"], delayed(calc_ic)(pred, label, date_col=date_col, dropna=dropna))
pred_all_ics = complex_parallel(Parallel(n_jobs=n_jobs, verbose=10), pred_all_ics)
return pred_all_ics
4 changes: 2 additions & 2 deletions qlib/contrib/model/pytorch_nn.py
Expand Up @@ -74,7 +74,7 @@ def __init__(
data_parall=False,
scheduler: Optional[Union[Callable]] = "default", # when it is Callable, it accept one argument named optimizer
init_model=None,
eval_train_metric=True,
eval_train_metric=False,
pt_model_uri="qlib.contrib.model.pytorch_nn.Net",
pt_model_kwargs={
"input_dim": 360,
Expand Down Expand Up @@ -290,7 +290,7 @@ def fit(
)
R.log_metrics(train_metric=metric_train, step=step)
else:
metric_train = -1
metric_train = np.nan
if verbose:
self.logger.info(
f"[Step {step}]: train_loss {train_loss:.6f}, valid_loss {loss_val:.6f}, train_metric {metric_train:.6f}, valid_metric {metric_val:.6f}"
Expand Down
7 changes: 7 additions & 0 deletions qlib/contrib/report/data/__init__.py
@@ -0,0 +1,7 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""
This module is designed to analysis data

"""
202 changes: 202 additions & 0 deletions qlib/contrib/report/data/ana.py
@@ -0,0 +1,202 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import pandas as pd
import numpy as np
from qlib.contrib.report.data.base import FeaAnalyser
from qlib.contrib.report.utils import sub_fig_generator
from qlib.utils.paral import datetime_groupby_apply
from qlib.contrib.eva.alpha import pred_autocorr_all
from loguru import logger
import seaborn as sns

DT_COL_NAME = "datetime"


class CombFeaAna(FeaAnalyser):
"""
Combine the sub feature analysers and plot then in a single graph
"""

def __init__(self, dataset: pd.DataFrame, *fea_ana_cls):
if len(fea_ana_cls) <= 1:
raise NotImplementedError(f"This type of input is not supported")
self._fea_ana_l = [fcls(dataset) for fcls in fea_ana_cls]
super().__init__(dataset=dataset)

def skip(self, col):
return np.all(list(map(lambda fa: fa.skip(col), self._fea_ana_l)))

def calc_stat_values(self):
"""The statistics of features are finished in the underlying analysers"""

def plot_all(self, *args, **kwargs):

ax_gen = iter(sub_fig_generator(row_n=len(self._fea_ana_l), *args, **kwargs))

for col in self._dataset:
if not self.skip(col):
axes = next(ax_gen)
for fa, ax in zip(self._fea_ana_l, axes):
if not fa.skip(col):
fa.plot_single(col, ax)
ax.set_xlabel("")
ax.set_title("")
axes[0].set_title(col)


class NumFeaAnalyser(FeaAnalyser):
def skip(self, col):
is_obj = np.issubdtype(self._dataset[col], np.dtype("O"))
if is_obj:
logger.info(f"{col} is not numeric and is skipped")
return is_obj


class ValueCNT(FeaAnalyser):
def __init__(self, dataset: pd.DataFrame, ratio=False):
self.ratio = ratio
super().__init__(dataset)

def calc_stat_values(self):
self._val_cnt = {}
for col, item in self._dataset.items():
if not super().skip(col):
self._val_cnt[col] = item.groupby(DT_COL_NAME).apply(lambda s: len(s.unique()))
self._val_cnt = pd.DataFrame(self._val_cnt)
if self.ratio:
self._val_cnt = self._val_cnt.div(self._dataset.groupby(DT_COL_NAME).size(), axis=0)

# TODO: transfer this feature to other analysers
ymin, ymax = self._val_cnt.min().min(), self._val_cnt.max().max()
self.ylim = (ymin - 0.05 * (ymax - ymin), ymax + 0.05 * (ymax - ymin))

def plot_single(self, col, ax):
self._val_cnt[col].plot(ax=ax, title=col, ylim=self.ylim)
ax.set_xlabel("")


class FeaDistAna(NumFeaAnalyser):
def plot_single(self, col, ax):
sns.histplot(self._dataset[col], ax=ax, kde=False, bins=100)
ax.set_xlabel("")
ax.set_title(col)


class FeaInfAna(NumFeaAnalyser):
def calc_stat_values(self):
self._inf_cnt = {}
for col, item in self._dataset.items():
if not super().skip(col):
self._inf_cnt[col] = item.apply(np.isinf).astype(np.int).groupby(DT_COL_NAME).sum()
self._inf_cnt = pd.DataFrame(self._inf_cnt)

def skip(self, col):
return (col not in self._inf_cnt) or (self._inf_cnt[col].sum() == 0)

def plot_single(self, col, ax):
self._inf_cnt[col].plot(ax=ax, title=col)
ax.set_xlabel("")


class FeaNanAna(FeaAnalyser):
def calc_stat_values(self):
self._nan_cnt = self._dataset.isna().groupby(DT_COL_NAME).sum()

def skip(self, col):
return (col not in self._nan_cnt) or (self._nan_cnt[col].sum() == 0)

def plot_single(self, col, ax):
self._nan_cnt[col].plot(ax=ax, title=col)
ax.set_xlabel("")


class FeaNanAnaRatio(FeaAnalyser):
def calc_stat_values(self):
self._nan_cnt = self._dataset.isna().groupby(DT_COL_NAME).sum()
self._total_cnt = self._dataset.groupby(DT_COL_NAME).size()

def skip(self, col):
return (col not in self._nan_cnt) or (self._nan_cnt[col].sum() == 0)

def plot_single(self, col, ax):
(self._nan_cnt[col] / self._total_cnt).plot(ax=ax, title=col)
ax.set_xlabel("")


class FeaACAna(FeaAnalyser):
"""Analysis the auto-correlation of features"""

def calc_stat_values(self):
self._fea_corr = pred_autocorr_all(self._dataset.to_dict("series"))
df = pd.DataFrame(self._fea_corr)
ymin, ymax = df.min().min(), df.max().max()
self.ylim = (ymin - 0.05 * (ymax - ymin), ymax + 0.05 * (ymax - ymin))

def plot_single(self, col, ax):
self._fea_corr[col].plot(ax=ax, title=col, ylim=self.ylim)
ax.set_xlabel("")


class FeaSkewTurt(NumFeaAnalyser):
def calc_stat_values(self):
self._skew = datetime_groupby_apply(self._dataset, "skew", skip_group=True)
self._kurt = datetime_groupby_apply(self._dataset, pd.DataFrame.kurt, skip_group=True)

def plot_single(self, col, ax):
self._skew[col].plot(ax=ax, label="skew")
ax.set_xlabel("")
ax.set_ylabel("skew")
ax.legend()

right_ax = ax.twinx()

self._kurt[col].plot(ax=right_ax, label="kurt", color="green")
right_ax.set_xlabel("")
right_ax.set_ylabel("kurt")

h1, l1 = ax.get_legend_handles_labels()
h2, l2 = right_ax.get_legend_handles_labels()

ax.legend().set_visible(False)
right_ax.legend(h1 + h2, l1 + l2)
ax.set_title(col)


class FeaMeanStd(NumFeaAnalyser):
def calc_stat_values(self):
self._std = self._dataset.groupby(DT_COL_NAME).std()
self._mean = self._dataset.groupby(DT_COL_NAME).mean()

def plot_single(self, col, ax):
self._mean[col].plot(ax=ax, label="mean")
ax.set_xlabel("")
ax.set_ylabel("mean")
ax.legend()

right_ax = ax.twinx()

self._std[col].plot(ax=right_ax, label="std", color="green")
right_ax.set_xlabel("")
right_ax.set_ylabel("std")

h1, l1 = ax.get_legend_handles_labels()
h2, l2 = right_ax.get_legend_handles_labels()

ax.legend().set_visible(False)
right_ax.legend(h1 + h2, l1 + l2)
ax.set_title(col)


class RawFeaAna(FeaAnalyser):
"""
Motivation:
- display the values without further analysis
"""

def calc_stat_values(self):
ymin, ymax = self._dataset.min().min(), self._dataset.max().max()
self.ylim = (ymin - 0.05 * (ymax - ymin), ymax + 0.05 * (ymax - ymin))

def plot_single(self, col, ax):
self._dataset[col].plot(ax=ax, title=col, ylim=self.ylim)
ax.set_xlabel("")