From 058f185ca6dbc9474f59d481757b5728938f3966 Mon Sep 17 00:00:00 2001 From: zhupr Date: Thu, 25 Mar 2021 17:22:05 +0800 Subject: [PATCH 01/11] add data-storage --- qlib/storage/__init__.py | 0 qlib/storage/storage.py | 154 ++++++++++++++++++++++++ tests/storage_tests/test_storage.py | 174 ++++++++++++++++++++++++++++ 3 files changed, 328 insertions(+) create mode 100644 qlib/storage/__init__.py create mode 100644 qlib/storage/storage.py create mode 100644 tests/storage_tests/test_storage.py diff --git a/qlib/storage/__init__.py b/qlib/storage/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/qlib/storage/storage.py b/qlib/storage/storage.py new file mode 100644 index 0000000000..dac0e167d1 --- /dev/null +++ b/qlib/storage/storage.py @@ -0,0 +1,154 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +import abc + +from typing import ( + Iterable, + overload, + TypeVar, + Tuple, + List, + Text, + Optional, + AbstractSet, + Mapping, + Iterator, +) + + +# calendar value type +CalVT = TypeVar("CalVT") + +# instrument value +InstVT = List[Tuple[CalVT, CalVT]] +# instrument key +InstKT = Text + + +FeatureVT = Tuple[int, float] + + +class CalendarStorage: + def __init__(self, uri: str): + self._uri = uri + + def append(self, obj: CalVT) -> None: + """ Append object to the end of the CalendarStorage. """ + raise NotImplementedError("Subclass of CalendarStorage must implement `append` method") + + def clear(self): + """ Remove all items from CalendarStorage. """ + raise NotImplementedError("Subclass of CalendarStorage must implement `clear` method") + + def extend(self, iterable: Iterable[CalVT]): + """ Extend list by appending elements from the iterable. """ + raise NotImplementedError("Subclass of CalendarStorage must implement `extend` method") + + @overload + @abc.abstractmethod + def __getitem__(self, s: slice) -> Iterable[CalVT]: + """x.__getitem__(slice(start: int, stop: int, step: int)) <==> x[start:stop:step]""" + raise NotImplementedError("Subclass of CalendarStorage must implement `__getitem__(s: slice)` method") + + @abc.abstractmethod + def __getitem__(self, i: int) -> CalVT: + """x.__getitem__(y) <==> x[y]""" + + raise NotImplementedError("Subclass of CalendarStorage must implement `__getitem__(i: int)` method") + + @abc.abstractmethod + def __iter__(self) -> Iterator[CalVT]: + """ Implement iter(self). """ + raise NotImplementedError("Subclass of CalendarStorage must implement `__iter__` method") + + def __len__(self) -> int: + raise NotImplementedError("Subclass of CalendarStorage must implement `__len__` method") + + +class InstrumentStorage: + def __init__(self, uri: str): + self._uri = uri + + def clear(self) -> None: + """ D.clear() -> None. Remove all items from D. """ + raise NotImplementedError("Subclass of InstrumentStorage must implement `clear` method") + + @abc.abstractmethod + def get(self, k: InstKT) -> Optional[InstVT]: + """D.get(k) -> InstV or None""" + raise NotImplementedError("Subclass of InstrumentStorage must implement `get` method") + + @abc.abstractmethod + def items(self) -> AbstractSet[Tuple[InstKT, InstVT]]: + """ D.items() -> a set-like object providing a view on D's items """ + raise NotImplementedError("Subclass of InstrumentStorage must implement `items` method") + + @abc.abstractmethod + def keys(self) -> AbstractSet[InstKT]: + """ D.keys() -> a set-like object providing a view on D's keys """ + raise NotImplementedError("Subclass of InstrumentStorage must implement `keys` method") + + def update(self, e: Mapping[InstKT, InstVT] = None, **f: InstVT) -> None: + """ + D.update([e, ]**f) -> None. Update D from dict/iterable e and f. + If e is present and has a .keys() method, then does: for k in e: D[k] = e[k] + If e is present and lacks a .keys() method, then does: for k, v in e: D[k] = v + In either case, this is followed by: for k in f: D[k] = f[k] + """ + raise NotImplementedError("Subclass of InstrumentStorage must implement `update` method") + + def __setitem__(self, k: InstKT, v: InstVT) -> None: + """ Set self[key] to value. """ + raise NotImplementedError("Subclass of InstrumentStorage must implement `__setitem__` method") + + def __delitem__(self, k: InstKT) -> None: + """ Delete self[key]. """ + raise NotImplementedError("Subclass of InstrumentStorage must implement `__delitem__` method") + + @abc.abstractmethod + def __getitem__(self, k: InstKT) -> InstVT: + """ x.__getitem__(y) <==> x[y] """ + raise NotImplementedError("Subclass of InstrumentStorage must implement `__getitem__` method") + + def __len__(self) -> int: + """ Return len(self). """ + raise NotImplementedError("Subclass of InstrumentStorage must implement `__len__` method") + + +class FeatureStorage: + def __init__(self, uri: str): + self._uri = uri + + def append(self, obj: FeatureVT) -> None: + """ Append object to the end of the FeatureStorage. """ + raise NotImplementedError("Subclass of FeatureStorage must implement `append` method") + + def clear(self): + """ Remove all items from FeatureStorage. """ + raise NotImplementedError("Subclass of FeatureStorage must implement `clear` method") + + def extend(self, iterable: Iterable[FeatureVT]): + """ Extend list by appending elements from the iterable. """ + raise NotImplementedError("Subclass of FeatureStorage must implement `extend` method") + + @overload + @abc.abstractmethod + def __getitem__(self, s: slice) -> Iterable[FeatureVT]: + """x.__getitem__(slice(start: int, stop: int, step: int)) <==> x[start:stop:step]""" + raise NotImplementedError("Subclass of FeatureStorage must implement `__getitem__(s: slice)` method") + + @abc.abstractmethod + def __getitem__(self, i: int) -> float: + """x.__getitem__(y) <==> x[y]""" + + raise NotImplementedError("Subclass of FeatureStorage must implement `__getitem__(i: int)` method") + + def __len__(self) -> int: + raise NotImplementedError("Subclass of FeatureStorage must implement `__len__` method") + + @abc.abstractmethod + def __iter__(self) -> Iterator[FeatureVT]: + """ Implement iter(self). """ + raise NotImplementedError("Subclass of FeatureStorage must implement `__iter__` method") diff --git a/tests/storage_tests/test_storage.py b/tests/storage_tests/test_storage.py new file mode 100644 index 0000000000..d4b37be777 --- /dev/null +++ b/tests/storage_tests/test_storage.py @@ -0,0 +1,174 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from pathlib import Path +from importlib.util import spec_from_file_location, module_from_spec + +import pandas as pd + + +# TODO: set STORAGE_NAME +STORAGE_NAME = "" +STORAGE_FILE_PATH = Path("") +# TODO: set value +CALENDAR_URI = "" +INSTRUMENT_URI = "" +FEATURE_URI = "" + + +def get_module(module_path: Path): + module_spec = spec_from_file_location("", module_path) + module = module_from_spec(module_spec) + module_spec.loader.exec_module(module) + return module + + +STORAGE_MODULE = get_module(STORAGE_FILE_PATH) + + +CalendarStorage = getattr(STORAGE_MODULE, f"{STORAGE_NAME.title()}CalendarStorage") +InstrumentStorage = getattr(STORAGE_MODULE, f"{STORAGE_NAME.title()}InstrumentStorage") +FeatureStorage = getattr(STORAGE_MODULE, f"{STORAGE_NAME.title()}FeatureStorage") + + +class TestCalendarStorage: + def test_calendar_storage(self): + # calendar value: pd.date_range(start="2005-01-01", stop="2005-03-01", freq="1D") + start_date = "2005-01-01" + end_date = "2005-03-01" + values = pd.date_range(start_date, end_date, freq="1D") + + calendar = CalendarStorage(uri=CALENDAR_URI) + # test `__iter__` + for _s, _t in zip(calendar, values): + assert pd.Timestamp(_s) == pd.Timestamp(_t), f"{calendar.__name__}.__iter__ error" + + # test `__getitem__(self, s: slice)` + for _s, _t in zip(calendar[1:3], values[1:3]): + assert pd.Timestamp(_s) == pd.Timestamp(_t), f"{calendar.__name__}.__getitem__(s: slice) error" + + # test `__getitem__(self, i)` + assert pd.Timestamp(calendar[0]) == pd.Timestamp(values[0]), f"{calendar.__name__}.__getitem__(i: int) error" + + def test_instrument_storage(self): + """ + The meaning of instrument, such as CSI500: + + CSI500 composition changes: + + date add remove + 2005-01-01 SH600000 + 2005-01-01 SH600001 + 2005-01-01 SH600002 + 2005-02-01 SH600003 SH600000 + 2005-02-15 SH600000 SH600002 + + Calendar: + pd.date_range(start="2020-01-01", stop="2020-03-01", freq="1D") + + Instrument: + symbol start_time end_time + SH600000 2005-01-01 2005-01-31 (2005-02-01 Last trading day) + SH600000 2005-02-15 2005-03-01 + SH600001 2005-01-01 2005-03-01 + SH600002 2005-01-01 2005-02-14 (2005-02-15 Last trading day) + SH600003 2005-02-01 2005-03-01 + + InstrumentStorage: + { + "SH600000": [(2005-01-01, 2005-01-31), (2005-02-15, 2005-03-01)], + "SH600001": [(2005-01-01, 2005-03-01)], + "SH600002": [(2005-01-01, 2005-02-14)], + "SH600003": [(2005-02-01, 2005-03-01)], + } + + """ + base_instrument = { + "SH600000": [("2005-01-01", "2005-01-31"), ("2005-02-15", "2005-03-01")], + "SH600001": [("2005-01-01", "2005-03-01")], + "SH600002": [("2005-01-01", "2005-02-14")], + "SH600003": [("2005-02-01", "2005-03-01")], + } + instrument = InstrumentStorage(uri=INSTRUMENT_URI) + + # test `keys` + assert sorted(instrument.keys()) == sorted(base_instrument.keys()), f"{instrument.__name__}.keys error" + # test `__getitem__` + assert instrument["SH600000"] == base_instrument["SH600000"], f"{instrument.__name__}.__getitem__ error" + # test `get` + assert instrument.get("SH600001") == base_instrument.get("SH600001"), f"{instrument.__name__}.get error" + # test `items` + for _item in instrument.items(): + assert base_instrument[_item[0]] == _item[1] + assert len(instrument.items()) == len(instrument) == len(base_instrument), f"{instrument.__name__}.items error" + + def test_feature_storage(self): + """ + Calendar: + pd.date_range(start="2005-01-01", stop="2005-03-01", freq="1D") + + Instrument: + { + "SH600000": [(2005-01-01, 2005-01-31), (2005-02-15, 2005-03-01)], + "SH600001": [(2005-01-01, 2005-03-01)], + "SH600002": [(2005-01-01, 2005-02-14)], + "SH600003": [(2005-02-01, 2005-03-01)], + } + + Feature: + Stock data(close): + 2005-01-01 ... 2005-02-01 ... 2005-02-14 2005-02-15 ... 2005-03-01 + SH600000 1 ... 3 ... 4 5 6 + SH600001 1 ... 4 ... 5 6 7 + SH600002 1 ... 5 ... 6 nan nan + SH600003 nan ... 1 ... 2 3 4 + + FeatureStorage(SH600000, close): + + [ + (calendar.index("2005-01-01"), 1), + ..., + (calendar.index("2005-03-01"), 6) + ] + + ====> [(0, 1), ..., (59, 6)] + + + FeatureStorage(SH600002, close): + + [ + (calendar.index("2005-01-01"), 1), + ..., + (calendar.index("2005-02-14"), 6) + ] + + ===> [(0, 1), ..., (44, 6)] + + FeatureStorage(SH600003, close): + + [ + (calendar.index("2005-02-01"), 1), + ..., + (calendar.index("2005-03-01"), 4) + ] + + ===> [(31, 1), ..., (59, 4)] + + """ + + # FeatureStorage(SH600003, close) + feature = FeatureStorage(uri=FEATURE_URI) + # 2005-02-01 and 2005-03-01 + assert feature[31] == 1 and feature[59] == 4, f"{feature.__name__}.__getitem__(i: int) error" + + # 2005-02-01, 2005-02-02, 2005-02-03 + # close_items: [(31, 1), ..., (33, )] + close_items = feature[31:34] + + # 2005-02-01, ..., 2005-03-01 + # feature: [(31, 1), ..., (59, 4)] + print(feature) + + assert ( + len(feature) == len(feature[:]) == len(feature[31:60]) == 29 + ), f"{feature.__name__}.items/__getitem__(s: slice) error" From f3001fe741fa57fc9f93e64dab476b362f34e337 Mon Sep 17 00:00:00 2001 From: zhupr Date: Fri, 26 Mar 2021 16:14:45 +0800 Subject: [PATCH 02/11] Add FileStorage --- qlib/data/storage/__init__.py | 4 + qlib/data/storage/file_storage.py | 91 ++++++++++++++++++ qlib/data/storage/storage.py | 135 ++++++++++++++++++++++++++ qlib/storage/__init__.py | 0 qlib/storage/storage.py | 154 ------------------------------ 5 files changed, 230 insertions(+), 154 deletions(-) create mode 100644 qlib/data/storage/__init__.py create mode 100644 qlib/data/storage/file_storage.py create mode 100644 qlib/data/storage/storage.py delete mode 100644 qlib/storage/__init__.py delete mode 100644 qlib/storage/storage.py diff --git a/qlib/data/storage/__init__.py b/qlib/data/storage/__init__.py new file mode 100644 index 0000000000..eb513714be --- /dev/null +++ b/qlib/data/storage/__init__.py @@ -0,0 +1,4 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from .storage import CalendarStorage, InstrumentStorage, FeatureStorage \ No newline at end of file diff --git a/qlib/data/storage/file_storage.py b/qlib/data/storage/file_storage.py new file mode 100644 index 0000000000..9d98545ced --- /dev/null +++ b/qlib/data/storage/file_storage.py @@ -0,0 +1,91 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from pathlib import Path +from typing import Iterator, Iterable, Type, List, Tuple, Text, Union + +from data.storage.storage import FeatureVT + +import numpy as np +import pandas as pd +from qlib.data.storage import CalendarStorage, InstrumentStorage, FeatureStorage + + +CalVT = Type[pd.Timestamp] +# instrument value +InstVT = List[Tuple[CalVT, CalVT]] +# instrument key +InstKT = Text + + +class FileCalendarStorage(CalendarStorage): + def __init__(self, uri: str): + super(FileCalendarStorage, self).__init__(uri=uri) + with open(uri) as f: + self._data = [pd.Timestamp(x.strip()) for x in f] + + def __getitem__(self, i: Union[int, slice]) -> Union[CalVT, Iterable[CalVT]]: + if isinstance(i, (int, slice)): + return self._data[i] + else: + raise TypeError(f"type(i) = {type(i)}") + + def __len__(self) -> int: + return len(self._data) + + +class FileInstrumentStorage(InstrumentStorage): + def __init__(self, uri: str): + super(FileInstrumentStorage, self).__init__(uri=uri) + self._data = self._load_data() + + def _load_data(self): + _instruments = dict() + df = pd.read_csv( + self._uri, + sep="\t", + usecols=[0, 1, 2], + names=["inst", "start_datetime", "end_datetime"], + dtype={"inst": str}, + parse_dates=["start_datetime", "end_datetime"], + ) + for row in df.itertuples(index=False): + _instruments.setdefault(row[0], []).append((row[1], row[2])) + return _instruments + + def __getitem__(self, k: InstKT) -> InstVT: + return self._data[k] + + def __len__(self) -> int: + return len(self._data) + + def __iter__(self) -> Iterator[InstKT]: + return self._data.__iter__() + + +class FileFeatureStorage(FeatureStorage): + def __getitem__(self, i: Union[int, slice]) -> Union[FeatureVT, Iterable[FeatureVT]]: + with open(self._uri, "rb") as fp: + ref_start_index = int(np.frombuffer(fp.read(4), dtype=" i: + raise IndexError(f"{i}") + fp.seek(4 * (i - ref_start_index) + 4) + return i, float(fp.read(4)) + elif isinstance(i, slice): + start_index = i.start + end_index = i.stop - 1 + si = max(ref_start_index, start_index) + if si > end_index: + return [] + fp.seek(4 * (si - ref_start_index) + 4) + # read n bytes + count = end_index - si + 1 + data = np.frombuffer(fp.read(4 * count), dtype=" int: + return Path(self._uri).stat().st_size // 4 - 1 diff --git a/qlib/data/storage/storage.py b/qlib/data/storage/storage.py new file mode 100644 index 0000000000..7848c243f2 --- /dev/null +++ b/qlib/data/storage/storage.py @@ -0,0 +1,135 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +from abc import abstractmethod +from collections.abc import MutableSequence, MutableMapping, Sequence +from typing import Iterable, overload, TypeVar, Tuple, List, Text, Iterator + + +# calendar value type +CalVT = TypeVar("CalVT") + +# instrument value +InstVT = List[Tuple[CalVT, CalVT]] +# instrument key +InstKT = Text + + +FeatureVT = Tuple[int, float] + + +class CalendarStorage(MutableSequence): + def __init__(self, uri: str): + self._uri = uri + + def insert(self, index: int, o: CalVT) -> None: + raise NotImplementedError("Subclass of CalendarStorage must implement `insert` method") + + @overload + def __setitem__(self, i: int, o: CalVT) -> None: + """x.__setitem__(i, o) <==> x[i] = o""" + ... + + @overload + def __setitem__(self, s: slice, o: Iterable[CalVT]) -> None: + """x.__setitem__(s, o) <==> x[s] = o""" + ... + + def __setitem__(self, i, o) -> None: + raise NotImplementedError( + "Subclass of CalendarStorage must implement `__setitem__(i: int, o: CalVT)`/`__setitem__(s: slice, o: Iterable[CalVT])` method" + ) + + @overload + def __delitem__(self, i: int) -> None: + """x.__delitem__(i) <==> del x[i]""" + ... + + @overload + def __delitem__(self, i: slice) -> None: + """x.__delitem__(slice(start: int, stop: int, step: int)) <==> del x[start:stop:step]""" + ... + + def __delitem__(self, i) -> None: + raise NotImplementedError( + "Subclass of CalendarStorage must implement `__delitem__(i: int)`/`__delitem__(s: slice)` method" + ) + + @overload + def __getitem__(self, s: slice) -> Iterable[CalVT]: + """x.__getitem__(slice(start: int, stop: int, step: int)) <==> x[start:stop:step]""" + ... + + @overload + def __getitem__(self, i: int) -> CalVT: + """x.__getitem__(i) <==> x[i]""" + ... + + def __getitem__(self, i) -> CalVT: + raise NotImplementedError( + "Subclass of CalendarStorage must implement `__getitem__(i: int)`/`__getitem__(s: slice)` method" + ) + + def __len__(self) -> int: + """x.__len__() <==> len(x)""" + raise NotImplementedError("Subclass of CalendarStorage must implement `__len__` method") + + +class InstrumentStorage(MutableMapping): + def __init__(self, uri: str): + self._uri = uri + + def __setitem__(self, k: InstKT, v: InstVT) -> None: + """ Set self[key] to value. """ + raise NotImplementedError("Subclass of InstrumentStorage must implement `__setitem__` method") + + def __delitem__(self, k: InstKT) -> None: + """ Delete self[key]. """ + raise NotImplementedError("Subclass of InstrumentStorage must implement `__delitem__` method") + + def __getitem__(self, k: InstKT) -> InstVT: + """ x.__getitem__(k) <==> x[k] """ + raise NotImplementedError("Subclass of InstrumentStorage must implement `__getitem__` method") + + def __len__(self) -> int: + """ Return len(self). """ + raise NotImplementedError("Subclass of InstrumentStorage must implement `__len__` method") + + def __iter__(self) -> Iterator[InstKT]: + """ Return iter(self). """ + raise NotImplementedError("Subclass of InstrumentStorage must implement `__iter__` method") + + +class FeatureStorage(Sequence): + def __init__(self, uri: str): + self._uri = uri + + def append(self, obj: FeatureVT) -> None: + """ Append object to the end of the FeatureStorage. """ + raise NotImplementedError("Subclass of FeatureStorage must implement `append` method") + + def clear(self): + """ Remove all items from FeatureStorage. """ + raise NotImplementedError("Subclass of FeatureStorage must implement `clear` method") + + def extend(self, iterable: Iterable[FeatureVT]): + """ Extend list by appending elements from the iterable. """ + raise NotImplementedError("Subclass of FeatureStorage must implement `extend` method") + + @overload + def __getitem__(self, s: slice) -> Iterable[FeatureVT]: + """x.__getitem__(slice(start: int, stop: int, step: int)) <==> x[start:stop:step]""" + ... + + @overload + def __getitem__(self, i: int) -> float: + """x.__getitem__(y) <==> x[y]""" + ... + + def __getitem__(self, i) -> float: + """x.__getitem__(y) <==> x[y]""" + raise NotImplementedError( + "Subclass of FeatureStorage must implement `__getitem__(i: int)`/`__getitem__(s: slice)` method" + ) + + def __len__(self) -> int: + raise NotImplementedError("Subclass of FeatureStorage must implement `__len__` method") diff --git a/qlib/storage/__init__.py b/qlib/storage/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/qlib/storage/storage.py b/qlib/storage/storage.py deleted file mode 100644 index dac0e167d1..0000000000 --- a/qlib/storage/storage.py +++ /dev/null @@ -1,154 +0,0 @@ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - - -import abc - -from typing import ( - Iterable, - overload, - TypeVar, - Tuple, - List, - Text, - Optional, - AbstractSet, - Mapping, - Iterator, -) - - -# calendar value type -CalVT = TypeVar("CalVT") - -# instrument value -InstVT = List[Tuple[CalVT, CalVT]] -# instrument key -InstKT = Text - - -FeatureVT = Tuple[int, float] - - -class CalendarStorage: - def __init__(self, uri: str): - self._uri = uri - - def append(self, obj: CalVT) -> None: - """ Append object to the end of the CalendarStorage. """ - raise NotImplementedError("Subclass of CalendarStorage must implement `append` method") - - def clear(self): - """ Remove all items from CalendarStorage. """ - raise NotImplementedError("Subclass of CalendarStorage must implement `clear` method") - - def extend(self, iterable: Iterable[CalVT]): - """ Extend list by appending elements from the iterable. """ - raise NotImplementedError("Subclass of CalendarStorage must implement `extend` method") - - @overload - @abc.abstractmethod - def __getitem__(self, s: slice) -> Iterable[CalVT]: - """x.__getitem__(slice(start: int, stop: int, step: int)) <==> x[start:stop:step]""" - raise NotImplementedError("Subclass of CalendarStorage must implement `__getitem__(s: slice)` method") - - @abc.abstractmethod - def __getitem__(self, i: int) -> CalVT: - """x.__getitem__(y) <==> x[y]""" - - raise NotImplementedError("Subclass of CalendarStorage must implement `__getitem__(i: int)` method") - - @abc.abstractmethod - def __iter__(self) -> Iterator[CalVT]: - """ Implement iter(self). """ - raise NotImplementedError("Subclass of CalendarStorage must implement `__iter__` method") - - def __len__(self) -> int: - raise NotImplementedError("Subclass of CalendarStorage must implement `__len__` method") - - -class InstrumentStorage: - def __init__(self, uri: str): - self._uri = uri - - def clear(self) -> None: - """ D.clear() -> None. Remove all items from D. """ - raise NotImplementedError("Subclass of InstrumentStorage must implement `clear` method") - - @abc.abstractmethod - def get(self, k: InstKT) -> Optional[InstVT]: - """D.get(k) -> InstV or None""" - raise NotImplementedError("Subclass of InstrumentStorage must implement `get` method") - - @abc.abstractmethod - def items(self) -> AbstractSet[Tuple[InstKT, InstVT]]: - """ D.items() -> a set-like object providing a view on D's items """ - raise NotImplementedError("Subclass of InstrumentStorage must implement `items` method") - - @abc.abstractmethod - def keys(self) -> AbstractSet[InstKT]: - """ D.keys() -> a set-like object providing a view on D's keys """ - raise NotImplementedError("Subclass of InstrumentStorage must implement `keys` method") - - def update(self, e: Mapping[InstKT, InstVT] = None, **f: InstVT) -> None: - """ - D.update([e, ]**f) -> None. Update D from dict/iterable e and f. - If e is present and has a .keys() method, then does: for k in e: D[k] = e[k] - If e is present and lacks a .keys() method, then does: for k, v in e: D[k] = v - In either case, this is followed by: for k in f: D[k] = f[k] - """ - raise NotImplementedError("Subclass of InstrumentStorage must implement `update` method") - - def __setitem__(self, k: InstKT, v: InstVT) -> None: - """ Set self[key] to value. """ - raise NotImplementedError("Subclass of InstrumentStorage must implement `__setitem__` method") - - def __delitem__(self, k: InstKT) -> None: - """ Delete self[key]. """ - raise NotImplementedError("Subclass of InstrumentStorage must implement `__delitem__` method") - - @abc.abstractmethod - def __getitem__(self, k: InstKT) -> InstVT: - """ x.__getitem__(y) <==> x[y] """ - raise NotImplementedError("Subclass of InstrumentStorage must implement `__getitem__` method") - - def __len__(self) -> int: - """ Return len(self). """ - raise NotImplementedError("Subclass of InstrumentStorage must implement `__len__` method") - - -class FeatureStorage: - def __init__(self, uri: str): - self._uri = uri - - def append(self, obj: FeatureVT) -> None: - """ Append object to the end of the FeatureStorage. """ - raise NotImplementedError("Subclass of FeatureStorage must implement `append` method") - - def clear(self): - """ Remove all items from FeatureStorage. """ - raise NotImplementedError("Subclass of FeatureStorage must implement `clear` method") - - def extend(self, iterable: Iterable[FeatureVT]): - """ Extend list by appending elements from the iterable. """ - raise NotImplementedError("Subclass of FeatureStorage must implement `extend` method") - - @overload - @abc.abstractmethod - def __getitem__(self, s: slice) -> Iterable[FeatureVT]: - """x.__getitem__(slice(start: int, stop: int, step: int)) <==> x[start:stop:step]""" - raise NotImplementedError("Subclass of FeatureStorage must implement `__getitem__(s: slice)` method") - - @abc.abstractmethod - def __getitem__(self, i: int) -> float: - """x.__getitem__(y) <==> x[y]""" - - raise NotImplementedError("Subclass of FeatureStorage must implement `__getitem__(i: int)` method") - - def __len__(self) -> int: - raise NotImplementedError("Subclass of FeatureStorage must implement `__len__` method") - - @abc.abstractmethod - def __iter__(self) -> Iterator[FeatureVT]: - """ Implement iter(self). """ - raise NotImplementedError("Subclass of FeatureStorage must implement `__iter__` method") From 0b17088887b904998e2900f839a817877c91af98 Mon Sep 17 00:00:00 2001 From: zhupr Date: Sat, 27 Mar 2021 01:15:33 +0800 Subject: [PATCH 03/11] Fix FileStorage --- qlib/data/storage/__init__.py | 2 +- qlib/data/storage/file_storage.py | 20 +++-- qlib/data/storage/storage.py | 2 +- tests/storage_tests/test_storage.py | 120 +++++++++++++--------------- 4 files changed, 73 insertions(+), 71 deletions(-) diff --git a/qlib/data/storage/__init__.py b/qlib/data/storage/__init__.py index eb513714be..f425047915 100644 --- a/qlib/data/storage/__init__.py +++ b/qlib/data/storage/__init__.py @@ -1,4 +1,4 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. -from .storage import CalendarStorage, InstrumentStorage, FeatureStorage \ No newline at end of file +from .storage import CalendarStorage, InstrumentStorage, FeatureStorage diff --git a/qlib/data/storage/file_storage.py b/qlib/data/storage/file_storage.py index 9d98545ced..aadc918c37 100644 --- a/qlib/data/storage/file_storage.py +++ b/qlib/data/storage/file_storage.py @@ -1,14 +1,15 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +import struct from pathlib import Path from typing import Iterator, Iterable, Type, List, Tuple, Text, Union -from data.storage.storage import FeatureVT +from .storage import FeatureVT import numpy as np import pandas as pd -from qlib.data.storage import CalendarStorage, InstrumentStorage, FeatureStorage +from . import CalendarStorage, InstrumentStorage, FeatureStorage CalVT = Type[pd.Timestamp] @@ -70,9 +71,9 @@ def __getitem__(self, i: Union[int, slice]) -> Union[FeatureVT, Iterable[Feature if isinstance(i, int): if ref_start_index > i: - raise IndexError(f"{i}") + raise IndexError(f"{i}: start index is {ref_start_index}") fp.seek(4 * (i - ref_start_index) + 4) - return i, float(fp.read(4)) + return i, struct.unpack("f", fp.read(4)) elif isinstance(i, slice): start_index = i.start end_index = i.stop - 1 @@ -83,9 +84,18 @@ def __getitem__(self, i: Union[int, slice]) -> Union[FeatureVT, Iterable[Feature # read n bytes count = end_index - si + 1 data = np.frombuffer(fp.read(4 * count), dtype=" int: return Path(self._uri).stat().st_size // 4 - 1 + + def __iter__(self): + with open(self._uri, "rb") as fp: + ref_start_index = int(np.frombuffer(fp.read(4), dtype=")] - close_items = feature[31:34] - - # 2005-02-01, ..., 2005-03-01 - # feature: [(31, 1), ..., (59, 4)] - print(feature) - assert ( - len(feature) == len(feature[:]) == len(feature[31:60]) == 29 - ), f"{feature.__name__}.items/__getitem__(s: slice) error" + assert isinstance(feature, Iterable), f"{feature.__class__.__name__} is not Iterable" + with pytest.raises(IndexError): + print(feature[0]) + assert len(feature[815:818]) == 3, f"{feature.__class__.__name__}.__getitem__(s: slice) error" + print(f"feature[815: 818]: {feature[815: 818]}") + + for _item in feature: + assert ( + isinstance(_item, tuple) and len(_item) == 2 + ), f"{feature.__class__.__name__}.__iter__ item type error" + assert isinstance(_item[0], int) and isinstance( + _item[1], (float, np.float, np.float32) + ), f"{feature.__class__.__name__}.__iter__ value type error" From 610f87a410ebeded4b5ee7220ffeb3b684b68eee Mon Sep 17 00:00:00 2001 From: zhupr Date: Thu, 1 Apr 2021 12:58:34 +0800 Subject: [PATCH 04/11] Modify FileStorage --- qlib/data/storage/__init__.py | 2 +- qlib/data/storage/file_storage.py | 197 ++++++++++++++++++++++++------ qlib/data/storage/storage.py | 121 ++++++++++++++---- 3 files changed, 262 insertions(+), 58 deletions(-) diff --git a/qlib/data/storage/__init__.py b/qlib/data/storage/__init__.py index f425047915..552e1e3e8e 100644 --- a/qlib/data/storage/__init__.py +++ b/qlib/data/storage/__init__.py @@ -1,4 +1,4 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. -from .storage import CalendarStorage, InstrumentStorage, FeatureStorage +from .storage import CalendarStorage, InstrumentStorage, FeatureStorage, CalVT, InstVT, InstKT diff --git a/qlib/data/storage/file_storage.py b/qlib/data/storage/file_storage.py index aadc918c37..e2e5bd3e7b 100644 --- a/qlib/data/storage/file_storage.py +++ b/qlib/data/storage/file_storage.py @@ -3,69 +3,193 @@ import struct from pathlib import Path -from typing import Iterator, Iterable, Type, List, Tuple, Text, Union - -from .storage import FeatureVT +from typing import Iterator, Iterable, Union, Dict, Mapping, Tuple import numpy as np import pandas as pd -from . import CalendarStorage, InstrumentStorage, FeatureStorage - -CalVT = Type[pd.Timestamp] -# instrument value -InstVT = List[Tuple[CalVT, CalVT]] -# instrument key -InstKT = Text +from . import CalendarStorage, InstrumentStorage, FeatureStorage, CalVT, InstKT, InstVT class FileCalendarStorage(CalendarStorage): def __init__(self, uri: str): - super(FileCalendarStorage, self).__init__(uri=uri) - with open(uri) as f: - self._data = [pd.Timestamp(x.strip()) for x in f] + super(FileCalendarStorage, self).__init__(uri) + self._uri = Path(self._uri).expanduser().resolve() + + def _read_calendar(self, skip_rows: int = 0, n_rows: int = None) -> np.ndarray: + if not self._uri.exists(): + self._write_calendar(values=[]) + with self._uri.open("rb") as fp: + return np.loadtxt(fp, str, skiprows=skip_rows, max_rows=n_rows, encoding="utf-8") + + def _write_calendar(self, values: Iterable[CalVT], mode: str = "wb"): + with self._uri.open(mode=mode) as fp: + np.savetxt(fp, values, fmt="%s", encoding="utf-8") + + def extend(self, values: Iterable[CalVT]) -> None: + self._write_calendar(values, mode="ab") + + def clear(self) -> None: + self._write_calendar(values=[]) + + def index(self, value: CalVT) -> int: + calendar = self._read_calendar() + return int(np.argwhere(calendar == value)[0]) + + def insert(self, index: int, value: CalVT): + calendar = self._read_calendar() + calendar = np.insert(calendar, index, value) + self._write_calendar(values=calendar) + + def remove(self, value: CalVT) -> None: + index = self.index(value) + calendar = self._read_calendar() + calendar = np.delete(calendar, index) + self._write_calendar(values=calendar) + + def __setitem__(self, i: Union[int, slice], values: Union[CalVT, Iterable[CalVT]]) -> None: + calendar = self._read_calendar() + calendar[i] = values + self._write_calendar(values=calendar) + + def __delitem__(self, i: Union[int, slice]) -> None: + calendar = self._read_calendar() + calendar = np.delete(calendar, i) + self._write_calendar(values=calendar) def __getitem__(self, i: Union[int, slice]) -> Union[CalVT, Iterable[CalVT]]: - if isinstance(i, (int, slice)): - return self._data[i] - else: - raise TypeError(f"type(i) = {type(i)}") + return self._read_calendar()[i] def __len__(self) -> int: - return len(self._data) + return len(self._read_calendar()) + + def __iter__(self): + with self._uri.open("r") as fp: + yield fp.readline() class FileInstrumentStorage(InstrumentStorage): + INSTRUMENT_SEP = "\t" + INSTRUMENT_START_FIELD = "start_datetime" + INSTRUMENT_END_FIELD = "end_datetime" + SYMBOL_FIELD_NAME = "instrument" + def __init__(self, uri: str): super(FileInstrumentStorage, self).__init__(uri=uri) - self._data = self._load_data() + self._uri = Path(self._uri).expanduser().resolve() + + def _read_instrument(self) -> Dict[InstKT, InstVT]: + if not self._uri.exists(): + self._write_instrument() - def _load_data(self): _instruments = dict() df = pd.read_csv( self._uri, sep="\t", usecols=[0, 1, 2], - names=["inst", "start_datetime", "end_datetime"], - dtype={"inst": str}, - parse_dates=["start_datetime", "end_datetime"], + names=[self.SYMBOL_FIELD_NAME, self.INSTRUMENT_START_FIELD, self.INSTRUMENT_END_FIELD], + dtype={self.SYMBOL_FIELD_NAME: str}, + parse_dates=[self.INSTRUMENT_START_FIELD, self.INSTRUMENT_END_FIELD], ) for row in df.itertuples(index=False): _instruments.setdefault(row[0], []).append((row[1], row[2])) return _instruments + def _write_instrument(self, data: Dict[InstKT, InstVT] = None) -> None: + if not data: + with self._uri.open("w") as _: + pass + return + + res = [] + for inst, v_list in data.items(): + _df = pd.DataFrame(v_list, columns=[self.INSTRUMENT_START_FIELD, self.INSTRUMENT_END_FIELD]) + _df[self.SYMBOL_FIELD_NAME] = inst + res.append(_df) + + df = pd.concat(res, sort=False) + df.loc[:, [self.SYMBOL_FIELD_NAME, self.INSTRUMENT_START_FIELD, self.INSTRUMENT_END_FIELD]].to_csv( + self._uri, header=False, sep=self.INSTRUMENT_SEP, index=False + ) + df.to_csv(self._uri, sep="\t", encoding="utf-8", header=False, index=False) + + def clear(self) -> None: + self._write_instrument(data={}) + + def __setitem__(self, k: InstKT, v: InstVT) -> None: + inst = self._read_instrument() + inst[k] = v + self._write_instrument(inst) + + def __delitem__(self, k: InstKT) -> None: + inst = self._read_instrument() + del inst[k] + self._write_instrument(inst) + def __getitem__(self, k: InstKT) -> InstVT: - return self._data[k] + return self._read_instrument()[k] def __len__(self) -> int: - return len(self._data) + inst = self._read_instrument() + return len(inst) def __iter__(self) -> Iterator[InstKT]: - return self._data.__iter__() + for _inst in self._read_instrument().keys(): + yield _inst + + def update(self, *args, **kwargs) -> None: + + if len(args) > 1: + raise TypeError(f"update expected at most 1 arguments, got {len(args)}") + inst = self._read_instrument() + if args: + other = args[0] + if isinstance(other, Mapping): + for key in other: + inst[key] = other[key] + elif hasattr(other, "keys"): + for key in other.keys(): + inst[key] = other[key] + else: + for key, value in other: + inst[key] = value + for key, value in kwargs.items(): + inst[key] = value + + self._write_instrument(inst) class FileFeatureStorage(FeatureStorage): - def __getitem__(self, i: Union[int, slice]) -> Union[FeatureVT, Iterable[FeatureVT]]: + def __init__(self, uri: str): + super(FileFeatureStorage, self).__init__(uri=uri) + self._uri = Path(self._uri) + + def clear(self): + with self._uri.open("wb") as _: + pass + + def extend(self, series: pd.Series) -> None: + extend_start_index = self[0][0] + len(self) if self._uri.exists() else series.index[0] + series = series.reindex(pd.RangeIndex(extend_start_index, series.index[-1] + 1)) + with self._uri.open("ab") as fp: + np.array(series.values).astype(" None: + origin_series = self[:] + series = series.append(origin_series.loc[origin_series.index > series.index[-1]]) + series = series.reindex(pd.RangeIndex(series.index[0], series.index[-1])) + with self._uri.open("wb") as fp: + np.array(series.values).astype(" Union[Tuple[int, float], pd.Series]: + if not self._uri.exists(): + if isinstance(i, int): + return None, None + elif isinstance(i, slice): + return pd.Series() + else: + raise TypeError(f"type(i) = {type(i)}") + with open(self._uri, "rb") as fp: ref_start_index = int(np.frombuffer(fp.read(4), dtype=" Union[FeatureVT, Iterable[Feature end_index = i.stop - 1 si = max(ref_start_index, start_index) if si > end_index: - return [] + return pd.Series() fp.seek(4 * (si - ref_start_index) + 4) # read n bytes count = end_index - si + 1 data = np.frombuffer(fp.read(4 * count), dtype=" int: - return Path(self._uri).stat().st_size // 4 - 1 + return self._uri.stat().st_size // 4 - 1 if self._uri.exists() else 0 def __iter__(self): + if not self._uri.exists(): + return with open(self._uri, "rb") as fp: ref_start_index = int(np.frombuffer(fp.read(4), dtype=" None: + def extend(self, iterable: Iterable[CalVT]) -> None: + raise NotImplementedError("Subclass of CalendarStorage must implement `extend` method") + + def clear(self) -> None: + raise NotImplementedError("Subclass of CalendarStorage must implement `clear` method") + + def index(self, value: CalVT) -> int: + raise NotImplementedError("Subclass of CalendarStorage must implement `index` method") + + def insert(self, index: int, value: CalVT) -> None: raise NotImplementedError("Subclass of CalendarStorage must implement `insert` method") + def remove(self, value: CalVT) -> None: + raise NotImplementedError("Subclass of CalendarStorage must implement `remove` method") + @overload - def __setitem__(self, i: int, o: CalVT) -> None: + def __setitem__(self, i: int, value: CalVT) -> None: """x.__setitem__(i, o) <==> x[i] = o""" ... @overload - def __setitem__(self, s: slice, o: Iterable[CalVT]) -> None: + def __setitem__(self, s: slice, value: Iterable[CalVT]) -> None: """x.__setitem__(s, o) <==> x[s] = o""" ... - def __setitem__(self, i, o) -> None: + def __setitem__(self, i, value) -> None: raise NotImplementedError( "Subclass of CalendarStorage must implement `__setitem__(i: int, o: CalVT)`/`__setitem__(s: slice, o: Iterable[CalVT])` method" ) @@ -74,10 +83,21 @@ def __len__(self) -> int: raise NotImplementedError("Subclass of CalendarStorage must implement `__len__` method") -class InstrumentStorage(MutableMapping): +class InstrumentStorage: def __init__(self, uri: str): self._uri = uri + def clear(self) -> None: + raise NotImplementedError("Subclass of InstrumentStorage must implement `clear` method") + + def update(self, *args, **kwargs) -> None: + """D.update([E, ]**F) -> None. Update D from mapping/iterable E and F. + If E present and has a .keys() method, does: for k in E: D[k] = E[k] + If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v + In either case, this is followed by: for k, v in F.items(): D[k] = v + """ + raise NotImplementedError("Subclass of InstrumentStorage must implement `update` method") + def __setitem__(self, k: InstKT, v: InstVT) -> None: """ Set self[key] to value. """ raise NotImplementedError("Subclass of InstrumentStorage must implement `__setitem__` method") @@ -99,37 +119,92 @@ def __iter__(self) -> Iterator[InstKT]: raise NotImplementedError("Subclass of InstrumentStorage must implement `__iter__` method") -class FeatureStorage(Sequence): +class FeatureStorage: def __init__(self, uri: str): self._uri = uri - def append(self, obj: FeatureVT) -> None: - """ Append object to the end of the FeatureStorage. """ - raise NotImplementedError("Subclass of FeatureStorage must implement `append` method") - def clear(self): """ Remove all items from FeatureStorage. """ raise NotImplementedError("Subclass of FeatureStorage must implement `clear` method") - def extend(self, iterable: Iterable[FeatureVT]): - """ Extend list by appending elements from the iterable. """ + def extend(self, series: pd.Series): + """Extend feature by appending elements from the series. + + Examples: + + feature: + 3 4 + 4 5 + 5 6 + + >>> self.extend(pd.Series({7: 8, 9:10})) + + feature: + 3 4 + 4 5 + 5 6 + 6 np.nan + 7 8 + 9 10 + + """ raise NotImplementedError("Subclass of FeatureStorage must implement `extend` method") + def rebase(self, series: pd.Series): + """Rebase feature header from the series. + + Examples: + + feature: + 3 4 + 4 5 + 5 6 + + >>> self.rebase(pd.Series({1: 2})) + + feature: + 1 2 + 2 np.nan + 3 4 + 4 5 + 5 6 + + >>> self.rebase(pd.Series({5: 6, 7: 8, 9: 10})) + + feature: + 5 6 + 7 8 + 9 10 + + >>> self.rebase(pd.Series({11: 12, 12: 13,})) + + feature: + 11 12 + 12 13 + + """ + raise NotImplementedError("Subclass of FeatureStorage must implement `rebase` method") + @overload - def __getitem__(self, s: slice) -> Iterable[FeatureVT]: - """x.__getitem__(slice(start: int, stop: int, step: int)) <==> x[start:stop:step]""" + def __getitem__(self, s: slice) -> pd.Series: + """x.__getitem__(slice(start: int, stop: int, step: int)) <==> x[start:stop:step] == pd.Series(values, index=pd.RangeIndex(start, len(values))""" ... @overload - def __getitem__(self, i: int) -> float: + def __getitem__(self, i: int) -> Tuple[int, float]: """x.__getitem__(y) <==> x[y]""" ... - def __getitem__(self, i) -> float: + def __getitem__(self, i) -> Union[Tuple[int, float], pd.Series]: """x.__getitem__(y) <==> x[y]""" raise NotImplementedError( "Subclass of FeatureStorage must implement `__getitem__(i: int)`/`__getitem__(s: slice)` method" ) def __len__(self) -> int: + """len(feature) <==> feature.__len__() """ raise NotImplementedError("Subclass of FeatureStorage must implement `__len__` method") + + def __iter__(self) -> Iterable[Tuple[int, float]]: + """iter(feature)""" + raise NotImplementedError("Subclass of FeatureStorage must implement `__iter__` method") From fe1676ec4089f3273d28aba2a3f82580d96ad24c Mon Sep 17 00:00:00 2001 From: zhupr Date: Tue, 13 Apr 2021 10:47:01 +0800 Subject: [PATCH 05/11] Modify data.storage --- qlib/data/data.py | 94 +++++++++++++++-------------- qlib/data/storage/file_storage.py | 79 ++++++++++++++---------- qlib/data/storage/storage.py | 38 +++++++++--- tests/storage_tests/test_storage.py | 23 ++++--- 4 files changed, 142 insertions(+), 92 deletions(-) diff --git a/qlib/data/data.py b/qlib/data/data.py index 000bd1196c..1a0ca616e4 100644 --- a/qlib/data/data.py +++ b/qlib/data/data.py @@ -6,6 +6,7 @@ from __future__ import print_function import os +import re import abc import time import queue @@ -27,12 +28,35 @@ from ..utils import Wrapper, init_instance_by_config, register_wrapper, get_module_by_module_path -class CalendarProvider(abc.ABC): +class ProviderBackendMixin: + def get_default_backend(self): + backend = {} + provider_name = re.findall("[A-Z][^A-Z]*", self.__class__.__name__)[-2] # type: str + # set default storage class + backend.setdefault("class", f"File{provider_name}Storage") + # set default storage module + backend.setdefault("module_path", "qlib.data.storage.file_storage") + # set default storage kwargs + backend_kwargs = backend.setdefault("kwargs", {}) # type: dict + backend_kwargs.setdefault("uri", os.path.join(C.get_data_path(), f"{provider_name.lower()}s")) + return backend + + @property + def backend_obj(self): + return init_instance_by_config(self.backend) + + +class CalendarProvider(abc.ABC, ProviderBackendMixin): """Calendar provider base class Provide calendar data. """ + def __init__(self, *args, **kwargs): + self.backend = kwargs.get("backend", {}) + if not self.backend: + self.backend = self.get_default_backend() + @abc.abstractmethod def calendar(self, start_time=None, end_time=None, freq="day", future=False): """Get calendar of certain market in given time range. @@ -127,12 +151,17 @@ def _uri(self, start_time, end_time, freq, future=False): return hash_args(start_time, end_time, freq, future) -class InstrumentProvider(abc.ABC): +class InstrumentProvider(abc.ABC, ProviderBackendMixin): """Instrument provider base class Provide instrument data. """ + def __init__(self, *args, **kwargs): + self.backend = kwargs.get("backend", {}) + if not self.backend: + self.backend = self.get_default_backend() + @staticmethod def instruments(market="all", filter_pipe=None): """Get the general config dictionary for a base market adding several dynamic filters. @@ -215,12 +244,17 @@ def get_inst_type(cls, inst): raise ValueError(f"Unknown instrument type {inst}") -class FeatureProvider(abc.ABC): +class FeatureProvider(abc.ABC, ProviderBackendMixin): """Feature provider class Provide feature data. """ + def __init__(self, *args, **kwargs): + self.backend = kwargs.get("backend", {}) + if not self.backend: + self.backend = self.get_default_backend() + @abc.abstractmethod def feature(self, instrument, field, start_time, end_time, freq): """Get feature data. @@ -497,6 +531,7 @@ class LocalCalendarProvider(CalendarProvider): """ def __init__(self, **kwargs): + super(LocalCalendarProvider, self).__init__(**kwargs) self.remote = kwargs.get("remote", False) @property @@ -517,18 +552,8 @@ def load_calendar(self, freq, future): list list of timestamps """ - if future: - fname = self._uri_cal.format(freq + "_future") - # if future calendar not exists, return current calendar - if not os.path.exists(fname): - get_module_logger("data").warning(f"{freq}_future.txt not exists, return current calendar!") - fname = self._uri_cal.format(freq) - else: - fname = self._uri_cal.format(freq) - if not os.path.exists(fname): - raise ValueError("calendar not exists for freq " + freq) - with open(fname) as f: - return [pd.Timestamp(x.strip()) for x in f] + self.backend.setdefault("kwargs", {}).update(freq=freq, future=future) + return [pd.Timestamp(x) for x in self.backend_obj.data] def calendar(self, start_time=None, end_time=None, freq="day", future=False): _calendar, _calendar_index = self._get_calendar(freq, future) @@ -559,31 +584,15 @@ class LocalInstrumentProvider(InstrumentProvider): Provide instrument data from local data source. """ - def __init__(self): - pass - @property def _uri_inst(self): """Instrument file uri.""" return os.path.join(C.get_data_path(), "instruments", "{}.txt") def _load_instruments(self, market): - fname = self._uri_inst.format(market) - if not os.path.exists(fname): - raise ValueError("instruments not exists for market " + market) - - _instruments = dict() - df = pd.read_csv( - fname, - sep="\t", - usecols=[0, 1, 2], - names=["inst", "start_datetime", "end_datetime"], - dtype={"inst": str}, - parse_dates=["start_datetime", "end_datetime"], - ) - for row in df.itertuples(index=False): - _instruments.setdefault(row[0], []).append((row[1], row[2])) - return _instruments + + self.backend.setdefault("kwargs", {}).update(market=market) + return self.backend_obj.data def list_instruments(self, instruments, start_time=None, end_time=None, freq="day", as_list=False): market = instruments["market"] @@ -601,7 +610,7 @@ def list_instruments(self, instruments, start_time=None, end_time=None, freq="da inst: list( filter( lambda x: x[0] <= x[1], - [(max(start_time, x[0]), min(end_time, x[1])) for x in spans], + [(max(start_time, pd.Timestamp(x[0])), min(end_time, pd.Timestamp(x[1]))) for x in spans], ) ) for inst, spans in _instruments.items() @@ -627,6 +636,7 @@ class LocalFeatureProvider(FeatureProvider): """ def __init__(self, **kwargs): + super(LocalFeatureProvider, self).__init__(**kwargs) self.remote = kwargs.get("remote", False) @property @@ -638,14 +648,9 @@ def feature(self, instrument, field, start_index, end_index, freq): # validate field = str(field).lower()[1:] instrument = code_to_fname(instrument) - uri_data = self._uri_data.format(instrument.lower(), field, freq) - if not os.path.exists(uri_data): - get_module_logger("data").warning("WARN: data not found for %s.%s" % (instrument, field)) - return pd.Series(dtype=np.float32) - # raise ValueError('uri_data not found: ' + uri_data) - # load - series = read_bin(uri_data, start_index, end_index) - return series + + self.backend.setdefault("kwargs", {}).update(instrument=instrument, field=field, freq=freq) + return self.backend_obj[start_index : end_index + 1] class LocalExpressionProvider(ExpressionProvider): @@ -1061,7 +1066,8 @@ def register_all_wrappers(C): register_wrapper(Cal, _calendar_provider, "qlib.data") logger.debug(f"registering Cal {C.calendar_provider}-{C.calendar_cache}") - register_wrapper(Inst, C.instrument_provider, "qlib.data") + _instrument_provider = init_instance_by_config(C.instrument_provider, module) + register_wrapper(Inst, _instrument_provider, "qlib.data") logger.debug(f"registering Inst {C.instrument_provider}") if getattr(C, "feature_provider", None) is not None: diff --git a/qlib/data/storage/file_storage.py b/qlib/data/storage/file_storage.py index e2e5bd3e7b..4090e3230d 100644 --- a/qlib/data/storage/file_storage.py +++ b/qlib/data/storage/file_storage.py @@ -8,24 +8,29 @@ import numpy as np import pandas as pd -from . import CalendarStorage, InstrumentStorage, FeatureStorage, CalVT, InstKT, InstVT +from qlib.data.storage import CalendarStorage, InstrumentStorage, FeatureStorage, CalVT, InstKT, InstVT class FileCalendarStorage(CalendarStorage): - def __init__(self, uri: str): - super(FileCalendarStorage, self).__init__(uri) - self._uri = Path(self._uri).expanduser().resolve() + def __init__(self, freq: str, future: bool, uri: str): + super(FileCalendarStorage, self).__init__(freq, future, uri) + _file_name = f"{freq}_future.txt" if future else f"{freq}.txt" + self.uri = Path(self.uri).expanduser().joinpath(_file_name.lower()) def _read_calendar(self, skip_rows: int = 0, n_rows: int = None) -> np.ndarray: - if not self._uri.exists(): + if not self.uri.exists(): self._write_calendar(values=[]) - with self._uri.open("rb") as fp: + with self.uri.open("rb") as fp: return np.loadtxt(fp, str, skiprows=skip_rows, max_rows=n_rows, encoding="utf-8") def _write_calendar(self, values: Iterable[CalVT], mode: str = "wb"): - with self._uri.open(mode=mode) as fp: + with self.uri.open(mode=mode) as fp: np.savetxt(fp, values, fmt="%s", encoding="utf-8") + @property + def data(self) -> Iterable[CalVT]: + return self._read_calendar() + def extend(self, values: Iterable[CalVT]) -> None: self._write_calendar(values, mode="ab") @@ -64,27 +69,27 @@ def __len__(self) -> int: return len(self._read_calendar()) def __iter__(self): - with self._uri.open("r") as fp: - yield fp.readline() + return iter(self._read_calendar()) class FileInstrumentStorage(InstrumentStorage): + INSTRUMENT_SEP = "\t" INSTRUMENT_START_FIELD = "start_datetime" INSTRUMENT_END_FIELD = "end_datetime" SYMBOL_FIELD_NAME = "instrument" - def __init__(self, uri: str): - super(FileInstrumentStorage, self).__init__(uri=uri) - self._uri = Path(self._uri).expanduser().resolve() + def __init__(self, market: str, uri: str): + super(FileInstrumentStorage, self).__init__(market, uri) + self.uri = Path(self.uri).expanduser().joinpath(f"{market.lower()}.txt") def _read_instrument(self) -> Dict[InstKT, InstVT]: - if not self._uri.exists(): + if not self.uri.exists(): self._write_instrument() _instruments = dict() df = pd.read_csv( - self._uri, + self.uri, sep="\t", usecols=[0, 1, 2], names=[self.SYMBOL_FIELD_NAME, self.INSTRUMENT_START_FIELD, self.INSTRUMENT_END_FIELD], @@ -97,7 +102,7 @@ def _read_instrument(self) -> Dict[InstKT, InstVT]: def _write_instrument(self, data: Dict[InstKT, InstVT] = None) -> None: if not data: - with self._uri.open("w") as _: + with self.uri.open("w") as _: pass return @@ -109,13 +114,17 @@ def _write_instrument(self, data: Dict[InstKT, InstVT] = None) -> None: df = pd.concat(res, sort=False) df.loc[:, [self.SYMBOL_FIELD_NAME, self.INSTRUMENT_START_FIELD, self.INSTRUMENT_END_FIELD]].to_csv( - self._uri, header=False, sep=self.INSTRUMENT_SEP, index=False + self.uri, header=False, sep=self.INSTRUMENT_SEP, index=False ) - df.to_csv(self._uri, sep="\t", encoding="utf-8", header=False, index=False) + df.to_csv(self.uri, sep="\t", encoding="utf-8", header=False, index=False) def clear(self) -> None: self._write_instrument(data={}) + @property + def data(self) -> Dict[InstKT, InstVT]: + return self._read_instrument() + def __setitem__(self, k: InstKT, v: InstVT) -> None: inst = self._read_instrument() inst[k] = v @@ -143,7 +152,7 @@ def update(self, *args, **kwargs) -> None: raise TypeError(f"update expected at most 1 arguments, got {len(args)}") inst = self._read_instrument() if args: - other = args[0] + other = args[0] # type: dict if isinstance(other, Mapping): for key in other: inst[key] = other[key] @@ -160,29 +169,35 @@ def update(self, *args, **kwargs) -> None: class FileFeatureStorage(FeatureStorage): - def __init__(self, uri: str): - super(FileFeatureStorage, self).__init__(uri=uri) - self._uri = Path(self._uri) + def __init__(self, instrument: str, field: str, freq: str, uri: str): + super(FileFeatureStorage, self).__init__(instrument, field, freq, uri) + self.uri = ( + Path(self.uri).expanduser().joinpath(instrument.lower()).joinpath(f"{field.lower()}.{freq.lower()}.bin") + ) def clear(self): - with self._uri.open("wb") as _: + with self.uri.open("wb") as _: pass + @property + def data(self) -> pd.Series: + return self[:] + def extend(self, series: pd.Series) -> None: - extend_start_index = self[0][0] + len(self) if self._uri.exists() else series.index[0] + extend_start_index = self[0][0] + len(self) if self.uri.exists() else series.index[0] series = series.reindex(pd.RangeIndex(extend_start_index, series.index[-1] + 1)) - with self._uri.open("ab") as fp: + with self.uri.open("ab") as fp: np.array(series.values).astype(" None: origin_series = self[:] series = series.append(origin_series.loc[origin_series.index > series.index[-1]]) series = series.reindex(pd.RangeIndex(series.index[0], series.index[-1])) - with self._uri.open("wb") as fp: + with self.uri.open("wb") as fp: np.array(series.values).astype(" Union[Tuple[int, float], pd.Series]: - if not self._uri.exists(): + if not self.uri.exists(): if isinstance(i, int): return None, None elif isinstance(i, slice): @@ -190,14 +205,14 @@ def __getitem__(self, i: Union[int, slice]) -> Union[Tuple[int, float], pd.Serie else: raise TypeError(f"type(i) = {type(i)}") - with open(self._uri, "rb") as fp: + with open(self.uri, "rb") as fp: ref_start_index = int(np.frombuffer(fp.read(4), dtype=" i: raise IndexError(f"{i}: start index is {ref_start_index}") fp.seek(4 * (i - ref_start_index) + 4) - return i, struct.unpack("f", fp.read(4)) + return i, struct.unpack("f", fp.read(4))[0] elif isinstance(i, slice): start_index = i.start end_index = i.stop - 1 @@ -213,18 +228,18 @@ def __getitem__(self, i: Union[int, slice]) -> Union[Tuple[int, float], pd.Serie raise TypeError(f"type(i) = {type(i)}") def __len__(self) -> int: - return self._uri.stat().st_size // 4 - 1 if self._uri.exists() else 0 + return self.uri.stat().st_size // 4 - 1 if self.uri.exists() else 0 def __iter__(self): - if not self._uri.exists(): + if not self.uri.exists(): return - with open(self._uri, "rb") as fp: + with open(self.uri, "rb") as fp: ref_start_index = int(np.frombuffer(fp.read(4), dtype=" Iterable[CalVT]: + """get all data""" + raise NotImplementedError("Subclass of CalendarStorage must implement `data` method") def extend(self, iterable: Iterable[CalVT]) -> None: raise NotImplementedError("Subclass of CalendarStorage must implement `extend` method") @@ -82,10 +89,19 @@ def __len__(self) -> int: """x.__len__() <==> len(x)""" raise NotImplementedError("Subclass of CalendarStorage must implement `__len__` method") + def __iter__(self): + raise NotImplementedError("Subclass of CalendarStorage must implement `__iter__` method") + class InstrumentStorage: - def __init__(self, uri: str): - self._uri = uri + def __init__(self, market: str, uri: str): + self.market = market + self.uri = uri + + @property + def data(self) -> Dict[InstKT, InstVT]: + """get all data""" + raise NotImplementedError("Subclass of InstrumentStorage must implement `data` method") def clear(self) -> None: raise NotImplementedError("Subclass of InstrumentStorage must implement `clear` method") @@ -120,8 +136,16 @@ def __iter__(self) -> Iterator[InstKT]: class FeatureStorage: - def __init__(self, uri: str): - self._uri = uri + def __init__(self, instrument: str, field: str, freq: str, uri: str): + self.instrument = instrument + self.field = field + self.freq = freq + self.uri = uri + + @property + def data(self) -> pd.Series: + """get all data""" + raise NotImplementedError("Subclass of FeatureStorage must implement `data` method") def clear(self): """ Remove all items from FeatureStorage. """ diff --git a/tests/storage_tests/test_storage.py b/tests/storage_tests/test_storage.py index a70ce82ea3..8ce3f50812 100644 --- a/tests/storage_tests/test_storage.py +++ b/tests/storage_tests/test_storage.py @@ -16,15 +16,16 @@ FileFeatureStorage as FeatureStorage, ) -DATA_DIR = Path(__file__).parent.joinpath("test_get_data") +_file_name = Path(__file__).name.split(".")[0] +DATA_DIR = Path(__file__).parent.joinpath(f"{_file_name}_data") QLIB_DIR = DATA_DIR.joinpath("qlib") QLIB_DIR.mkdir(exist_ok=True, parents=True) # TODO: set value -CALENDAR_URI = QLIB_DIR.joinpath("calendars").joinpath("day.txt") -INSTRUMENT_URI = QLIB_DIR.joinpath("instruments").joinpath("csi300.txt") -FEATURE_URI = QLIB_DIR.joinpath("features").joinpath("SH600004").joinpath("close.day.bin") +CALENDAR_URI = QLIB_DIR.joinpath("calendars") +INSTRUMENT_URI = QLIB_DIR.joinpath("instruments") +FEATURE_URI = QLIB_DIR.joinpath("features") class TestStorage: @@ -38,9 +39,10 @@ def teardown_class(cls): def test_calendar_storage(self): - calendar = CalendarStorage(uri=CALENDAR_URI) + calendar = CalendarStorage(freq="day", future=False, uri=CALENDAR_URI) assert isinstance(calendar, Iterable), f"{calendar.__class__.__name__} is not Iterable" assert isinstance(calendar[:], Iterable), f"{calendar.__class__.__name__}.__getitem__(s: slice) is not Iterable" + assert isinstance(calendar.data, Iterable), f"{calendar.__class__.__name__}.data is not Iterable" print(f"calendar[1: 5]: {calendar[1:5]}") print(f"calendar[0]: {calendar[0]}") @@ -80,11 +82,11 @@ def test_instrument_storage(self): """ - instrument = InstrumentStorage(uri=INSTRUMENT_URI) + instrument = InstrumentStorage(market="csi300", uri=INSTRUMENT_URI) assert isinstance(instrument, Iterable), f"{instrument.__class__.__name__} is not Iterable" - for inst, spans in instrument.items(): + for inst, spans in instrument.data.items(): assert isinstance(inst, str) and isinstance( spans, Iterable ), f"{instrument.__class__.__name__} value is not Iterable" @@ -149,11 +151,14 @@ def test_feature_storage(self): """ - feature = FeatureStorage(uri=FEATURE_URI) + feature = FeatureStorage(instrument="SH600004", field="close", freq="day", uri=FEATURE_URI) assert isinstance(feature, Iterable), f"{feature.__class__.__name__} is not Iterable" with pytest.raises(IndexError): print(feature[0]) + assert isinstance( + feature[815][1], (np.float, np.float32) + ), f"{feature.__class__.__name__}.__getitem__(i: int) error" assert len(feature[815:818]) == 3, f"{feature.__class__.__name__}.__getitem__(s: slice) error" print(f"feature[815: 818]: {feature[815: 818]}") @@ -162,5 +167,5 @@ def test_feature_storage(self): isinstance(_item, tuple) and len(_item) == 2 ), f"{feature.__class__.__name__}.__iter__ item type error" assert isinstance(_item[0], int) and isinstance( - _item[1], (float, np.float, np.float32) + _item[1], (np.float, np.float32) ), f"{feature.__class__.__name__}.__iter__ value type error" From 7e90ea2ddc07662b0e3041305cbcf72b47cd7048 Mon Sep 17 00:00:00 2001 From: zhupr Date: Fri, 21 May 2021 08:43:36 +0800 Subject: [PATCH 06/11] add write method to FeatureStorage && remove extend --- qlib/data/data.py | 62 ++++--- qlib/data/storage/file_storage.py | 122 +++++++------ qlib/data/storage/storage.py | 263 +++++++++++++++++++++------- qlib/tests/__init__.py | 10 +- tests/storage_tests/test_storage.py | 33 +--- 5 files changed, 321 insertions(+), 169 deletions(-) diff --git a/qlib/data/data.py b/qlib/data/data.py index 1a0ca616e4..3848a68232 100644 --- a/qlib/data/data.py +++ b/qlib/data/data.py @@ -8,6 +8,7 @@ import os import re import abc +import copy import time import queue import bisect @@ -31,19 +32,27 @@ class ProviderBackendMixin: def get_default_backend(self): backend = {} - provider_name = re.findall("[A-Z][^A-Z]*", self.__class__.__name__)[-2] # type: str + provider_name: str = re.findall("[A-Z][^A-Z]*", self.__class__.__name__)[-2] # set default storage class backend.setdefault("class", f"File{provider_name}Storage") # set default storage module backend.setdefault("module_path", "qlib.data.storage.file_storage") - # set default storage kwargs - backend_kwargs = backend.setdefault("kwargs", {}) # type: dict - backend_kwargs.setdefault("uri", os.path.join(C.get_data_path(), f"{provider_name.lower()}s")) return backend - @property - def backend_obj(self): - return init_instance_by_config(self.backend) + def backend_obj(self, **kwargs): + backend = self.backend if self.backend else self.get_default_backend() + backend = copy.deepcopy(backend) + + # set default storage kwargs + backend_kwargs = backend.setdefault("kwargs", {}) + # default uri map + if "uri" not in backend_kwargs: + # if the user has no uri configured, use: uri = uri_map[freq] + freq = kwargs.get("freq", "day") + uri_map = backend_kwargs.setdefault("uri_map", {freq: C.get_data_path()}) + backend_kwargs["uri"] = uri_map[freq] + backend.setdefault("kwargs", {}).update(**kwargs) + return init_instance_by_config(backend) class CalendarProvider(abc.ABC, ProviderBackendMixin): @@ -54,8 +63,6 @@ class CalendarProvider(abc.ABC, ProviderBackendMixin): def __init__(self, *args, **kwargs): self.backend = kwargs.get("backend", {}) - if not self.backend: - self.backend = self.get_default_backend() @abc.abstractmethod def calendar(self, start_time=None, end_time=None, freq="day", future=False): @@ -159,8 +166,6 @@ class InstrumentProvider(abc.ABC, ProviderBackendMixin): def __init__(self, *args, **kwargs): self.backend = kwargs.get("backend", {}) - if not self.backend: - self.backend = self.get_default_backend() @staticmethod def instruments(market="all", filter_pipe=None): @@ -252,8 +257,6 @@ class FeatureProvider(abc.ABC, ProviderBackendMixin): def __init__(self, *args, **kwargs): self.backend = kwargs.get("backend", {}) - if not self.backend: - self.backend = self.get_default_backend() @abc.abstractmethod def feature(self, instrument, field, start_time, end_time, freq): @@ -552,8 +555,18 @@ def load_calendar(self, freq, future): list list of timestamps """ - self.backend.setdefault("kwargs", {}).update(freq=freq, future=future) - return [pd.Timestamp(x) for x in self.backend_obj.data] + + backend_obj = self.backend_obj(freq=freq, future=future) + if future and not backend_obj.check_exists(): + get_module_logger("data").warning( + f"load calendar error: freq={freq}, future={future}; return current calendar!" + ) + get_module_logger("data").warning( + "You can get future calendar by referring to the following document: https://github.com/microsoft/qlib/blob/main/scripts/data_collector/contrib/README.md" + ) + backend_obj = self.backend_obj(freq=freq, future=False) + + return [pd.Timestamp(x) for x in backend_obj.data] def calendar(self, start_time=None, end_time=None, freq="day", future=False): _calendar, _calendar_index = self._get_calendar(freq, future) @@ -589,17 +602,15 @@ def _uri_inst(self): """Instrument file uri.""" return os.path.join(C.get_data_path(), "instruments", "{}.txt") - def _load_instruments(self, market): - - self.backend.setdefault("kwargs", {}).update(market=market) - return self.backend_obj.data + def _load_instruments(self, market, freq): + return self.backend_obj(market=market, freq=freq).data def list_instruments(self, instruments, start_time=None, end_time=None, freq="day", as_list=False): market = instruments["market"] if market in H["i"]: _instruments = H["i"][market] else: - _instruments = self._load_instruments(market) + _instruments = self._load_instruments(market, freq=freq) H["i"][market] = _instruments # strip # use calendar boundary @@ -648,9 +659,14 @@ def feature(self, instrument, field, start_index, end_index, freq): # validate field = str(field).lower()[1:] instrument = code_to_fname(instrument) - - self.backend.setdefault("kwargs", {}).update(instrument=instrument, field=field, freq=freq) - return self.backend_obj[start_index : end_index + 1] + try: + data = self.backend_obj(instrument=instrument, field=field, freq=freq)[start_index : end_index + 1] + except Exception as e: + get_module_logger("data").warning( + f"WARN: data not found for {instrument}.{field}\n\tException info: {str(e)}" + ) + data = pd.Series(dtype=np.float32) + return data class LocalExpressionProvider(ExpressionProvider): diff --git a/qlib/data/storage/file_storage.py b/qlib/data/storage/file_storage.py index 4090e3230d..e55105f573 100644 --- a/qlib/data/storage/file_storage.py +++ b/qlib/data/storage/file_storage.py @@ -3,25 +3,36 @@ import struct from pathlib import Path -from typing import Iterator, Iterable, Union, Dict, Mapping, Tuple +from typing import Iterable, Union, Dict, Mapping, Tuple, List import numpy as np import pandas as pd +from qlib.log import get_module_logger from qlib.data.storage import CalendarStorage, InstrumentStorage, FeatureStorage, CalVT, InstKT, InstVT +logger = get_module_logger("file_storage") -class FileCalendarStorage(CalendarStorage): - def __init__(self, freq: str, future: bool, uri: str): - super(FileCalendarStorage, self).__init__(freq, future, uri) + +class FileStorage: + def check_exists(self): + return self.uri.exists() + + +class FileCalendarStorage(FileStorage, CalendarStorage): + def __init__(self, freq: str, future: bool, uri: str, **kwargs): + super(FileCalendarStorage, self).__init__(freq, future, uri, **kwargs) _file_name = f"{freq}_future.txt" if future else f"{freq}.txt" - self.uri = Path(self.uri).expanduser().joinpath(_file_name.lower()) + self.uri = Path(self.uri).expanduser().joinpath("calendars", _file_name.lower()) - def _read_calendar(self, skip_rows: int = 0, n_rows: int = None) -> np.ndarray: - if not self.uri.exists(): + def _read_calendar(self, skip_rows: int = 0, n_rows: int = None) -> Iterable[CalVT]: + if not self.check_exists(): self._write_calendar(values=[]) with self.uri.open("rb") as fp: - return np.loadtxt(fp, str, skiprows=skip_rows, max_rows=n_rows, encoding="utf-8") + return [ + str(x) + for x in np.loadtxt(fp, str, skiprows=skip_rows, max_rows=n_rows, delimiter="\n", encoding="utf-8") + ] def _write_calendar(self, values: Iterable[CalVT], mode: str = "wb"): with self.uri.open(mode=mode) as fp: @@ -65,23 +76,17 @@ def __delitem__(self, i: Union[int, slice]) -> None: def __getitem__(self, i: Union[int, slice]) -> Union[CalVT, Iterable[CalVT]]: return self._read_calendar()[i] - def __len__(self) -> int: - return len(self._read_calendar()) - - def __iter__(self): - return iter(self._read_calendar()) - -class FileInstrumentStorage(InstrumentStorage): +class FileInstrumentStorage(FileStorage, InstrumentStorage): INSTRUMENT_SEP = "\t" INSTRUMENT_START_FIELD = "start_datetime" INSTRUMENT_END_FIELD = "end_datetime" SYMBOL_FIELD_NAME = "instrument" - def __init__(self, market: str, uri: str): - super(FileInstrumentStorage, self).__init__(market, uri) - self.uri = Path(self.uri).expanduser().joinpath(f"{market.lower()}.txt") + def __init__(self, market: str, uri: str, **kwargs): + super(FileInstrumentStorage, self).__init__(market, uri, **kwargs) + self.uri = Path(self.uri).expanduser().joinpath("instruments", f"{market.lower()}.txt") def _read_instrument(self) -> Dict[InstKT, InstVT]: if not self.uri.exists(): @@ -138,14 +143,6 @@ def __delitem__(self, k: InstKT) -> None: def __getitem__(self, k: InstKT) -> InstVT: return self._read_instrument()[k] - def __len__(self) -> int: - inst = self._read_instrument() - return len(inst) - - def __iter__(self) -> Iterator[InstKT]: - for _inst in self._read_instrument().keys(): - yield _inst - def update(self, *args, **kwargs) -> None: if len(args) > 1: @@ -168,11 +165,11 @@ def update(self, *args, **kwargs) -> None: self._write_instrument(inst) -class FileFeatureStorage(FeatureStorage): - def __init__(self, instrument: str, field: str, freq: str, uri: str): - super(FileFeatureStorage, self).__init__(instrument, field, freq, uri) +class FileFeatureStorage(FileStorage, FeatureStorage): + def __init__(self, instrument: str, field: str, freq: str, uri: str, **kwargs): + super(FileFeatureStorage, self).__init__(instrument, field, freq, uri, **kwargs) self.uri = ( - Path(self.uri).expanduser().joinpath(instrument.lower()).joinpath(f"{field.lower()}.{freq.lower()}.bin") + Path(self.uri).expanduser().joinpath("features", instrument.lower(), f"{field.lower()}.{freq.lower()}.bin") ) def clear(self): @@ -183,18 +180,45 @@ def clear(self): def data(self) -> pd.Series: return self[:] - def extend(self, series: pd.Series) -> None: - extend_start_index = self[0][0] + len(self) if self.uri.exists() else series.index[0] - series = series.reindex(pd.RangeIndex(extend_start_index, series.index[-1] + 1)) - with self.uri.open("ab") as fp: - np.array(series.values).astype(" None: + if len(data_array) == 0: + logger.info( + "len(data_array) == 0, write" + "if you need to clear the FeatureStorage, please execute: FeatureStorage.clear" + ) + return + if not self.uri.exists(): + # write + index = 0 if index is None else index + with self.uri.open("wb") as fp: + np.hstack([index, data_array]).astype(" self.end_index: + # append + index = 0 if index is None else index + with self.uri.open("ab+") as fp: + np.hstack([[np.nan] * (index - self.end_index - 1), data_array]).astype(" None: - origin_series = self[:] - series = series.append(origin_series.loc[origin_series.index > series.index[-1]]) - series = series.reindex(pd.RangeIndex(series.index[0], series.index[-1])) - with self.uri.open("wb") as fp: - np.array(series.values).astype(" Union[int, None]: + if len(self) == 0: + return None + with open(self.uri, "rb") as fp: + index = int(np.frombuffer(fp.read(4), dtype=" Union[Tuple[int, float], pd.Series]: if not self.uri.exists(): @@ -228,18 +252,4 @@ def __getitem__(self, i: Union[int, slice]) -> Union[Tuple[int, float], pd.Serie raise TypeError(f"type(i) = {type(i)}") def __len__(self) -> int: - return self.uri.stat().st_size // 4 - 1 if self.uri.exists() else 0 - - def __iter__(self): - if not self.uri.exists(): - return - with open(self.uri, "rb") as fp: - ref_start_index = int(np.frombuffer(fp.read(4), dtype=" pd.Series: + pass + +""" + + +class StorageMeta(type): + """unified management of raise when storage is not exists""" + + def __new__(cls, name, bases, dict): + class_obj = type.__new__(cls, name, bases, dict) + + # The calls to __iter__ and __getitem__ do not pass through __getattribute__. + # In order to throw an exception before calling __getitem__, use the metaclass + _getitem_func = getattr(class_obj, "__getitem__") + + def _getitem(obj, item): + _check_func = getattr(obj, "_check") + if callable(_check_func): + _check_func() + return _getitem_func(obj, item) + + setattr(class_obj, "__getitem__", _getitem) + return class_obj + + +class BaseStorage(metaclass=StorageMeta): + @property + def storage_name(self) -> str: + return re.findall("[A-Z][^A-Z]*", self.__class__.__name__)[-2] + + def check_exists(self) -> bool: + """check if storage(uri) exists, if not exists: return False""" + raise NotImplementedError("Subclass of BaseStorage must implement `check_exists` method") + + def clear(self) -> None: + """clear storage""" + raise NotImplementedError("Subclass of BaseStorage must implement `clear` method") + + def __len__(self) -> 0: + return len(self.data) if self.check_exists() else 0 + + def __getitem__(self, item: Union[slice, Union[int, InstKT]]): + raise NotImplementedError( + "Subclass of BaseStorage must implement `__getitem__(i: Union[int, InstKT])`/`__getitem__(s: slice)` method" + ) + + def _check(self): + # check storage(uri) + if not self.check_exists(): + parameters_info = [f"{_k}={_v}" for _k, _v in self.__dict__.items()] + raise ValueError(f"{self.storage_name.lower()} not exists, storage parameters: {parameters_info}") + + def __getattribute__(self, item): + if item == "data": + self._check() + return super(BaseStorage, self).__getattribute__(item) + + +class CalendarStorage(BaseStorage): + """ + The behavior of CalendarStorage's methods and List's methods of the same name remain consistent + """ + + def __init__(self, freq: str, future: bool, uri: str, **kwargs): self.freq = freq self.future = future self.uri = uri @@ -28,9 +113,6 @@ def data(self) -> Iterable[CalVT]: def extend(self, iterable: Iterable[CalVT]) -> None: raise NotImplementedError("Subclass of CalendarStorage must implement `extend` method") - def clear(self) -> None: - raise NotImplementedError("Subclass of CalendarStorage must implement `clear` method") - def index(self, value: CalVT) -> int: raise NotImplementedError("Subclass of CalendarStorage must implement `index` method") @@ -85,16 +167,9 @@ def __getitem__(self, i) -> CalVT: "Subclass of CalendarStorage must implement `__getitem__(i: int)`/`__getitem__(s: slice)` method" ) - def __len__(self) -> int: - """x.__len__() <==> len(x)""" - raise NotImplementedError("Subclass of CalendarStorage must implement `__len__` method") - def __iter__(self): - raise NotImplementedError("Subclass of CalendarStorage must implement `__iter__` method") - - -class InstrumentStorage: - def __init__(self, market: str, uri: str): +class InstrumentStorage(BaseStorage): + def __init__(self, market: str, uri: str, **kwargs): self.market = market self.uri = uri @@ -103,9 +178,6 @@ def data(self) -> Dict[InstKT, InstVT]: """get all data""" raise NotImplementedError("Subclass of InstrumentStorage must implement `data` method") - def clear(self) -> None: - raise NotImplementedError("Subclass of InstrumentStorage must implement `clear` method") - def update(self, *args, **kwargs) -> None: """D.update([E, ]**F) -> None. Update D from mapping/iterable E and F. If E present and has a .keys() method, does: for k in E: D[k] = E[k] @@ -126,17 +198,9 @@ def __getitem__(self, k: InstKT) -> InstVT: """ x.__getitem__(k) <==> x[k] """ raise NotImplementedError("Subclass of InstrumentStorage must implement `__getitem__` method") - def __len__(self) -> int: - """ Return len(self). """ - raise NotImplementedError("Subclass of InstrumentStorage must implement `__len__` method") - def __iter__(self) -> Iterator[InstKT]: - """ Return iter(self). """ - raise NotImplementedError("Subclass of InstrumentStorage must implement `__iter__` method") - - -class FeatureStorage: - def __init__(self, instrument: str, field: str, freq: str, uri: str): +class FeatureStorage(BaseStorage): + def __init__(self, instrument: str, field: str, freq: str, uri: str, **kwargs): self.instrument = instrument self.field = field self.freq = freq @@ -147,12 +211,25 @@ def data(self) -> pd.Series: """get all data""" raise NotImplementedError("Subclass of FeatureStorage must implement `data` method") - def clear(self): - """ Remove all items from FeatureStorage. """ - raise NotImplementedError("Subclass of FeatureStorage must implement `clear` method") + @property + def start_index(self) -> Union[int, None]: + """get FeatureStorage start index + If len(self) == 0; return None + """ + raise NotImplementedError("Subclass of FeatureStorage must implement `data` method") + + @property + def end_index(self) -> Union[int, None]: + if len(self) == 0: + return None + return None if len(self) == 0 else self.start_index + len(self) - 1 + + def write(self, data_array: Union[List, np.ndarray, Tuple], index: int = None): + """Write data_array to FeatureStorage starting from index. + If index is None, append data_array to feature. + If len(data_array) == 0; return + If (index - self.end_index) >= 1, self[end_index+1: index] will be filled with np.nan - def extend(self, series: pd.Series): - """Extend feature by appending elements from the series. Examples: @@ -161,21 +238,42 @@ def extend(self, series: pd.Series): 4 5 5 6 - >>> self.extend(pd.Series({7: 8, 9:10})) + >>> self.write([6, 7], index=6) + + feature: + 3 4 + 4 5 + 5 6 + 6 6 + 7 7 + + >>> self.write([8], index=9) feature: 3 4 4 5 5 6 - 6 np.nan - 7 8 - 9 10 + 6 6 + 7 7 + 8 np.nan + 9 8 + + >>> self.write([1, np.nan], index=3) + + feature: + 3 1 + 4 np.nan + 5 6 + 6 6 + 7 7 + 8 np.nan + 9 8 """ - raise NotImplementedError("Subclass of FeatureStorage must implement `extend` method") + raise NotImplementedError("Subclass of FeatureStorage must implement `write` method") - def rebase(self, series: pd.Series): - """Rebase feature header from the series. + def rebase(self, start_index: int = None, end_index: int = None): + """Rebase the start_index and end_index of the FeatureStorage. Examples: @@ -184,30 +282,85 @@ def rebase(self, series: pd.Series): 4 5 5 6 - >>> self.rebase(pd.Series({1: 2})) + >>> self.rebase(start_index=4) feature: - 1 2 - 2 np.nan - 3 4 4 5 5 6 - >>> self.rebase(pd.Series({5: 6, 7: 8, 9: 10})) + >>> self.rebase(start_index=3) feature: + 3 np.nan + 4 5 5 6 - 7 8 - 9 10 - >>> self.rebase(pd.Series({11: 12, 12: 13,})) + >>> self.write([3], index=3) feature: - 11 12 - 12 13 + 3 3 + 4 5 + 5 6 + + >>> self.rebase(end_index=4) + + feature: + 3 3 + 4 5 + + >>> self.write([6, 7, 8], index=4) + feature: + 3 3 + 4 6 + 5 7 + 6 8 + + >>> self.rebase(start_index=4, end_index=5) + + feature: + 4 6 + 5 7 + + """ + if start_index is None and end_index is None: + logger.warning("both start_index and end_index are None, rebase is ignored") + return + + if start_index < 0 or end_index < 0: + logger.warning("start_index or end_index cannot be less than 0") + return + if start_index > end_index: + logger.warning( + f"start_index({start_index}) > end_index({end_index}), rebase is ignored; " + f"if you need to clear the FeatureStorage, please execute: FeatureStorage.clear" + ) + return + + start_index = self.start_index if start_index is None else end_index + end_index = self.end_index if end_index is None else end_index + if start_index <= self.start_index: + self.write([np.nan] * (self.start_index - start_index), start_index) + else: + self.rewrite(self[start_index:].values, start_index) + + if end_index >= self.end_index: + self.write([np.nan] * (end_index - self.end_index)) + else: + self.rewrite(self[: end_index + 1].values, self.start_index) + + def rewrite(self, data: Union[List, np.ndarray, Tuple], index: int): + """overwrite all data in FeatureStorage with data + + Parameters + ---------- + data: Union[List, np.ndarray, Tuple] + data + index: int + data start index """ - raise NotImplementedError("Subclass of FeatureStorage must implement `rebase` method") + self.clear() + self.write(data, index) @overload def __getitem__(self, s: slice) -> pd.Series: @@ -224,11 +377,3 @@ def __getitem__(self, i) -> Union[Tuple[int, float], pd.Series]: raise NotImplementedError( "Subclass of FeatureStorage must implement `__getitem__(i: int)`/`__getitem__(s: slice)` method" ) - - def __len__(self) -> int: - """len(feature) <==> feature.__len__() """ - raise NotImplementedError("Subclass of FeatureStorage must implement `__len__` method") - - def __iter__(self) -> Iterable[Tuple[int, float]]: - """iter(feature)""" - raise NotImplementedError("Subclass of FeatureStorage must implement `__iter__` method") diff --git a/qlib/tests/__init__.py b/qlib/tests/__init__.py index f92e727875..8b53bc53a5 100644 --- a/qlib/tests/__init__.py +++ b/qlib/tests/__init__.py @@ -9,19 +9,19 @@ class TestAutoData(unittest.TestCase): _setup_kwargs = {} + provider_uri = "~/.qlib/qlib_data/cn_data_simple" # target_dir @classmethod def setUpClass(cls) -> None: # use default data - provider_uri = "~/.qlib/qlib_data/cn_data_simple" # target_dir - if not exists_qlib_data(provider_uri): - print(f"Qlib data is not found in {provider_uri}") + if not exists_qlib_data(cls.provider_uri): + print(f"Qlib data is not found in {cls.provider_uri}") GetData().qlib_data( name="qlib_data_simple", region="cn", interval="1d", - target_dir=provider_uri, + target_dir=cls.provider_uri, delete_old=False, ) - init(provider_uri=provider_uri, region=REG_CN, **cls._setup_kwargs) + init(provider_uri=cls.provider_uri, region=REG_CN, **cls._setup_kwargs) diff --git a/tests/storage_tests/test_storage.py b/tests/storage_tests/test_storage.py index 8ce3f50812..79ad78b82e 100644 --- a/tests/storage_tests/test_storage.py +++ b/tests/storage_tests/test_storage.py @@ -2,13 +2,12 @@ # Licensed under the MIT License. -import shutil from pathlib import Path from collections.abc import Iterable import pytest import numpy as np -from qlib.tests.data import GetData +from qlib.tests import TestAutoData from qlib.data.storage.file_storage import ( FileCalendarStorage as CalendarStorage, @@ -22,25 +21,10 @@ QLIB_DIR.mkdir(exist_ok=True, parents=True) -# TODO: set value -CALENDAR_URI = QLIB_DIR.joinpath("calendars") -INSTRUMENT_URI = QLIB_DIR.joinpath("instruments") -FEATURE_URI = QLIB_DIR.joinpath("features") - - -class TestStorage: - @classmethod - def setup_class(cls): - GetData().qlib_data(name="qlib_data_simple", target_dir=QLIB_DIR, region="cn", interval="1d", delete_old=False) - - @classmethod - def teardown_class(cls): - shutil.rmtree(str(DATA_DIR.resolve())) - +class TestStorage(TestAutoData): def test_calendar_storage(self): - calendar = CalendarStorage(freq="day", future=False, uri=CALENDAR_URI) - assert isinstance(calendar, Iterable), f"{calendar.__class__.__name__} is not Iterable" + calendar = CalendarStorage(freq="day", future=False, uri=self.provider_uri) assert isinstance(calendar[:], Iterable), f"{calendar.__class__.__name__}.__getitem__(s: slice) is not Iterable" assert isinstance(calendar.data, Iterable), f"{calendar.__class__.__name__}.data is not Iterable" @@ -82,9 +66,7 @@ def test_instrument_storage(self): """ - instrument = InstrumentStorage(market="csi300", uri=INSTRUMENT_URI) - - assert isinstance(instrument, Iterable), f"{instrument.__class__.__name__} is not Iterable" + instrument = InstrumentStorage(market="csi300", uri=self.provider_uri) for inst, spans in instrument.data.items(): assert isinstance(inst, str) and isinstance( @@ -151,13 +133,12 @@ def test_feature_storage(self): """ - feature = FeatureStorage(instrument="SH600004", field="close", freq="day", uri=FEATURE_URI) + feature = FeatureStorage(instrument="SH600004", field="close", freq="day", uri=self.provider_uri) - assert isinstance(feature, Iterable), f"{feature.__class__.__name__} is not Iterable" with pytest.raises(IndexError): print(feature[0]) assert isinstance( - feature[815][1], (np.float, np.float32) + feature[815][1], (float, np.float32) ), f"{feature.__class__.__name__}.__getitem__(i: int) error" assert len(feature[815:818]) == 3, f"{feature.__class__.__name__}.__getitem__(s: slice) error" print(f"feature[815: 818]: {feature[815: 818]}") @@ -167,5 +148,5 @@ def test_feature_storage(self): isinstance(_item, tuple) and len(_item) == 2 ), f"{feature.__class__.__name__}.__iter__ item type error" assert isinstance(_item[0], int) and isinstance( - _item[1], (np.float, np.float32) + _item[1], (float, np.float32) ), f"{feature.__class__.__name__}.__iter__ value type error" From 4bd84a9e6769b93b347fed475804de633a800c46 Mon Sep 17 00:00:00 2001 From: zhupr Date: Fri, 21 May 2021 08:56:44 +0800 Subject: [PATCH 07/11] replace the type of numpy deprecated --- qlib/contrib/backtest/position.py | 2 +- qlib/data/dataset/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/qlib/contrib/backtest/position.py b/qlib/contrib/backtest/position.py index 6c269d505b..5a6b102b28 100644 --- a/qlib/contrib/backtest/position.py +++ b/qlib/contrib/backtest/position.py @@ -166,7 +166,7 @@ def update_weight_all(self): def save_position(self, path, last_trade_date): path = pathlib.Path(path) p = copy.deepcopy(self.position) - cash = pd.Series(dtype=np.float) + cash = pd.Series(dtype=float) cash["init_cash"] = self.init_cash cash["cash"] = p["cash"] cash["today_account_value"] = p["today_account_value"] diff --git a/qlib/data/dataset/__init__.py b/qlib/data/dataset/__init__.py index 0f5d2baba0..ef7bfa67ee 100644 --- a/qlib/data/dataset/__init__.py +++ b/qlib/data/dataset/__init__.py @@ -299,7 +299,7 @@ def build_index(data: pd.DataFrame) -> dict: # get the previous index of a line given index """ # object incase of pandas converting int to flaot - idx_df = pd.Series(range(data.shape[0]), index=data.index, dtype=np.object) + idx_df = pd.Series(range(data.shape[0]), index=data.index, dtype=object) idx_df = lazy_sort_index(idx_df.unstack()) # NOTE: the correctness of `__getitem__` depends on columns sorted here idx_df = lazy_sort_index(idx_df, axis=1) From 6e20124125d400d45de3a104eff93bd92955cdbc Mon Sep 17 00:00:00 2001 From: zhupr Date: Fri, 21 May 2021 10:03:02 +0800 Subject: [PATCH 08/11] code for formatting storage.py using black(v21.5) --- qlib/data/storage/storage.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/qlib/data/storage/storage.py b/qlib/data/storage/storage.py index 8207b5c9aa..25f0ce6f13 100644 --- a/qlib/data/storage/storage.py +++ b/qlib/data/storage/storage.py @@ -124,12 +124,12 @@ def remove(self, value: CalVT) -> None: @overload def __setitem__(self, i: int, value: CalVT) -> None: - """x.__setitem__(i, o) <==> x[i] = o""" + """x.__setitem__(i, o) <==> (x[i] = o)""" ... @overload def __setitem__(self, s: slice, value: Iterable[CalVT]) -> None: - """x.__setitem__(s, o) <==> x[s] = o""" + """x.__setitem__(s, o) <==> (x[s] = o)""" ... def __setitem__(self, i, value) -> None: @@ -187,15 +187,15 @@ def update(self, *args, **kwargs) -> None: raise NotImplementedError("Subclass of InstrumentStorage must implement `update` method") def __setitem__(self, k: InstKT, v: InstVT) -> None: - """ Set self[key] to value. """ + """Set self[key] to value.""" raise NotImplementedError("Subclass of InstrumentStorage must implement `__setitem__` method") def __delitem__(self, k: InstKT) -> None: - """ Delete self[key]. """ + """Delete self[key].""" raise NotImplementedError("Subclass of InstrumentStorage must implement `__delitem__` method") def __getitem__(self, k: InstKT) -> InstVT: - """ x.__getitem__(k) <==> x[k] """ + """x.__getitem__(k) <==> x[k]""" raise NotImplementedError("Subclass of InstrumentStorage must implement `__getitem__` method") @@ -364,7 +364,12 @@ def rewrite(self, data: Union[List, np.ndarray, Tuple], index: int): @overload def __getitem__(self, s: slice) -> pd.Series: - """x.__getitem__(slice(start: int, stop: int, step: int)) <==> x[start:stop:step] == pd.Series(values, index=pd.RangeIndex(start, len(values))""" + """x.__getitem__(slice(start: int, stop: int, step: int)) <==> x[start:stop:step] + + Returns + ------- + pd.Series(values, index=pd.RangeIndex(start, len(values)) + """ ... @overload From a26a38461d35cb3a7ce47009ec3af0d577b36347 Mon Sep 17 00:00:00 2001 From: zhupr Date: Sat, 22 May 2021 02:03:50 +0800 Subject: [PATCH 09/11] modify exception message hint for storage.py && fix FileFeatureStorage[:] bug --- qlib/data/data.py | 2 +- qlib/data/storage/file_storage.py | 15 +++++++------- qlib/data/storage/storage.py | 31 ++++++++++++++++++++--------- tests/storage_tests/test_storage.py | 14 ++++--------- 4 files changed, 34 insertions(+), 28 deletions(-) diff --git a/qlib/data/data.py b/qlib/data/data.py index e1c9692476..3a74a20277 100644 --- a/qlib/data/data.py +++ b/qlib/data/data.py @@ -663,7 +663,7 @@ def feature(self, instrument, field, start_index, end_index, freq): data = self.backend_obj(instrument=instrument, field=field, freq=freq)[start_index : end_index + 1] except Exception as e: get_module_logger("data").warning( - f"WARN: data not found for {instrument}.{field}\n\tException info: {str(e)}" + f"WARN: data not found for {instrument}.{field}\n\tFeature exception info: {str(e)}" ) data = pd.Series(dtype=np.float32) return data diff --git a/qlib/data/storage/file_storage.py b/qlib/data/storage/file_storage.py index e55105f573..90e4178ffe 100644 --- a/qlib/data/storage/file_storage.py +++ b/qlib/data/storage/file_storage.py @@ -230,20 +230,19 @@ def __getitem__(self, i: Union[int, slice]) -> Union[Tuple[int, float], pd.Serie raise TypeError(f"type(i) = {type(i)}") with open(self.uri, "rb") as fp: - ref_start_index = int(np.frombuffer(fp.read(4), dtype=" i: - raise IndexError(f"{i}: start index is {ref_start_index}") - fp.seek(4 * (i - ref_start_index) + 4) + if self.start_index > i: + raise IndexError(f"{i}: start index is {self.start_index}") + fp.seek(4 * (i - self.start_index) + 4) return i, struct.unpack("f", fp.read(4))[0] elif isinstance(i, slice): - start_index = i.start - end_index = i.stop - 1 - si = max(ref_start_index, start_index) + start_index = self.start_index if i.start is None else i.start + end_index = self.end_index if i.stop is None else i.stop - 1 + si = max(self.start_index, start_index) if si > end_index: return pd.Series() - fp.seek(4 * (si - ref_start_index) + 4) + fp.seek(4 * (si - self.start_index) + 4) # read n bytes count = end_index - si + 1 data = np.frombuffer(fp.read(4 * count), dtype=" str: - return re.findall("[A-Z][^A-Z]*", self.__class__.__name__)[-2] + return re.findall("[A-Z][^A-Z]*", self.__class__.__name__)[-2].lower() + + @property + def raise_info(self): + parameters_info = [ + f"{_k}={_v}" + for _k, _v in self.__dict__.items() + if not isinstance(_v, (dict,)) or (hasattr(_v, "__len__") and len(_v) < 3) + ] + return f"{self.storage_name.lower()} not exists, storage parameters: {parameters_info}" def check_exists(self) -> bool: """check if storage(uri) exists, if not exists: return False""" @@ -84,15 +95,17 @@ def __getitem__(self, item: Union[slice, Union[int, InstKT]]): ) def _check(self): - # check storage(uri) if not self.check_exists(): - parameters_info = [f"{_k}={_v}" for _k, _v in self.__dict__.items()] - raise ValueError(f"{self.storage_name.lower()} not exists, storage parameters: {parameters_info}") + raise ValueError(self.raise_info) def __getattribute__(self, item): if item == "data": self._check() - return super(BaseStorage, self).__getattribute__(item) + try: + res = super(BaseStorage, self).__getattribute__(item) + except Exception as e: + raise ValueError(f"{self.raise_info}\n\tStorage exception info: {str(e)}") + return res class CalendarStorage(BaseStorage): diff --git a/tests/storage_tests/test_storage.py b/tests/storage_tests/test_storage.py index 79ad78b82e..e7bac658cb 100644 --- a/tests/storage_tests/test_storage.py +++ b/tests/storage_tests/test_storage.py @@ -135,18 +135,12 @@ def test_feature_storage(self): feature = FeatureStorage(instrument="SH600004", field="close", freq="day", uri=self.provider_uri) - with pytest.raises(IndexError): + with pytest.raises(ValueError): print(feature[0]) assert isinstance( feature[815][1], (float, np.float32) ), f"{feature.__class__.__name__}.__getitem__(i: int) error" assert len(feature[815:818]) == 3, f"{feature.__class__.__name__}.__getitem__(s: slice) error" - print(f"feature[815: 818]: {feature[815: 818]}") - - for _item in feature: - assert ( - isinstance(_item, tuple) and len(_item) == 2 - ), f"{feature.__class__.__name__}.__iter__ item type error" - assert isinstance(_item[0], int) and isinstance( - _item[1], (float, np.float32) - ), f"{feature.__class__.__name__}.__iter__ value type error" + print(f"feature[815: 818]: \n{feature[815: 818]}") + + print(f"feature[:].tail(): \n{feature[:].tail()}") From 16eff876b4080199e048bf00848f0a4e62c8941a Mon Sep 17 00:00:00 2001 From: zhupr Date: Sat, 22 May 2021 08:30:12 +0800 Subject: [PATCH 10/11] add documentation on which storage methods are used in qlib --- qlib/data/storage/storage.py | 37 +++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/qlib/data/storage/storage.py b/qlib/data/storage/storage.py index 766af30be9..dcf6da9ed1 100644 --- a/qlib/data/storage/storage.py +++ b/qlib/data/storage/storage.py @@ -24,20 +24,43 @@ class UserCalendarStorage(CalendarStorage): @property - def data(self): - pass + def data(self) -> Iterable[CalVT]: + '''get all data''' + raise NotImplementedError("Subclass of CalendarStorage must implement `data` method") + + def check_exists(self) -> bool: + '''check if storage(uri) exists, if not exists: return False''' + raise NotImplementedError("Subclass of BaseStorage must implement `check_exists` method") + class UserInstrumentStorage(InstrumentStorage): @property - def data(self): - pass + def data(self) -> Dict[InstKT, InstVT]: + '''get all data''' + raise NotImplementedError("Subclass of InstrumentStorage must implement `data` method") + + def check_exists(self) -> bool: + '''check if storage(uri) exists, if not exists: return False''' + raise NotImplementedError("Subclass of BaseStorage must implement `check_exists` method") + class UserFeatureStorage(FeatureStorage): - @check_storage - def __getitem__(self, i: slice) -> pd.Series: - pass + def __getitem__(self, s: slice) -> pd.Series: + '''x.__getitem__(slice(start: int, stop: int, step: int)) <==> x[start:stop:step] + + Returns + ------- + pd.Series(values, index=pd.RangeIndex(start, len(values)) + ''' + raise NotImplementedError( + "Subclass of FeatureStorage must implement `__getitem__(s: slice)` method" + ) + + def check_exists(self) -> bool: + '''check if storage(uri) exists, if not exists: return False''' + raise NotImplementedError("Subclass of BaseStorage must implement `check_exists` method") """ From a34d5964b43590fe06dd7133fad74ee2ec900f68 Mon Sep 17 00:00:00 2001 From: zhupr Date: Wed, 26 May 2021 01:01:36 +0800 Subject: [PATCH 11/11] remove uri parameter from storage && modify file_storage --- docs/reference/api.rst | 28 +++ qlib/data/data.py | 41 ++-- qlib/data/storage/file_storage.py | 110 ++++++--- qlib/data/storage/storage.py | 355 +++++++++++++++++----------- qlib/utils/__init__.py | 5 +- scripts/dump_bin.py | 2 +- tests/storage_tests/test_storage.py | 33 ++- 7 files changed, 373 insertions(+), 201 deletions(-) diff --git a/docs/reference/api.rst b/docs/reference/api.rst index 57f61f18b1..5e6e50b0ba 100644 --- a/docs/reference/api.rst +++ b/docs/reference/api.rst @@ -53,6 +53,34 @@ Cache .. autoclass:: qlib.data.cache.DiskDatasetCache :members: + +Storage +------------- +.. autoclass:: qlib.data.storage.storage.BaseStorage + :members: + +.. autoclass:: qlib.data.storage.storage.CalendarStorage + :members: + +.. autoclass:: qlib.data.storage.storage.InstrumentStorage + :members: + +.. autoclass:: qlib.data.storage.storage.FeatureStorage + :members: + +.. autoclass:: qlib.data.storage.file_storage.FileStorageMixin + :members: + +.. autoclass:: qlib.data.storage.file_storage.FileCalendarStorage + :members: + +.. autoclass:: qlib.data.storage.file_storage.FileInstrumentStorage + :members: + +.. autoclass:: qlib.data.storage.file_storage.FileFeatureStorage + :members: + + Dataset --------------- diff --git a/qlib/data/data.py b/qlib/data/data.py index 3a74a20277..eb7fbe0ead 100644 --- a/qlib/data/data.py +++ b/qlib/data/data.py @@ -45,12 +45,12 @@ def backend_obj(self, **kwargs): # set default storage kwargs backend_kwargs = backend.setdefault("kwargs", {}) - # default uri map - if "uri" not in backend_kwargs: + # default provider_uri map + if "provider_uri" not in backend_kwargs: # if the user has no uri configured, use: uri = uri_map[freq] freq = kwargs.get("freq", "day") - uri_map = backend_kwargs.setdefault("uri_map", {freq: C.get_data_path()}) - backend_kwargs["uri"] = uri_map[freq] + provider_uri_map = backend_kwargs.setdefault("provider_uri_map", {freq: C.get_data_path()}) + backend_kwargs["provider_uri"] = provider_uri_map[freq] backend.setdefault("kwargs", {}).update(**kwargs) return init_instance_by_config(backend) @@ -556,17 +556,21 @@ def load_calendar(self, freq, future): list of timestamps """ - backend_obj = self.backend_obj(freq=freq, future=future) - if future and not backend_obj.check_exists(): - get_module_logger("data").warning( - f"load calendar error: freq={freq}, future={future}; return current calendar!" - ) - get_module_logger("data").warning( - "You can get future calendar by referring to the following document: https://github.com/microsoft/qlib/blob/main/scripts/data_collector/contrib/README.md" - ) - backend_obj = self.backend_obj(freq=freq, future=False) + try: + backend_obj = self.backend_obj(freq=freq, future=future).data + except ValueError: + if future: + get_module_logger("data").warning( + f"load calendar error: freq={freq}, future={future}; return current calendar!" + ) + get_module_logger("data").warning( + "You can get future calendar by referring to the following document: https://github.com/microsoft/qlib/blob/main/scripts/data_collector/contrib/README.md" + ) + backend_obj = self.backend_obj(freq=freq, future=False).data + else: + raise - return [pd.Timestamp(x) for x in backend_obj.data] + return [pd.Timestamp(x) for x in backend_obj] def calendar(self, start_time=None, end_time=None, freq="day", future=False): _calendar, _calendar_index = self._get_calendar(freq, future) @@ -659,14 +663,7 @@ def feature(self, instrument, field, start_index, end_index, freq): # validate field = str(field).lower()[1:] instrument = code_to_fname(instrument) - try: - data = self.backend_obj(instrument=instrument, field=field, freq=freq)[start_index : end_index + 1] - except Exception as e: - get_module_logger("data").warning( - f"WARN: data not found for {instrument}.{field}\n\tFeature exception info: {str(e)}" - ) - data = pd.Series(dtype=np.float32) - return data + return self.backend_obj(instrument=instrument, field=field, freq=freq)[start_index : end_index + 1] class LocalExpressionProvider(ExpressionProvider): diff --git a/qlib/data/storage/file_storage.py b/qlib/data/storage/file_storage.py index 90e4178ffe..a2b145c4df 100644 --- a/qlib/data/storage/file_storage.py +++ b/qlib/data/storage/file_storage.py @@ -14,19 +14,35 @@ logger = get_module_logger("file_storage") -class FileStorage: - def check_exists(self): - return self.uri.exists() +class FileStorageMixin: + @property + def uri(self) -> Path: + _provider_uri = self.kwargs.get("provider_uri", None) + if _provider_uri is None: + raise ValueError( + f"The `provider_uri` parameter is not found in {self.__class__.__name__}, " + f'please specify `provider_uri` in the "provider\'s backend"' + ) + return Path(_provider_uri).expanduser().joinpath(f"{self.storage_name}s", self.file_name) + + def check(self): + """check self.uri + + Raises + ------- + ValueError + """ + if not self.uri.exists(): + raise ValueError(f"{self.storage_name} not exists: {self.uri}") -class FileCalendarStorage(FileStorage, CalendarStorage): - def __init__(self, freq: str, future: bool, uri: str, **kwargs): - super(FileCalendarStorage, self).__init__(freq, future, uri, **kwargs) - _file_name = f"{freq}_future.txt" if future else f"{freq}.txt" - self.uri = Path(self.uri).expanduser().joinpath("calendars", _file_name.lower()) +class FileCalendarStorage(FileStorageMixin, CalendarStorage): + def __init__(self, freq: str, future: bool, **kwargs): + super(FileCalendarStorage, self).__init__(freq, future, **kwargs) + self.file_name = f"{freq}_future.txt" if future else f"{freq}.txt".lower() - def _read_calendar(self, skip_rows: int = 0, n_rows: int = None) -> Iterable[CalVT]: - if not self.check_exists(): + def _read_calendar(self, skip_rows: int = 0, n_rows: int = None) -> List[CalVT]: + if not self.uri.exists(): self._write_calendar(values=[]) with self.uri.open("rb") as fp: return [ @@ -39,7 +55,8 @@ def _write_calendar(self, values: Iterable[CalVT], mode: str = "wb"): np.savetxt(fp, values, fmt="%s", encoding="utf-8") @property - def data(self) -> Iterable[CalVT]: + def data(self) -> List[CalVT]: + self.check() return self._read_calendar() def extend(self, values: Iterable[CalVT]) -> None: @@ -49,6 +66,7 @@ def clear(self) -> None: self._write_calendar(values=[]) def index(self, value: CalVT) -> int: + self.check() calendar = self._read_calendar() return int(np.argwhere(calendar == value)[0]) @@ -58,6 +76,7 @@ def insert(self, index: int, value: CalVT): self._write_calendar(values=calendar) def remove(self, value: CalVT) -> None: + self.check() index = self.index(value) calendar = self._read_calendar() calendar = np.delete(calendar, index) @@ -69,24 +88,29 @@ def __setitem__(self, i: Union[int, slice], values: Union[CalVT, Iterable[CalVT] self._write_calendar(values=calendar) def __delitem__(self, i: Union[int, slice]) -> None: + self.check() calendar = self._read_calendar() calendar = np.delete(calendar, i) self._write_calendar(values=calendar) - def __getitem__(self, i: Union[int, slice]) -> Union[CalVT, Iterable[CalVT]]: + def __getitem__(self, i: Union[int, slice]) -> Union[CalVT, List[CalVT]]: + self.check() return self._read_calendar()[i] + def __len__(self) -> int: + return len(self.data) + -class FileInstrumentStorage(FileStorage, InstrumentStorage): +class FileInstrumentStorage(FileStorageMixin, InstrumentStorage): INSTRUMENT_SEP = "\t" INSTRUMENT_START_FIELD = "start_datetime" INSTRUMENT_END_FIELD = "end_datetime" SYMBOL_FIELD_NAME = "instrument" - def __init__(self, market: str, uri: str, **kwargs): - super(FileInstrumentStorage, self).__init__(market, uri, **kwargs) - self.uri = Path(self.uri).expanduser().joinpath("instruments", f"{market.lower()}.txt") + def __init__(self, market: str, **kwargs): + super(FileInstrumentStorage, self).__init__(market, **kwargs) + self.file_name = f"{market.lower()}.txt" def _read_instrument(self) -> Dict[InstKT, InstVT]: if not self.uri.exists(): @@ -128,6 +152,7 @@ def clear(self) -> None: @property def data(self) -> Dict[InstKT, InstVT]: + self.check() return self._read_instrument() def __setitem__(self, k: InstKT, v: InstVT) -> None: @@ -136,11 +161,13 @@ def __setitem__(self, k: InstKT, v: InstVT) -> None: self._write_instrument(inst) def __delitem__(self, k: InstKT) -> None: + self.check() inst = self._read_instrument() del inst[k] self._write_instrument(inst) def __getitem__(self, k: InstKT) -> InstVT: + self.check() return self._read_instrument()[k] def update(self, *args, **kwargs) -> None: @@ -164,13 +191,14 @@ def update(self, *args, **kwargs) -> None: self._write_instrument(inst) + def __len__(self) -> int: + return len(self.data) -class FileFeatureStorage(FileStorage, FeatureStorage): - def __init__(self, instrument: str, field: str, freq: str, uri: str, **kwargs): - super(FileFeatureStorage, self).__init__(instrument, field, freq, uri, **kwargs) - self.uri = ( - Path(self.uri).expanduser().joinpath("features", instrument.lower(), f"{field.lower()}.{freq.lower()}.bin") - ) + +class FileFeatureStorage(FileStorageMixin, FeatureStorage): + def __init__(self, instrument: str, field: str, freq: str, **kwargs): + super(FileFeatureStorage, self).__init__(instrument, field, freq, **kwargs) + self.file_name = f"{instrument.lower()}/{field.lower()}.{freq.lower()}.bin" def clear(self): with self.uri.open("wb") as _: @@ -214,35 +242,44 @@ def write(self, data_array: Union[List, np.ndarray], index: int = None) -> None: @property def start_index(self) -> Union[int, None]: - if len(self) == 0: + if not self.uri.exists(): return None - with open(self.uri, "rb") as fp: + with self.uri.open("rb") as fp: index = int(np.frombuffer(fp.read(4), dtype=" Union[int, None]: + if not self.uri.exists(): + return None + # The next data appending index point will be `end_index + 1` + return self.start_index + len(self) - 1 + def __getitem__(self, i: Union[int, slice]) -> Union[Tuple[int, float], pd.Series]: if not self.uri.exists(): if isinstance(i, int): return None, None elif isinstance(i, slice): - return pd.Series() + return pd.Series(dtype=np.float32) else: raise TypeError(f"type(i) = {type(i)}") - with open(self.uri, "rb") as fp: - + storage_start_index = self.start_index + storage_end_index = self.end_index + with self.uri.open("rb") as fp: if isinstance(i, int): - if self.start_index > i: - raise IndexError(f"{i}: start index is {self.start_index}") - fp.seek(4 * (i - self.start_index) + 4) + + if storage_start_index > i: + raise IndexError(f"{i}: start index is {storage_start_index}") + fp.seek(4 * (i - storage_start_index) + 4) return i, struct.unpack("f", fp.read(4))[0] elif isinstance(i, slice): - start_index = self.start_index if i.start is None else i.start - end_index = self.end_index if i.stop is None else i.stop - 1 - si = max(self.start_index, start_index) + start_index = storage_start_index if i.start is None else i.start + end_index = storage_end_index if i.stop is None else i.stop - 1 + si = max(start_index, storage_start_index) if si > end_index: - return pd.Series() - fp.seek(4 * (si - self.start_index) + 4) + return pd.Series(dtype=np.float32) + fp.seek(4 * (si - storage_start_index) + 4) # read n bytes count = end_index - si + 1 data = np.frombuffer(fp.read(4 * count), dtype=" Union[Tuple[int, float], pd.Serie raise TypeError(f"type(i) = {type(i)}") def __len__(self) -> int: - return self.uri.stat().st_size // 4 - 1 if self.check_exists() else 0 + self.check() + return self.uri.stat().st_size // 4 - 1 diff --git a/qlib/data/storage/storage.py b/qlib/data/storage/storage.py index dcf6da9ed1..8426ebe66f 100644 --- a/qlib/data/storage/storage.py +++ b/qlib/data/storage/storage.py @@ -25,24 +25,28 @@ class UserCalendarStorage(CalendarStorage): @property def data(self) -> Iterable[CalVT]: - '''get all data''' - raise NotImplementedError("Subclass of CalendarStorage must implement `data` method") + '''get all data - def check_exists(self) -> bool: - '''check if storage(uri) exists, if not exists: return False''' - raise NotImplementedError("Subclass of BaseStorage must implement `check_exists` method") + Raises + ------ + ValueError + If the data(storage) does not exist, raise ValueError + ''' + raise NotImplementedError("Subclass of CalendarStorage must implement `data` method") class UserInstrumentStorage(InstrumentStorage): @property def data(self) -> Dict[InstKT, InstVT]: - '''get all data''' - raise NotImplementedError("Subclass of InstrumentStorage must implement `data` method") + '''get all data - def check_exists(self) -> bool: - '''check if storage(uri) exists, if not exists: return False''' - raise NotImplementedError("Subclass of BaseStorage must implement `check_exists` method") + Raises + ------ + ValueError + If the data(storage) does not exist, raise ValueError + ''' + raise NotImplementedError("Subclass of InstrumentStorage must implement `data` method") class UserFeatureStorage(FeatureStorage): @@ -53,103 +57,64 @@ def __getitem__(self, s: slice) -> pd.Series: Returns ------- pd.Series(values, index=pd.RangeIndex(start, len(values)) + + Notes + ------- + if data(storage) does not exist: + if isinstance(i, int): + return (None, None) + if isinstance(i, slice): + # return empty pd.Series + return pd.Series(dtype=np.float32) ''' raise NotImplementedError( "Subclass of FeatureStorage must implement `__getitem__(s: slice)` method" ) - def check_exists(self) -> bool: - '''check if storage(uri) exists, if not exists: return False''' - raise NotImplementedError("Subclass of BaseStorage must implement `check_exists` method") """ -class StorageMeta(type): - """unified management of raise when storage is not exists""" - - def __new__(cls, name, bases, dict): - class_obj = type.__new__(cls, name, bases, dict) - - # The calls to __iter__ and __getitem__ do not pass through __getattribute__. - # In order to throw an exception before calling __getitem__, use the metaclass - _getitem_func = getattr(class_obj, "__getitem__") - - def _getitem(obj, item): - getattr(obj, "_check")() - try: - res = _getitem_func(obj, item) - except Exception as e: - raise ValueError(f"{obj.raise_info}\n\tStorage exception info: {str(e)}") - return res - - setattr(class_obj, "__getitem__", _getitem) - return class_obj - - -class BaseStorage(metaclass=StorageMeta): +class BaseStorage: @property def storage_name(self) -> str: return re.findall("[A-Z][^A-Z]*", self.__class__.__name__)[-2].lower() - @property - def raise_info(self): - parameters_info = [ - f"{_k}={_v}" - for _k, _v in self.__dict__.items() - if not isinstance(_v, (dict,)) or (hasattr(_v, "__len__") and len(_v) < 3) - ] - return f"{self.storage_name.lower()} not exists, storage parameters: {parameters_info}" - - def check_exists(self) -> bool: - """check if storage(uri) exists, if not exists: return False""" - raise NotImplementedError("Subclass of BaseStorage must implement `check_exists` method") - - def clear(self) -> None: - """clear storage""" - raise NotImplementedError("Subclass of BaseStorage must implement `clear` method") - - def __len__(self) -> 0: - return len(self.data) if self.check_exists() else 0 - - def __getitem__(self, item: Union[slice, Union[int, InstKT]]): - raise NotImplementedError( - "Subclass of BaseStorage must implement `__getitem__(i: Union[int, InstKT])`/`__getitem__(s: slice)` method" - ) - - def _check(self): - if not self.check_exists(): - raise ValueError(self.raise_info) - - def __getattribute__(self, item): - if item == "data": - self._check() - try: - res = super(BaseStorage, self).__getattribute__(item) - except Exception as e: - raise ValueError(f"{self.raise_info}\n\tStorage exception info: {str(e)}") - return res - class CalendarStorage(BaseStorage): """ The behavior of CalendarStorage's methods and List's methods of the same name remain consistent """ - def __init__(self, freq: str, future: bool, uri: str, **kwargs): + def __init__(self, freq: str, future: bool, **kwargs): self.freq = freq self.future = future - self.uri = uri + self.kwargs = kwargs @property def data(self) -> Iterable[CalVT]: - """get all data""" + """get all data + + Raises + ------ + ValueError + If the data(storage) does not exist, raise ValueError + """ raise NotImplementedError("Subclass of CalendarStorage must implement `data` method") + def clear(self) -> None: + raise NotImplementedError("Subclass of CalendarStorage must implement `clear` method") + def extend(self, iterable: Iterable[CalVT]) -> None: raise NotImplementedError("Subclass of CalendarStorage must implement `extend` method") def index(self, value: CalVT) -> int: + """ + Raises + ------ + ValueError + If the data(storage) does not exist, raise ValueError + """ raise NotImplementedError("Subclass of CalendarStorage must implement `index` method") def insert(self, index: int, value: CalVT) -> None: @@ -184,6 +149,12 @@ def __delitem__(self, i: slice) -> None: ... def __delitem__(self, i) -> None: + """ + Raises + ------ + ValueError + If the data(storage) does not exist, raise ValueError + """ raise NotImplementedError( "Subclass of CalendarStorage must implement `__delitem__(i: int)`/`__delitem__(s: slice)` method" ) @@ -199,26 +170,60 @@ def __getitem__(self, i: int) -> CalVT: ... def __getitem__(self, i) -> CalVT: + """ + + Raises + ------ + ValueError + If the data(storage) does not exist, raise ValueError + + """ raise NotImplementedError( "Subclass of CalendarStorage must implement `__getitem__(i: int)`/`__getitem__(s: slice)` method" ) + def __len__(self) -> int: + """ + + Raises + ------ + ValueError + If the data(storage) does not exist, raise ValueError + + """ + raise NotImplementedError("Subclass of CalendarStorage must implement `__len__` method") + class InstrumentStorage(BaseStorage): - def __init__(self, market: str, uri: str, **kwargs): + def __init__(self, market: str, **kwargs): self.market = market - self.uri = uri + self.kwargs = kwargs @property def data(self) -> Dict[InstKT, InstVT]: - """get all data""" + """get all data + + Raises + ------ + ValueError + If the data(storage) does not exist, raise ValueError + """ raise NotImplementedError("Subclass of InstrumentStorage must implement `data` method") + def clear(self) -> None: + raise NotImplementedError("Subclass of InstrumentStorage must implement `clear` method") + def update(self, *args, **kwargs) -> None: """D.update([E, ]**F) -> None. Update D from mapping/iterable E and F. - If E present and has a .keys() method, does: for k in E: D[k] = E[k] - If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v - In either case, this is followed by: for k, v in F.items(): D[k] = v + + Notes + ------ + If E present and has a .keys() method, does: for k in E: D[k] = E[k] + + If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v + + In either case, this is followed by: for k, v in F.items(): D[k] = v + """ raise NotImplementedError("Subclass of InstrumentStorage must implement `update` method") @@ -227,53 +232,96 @@ def __setitem__(self, k: InstKT, v: InstVT) -> None: raise NotImplementedError("Subclass of InstrumentStorage must implement `__setitem__` method") def __delitem__(self, k: InstKT) -> None: - """Delete self[key].""" + """Delete self[key]. + + Raises + ------ + ValueError + If the data(storage) does not exist, raise ValueError + """ raise NotImplementedError("Subclass of InstrumentStorage must implement `__delitem__` method") def __getitem__(self, k: InstKT) -> InstVT: """x.__getitem__(k) <==> x[k]""" raise NotImplementedError("Subclass of InstrumentStorage must implement `__getitem__` method") + def __len__(self) -> int: + """ + + Raises + ------ + ValueError + If the data(storage) does not exist, raise ValueError + + """ + raise NotImplementedError("Subclass of InstrumentStorage must implement `__len__` method") + class FeatureStorage(BaseStorage): - def __init__(self, instrument: str, field: str, freq: str, uri: str, **kwargs): + def __init__(self, instrument: str, field: str, freq: str, **kwargs): self.instrument = instrument self.field = field self.freq = freq - self.uri = uri + self.kwargs = kwargs @property def data(self) -> pd.Series: - """get all data""" + """get all data + + Notes + ------ + if data(storage) does not exist, return empty pd.Series: `return pd.Series(dtype=np.float32)` + """ raise NotImplementedError("Subclass of FeatureStorage must implement `data` method") @property def start_index(self) -> Union[int, None]: """get FeatureStorage start index - If len(self) == 0; return None + + Notes + ----- + If the data(storage) does not exist, return None """ - raise NotImplementedError("Subclass of FeatureStorage must implement `data` method") + raise NotImplementedError("Subclass of FeatureStorage must implement `start_index` method") @property def end_index(self) -> Union[int, None]: - if len(self) == 0: - return None - return None if len(self) == 0 else self.start_index + len(self) - 1 + """get FeatureStorage end index + + Notes + ----- + The right index of the data range (both sides are closed) + + The next data appending point will be `end_index + 1` + + If the data(storage) does not exist, return None + """ + raise NotImplementedError("Subclass of FeatureStorage must implement `end_index` method") + + def clear(self) -> None: + raise NotImplementedError("Subclass of FeatureStorage must implement `clear` method") def write(self, data_array: Union[List, np.ndarray, Tuple], index: int = None): """Write data_array to FeatureStorage starting from index. - If index is None, append data_array to feature. - If len(data_array) == 0; return - If (index - self.end_index) >= 1, self[end_index+1: index] will be filled with np.nan + Notes + ------ + If index is None, append data_array to feature. - Examples: + If len(data_array) == 0; return + + If (index - self.end_index) >= 1, self[end_index+1: index] will be filled with np.nan + + Examples + --------- + .. code-block:: feature: 3 4 4 5 5 6 + >>> self.write([6, 7], index=6) feature: @@ -311,56 +359,70 @@ def write(self, data_array: Union[List, np.ndarray, Tuple], index: int = None): def rebase(self, start_index: int = None, end_index: int = None): """Rebase the start_index and end_index of the FeatureStorage. - Examples: + start_index and end_index are closed intervals: [start_index, end_index] - feature: - 3 4 - 4 5 - 5 6 + Examples + --------- - >>> self.rebase(start_index=4) + .. code-block:: - feature: - 4 5 - 5 6 + feature: + 3 4 + 4 5 + 5 6 - >>> self.rebase(start_index=3) - feature: - 3 np.nan - 4 5 - 5 6 + >>> self.rebase(start_index=4) - >>> self.write([3], index=3) + feature: + 4 5 + 5 6 - feature: - 3 3 - 4 5 - 5 6 + >>> self.rebase(start_index=3) - >>> self.rebase(end_index=4) + feature: + 3 np.nan + 4 5 + 5 6 - feature: - 3 3 - 4 5 + >>> self.write([3], index=3) - >>> self.write([6, 7, 8], index=4) + feature: + 3 3 + 4 5 + 5 6 - feature: - 3 3 - 4 6 - 5 7 - 6 8 + >>> self.rebase(end_index=4) - >>> self.rebase(start_index=4, end_index=5) + feature: + 3 3 + 4 5 - feature: - 4 6 - 5 7 + >>> self.write([6, 7, 8], index=4) + + feature: + 3 3 + 4 6 + 5 7 + 6 8 + + >>> self.rebase(start_index=4, end_index=5) + + feature: + 4 6 + 5 7 """ - if start_index is None and end_index is None: - logger.warning("both start_index and end_index are None, rebase is ignored") + storage_si = self.start_index + storage_ei = self.end_index + if storage_si is None or storage_ei is None: + raise ValueError("storage.start_index or storage.end_index is None, storage may not exist") + + start_index = storage_si if start_index is None else start_index + end_index = storage_ei if end_index is None else end_index + + if start_index is None or end_index is None: + logger.warning("both start_index and end_index are None, or storage does not exist; rebase is ignored") return if start_index < 0 or end_index < 0: @@ -373,17 +435,15 @@ def rebase(self, start_index: int = None, end_index: int = None): ) return - start_index = self.start_index if start_index is None else end_index - end_index = self.end_index if end_index is None else end_index - if start_index <= self.start_index: - self.write([np.nan] * (self.start_index - start_index), start_index) + if start_index <= storage_si: + self.write([np.nan] * (storage_si - start_index), start_index) else: self.rewrite(self[start_index:].values, start_index) if end_index >= self.end_index: self.write([np.nan] * (end_index - self.end_index)) else: - self.rewrite(self[: end_index + 1].values, self.start_index) + self.rewrite(self[: end_index + 1].values, start_index) def rewrite(self, data: Union[List, np.ndarray, Tuple], index: int): """overwrite all data in FeatureStorage with data @@ -414,7 +474,28 @@ def __getitem__(self, i: int) -> Tuple[int, float]: ... def __getitem__(self, i) -> Union[Tuple[int, float], pd.Series]: - """x.__getitem__(y) <==> x[y]""" + """x.__getitem__(y) <==> x[y] + + Notes + ------- + if data(storage) does not exist: + if isinstance(i, int): + return (None, None) + if isinstance(i, slice): + # return empty pd.Series + return pd.Series(dtype=np.float32) + """ raise NotImplementedError( "Subclass of FeatureStorage must implement `__getitem__(i: int)`/`__getitem__(s: slice)` method" ) + + def __len__(self) -> int: + """ + + Raises + ------ + ValueError + If the data(storage) does not exist, raise ValueError + + """ + raise NotImplementedError("Subclass of FeatureStorage must implement `__len__` method") diff --git a/qlib/utils/__init__.py b/qlib/utils/__init__.py index 77857182d9..686f0fc00f 100644 --- a/qlib/utils/__init__.py +++ b/qlib/utils/__init__.py @@ -665,7 +665,10 @@ def exists_qlib_data(qlib_dir): return False # check calendar bin for _calendar in calendars_dir.iterdir(): - if not list(features_dir.rglob(f"*.{_calendar.name.split('.')[0]}.bin")): + + if ("_future" not in _calendar.name) and ( + not list(features_dir.rglob(f"*.{_calendar.name.split('.')[0]}.bin")) + ): return False # check instruments diff --git a/scripts/dump_bin.py b/scripts/dump_bin.py index 0b063fddac..b3a18cc902 100644 --- a/scripts/dump_bin.py +++ b/scripts/dump_bin.py @@ -120,7 +120,7 @@ def _get_date( else: df = file_or_df if df.empty or self.date_field_name not in df.columns.tolist(): - _calendars = pd.Series() + _calendars = pd.Series(dtype=np.float32) else: _calendars = df[self.date_field_name] diff --git a/tests/storage_tests/test_storage.py b/tests/storage_tests/test_storage.py index e7bac658cb..aad8d11e48 100644 --- a/tests/storage_tests/test_storage.py +++ b/tests/storage_tests/test_storage.py @@ -24,7 +24,7 @@ class TestStorage(TestAutoData): def test_calendar_storage(self): - calendar = CalendarStorage(freq="day", future=False, uri=self.provider_uri) + calendar = CalendarStorage(freq="day", future=False, provider_uri=self.provider_uri) assert isinstance(calendar[:], Iterable), f"{calendar.__class__.__name__}.__getitem__(s: slice) is not Iterable" assert isinstance(calendar.data, Iterable), f"{calendar.__class__.__name__}.data is not Iterable" @@ -32,6 +32,16 @@ def test_calendar_storage(self): print(f"calendar[0]: {calendar[0]}") print(f"calendar[-1]: {calendar[-1]}") + calendar = CalendarStorage(freq="1min", future=False, provider_uri="not_found") + with pytest.raises(ValueError): + print(calendar.data) + + with pytest.raises(ValueError): + print(calendar[:]) + + with pytest.raises(ValueError): + print(calendar[0]) + def test_instrument_storage(self): """ The meaning of instrument, such as CSI500: @@ -66,7 +76,7 @@ def test_instrument_storage(self): """ - instrument = InstrumentStorage(market="csi300", uri=self.provider_uri) + instrument = InstrumentStorage(market="csi300", provider_uri=self.provider_uri) for inst, spans in instrument.data.items(): assert isinstance(inst, str) and isinstance( @@ -79,6 +89,13 @@ def test_instrument_storage(self): print(f"instrument['SH600000']: {instrument['SH600000']}") + instrument = InstrumentStorage(market="csi300", provider_uri="not_found") + with pytest.raises(ValueError): + print(instrument.data) + + with pytest.raises(ValueError): + print(instrument["sSH600000"]) + def test_feature_storage(self): """ Calendar: @@ -133,9 +150,9 @@ def test_feature_storage(self): """ - feature = FeatureStorage(instrument="SH600004", field="close", freq="day", uri=self.provider_uri) + feature = FeatureStorage(instrument="SH600004", field="close", freq="day", provider_uri=self.provider_uri) - with pytest.raises(ValueError): + with pytest.raises(IndexError): print(feature[0]) assert isinstance( feature[815][1], (float, np.float32) @@ -144,3 +161,11 @@ def test_feature_storage(self): print(f"feature[815: 818]: \n{feature[815: 818]}") print(f"feature[:].tail(): \n{feature[:].tail()}") + + feature = FeatureStorage(instrument="SH600004", field="close", freq="day", provider_uri="not_fount") + + assert feature[0] == (None, None), "FeatureStorage does not exist, feature[i] should return `(None, None)`" + assert feature[:].empty, "FeatureStorage does not exist, feature[:] should return `pd.Series(dtype=np.float32)`" + assert ( + feature.data.empty + ), "FeatureStorage does not exist, feature.data should return `pd.Series(dtype=np.float32)`"