Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions qlib/data/_libs/expanding.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ cdef class Expanding(object):
cdef int na_count
def __init__(self):
self.na_count = 0

cdef double update(self, double val):
pass

Expand All @@ -25,7 +25,7 @@ cdef class Mean(Expanding):
def __init__(self):
super(Mean, self).__init__()
self.vsum = 0

cdef double update(self, double val):
self.barv.push_back(val)
if isnan(val):
Expand Down Expand Up @@ -62,7 +62,7 @@ cdef class Slope(Expanding):
return (N*self.xy_sum - self.x_sum*self.y_sum) / \
(N*self.x2_sum - self.x_sum*self.x_sum)


cdef class Resi(Expanding):
"""1-D array expanding residuals"""
cdef double x_sum
Expand Down Expand Up @@ -94,7 +94,7 @@ cdef class Resi(Expanding):
interp = y_mean - slope*x_mean
return val - (slope*size + interp)


cdef class Rsquare(Expanding):
"""1-D array expanding rsquare"""
cdef double x_sum
Expand All @@ -117,7 +117,7 @@ cdef class Rsquare(Expanding):
self.na_count += 1
else:
self.x_sum += size
self.x2_sum += size
self.x2_sum += size * size
self.y_sum += val
self.y2_sum += val * val
self.xy_sum += size * val
Expand All @@ -126,7 +126,7 @@ cdef class Rsquare(Expanding):
sqrt((N*self.x2_sum - self.x_sum*self.x_sum) * (N*self.y2_sum - self.y_sum*self.y_sum))
return rvalue * rvalue


cdef np.ndarray[double, ndim=1] expanding(Expanding r, np.ndarray a):
cdef int i
cdef int N = len(a)
Expand Down
51 changes: 47 additions & 4 deletions qlib/data/dataset/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import abc
import bisect
import logging
from typing import Union, Tuple, List
from typing import Union, Tuple, List, Iterator, Optional

import pandas as pd
import numpy as np
Expand Down Expand Up @@ -113,8 +113,7 @@ def _fetch_df_by_index(
CS_ALL = "__all"

def _fetch_df_by_col(self, df: pd.DataFrame, col_set: str) -> pd.DataFrame:
cln = len(df.columns.levels)
if cln == 1:
if not isinstance(df.columns, pd.MultiIndex):
return df
elif col_set == self.CS_ALL:
return df.droplevel(axis=1, level=0)
Expand All @@ -126,6 +125,7 @@ def fetch(
selector: Union[pd.Timestamp, slice, str],
level: Union[str, int] = "datetime",
col_set: Union[str, List[str]] = CS_ALL,
squeeze: bool = False
) -> pd.DataFrame:
"""
fetch data from underlying data source
Expand All @@ -141,13 +141,22 @@ def fetch(
select a set of meaningful columns.(e.g. features, columns)
if isinstance(col_set, List[str]):
select several sets of meaningful columns, the returned data has multiple levels
squeeze : bool
whether squeeze columns and index

Returns
-------
pd.DataFrame:
"""
df = self._fetch_df_by_index(self._data, selector, level)
return self._fetch_df_by_col(df, col_set)
df = self._fetch_df_by_col(df, col_set)
if squeeze:
# squeeze columns
df = df.squeeze()
# squeeze index
if isinstance(selector, (str, pd.Timestamp)):
df = df.reset_index(level=level, drop=True)
return df

def get_cols(self, col_set=CS_ALL) -> list:
"""
Expand All @@ -167,6 +176,40 @@ def get_cols(self, col_set=CS_ALL) -> list:
df = self._fetch_df_by_col(df, col_set)
return df.columns.to_list()

def get_range_selector(self, cur_date: Union[pd.Timestamp, str], periods: int) -> slice:
"""
get range selector by number of periods

Args:
cur_date (pd.Timestamp or str): current date
periods (int): number of periods
"""
trading_dates = self._data.index.unique(level='datetime')
cur_loc = trading_dates.get_loc(cur_date)
pre_loc = cur_loc - periods + 1
if pre_loc < 0:
warnings.warn('`periods` is too large. the first date will be returned.')
pre_loc = 0
ref_date = trading_dates[pre_loc]
return slice(ref_date, cur_date)

def get_range_iterator(self, periods: int, min_periods: Optional[int] = None,
**kwargs) -> Iterator[Tuple[pd.Timestamp, pd.DataFrame]]:
"""
get a iterator of sliced data with given periods

Args:
periods (int): number of periods
min_periods (int): minimum periods for sliced dataframe
kwargs (dict): will be passed to `self.fetch`
"""
trading_dates = self._data.index.unique(level='datetime')
if min_periods is None:
min_periods = periods
for cur_date in trading_dates[min_periods:]:
selector = self.get_range_selector(cur_date, periods)
yield cur_date, self.fetch(selector, **kwargs)


class DataHandlerLP(DataHandler):
"""
Expand Down
89 changes: 45 additions & 44 deletions qlib/data/dataset/loader.py
Original file line number Diff line number Diff line change
@@ -1,78 +1,75 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from abc import ABC, abstractmethod
import abc
import warnings
import pandas as pd
from qlib.data import D

from typing import Tuple

from qlib.data import D

class DataLoader(ABC):
"""
class DataLoader(abc.ABC):
'''
DataLoader is designed for loading raw data from original data source.
"""

@abstractmethod
'''
@abc.abstractmethod
def load(self, instruments, start_time=None, end_time=None) -> pd.DataFrame:
"""
load the data as pd.DataFrame

Parameters
----------
self : [TODO:type]
[TODO:description]
instruments : [TODO:type]
[TODO:description]
start_time : [TODO:type]
[TODO:description]
end_time : [TODO:type]
[TODO:description]

Returns
-------
pd.DataFrame:
data load from the under layer source
load the data as pd.DataFrame

Example of the data:
The multi-index of the columns is optional.
feature label
$close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0
datetime instrument
2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032
SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042
SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289
Parameters
----------
self : [TODO:type]
[TODO:description]
instruments : [TODO:type]
[TODO:description]
start_time : [TODO:type]
[TODO:description]
end_time : [TODO:type]
[TODO:description]

Returns
-------
pd.DataFrame:
data load from the under layer source

Example of the data:
(The multi-index of the columns is optional.)
feature label
$close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0
datetime instrument
2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032
SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042
SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289
"""
pass


class QlibDataLoader(DataLoader):
"""Same as QlibDataLoader. The fields can be define by config"""

'''Same as QlibDataLoader. The fields can be define by config'''
def __init__(self, config: Tuple[list, tuple, dict], filter_pipe=None):
"""
Parameters
----------
config : Tuple[list ,tuple, dict]
config : Tuple[list, tuple, dict]
Config will be used to describe the fields and column names

<config> := {
"group_name1": <fields_info1>
"group_name2": <fields_info2>
}

or
<config> := <fields_info>

<fields_info> := ["expr", ...] | (["expr", ...], ["col_name", ...])

Here is a few examples to describe the fields
TODO:
"""
self.is_group = isinstance(config, dict)
self.is_group = isinstance(config, dict)

if self.is_group:
self.fields = {grp: self._parse_fields_info(fields_info) for grp, fields_info in config.items()}
else:
self.fields = self._parse_fields_info(fields_info)
self.fields = self._parse_fields_info(config)

self.filter_pipe = filter_pipe

Expand All @@ -86,14 +83,18 @@ def _parse_fields_info(self, fields_info: Tuple[list, tuple]) -> Tuple[list, lis
return exprs, names

def load(self, instruments, start_time=None, end_time=None) -> pd.DataFrame:
if isinstance(instruments, str):
instruments = D.instruments(instruments, filter_pipe=self.filter_pipe)
elif self.filter_pipe is not None:
warnings.warn('`filter_pipe` is not None, but it will not be used with `instruments` as list')
def _get_df(exprs, names):
df = D.features(D.instruments(instruments, filter_pipe=self.filter_pipe), exprs, start_time, end_time)
df = D.features(instruments, exprs, start_time, end_time)
df.columns = names
return df

if self.is_group:
df = pd.concat({grp: _get_df(exprs, names) for grp, (exprs, names) in self.fields.items()}, axis=1)
else:
exprs, names = self.fields
df = _get_df(exprs, names)
df = df.swaplevel().sort_index()
df = df.swaplevel().sort_index() # NOTE: always return <datetime, instrument>
return df
17 changes: 15 additions & 2 deletions qlib/data/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import numpy as np
import pandas as pd

from scipy.stats import percentileofscore

from .base import Expression, ExpressionOps
from ..log import get_module_logger

Expand Down Expand Up @@ -687,6 +689,8 @@ def _load_internal(self, instrument, start_index, end_index, freq):
# isnull = series.isnull() # NOTE: isnull = NaN, inf is not null
if self.N == 0:
series = getattr(series.expanding(min_periods=1), self.func)()
elif 0 < self.N < 1:
series = series.ewm(alpha=self.N, min_periods=1).mean()
else:
series = getattr(series.rolling(self.N, min_periods=1), self.func)()
# series.iloc[:self.N-1] = np.nan
Expand All @@ -696,6 +700,8 @@ def _load_internal(self, instrument, start_index, end_index, freq):
def get_longest_back_rolling(self):
if self.N == 0:
return np.inf
if 0 < self.N < 1:
return int(np.log(1e-6) / np.log(1 - self.N)) # (1 - N)**window == 1e-6
return self.feature.get_longest_back_rolling() + self.N - 1

def get_extended_window_size(self):
Expand All @@ -704,6 +710,11 @@ def get_extended_window_size(self):
# remove such support for N == 0?
get_module_logger(self.__class__.__name__).warning("The Rolling(ATTR, 0) will not be accurately calculated")
return self.feature.get_extended_window_size()
elif 0 < self.N < 1:
lft_etd, rght_etd = self.feature.get_extended_window_size()
size = int(np.log(1e-6) / np.log(1 - self.N))
lft_etd = max(lft_etd + size - 1, lft_etd)
return lft_etd, rght_etd
else:
lft_etd, rght_etd = self.feature.get_extended_window_size()
lft_etd = max(lft_etd + self.N - 1, lft_etd)
Expand Down Expand Up @@ -1087,7 +1098,7 @@ def rank(x):
x1 = x[~np.isnan(x)]
if x1.shape[0] == 0:
return np.nan
return (x1.argsort()[-1] + 1) / len(x1)
return percentileofscore(x1, x1[-1]) / len(x1)

if self.N == 0:
series = series.expanding(min_periods=1).apply(rank, raw=True)
Expand Down Expand Up @@ -1273,7 +1284,7 @@ class EMA(Rolling):
----------
feature : Expression
feature instance
N : int
N : int, float
rolling window size

Returns
Expand All @@ -1296,6 +1307,8 @@ def exp_weighted_mean(x):

if self.N == 0:
series = series.expanding(min_periods=1).apply(exp_weighted_mean, raw=True)
elif 0 < self.N < 1:
series = series.ewm(alpha=self.N, min_periods=1).mean()
else:
series = series.ewm(span=self.N, min_periods=1).mean()
return series
Expand Down
Loading