forked from microsoft/qlib
-
Notifications
You must be signed in to change notification settings - Fork 1
/
__init__.py
539 lines (436 loc) · 18.2 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
from ...utils.serial import Serializable
from typing import Union, List, Tuple, Dict, Text, Optional
from ...utils import init_instance_by_config, np_ffill
from ...log import get_module_logger
from .handler import DataHandler, DataHandlerLP
from copy import deepcopy
from inspect import getfullargspec
import pandas as pd
import numpy as np
import bisect
from ...utils import lazy_sort_index
from .utils import get_level_index
class Dataset(Serializable):
"""
Preparing data for model training and inferencing.
"""
def __init__(self, **kwargs):
"""
init is designed to finish following steps:
- init the sub instance and the state of the dataset(info to prepare the data)
- The name of essential state for preparing data should not start with '_' so that it could be serialized on disk when serializing.
- setup data
- The data related attributes' names should start with '_' so that it will not be saved on disk when serializing.
The data could specify the info to calculate the essential data for preparation
"""
self.setup_data(**kwargs)
super().__init__()
def config(self, **kwargs):
"""
config is designed to configure and parameters that cannot be learned from the data
"""
super().config(**kwargs)
def setup_data(self, **kwargs):
"""
Setup the data.
We split the setup_data function for following situation:
- User have a Dataset object with learned status on disk.
- User load the Dataset object from the disk.
- User call `setup_data` to load new data.
- User prepare data for model based on previous status.
"""
pass
def prepare(self, **kwargs) -> object:
"""
The type of dataset depends on the model. (It could be pd.DataFrame, pytorch.DataLoader, etc.)
The parameters should specify the scope for the prepared data
The method should:
- process the data
- return the processed data
Returns
-------
object:
return the object
"""
pass
class DatasetH(Dataset):
"""
Dataset with Data(H)andler
User should try to put the data preprocessing functions into handler.
Only following data processing functions should be placed in Dataset:
- The processing is related to specific model.
- The processing is related to data split.
"""
def __init__(self, handler: Union[Dict, DataHandler], segments: Dict[Text, Tuple], **kwargs):
"""
Setup the underlying data.
Parameters
----------
handler : Union[dict, DataHandler]
handler could be:
- instance of `DataHandler`
- config of `DataHandler`. Please refer to `DataHandler`
segments : dict
Describe the options to segment the data.
Here are some examples:
.. code-block::
1) 'segments': {
'train': ("2008-01-01", "2014-12-31"),
'valid': ("2017-01-01", "2020-08-01",),
'test': ("2015-01-01", "2016-12-31",),
}
2) 'segments': {
'insample': ("2008-01-01", "2014-12-31"),
'outsample': ("2017-01-01", "2020-08-01",),
}
"""
self.handler: DataHandler = init_instance_by_config(handler, accept_types=DataHandler)
self.segments = segments.copy()
self.fetch_kwargs = {}
super().__init__(**kwargs)
def config(self, handler_kwargs: dict = None, **kwargs):
"""
Initialize the DatasetH
Parameters
----------
handler_kwargs : dict
Config of DataHandler, which could include the following arguments:
- arguments of DataHandler.conf_data, such as 'instruments', 'start_time' and 'end_time'.
kwargs : dict
Config of DatasetH, such as
- segments : dict
Config of segments which is same as 'segments' in self.__init__
"""
if handler_kwargs is not None:
self.handler.config(**handler_kwargs)
if "segments" in kwargs:
self.segments = deepcopy(kwargs.pop("segments"))
super().config(**kwargs)
def setup_data(self, handler_kwargs: dict = None, **kwargs):
"""
Setup the Data
Parameters
----------
handler_kwargs : dict
init arguments of DataHandler, which could include the following arguments:
- init_type : Init Type of Handler
- enable_cache : whether to enable cache
"""
super().setup_data(**kwargs)
if handler_kwargs is not None:
self.handler.setup_data(**handler_kwargs)
def __repr__(self):
return "{name}(handler={handler}, segments={segments})".format(
name=self.__class__.__name__, handler=self.handler, segments=self.segments
)
def _prepare_seg(self, slc: slice, **kwargs):
"""
Give a slice, retrieve the according data
Parameters
----------
slc : slice
"""
if hasattr(self, "fetch_kwargs"):
return self.handler.fetch(slc, **kwargs, **self.fetch_kwargs)
else:
return self.handler.fetch(slc, **kwargs)
def prepare(
self,
segments: Union[List[Text], Tuple[Text], Text, slice],
col_set=DataHandler.CS_ALL,
data_key=DataHandlerLP.DK_I,
**kwargs,
) -> Union[List[pd.DataFrame], pd.DataFrame]:
"""
Prepare the data for learning and inference.
Parameters
----------
segments : Union[List[Text], Tuple[Text], Text, slice]
Describe the scope of the data to be prepared
Here are some examples:
- 'train'
- ['train', 'valid']
col_set : str
The col_set will be passed to self.handler when fetching data.
data_key : str
The data to fetch: DK_*
Default is DK_I, which indicate fetching data for **inference**.
kwargs :
The parameters that kwargs may contain:
flt_col : str
It only exists in TSDatasetH, can be used to add a column of data(True or False) to filter data.
This parameter is only supported when it is an instance of TSDatasetH.
Returns
-------
Union[List[pd.DataFrame], pd.DataFrame]:
Raises
------
NotImplementedError:
"""
logger = get_module_logger("DatasetH")
fetch_kwargs = {"col_set": col_set}
fetch_kwargs.update(kwargs)
if "data_key" in getfullargspec(self.handler.fetch).args:
fetch_kwargs["data_key"] = data_key
else:
logger.info(f"data_key[{data_key}] is ignored.")
# Handle all kinds of segments format
if isinstance(segments, (list, tuple)):
return [self._prepare_seg(slice(*self.segments[seg]), **fetch_kwargs) for seg in segments]
elif isinstance(segments, str):
return self._prepare_seg(slice(*self.segments[segments]), **fetch_kwargs)
elif isinstance(segments, slice):
return self._prepare_seg(segments, **fetch_kwargs)
else:
raise NotImplementedError(f"This type of input is not supported")
class TSDataSampler:
"""
(T)ime-(S)eries DataSampler
This is the result of TSDatasetH
It works like `torch.data.utils.Dataset`, it provides a very convenient interface for constructing time-series
dataset based on tabular data.
If user have further requirements for processing data, user could process them based on `TSDataSampler` or create
more powerful subclasses.
Known Issues:
- For performance issues, this Sampler will convert dataframe into arrays for better performance. This could result
in a different data type
"""
def __init__(
self, data: pd.DataFrame, start, end, step_len: int, fillna_type: str = "none", dtype=None, flt_data=None
):
"""
Build a dataset which looks like torch.data.utils.Dataset.
Parameters
----------
data : pd.DataFrame
The raw tabular data
start :
The indexable start time
end :
The indexable end time
step_len : int
The length of the time-series step
fillna_type : int
How will qlib handle the sample if there is on sample in a specific date.
none:
fill with np.nan
ffill:
ffill with previous sample
ffill+bfill:
ffill with previous samples first and fill with later samples second
flt_data : pd.Series
a column of data(True or False) to filter data.
None:
kepp all data
"""
self.start = start
self.end = end
self.step_len = step_len
self.fillna_type = fillna_type
assert get_level_index(data, "datetime") == 0
self.data = lazy_sort_index(data)
kwargs = {"object": self.data}
if dtype is not None:
kwargs["dtype"] = dtype
self.data_arr = np.array(**kwargs) # Get index from numpy.array will much faster than DataFrame.values!
# NOTE:
# - append last line with full NaN for better performance in `__getitem__`
# - Keep the same dtype will result in a better performance
self.data_arr = np.append(
self.data_arr, np.full((1, self.data_arr.shape[1]), np.nan, dtype=self.data_arr.dtype), axis=0
)
self.nan_idx = -1 # The last line is all NaN
# the data type will be changed
# The index of usable data is between start_idx and end_idx
self.idx_df, self.idx_map = self.build_index(self.data)
self.data_index = deepcopy(self.data.index)
if flt_data is not None:
self.flt_data = np.array(flt_data.reindex(self.data_index)).reshape(-1)
self.idx_map = self.flt_idx_map(self.flt_data, self.idx_map)
self.data_index = self.data_index[np.where(self.flt_data == True)[0]]
self.start_idx, self.end_idx = self.data_index.slice_locs(start=pd.Timestamp(start), end=pd.Timestamp(end))
self.idx_arr = np.array(self.idx_df.values, dtype=np.float64) # for better performance
del self.data # save memory
@staticmethod
def flt_idx_map(flt_data, idx_map):
idx = 0
new_idx_map = {}
for i, exist in enumerate(flt_data):
if exist:
new_idx_map[idx] = idx_map[i]
idx += 1
return new_idx_map
def get_index(self):
"""
Get the pandas index of the data, it will be useful in following scenarios
- Special sampler will be used (e.g. user want to sample day by day)
"""
return self.data_index[self.start_idx : self.end_idx]
def config(self, **kwargs):
# Config the attributes
for k, v in kwargs.items():
setattr(self, k, v)
@staticmethod
def build_index(data: pd.DataFrame) -> dict:
"""
The relation of the data
Parameters
----------
data : pd.DataFrame
The dataframe with <datetime, DataFrame>
Returns
-------
dict:
{<index>: <prev_index or None>}
# get the previous index of a line given index
"""
# object incase of pandas converting int to flaot
idx_df = pd.Series(range(data.shape[0]), index=data.index, dtype=object)
idx_df = lazy_sort_index(idx_df.unstack())
# NOTE: the correctness of `__getitem__` depends on columns sorted here
idx_df = lazy_sort_index(idx_df, axis=1)
idx_map = {}
for i, (_, row) in enumerate(idx_df.iterrows()):
for j, real_idx in enumerate(row):
if not np.isnan(real_idx):
idx_map[real_idx] = (i, j)
return idx_df, idx_map
def _get_indices(self, row: int, col: int) -> np.array:
"""
get series indices of self.data_arr from the row, col indices of self.idx_df
Parameters
----------
row : int
the row in self.idx_df
col : int
the col in self.idx_df
Returns
-------
np.array:
The indices of data of the data
"""
indices = self.idx_arr[max(row - self.step_len + 1, 0) : row + 1, col]
if len(indices) < self.step_len:
indices = np.concatenate([np.full((self.step_len - len(indices),), np.nan), indices])
if self.fillna_type == "ffill":
indices = np_ffill(indices)
elif self.fillna_type == "ffill+bfill":
indices = np_ffill(np_ffill(indices)[::-1])[::-1]
else:
assert self.fillna_type == "none"
return indices
def _get_row_col(self, idx) -> Tuple[int]:
"""
get the col index and row index of a given sample index in self.idx_df
Parameters
----------
idx :
the input of `__getitem__`
Returns
-------
Tuple[int]:
the row and col index
"""
# The the right row number `i` and col number `j` in idx_df
if isinstance(idx, (int, np.integer)):
real_idx = self.start_idx + idx
if self.start_idx <= real_idx < self.end_idx:
i, j = self.idx_map[real_idx] # TODO: The performance of this line is not good
else:
raise KeyError(f"{real_idx} is out of [{self.start_idx}, {self.end_idx})")
elif isinstance(idx, tuple):
# <TSDataSampler object>["datetime", "instruments"]
date, inst = idx
date = pd.Timestamp(date)
i = bisect.bisect_right(self.idx_df.index, date) - 1
# NOTE: This relies on the idx_df columns sorted in `__init__`
j = bisect.bisect_left(self.idx_df.columns, inst)
else:
raise NotImplementedError(f"This type of input is not supported")
return i, j
def __getitem__(self, idx: Union[int, Tuple[object, str], List[int]]):
"""
# We have two method to get the time-series of a sample
tsds is a instance of TSDataSampler
# 1) sample by int index directly
tsds[len(tsds) - 1]
# 2) sample by <datetime,instrument> index
tsds['2016-12-31', "SZ300315"]
# The return value will be similar to the data retrieved by following code
df.loc(axis=0)['2015-01-01':'2016-12-31', "SZ300315"].iloc[-30:]
Parameters
----------
idx : Union[int, Tuple[object, str]]
"""
# Multi-index type
mtit = (list, np.ndarray)
if isinstance(idx, mtit):
indices = [self._get_indices(*self._get_row_col(i)) for i in idx]
indices = np.concatenate(indices)
else:
indices = self._get_indices(*self._get_row_col(idx))
# 1) for better performance, use the last nan line for padding the lost date
# 2) In case of precision problems. We use np.float64. # TODO: I'm not sure if whether np.float64 will result in
# precision problems. It will not cause any problems in my tests at least
indices = np.nan_to_num(indices.astype(np.float64), nan=self.nan_idx).astype(int)
data = self.data_arr[indices]
if isinstance(idx, mtit):
# if we get multiple indexes, addition dimension should be added.
# <sample_idx, step_idx, feature_idx>
data = data.reshape(-1, self.step_len, *data.shape[1:])
return data
def __len__(self):
return self.end_idx - self.start_idx
class TSDatasetH(DatasetH):
"""
(T)ime-(S)eries Dataset (H)andler
Convert the tabular data to Time-Series data
Requirements analysis
The typical workflow of a user to get time-series data for an sample
- process features
- slice proper data from data handler: dimension of sample <feature, >
- Build relation of samples by <time, instrument> index
- Be able to sample times series of data <timestep, feature>
- It will be better if the interface is like "torch.utils.data.Dataset"
- User could build customized batch based on the data
- The dimension of a batch of data <batch_idx, feature, timestep>
"""
def __init__(self, step_len=30, **kwargs):
self.step_len = step_len
super().__init__(**kwargs)
def config(self, **kwargs):
if "step_len" in kwargs:
self.step_len = kwargs.pop("step_len")
super().config(**kwargs)
def setup_data(self, **kwargs):
super().setup_data(**kwargs)
cal = self.handler.fetch(col_set=self.handler.CS_RAW).index.get_level_values("datetime").unique()
cal = sorted(cal)
self.cal = cal
def _prepare_raw_seg(self, slc: slice, **kwargs) -> pd.DataFrame:
# Dataset decide how to slice data(Get more data for timeseries).
start, end = slc.start, slc.stop
start_idx = bisect.bisect_left(self.cal, pd.Timestamp(start))
pad_start_idx = max(0, start_idx - self.step_len)
pad_start = self.cal[pad_start_idx]
# TSDatasetH will retrieve more data for complete
data = super()._prepare_seg(slice(pad_start, end), **kwargs)
return data
def _prepare_seg(self, slc: slice, **kwargs) -> TSDataSampler:
"""
split the _prepare_raw_seg is to leave a hook for data preprocessing before creating processing data
"""
dtype = kwargs.pop("dtype", None)
start, end = slc.start, slc.stop
flt_col = kwargs.pop("flt_col", None)
# TSDatasetH will retrieve more data for complete
data = self._prepare_raw_seg(slc, **kwargs)
flt_kwargs = deepcopy(kwargs)
if flt_col is not None:
flt_kwargs["col_set"] = flt_col
flt_data = self._prepare_raw_seg(slc, **flt_kwargs)
assert len(flt_data.columns) == 1
else:
flt_data = None
tsds = TSDataSampler(data=data, start=start, end=end, step_len=self.step_len, dtype=dtype, flt_data=flt_data)
return tsds