Skip to content

Commit

Permalink
Merge 1525ecf into 5c144af
Browse files Browse the repository at this point in the history
  • Loading branch information
Lin-Dongzhao committed Dec 27, 2023
2 parents 5c144af + 1525ecf commit ac5e2dc
Show file tree
Hide file tree
Showing 18 changed files with 365 additions and 48 deletions.
32 changes: 27 additions & 5 deletions rqalpha/data/base_data_source/data_source.py
Expand Up @@ -32,14 +32,15 @@
from rqalpha.utils.functools import lru_cache
from rqalpha.utils.typing import DateLike
from rqalpha.environment import Environment
from rqalpha.data.bundle import update_futures_trading_parameters

from rqalpha.data.base_data_source.adjust import FIELDS_REQUIRE_ADJUSTMENT, adjust_bars
from rqalpha.data.base_data_source.storage_interface import (AbstractCalendarStore, AbstractDateSet,
AbstractDayBarStore, AbstractDividendStore,
AbstractInstrumentStore)
from rqalpha.data.base_data_source.storages import (DateSet, DayBarStore, DividendStore,
ExchangeTradingCalendarStore, FutureDayBarStore,
FutureInfoStore, InstrumentStore,
FutureInfoStore, FuturesTradingParametersStore,InstrumentStore,
ShareTransformationStore, SimpleFactorStore,
YieldCurveStore)

Expand Down Expand Up @@ -71,7 +72,7 @@ class BaseDataSource(AbstractDataSource):
INSTRUMENT_TYPE.PUBLIC_FUND,
)

def __init__(self, path, custom_future_info):
def __init__(self, path, custom_future_info, trading_parameters_update_args):
if not os.path.exists(path):
raise RuntimeError('bundle path {} not exist'.format(os.path.abspath(path)))

Expand All @@ -86,20 +87,32 @@ def _p(name):
INSTRUMENT_TYPE.ETF: funds_day_bar_store,
INSTRUMENT_TYPE.LOF: funds_day_bar_store
} # type: Dict[INSTRUMENT_TYPE, AbstractDayBarStore]


self._futures_trading_parameters_store = None
if trading_parameters_update_args:
if update_futures_trading_parameters(path, trading_parameters_update_args):
self._futures_trading_parameters_store = FuturesTradingParametersStore(_p("futures_trading_parameters.h5"))
self._future_info_store = FutureInfoStore(_p("future_info.json"), custom_future_info)

self._instruments_stores = {} # type: Dict[INSTRUMENT_TYPE, AbstractInstrumentStore]
self._ins_id_or_sym_type_map = {} # type: Dict[str, INSTRUMENT_TYPE]
instruments = []

with open(_p('instruments.pk'), 'rb') as f:
for i in pickle.load(f):
if i["type"] == "Future" and Instrument.is_future_continuous_contract(i["order_book_id"]):
i["listed_date"] = datetime(1990, 1, 1)
instruments.append(Instrument(i, lambda i: self._future_info_store.get_future_info(i)["tick_size"]))
instruments.append(Instrument(
i,
lambda i: self._future_info_store.get_future_info(i).tick_size,
lambda i, dt: self.get_futures_trading_parameters(i, dt).long_margin_ratio,
lambda i, dt: self.get_futures_trading_parameters(i, dt).short_margin_ratio
))
for ins_type in self.DEFAULT_INS_TYPES:
self.register_instruments_store(InstrumentStore(instruments, ins_type))

if "margin_rate" not in self._future_info_store._default_data[list(self._future_info_store._default_data.keys())[0]]:
self._future_info_store.data_compatible(self._instruments_stores[INSTRUMENT_TYPE.FUTURE])
dividend_store = DividendStore(_p('dividends.h5'))
self._dividends = {
INSTRUMENT_TYPE.CS: dividend_store,
Expand Down Expand Up @@ -362,6 +375,15 @@ def get_yield_curve(self, start_date, end_date, tenor=None):
def get_commission_info(self, instrument):
return self._future_info_store.get_future_info(instrument)

def get_futures_trading_parameters(self, instrument, dt):
if self._futures_trading_parameters_store:
trading_parameters = self._futures_trading_parameters_store.get_futures_trading_parameters(instrument, dt)
if trading_parameters == None:
return self.get_commission_info(instrument)
return trading_parameters
else:
return self.get_commission_info(instrument)

def get_merge_ticks(self, order_book_id_list, trading_date, last_dt=None):
raise NotImplementedError

Expand Down
122 changes: 110 additions & 12 deletions rqalpha/data/base_data_source/storages.py
Expand Up @@ -29,6 +29,7 @@
import numpy as np
import pandas
from methodtools import lru_cache
import collections

from rqalpha.const import COMMISSION_TYPE, INSTRUMENT_TYPE
from rqalpha.model.instrument import Instrument
Expand Down Expand Up @@ -56,6 +57,18 @@ class FutureInfoStore(object):
"by_money": COMMISSION_TYPE.BY_MONEY
}

FuturesInfo = collections.namedtuple("FuturesInfo", [
"order_book_id",
"underlying_symbol",
"close_commission_ratio",
"close_commission_today_ratio",
"commission_type",
"open_commission_ratio",
"long_margin_ratio",
"short_margin_ratio",
"tick_size"
])

def __init__(self, f, custom_future_info):
with open(f, "r") as json_file:
self._default_data = {
Expand All @@ -66,25 +79,50 @@ def __init__(self, f, custom_future_info):
self._custom_data = custom_future_info
self._future_info = {}

def data_compatible(self, futures_instruments_store):
"""
RQAlpha==5.3.5 后, margin_rate调整为从 future_info.json 获取,当用户的 bundle 数据未更新时,调用该函数进行兼容
"""
hard_code = {"TC": 0.05, "ER": 0.05, "WS": 0.05, "RO": 0.05, "ME": 0.06, "WT": 0.05}
for id_or_syms in list(self._default_data.keys()):
if len(id_or_syms) <= 2:
if id_or_syms in hard_code.keys():
self._default_data[id_or_syms]["margin_rate"] = hard_code[id_or_syms]
else:
order_book_id = futures_instruments_store._instruments[id_or_syms + "88"].trading_code
self._default_data[id_or_syms]["margin_rate"] = futures_instruments_store._instruments[order_book_id].margin_rate
else:
self._default_data[id_or_syms]["margin_rate"] = futures_instruments_store._instruments[id_or_syms].margin_rate

@classmethod
def _process_future_info_item(cls, item):
item["commission_type"] = cls.COMMISSION_TYPE_MAP[item["commission_type"]]
return item

@lru_cache(1024)
def get_future_info(self, instrument):
# type: (Instrument) -> Dict[str, float]
order_book_id = instrument.order_book_id
try:
return self._future_info[order_book_id]
except KeyError:
custom_info = self._custom_data.get(order_book_id) or self._custom_data.get(instrument.underlying_symbol)
info = self._default_data.get(order_book_id) or self._default_data.get(instrument.underlying_symbol)
if custom_info:
info = copy(info) or {}
info.update(custom_info)
elif not info:
raise NotImplementedError(_("unsupported future instrument {}").format(order_book_id))
return self._future_info.setdefault(order_book_id, info)
custom_info = self._custom_data.get(order_book_id) or self._custom_data.get(instrument.underlying_symbol)
info = self._default_data.get(order_book_id) or self._default_data.get(instrument.underlying_symbol)
if custom_info:
info = copy(info) or {}
info.update(custom_info)
elif not info:
raise NotImplementedError(_("unsupported future instrument {}").format(order_book_id))
info = self.to_namedtuple(info)
return info

def to_namedtuple(self, info):
if info.get("order_book_id"):
info_data_list = [info["order_book_id"], None]
else:
info_data_list = [None, info["underlying_symbol"]]
for field in self.FuturesInfo._fields:
if (field in ["order_book_id", "underlying_symbol"]): continue
if (field in ["long_margin_ratio", "short_margin_ratio"]): field = "margin_rate"
info_data_list.append(info[field])
info = self.FuturesInfo._make(info_data_list)
return info


class InstrumentStore(AbstractInstrumentStore):
Expand Down Expand Up @@ -208,6 +246,66 @@ class FutureDayBarStore(DayBarStore):
DEFAULT_DTYPE = np.dtype(DayBarStore.DEFAULT_DTYPE.descr + [("open_interest", '<f8')])


class FuturesTradingParametersStore(object):
COMMISSION_TYPE_MAP = {
0: COMMISSION_TYPE.BY_MONEY,
1: COMMISSION_TYPE.BY_VOLUME
}

FuturesTradingParameters = collections.namedtuple("FuturesTradingParameters", [
"order_book_id",
"commission_type",
"long_margin_ratio",
"short_margin_ratio",
"close_commission_ratio",
"close_commission_today_ratio",
"open_commission_ratio"
])

# 历史期货交易参数的数据在2010年4月之后才有
FUTURES_TRADING_PARAMETERS_START_DATE = 20100401

def __init__(self, path):
self._path = path
self._order_book_id = None
self.futures_trading_parameters = None
self.arr = None

@lru_cache(1024)
def get_futures_trading_parameters(self, instrument, dt):
# type: (Instrument, datetime.datetime) -> FuturesTradingParameters
dt = convert_date_to_date_int(dt)
if dt < self.FUTURES_TRADING_PARAMETERS_START_DATE:
return None
self._order_book_id = instrument.order_book_id
dt = dt * 1000000
with h5_file(self._path) as h5:
data = h5[self._order_book_id][:]
self.arr = data[data['datetime'] == dt]
if len(self.arr) == 0:
if dt in range(
convert_date_to_date_int(instrument.listed_date),
convert_date_to_date_int(instrument.de_listed_date)
):
raise
else:
return None
self.to_namedtuple()
return self.futures_trading_parameters

def to_namedtuple(self):
parameter_data_list = []
for field in self.FuturesTradingParameters._fields:
if field == "order_book_id":
parameter_data = self._order_book_id
elif field == "commission_type":
parameter_data = self.COMMISSION_TYPE_MAP[int(self.arr[field][0])]
else:
parameter_data = float(self.arr[field][0])
parameter_data_list.append(parameter_data)
self.futures_trading_parameters = self.FuturesTradingParameters._make(parameter_data_list)


class DividendStore(AbstractDividendStore):
def __init__(self, path):
self._path = path
Expand Down

0 comments on commit ac5e2dc

Please sign in to comment.