From b50d8c900e22121eaea0d48e295d5a38df19a807 Mon Sep 17 00:00:00 2001 From: "lin.dongzhao" <542698096@qq.com> Date: Mon, 13 May 2024 16:18:22 +0800 Subject: [PATCH] =?UTF-8?q?futures=5Ftrading=5Fparameters=E8=BF=81?= =?UTF-8?q?=E7=A7=BB=E8=87=B3mod-ricequant-data?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- requirements.txt | 2 +- rqalpha/data/base_data_source/data_source.py | 37 +-- rqalpha/data/base_data_source/storages.py | 68 +---- rqalpha/data/bundle.py | 259 +++---------------- setup.cfg | 2 +- 5 files changed, 46 insertions(+), 322 deletions(-) diff --git a/requirements.txt b/requirements.txt index 987ed4688..4c41ab10c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,7 +10,7 @@ simplejson >=3.10.0 dill ==0.2.5 PyYAML >=3.12 tabulate -rqrisk >=1.0.8 +rqrisk >=1.0.9 h5py matplotlib >=1.5.1 ; python_version >= '3.6' matplotlib >=1.5.1,<=3.0.3 ; python_version == '3.5' diff --git a/rqalpha/data/base_data_source/data_source.py b/rqalpha/data/base_data_source/data_source.py index 33755cba9..4da9a001c 100644 --- a/rqalpha/data/base_data_source/data_source.py +++ b/rqalpha/data/base_data_source/data_source.py @@ -24,23 +24,21 @@ import pandas as pd import six from rqalpha.utils.i18n import gettext as _ -from rqalpha.const import INSTRUMENT_TYPE, TRADING_CALENDAR_TYPE, DEFAULT_ACCOUNT_TYPE +from rqalpha.const import INSTRUMENT_TYPE, TRADING_CALENDAR_TYPE from rqalpha.interface import AbstractDataSource from rqalpha.model.instrument import Instrument from rqalpha.utils.datetime_func import (convert_date_to_int, convert_int_to_date, convert_int_to_datetime) -from rqalpha.utils.exception import RQInvalidArgument, RQDatacVersionTooLow +from rqalpha.utils.exception import RQInvalidArgument from rqalpha.utils.functools import lru_cache from rqalpha.utils.typing import DateLike from rqalpha.environment import Environment -from rqalpha.data.bundle import update_futures_trading_parameters -from rqalpha.utils.logger import user_system_log from rqalpha.data.base_data_source.adjust import FIELDS_REQUIRE_ADJUSTMENT, adjust_bars from rqalpha.data.base_data_source.storage_interface import (AbstractCalendarStore, AbstractDateSet, AbstractDayBarStore, AbstractDividendStore, AbstractInstrumentStore) from rqalpha.data.base_data_source.storages import (DateSet, DayBarStore, DividendStore, ExchangeTradingCalendarStore, FutureDayBarStore, - FutureInfoStore, FuturesTradingParametersStore,InstrumentStore, + FutureInfoStore,InstrumentStore, ShareTransformationStore, SimpleFactorStore, YieldCurveStore, FuturesTradingParameters) @@ -72,8 +70,7 @@ class BaseDataSource(AbstractDataSource): INSTRUMENT_TYPE.PUBLIC_FUND, ) - def __init__(self, path, custom_future_info, futures_time_series_trading_parameters=False, end_date=None): - # type: (str, dict, bool, date) -> None + def __init__(self, path: str, custom_future_info: dict, *args, **kwargs) -> None: if not os.path.exists(path): raise RuntimeError('bundle path {} not exist'.format(os.path.abspath(path))) @@ -89,7 +86,6 @@ def _p(name): INSTRUMENT_TYPE.LOF: funds_day_bar_store } # type: Dict[INSTRUMENT_TYPE, AbstractDayBarStore] - self._futures_trading_parameters_store = None self._future_info_store = FutureInfoStore(_p("future_info.json"), custom_future_info) self._instruments_stores = {} # type: Dict[INSTRUMENT_TYPE, AbstractInstrumentStore] @@ -133,20 +129,6 @@ def _p(name): self._suspend_days = [DateSet(_p('suspended_days.h5'))] # type: List[AbstractDateSet] self._st_stock_days = DateSet(_p('st_stock_days.h5')) - if futures_time_series_trading_parameters: - try: - import rqdatac - except ImportError: - user_system_log.warn(_("RQDatac is not installed, \"config.base.futures_time_series_trading_parameters\" will be disabled.")) - else: - try: - update_futures_trading_parameters(path, end_date) - except (rqdatac.share.errors.PermissionDenied, RQDatacVersionTooLow): - user_system_log.warn(_("RQDatac does not have permission to obtain futures histrical trading parameters, \"config.base.futures_time_series_trading_parameters\" will be disabled.")) - else: - file = os.path.join(path, "futures_trading_parameters.h5") - self._futures_trading_parameters_store = FuturesTradingParametersStore(file, custom_future_info) - def register_day_bar_store(self, instrument_type, store): # type: (INSTRUMENT_TYPE, AbstractDayBarStore) -> None self._day_bars[instrument_type] = store @@ -382,15 +364,8 @@ def get_yield_curve(self, start_date, end_date, tenor=None): return self._yield_curve.get_yield_curve(start_date, end_date, tenor=tenor) @lru_cache(1024) - def get_futures_trading_parameters(self, instrument, dt): - # type: (Instrument, datetime.date) -> FuturesTradingParameters - if self._futures_trading_parameters_store: - trading_parameters = self._futures_trading_parameters_store.get_futures_trading_parameters(instrument, dt) - if trading_parameters is None: - return self._future_info_store.get_future_info(instrument.order_book_id, instrument.underlying_symbol) - return trading_parameters - else: - return self._future_info_store.get_future_info(instrument.order_book_id, instrument.underlying_symbol) + def get_futures_trading_parameters(self, instrument: Instrument, dt: datetime.date) -> FuturesTradingParameters: + return self._future_info_store.get_future_info(instrument.order_book_id, instrument.underlying_symbol) def get_merge_ticks(self, order_book_id_list, trading_date, last_dt=None): raise NotImplementedError diff --git a/rqalpha/data/base_data_source/storages.py b/rqalpha/data/base_data_source/storages.py index 6091b5470..ddccf0dfa 100644 --- a/rqalpha/data/base_data_source/storages.py +++ b/rqalpha/data/base_data_source/storages.py @@ -78,7 +78,7 @@ def __init__(self, f, custom_future_info): } self._custom_data = custom_future_info if "margin_rate" not in self._default_data[next(iter(self._default_data))]: - raise RuntimeError(_("Your bundle data is too old, please use 'rqalpha update-bundle' or 'rqalpha download-bundle' to update it to lastest before using")) + raise RuntimeError(_("The bundle data you are using is too old, please update it to lastest before using")) @classmethod def _process_future_info_item(cls, item): @@ -247,72 +247,6 @@ class FutureDayBarStore(DayBarStore): DEFAULT_DTYPE = np.dtype(DayBarStore.DEFAULT_DTYPE.descr + [("open_interest", ' FuturesTradingParameters or None - dt = convert_date_to_date_int(dt) - if dt < self.FUTURES_TRADING_PARAMETERS_START_DATE: - return None - order_book_id = instrument.order_book_id - underlying_symbol = instrument.underlying_symbol - data = self.get_futures_trading_parameters_all_time(order_book_id) - if data is None: - return None - else: - arr = data[data['datetime'] == dt] - if len(arr) == 0: - if dt >= convert_date_to_date_int(instrument.listed_date) and dt <= convert_date_to_date_int(instrument.de_listed_date): - user_system_log.info("Historical futures trading parameters are abnormal, the lastst parameters will be used for calculations.\nPlease contract RiceQuant to repair: 0755-26569969") - return None - custom_info = self._custom_data.get(order_book_id) or self._custom_data.get(underlying_symbol) - if custom_info: - arr[0] = self.set_custom_info(arr[0], custom_info) - futures_trading_parameters = self._to_namedtuple(arr[0]) - return futures_trading_parameters - - @lru_cache(1024) - def get_futures_trading_parameters_all_time(self, order_book_id): - # type: (str) -> numpy.ndarray or None - with h5_file(self._path) as h5: - try: - data = h5[order_book_id][:] - except KeyError: - return None - return data - - def set_custom_info(self, arr, custom_info): - for field in custom_info: - if field == "commission_type": - if custom_info[field] == COMMISSION_TYPE.BY_MONEY: - value = 0 - elif custom_info[field] == COMMISSION_TYPE.BY_VOLUME: - value = 1 - else: - value = custom_info[field] - arr[field] = value - return arr - - def _to_namedtuple(self, arr): - # type: (numpy.void) -> FuturesTradingParameters - dic = dict(zip(arr.dtype.names, arr)) - del dic['datetime'] - dic["commission_type"] = self.COMMISSION_TYPE_MAP[dic['commission_type']] - futures_trading_parameters = FuturesTradingParameters(**dic) - return futures_trading_parameters - - class DividendStore(AbstractDividendStore): def __init__(self, path): self._path = path diff --git a/rqalpha/data/bundle.py b/rqalpha/data/bundle.py index 415b32f9f..f98ea35b2 100644 --- a/rqalpha/data/bundle.py +++ b/rqalpha/data/bundle.py @@ -18,20 +18,15 @@ import pickle import re from itertools import chain -from typing import Callable, Optional, List +from typing import Callable, Optional, Union, List +from filelock import FileLock, Timeout import h5py import numpy as np -import pandas as pd from rqalpha.apis.api_rqdatac import rqdatac -from rqalpha.utils.concurrent import (ProgressedProcessPoolExecutor, - ProgressedTask) -from rqalpha.utils.datetime_func import (convert_date_to_date_int, - convert_date_to_int,) -from rqalpha.utils.exception import RQDatacVersionTooLow +from rqalpha.utils.concurrent import ProgressedProcessPoolExecutor, ProgressedTask +from rqalpha.utils.datetime_func import convert_date_to_date_int, convert_date_to_int from rqalpha.utils.i18n import gettext as _ -from rqalpha.utils.logger import system_log -from rqalpha.const import TRADING_CALENDAR_TYPE from rqalpha.utils.functools import lru_cache from rqalpha.environment import Environment from rqalpha.model.instrument import Instrument @@ -445,195 +440,8 @@ def update_bundle(path, create, enable_compression=False, concurrency=1): executor.submit(_DayBarTask(order_book_id), os.path.join(path, file), field, **kwargs) -FUTURES_TRADING_PARAMETERS_FIELDS = ["long_margin_ratio", "short_margin_ratio", "commission_type", "open_commission", "close_commission", "close_commission_today"] -TRADING_PARAMETERS_START_DATE = 20100401 -FUTURES_TRADING_PARAMETERS_FILE = "futures_trading_parameters.h5" - - -class FuturesTradingParametersTask(object): - def __init__(self, order_book_ids, underlying_symbols): - self._order_book_ids = order_book_ids - self._underlying_symbols = underlying_symbols - - def __call__(self, path, fields, end_date): - if rqdatac.__version__ < '2.11.12': - raise RQDatacVersionTooLow(_("RQAlpha already supports backtesting using futures historical margins and rates, please upgrade RQDatac to version 2.11.12 and above to use it")) - - if not os.path.exists(path): - self.generate_futures_trading_parameters(path, fields, end_date) - else: - self.update_futures_trading_parameters(path, fields, end_date) - - def generate_futures_trading_parameters(self, path, fields, end_date, recreate_futures_list=None): - # type: (str, list, datetime.date, list) -> None - if not recreate_futures_list: - system_log.info(_("Futures historical trading parameters data is being updated, please wait......")) - order_book_ids = self._order_book_ids - if recreate_futures_list: - order_book_ids = recreate_futures_list - df = rqdatac.futures.get_trading_parameters(order_book_ids, TRADING_PARAMETERS_START_DATE, end_date, fields) - if not (df is None or df.empty): - df.dropna(axis=0, how="all") - df.reset_index(inplace=True) - df['datetime'] = df['trading_date'].map(convert_date_to_date_int) - del df["trading_date"] - df['commission_type'] = df['commission_type'].map(self.set_commission_type) - df.rename(columns={ - 'close_commission': "close_commission_ratio", - 'close_commission_today': "close_commission_today_ratio", - 'open_commission': 'open_commission_ratio' - }, inplace=True) - df.set_index(["order_book_id", "datetime"], inplace=True) - df.sort_index(inplace=True) - with h5py.File(path, "w") as h5: - for order_book_id in df.index.levels[0]: - h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records()) - # 更新期货连续合约的历史交易参数数据(当函数执行目的为补充上次未正常更新的数据时,不需要执行此段逻辑) - if recreate_futures_list is None: - with h5py.File(path, "a") as h5: - df = rqdatac.all_instruments("Future") - for underlying_symbol in self._underlying_symbols: - futures_continuous_contract = df[(df['underlying_symbol'] == underlying_symbol) & (df["listed_date"] == '0000-00-00')].order_book_id.tolist() - s = rqdatac.futures.get_dominant(underlying_symbol, TRADING_PARAMETERS_START_DATE, end_date) - if (s is None or s.empty): - continue - s = s.to_frame().reset_index() - s['date'] = s['date'].map(convert_date_to_date_int) - s.set_index(['date'], inplace=True) - trading_parameters_list = [] - for date in s.index: - try: - data = h5[s['dominant'][date]][:] - except KeyError: - continue - trading_parameters = data[data['datetime'] == date] - if len(trading_parameters) != 0: - trading_parameters_list.append(trading_parameters[0]) - data = np.array(trading_parameters_list) - for order_book_id in futures_continuous_contract: - h5.create_dataset(order_book_id, data=data) - - def update_futures_trading_parameters(self, path, fields, end_date): - # type: (str, list, datetime.date) -> None - try: - h5 = h5py.File(path, "a") - h5.close() - except OSError as e: - raise OSError(_("File {} update failed, if it is using, please update later, or you can delete then update again".format(path))) from e - last_date = self.get_h5_last_date(path) - recreate_futures_list = self.get_recreate_futures_list(path, last_date) - if recreate_futures_list: - self.generate_futures_trading_parameters(path, fields, last_date, recreate_futures_list=recreate_futures_list) - if end_date > last_date: - if rqdatac.get_previous_trading_date(end_date) == last_date: - return - else: - system_log.info(_("Futures historical trading parameters data is being updated, please wait......")) - start_date = rqdatac.get_next_trading_date(last_date) - df = rqdatac.futures.get_trading_parameters(self._order_book_ids, start_date, end_date, fields) - if not(df is None or df.empty): - df = df.dropna(axis=0, how="all") - df.reset_index(inplace=True) - df['datetime'] = df['trading_date'].map(convert_date_to_date_int) - del [df['trading_date']] - df['commission_type'] = df['commission_type'].map(self.set_commission_type) - df.rename(columns={ - 'close_commission': "close_commission_ratio", - 'close_commission_today': "close_commission_today_ratio", - 'open_commission': 'open_commission_ratio' - }, inplace=True) - df.set_index(['order_book_id', 'datetime'], inplace=True) - with h5py.File(path, "a") as h5: - for order_book_id in df.index.levels[0]: - if order_book_id in h5: - data = np.array( - [tuple(i) for i in chain(h5[order_book_id][:], df.loc[order_book_id].to_records())], - dtype=h5[order_book_id].dtype - ) - del h5[order_book_id] - h5.create_dataset(order_book_id, data=data) - else: - h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records()) - # 更新期货连续合约历史交易参数 - with h5py.File(path, "a") as h5: - df = rqdatac.all_instruments("Future") - for underlying_symbol in self._underlying_symbols: - futures_continuous_contract = df[(df['underlying_symbol'] == underlying_symbol) & (df["listed_date"] == '0000-00-00')].order_book_id.tolist() - s = rqdatac.futures.get_dominant(underlying_symbol, start_date, end_date) - if (s is None or s.empty): - continue - s = s.to_frame().reset_index() - s['date'] = s['date'].map(convert_date_to_date_int) - s.set_index(['date'], inplace=True) - trading_parameters_list = [] - for date in s.index: - try: - data = h5[s['dominant'][date]][:] - except KeyError: - continue - trading_parameters = data[data['datetime'] == date] - if len(trading_parameters) != 0: - trading_parameters_list.append(trading_parameters[0]) - for order_book_id in futures_continuous_contract: - if order_book_id in h5: - data = np.array( - [tuple(i) for i in chain(h5[order_book_id][:], trading_parameters_list)], - dtype=h5[order_book_id].dtype - ) - del h5[order_book_id] - h5.create_dataset(order_book_id, data=data) - else: - h5.create_dataset(order_book_id, data=np.array(trading_parameters)) - - def set_commission_type(self, commission_type): - if commission_type == "by_money": - commission_type = 0 - elif commission_type == "by_volume": - commission_type = 1 - return commission_type - - def get_h5_last_date(self, path): - last_date = TRADING_PARAMETERS_START_DATE - with h5py.File(path, "r") as h5: - for key in h5.keys(): - if int(h5[key]['datetime'][-1]) > last_date: - last_date = h5[key]['datetime'][-1] - last_date = datetime.datetime.strptime(str(last_date), "%Y%m%d").date() - return last_date - - def get_recreate_futures_list(self, path, h5_last_date): - # type: (str, datetime.date) -> list - """ - 用户在运行策略的过程中可能中断进程,进而可能导致在创建 h5 文件时,部分合约没有成功 download - 通过该函数,获取在上一次更新中因为异常而没有更新的合约 - """ - recreate_futures_list = [] - df = rqdatac.all_instruments("Future") - last_update_futures_list = df[(df['de_listed_date'] >= str(TRADING_PARAMETERS_START_DATE)) & (df['listed_date'] <= h5_last_date.strftime("%Y%m%d"))].order_book_id.to_list() - with h5py.File(path, "r") as h5: - h5_order_book_ids = h5.keys() - for order_book_id in last_update_futures_list: - if order_book_id in h5_order_book_ids: - continue - else: - recreate_futures_list.append(order_book_id) - return recreate_futures_list - - -def update_futures_trading_parameters(path, end_date): - # type: (str, datetime.date) -> None - df = rqdatac.all_instruments("Future") - order_book_ids = (df[df['de_listed_date'] >= str(TRADING_PARAMETERS_START_DATE)]).order_book_id.tolist() - underlying_symbols = list(set((df[df['de_listed_date'] >= str(TRADING_PARAMETERS_START_DATE)]).underlying_symbol.tolist())) - FuturesTradingParametersTask(order_book_ids, underlying_symbols)( - os.path.join(path, FUTURES_TRADING_PARAMETERS_FILE), - FUTURES_TRADING_PARAMETERS_FIELDS, - end_date - ) - - class AutomaticUpdateBundle(object): - def __init__(self, path: str, filename: str, api: Callable, fields: List[str], end_date: datetime.date) -> None: + def __init__(self, path: str, filename: str, api: Callable, fields: List[str], end_date: datetime.date, start_date: Union[int, datetime.date] = START_DATE) -> None: if not os.path.exists(path): os.makedirs(path) self._file = os.path.join(path, filename) @@ -641,9 +449,11 @@ def __init__(self, path: str, filename: str, api: Callable, fields: List[str], e self._filename = filename self._api = api self._fields = fields + self._start_date = start_date self._end_date = end_date self.updated = [] self._env = Environment.get_instance() + self._file_lock = FileLock(self._file + ".lock") def get_data(self, instrument: Instrument, dt: datetime.date) -> Optional[np.ndarray]: dt = convert_date_to_date_int(dt) @@ -676,34 +486,39 @@ def _auto_update_task(self, instrument: Instrument) -> None: :type instrument: `Instrument` """ order_book_id = instrument.order_book_id - start_date = START_DATE + start_date = self._start_date try: - h5 = h5py.File(self._file, "a") - if order_book_id in h5 and 'trading_dt' in h5[order_book_id].dtype.names: - if len(h5[order_book_id][:]) != 0: - last_date = datetime.datetime.strptime(str(h5[order_book_id][-1]['trading_dt']), "%Y%m%d").date() - if last_date >= self._end_date: - return - start_date = self._env.data_proxy._data_source.get_next_trading_date(last_date).date() - if start_date > self._end_date: - return - arr = self._get_array(instrument, start_date) - if arr is None: - if order_book_id not in h5: - arr = np.array([]) - h5.create_dataset(order_book_id, data=arr) - else: - if order_book_id in h5: - data = np.array( - [tuple(i) for i in chain(h5[order_book_id][:], arr)], - dtype=h5[order_book_id].dtype) - del h5[order_book_id] - h5.create_dataset(order_book_id, data=data) + with self._file_lock.acquire(): + h5 = h5py.File(self._file, "a") + if order_book_id in h5 and 'trading_dt' in h5[order_book_id].dtype.names: + # 需要兼容此前的旧版数据,对字段名进行更新 + if len(h5[order_book_id][:]) != 0: + last_date = datetime.datetime.strptime(str(h5[order_book_id][-1]['trading_dt']), "%Y%m%d").date() + if last_date >= self._end_date: + return + start_date = self._env.data_proxy._data_source.get_next_trading_date(last_date).date() + if start_date > self._end_date: + return + arr = self._get_array(instrument, start_date) + if arr is None: + if order_book_id not in h5: + arr = np.array([]) + h5.create_dataset(order_book_id, data=arr) else: - h5.create_dataset(order_book_id, data=arr) - except OSError as e: + if order_book_id in h5: + if 'trading_dt' in h5[order_book_id].dtype.names: + data = np.array( + [tuple(i) for i in chain(h5[order_book_id][:], arr)], + dtype=h5[order_book_id].dtype) + else: + data = arr + del h5[order_book_id] + h5.create_dataset(order_book_id, data=data) + else: + h5.create_dataset(order_book_id, data=arr) + except (OSError, Timeout) as e: raise OSError(_("File {} update failed, if it is using, please update later, " - "or you can delete then update again".format(self._file))) from e + "or you can delete then update again".format(self._file))) from e finally: h5.close() diff --git a/setup.cfg b/setup.cfg index d84ade053..4b0fb6881 100644 --- a/setup.cfg +++ b/setup.cfg @@ -5,7 +5,7 @@ [metadata] name = rqalpha -version = 5.3.11 +version = 5.4.0 [versioneer] VCS = git