Skip to content

Commit

Permalink
Add data analysis feature for report (microsoft#918)
Browse files Browse the repository at this point in the history
* Add data analysis feature for report

* better display
  • Loading branch information
you-n-g committed Feb 17, 2022
1 parent 74530be commit 6546037
Show file tree
Hide file tree
Showing 8 changed files with 572 additions and 33 deletions.
132 changes: 105 additions & 27 deletions qlib/contrib/eva/alpha.py
Expand Up @@ -4,8 +4,10 @@
The interface should be redesigned carefully in the future.
"""
import pandas as pd

from typing import Tuple
from qlib import get_module_logger
from qlib.utils.paral import complex_parallel, DelayedDict
from joblib import Parallel, delayed


def calc_long_short_prec(
Expand Down Expand Up @@ -61,32 +63,6 @@ def calc_long_short_prec(
return (l_dom.groupby(date_col).sum() / l_c), (s_dom.groupby(date_col).sum() / s_c)


def calc_ic(pred: pd.Series, label: pd.Series, date_col="datetime", dropna=False) -> Tuple[pd.Series, pd.Series]:
"""calc_ic.
Parameters
----------
pred :
pred
label :
label
date_col :
date_col
Returns
-------
(pd.Series, pd.Series)
ic and rank ic
"""
df = pd.DataFrame({"pred": pred, "label": label})
ic = df.groupby(date_col).apply(lambda df: df["pred"].corr(df["label"]))
ric = df.groupby(date_col).apply(lambda df: df["pred"].corr(df["label"], method="spearman"))
if dropna:
return ic.dropna(), ric.dropna()
else:
return ic, ric


def calc_long_short_return(
pred: pd.Series,
label: pd.Series,
Expand Down Expand Up @@ -127,3 +103,105 @@ def calc_long_short_return(
r_short = group.apply(lambda x: x.nsmallest(N(x), columns="pred").label.mean())
r_avg = group.label.mean()
return (r_long - r_short) / 2, r_avg


def pred_autocorr(pred: pd.Series, lag=1, inst_col="instrument", date_col="datetime"):
"""pred_autocorr.
Limitation:
- If the datetime is not sequential densely, the correlation will be calulated based on adjacent dates. (some users may expected NaN)
:param pred: pd.Series with following format
instrument datetime
SH600000 2016-01-04 -0.000403
2016-01-05 -0.000753
2016-01-06 -0.021801
2016-01-07 -0.065230
2016-01-08 -0.062465
:type pred: pd.Series
:param lag:
"""
if isinstance(pred, pd.DataFrame):
pred = pred.iloc[:, 0]
get_module_logger("pred_autocorr").warning("Only the first column in {pred.columns} of `pred` is kept")
pred_ustk = pred.sort_index().unstack(inst_col)
corr_s = {}
for (idx, cur), (_, prev) in zip(pred_ustk.iterrows(), pred_ustk.shift(lag).iterrows()):
corr_s[idx] = cur.corr(prev)
corr_s = pd.Series(corr_s).sort_index()
return corr_s


def pred_autocorr_all(pred_dict, n_jobs=-1, **kwargs):
"""
calculate auto correlation for pred_dict
Parameters
----------
pred_dict : dict
A dict like {<method_name>: <prediction>}
kwargs :
all these arguments will be passed into pred_autocorr
"""
ac_dict = {}
for k, pred in pred_dict.items():
ac_dict[k] = delayed(pred_autocorr)(pred, **kwargs)
return complex_parallel(Parallel(n_jobs=n_jobs, verbose=10), ac_dict)


def calc_ic(pred: pd.Series, label: pd.Series, date_col="datetime", dropna=False) -> (pd.Series, pd.Series):
"""calc_ic.
Parameters
----------
pred :
pred
label :
label
date_col :
date_col
Returns
-------
(pd.Series, pd.Series)
ic and rank ic
"""
df = pd.DataFrame({"pred": pred, "label": label})
ic = df.groupby(date_col).apply(lambda df: df["pred"].corr(df["label"]))
ric = df.groupby(date_col).apply(lambda df: df["pred"].corr(df["label"], method="spearman"))
if dropna:
return ic.dropna(), ric.dropna()
else:
return ic, ric


def calc_all_ic(pred_dict_all, label, date_col="datetime", dropna=False, n_jobs=-1):
"""calc_all_ic.
Parameters
----------
pred_dict_all :
A dict like {<method_name>: <prediction>}
label:
A pd.Series of label values
Returns
-------
{'Q2+IND_z': {'ic': <ic series like>
2016-01-04 -0.057407
...
2020-05-28 0.183470
2020-05-29 0.171393
'ric': <rank ic series like>
2016-01-04 -0.040888
...
2020-05-28 0.236665
2020-05-29 0.183886
}
...}
"""
pred_all_ics = {}
for k, pred in pred_dict_all.items():
pred_all_ics[k] = DelayedDict(["ic", "ric"], delayed(calc_ic)(pred, label, date_col=date_col, dropna=dropna))
pred_all_ics = complex_parallel(Parallel(n_jobs=n_jobs, verbose=10), pred_all_ics)
return pred_all_ics
4 changes: 2 additions & 2 deletions qlib/contrib/model/pytorch_nn.py
Expand Up @@ -74,7 +74,7 @@ def __init__(
data_parall=False,
scheduler: Optional[Union[Callable]] = "default", # when it is Callable, it accept one argument named optimizer
init_model=None,
eval_train_metric=True,
eval_train_metric=False,
pt_model_uri="qlib.contrib.model.pytorch_nn.Net",
pt_model_kwargs={
"input_dim": 360,
Expand Down Expand Up @@ -290,7 +290,7 @@ def fit(
)
R.log_metrics(train_metric=metric_train, step=step)
else:
metric_train = -1
metric_train = np.nan
if verbose:
self.logger.info(
f"[Step {step}]: train_loss {train_loss:.6f}, valid_loss {loss_val:.6f}, train_metric {metric_train:.6f}, valid_metric {metric_val:.6f}"
Expand Down
7 changes: 7 additions & 0 deletions qlib/contrib/report/data/__init__.py
@@ -0,0 +1,7 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""
This module is designed to analysis data
"""
202 changes: 202 additions & 0 deletions qlib/contrib/report/data/ana.py
@@ -0,0 +1,202 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import pandas as pd
import numpy as np
from qlib.contrib.report.data.base import FeaAnalyser
from qlib.contrib.report.utils import sub_fig_generator
from qlib.utils.paral import datetime_groupby_apply
from qlib.contrib.eva.alpha import pred_autocorr_all
from loguru import logger
import seaborn as sns

DT_COL_NAME = "datetime"


class CombFeaAna(FeaAnalyser):
"""
Combine the sub feature analysers and plot then in a single graph
"""

def __init__(self, dataset: pd.DataFrame, *fea_ana_cls):
if len(fea_ana_cls) <= 1:
raise NotImplementedError(f"This type of input is not supported")
self._fea_ana_l = [fcls(dataset) for fcls in fea_ana_cls]
super().__init__(dataset=dataset)

def skip(self, col):
return np.all(list(map(lambda fa: fa.skip(col), self._fea_ana_l)))

def calc_stat_values(self):
"""The statistics of features are finished in the underlying analysers"""

def plot_all(self, *args, **kwargs):

ax_gen = iter(sub_fig_generator(row_n=len(self._fea_ana_l), *args, **kwargs))

for col in self._dataset:
if not self.skip(col):
axes = next(ax_gen)
for fa, ax in zip(self._fea_ana_l, axes):
if not fa.skip(col):
fa.plot_single(col, ax)
ax.set_xlabel("")
ax.set_title("")
axes[0].set_title(col)


class NumFeaAnalyser(FeaAnalyser):
def skip(self, col):
is_obj = np.issubdtype(self._dataset[col], np.dtype("O"))
if is_obj:
logger.info(f"{col} is not numeric and is skipped")
return is_obj


class ValueCNT(FeaAnalyser):
def __init__(self, dataset: pd.DataFrame, ratio=False):
self.ratio = ratio
super().__init__(dataset)

def calc_stat_values(self):
self._val_cnt = {}
for col, item in self._dataset.items():
if not super().skip(col):
self._val_cnt[col] = item.groupby(DT_COL_NAME).apply(lambda s: len(s.unique()))
self._val_cnt = pd.DataFrame(self._val_cnt)
if self.ratio:
self._val_cnt = self._val_cnt.div(self._dataset.groupby(DT_COL_NAME).size(), axis=0)

# TODO: transfer this feature to other analysers
ymin, ymax = self._val_cnt.min().min(), self._val_cnt.max().max()
self.ylim = (ymin - 0.05 * (ymax - ymin), ymax + 0.05 * (ymax - ymin))

def plot_single(self, col, ax):
self._val_cnt[col].plot(ax=ax, title=col, ylim=self.ylim)
ax.set_xlabel("")


class FeaDistAna(NumFeaAnalyser):
def plot_single(self, col, ax):
sns.histplot(self._dataset[col], ax=ax, kde=False, bins=100)
ax.set_xlabel("")
ax.set_title(col)


class FeaInfAna(NumFeaAnalyser):
def calc_stat_values(self):
self._inf_cnt = {}
for col, item in self._dataset.items():
if not super().skip(col):
self._inf_cnt[col] = item.apply(np.isinf).astype(np.int).groupby(DT_COL_NAME).sum()
self._inf_cnt = pd.DataFrame(self._inf_cnt)

def skip(self, col):
return (col not in self._inf_cnt) or (self._inf_cnt[col].sum() == 0)

def plot_single(self, col, ax):
self._inf_cnt[col].plot(ax=ax, title=col)
ax.set_xlabel("")


class FeaNanAna(FeaAnalyser):
def calc_stat_values(self):
self._nan_cnt = self._dataset.isna().groupby(DT_COL_NAME).sum()

def skip(self, col):
return (col not in self._nan_cnt) or (self._nan_cnt[col].sum() == 0)

def plot_single(self, col, ax):
self._nan_cnt[col].plot(ax=ax, title=col)
ax.set_xlabel("")


class FeaNanAnaRatio(FeaAnalyser):
def calc_stat_values(self):
self._nan_cnt = self._dataset.isna().groupby(DT_COL_NAME).sum()
self._total_cnt = self._dataset.groupby(DT_COL_NAME).size()

def skip(self, col):
return (col not in self._nan_cnt) or (self._nan_cnt[col].sum() == 0)

def plot_single(self, col, ax):
(self._nan_cnt[col] / self._total_cnt).plot(ax=ax, title=col)
ax.set_xlabel("")


class FeaACAna(FeaAnalyser):
"""Analysis the auto-correlation of features"""

def calc_stat_values(self):
self._fea_corr = pred_autocorr_all(self._dataset.to_dict("series"))
df = pd.DataFrame(self._fea_corr)
ymin, ymax = df.min().min(), df.max().max()
self.ylim = (ymin - 0.05 * (ymax - ymin), ymax + 0.05 * (ymax - ymin))

def plot_single(self, col, ax):
self._fea_corr[col].plot(ax=ax, title=col, ylim=self.ylim)
ax.set_xlabel("")


class FeaSkewTurt(NumFeaAnalyser):
def calc_stat_values(self):
self._skew = datetime_groupby_apply(self._dataset, "skew", skip_group=True)
self._kurt = datetime_groupby_apply(self._dataset, pd.DataFrame.kurt, skip_group=True)

def plot_single(self, col, ax):
self._skew[col].plot(ax=ax, label="skew")
ax.set_xlabel("")
ax.set_ylabel("skew")
ax.legend()

right_ax = ax.twinx()

self._kurt[col].plot(ax=right_ax, label="kurt", color="green")
right_ax.set_xlabel("")
right_ax.set_ylabel("kurt")

h1, l1 = ax.get_legend_handles_labels()
h2, l2 = right_ax.get_legend_handles_labels()

ax.legend().set_visible(False)
right_ax.legend(h1 + h2, l1 + l2)
ax.set_title(col)


class FeaMeanStd(NumFeaAnalyser):
def calc_stat_values(self):
self._std = self._dataset.groupby(DT_COL_NAME).std()
self._mean = self._dataset.groupby(DT_COL_NAME).mean()

def plot_single(self, col, ax):
self._mean[col].plot(ax=ax, label="mean")
ax.set_xlabel("")
ax.set_ylabel("mean")
ax.legend()

right_ax = ax.twinx()

self._std[col].plot(ax=right_ax, label="std", color="green")
right_ax.set_xlabel("")
right_ax.set_ylabel("std")

h1, l1 = ax.get_legend_handles_labels()
h2, l2 = right_ax.get_legend_handles_labels()

ax.legend().set_visible(False)
right_ax.legend(h1 + h2, l1 + l2)
ax.set_title(col)


class RawFeaAna(FeaAnalyser):
"""
Motivation:
- display the values without further analysis
"""

def calc_stat_values(self):
ymin, ymax = self._dataset.min().min(), self._dataset.max().max()
self.ylim = (ymin - 0.05 * (ymax - ymin), ymax + 0.05 * (ymax - ymin))

def plot_single(self, col, ax):
self._dataset[col].plot(ax=ax, title=col, ylim=self.ylim)
ax.set_xlabel("")

0 comments on commit 6546037

Please sign in to comment.