Skip to content

Commit

Permalink
pr update
Browse files Browse the repository at this point in the history
  • Loading branch information
Lin-Dongzhao committed Mar 17, 2024
1 parent cd87697 commit 7c0b56e
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 61 deletions.
115 changes: 56 additions & 59 deletions rqalpha/data/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,20 +631,16 @@ def update_futures_trading_parameters(path, end_date):


class AutomaticUpdateBundle(object):
def __init__(self, path, filename, rqdata_api, fields):
# type: (str, str, function, List[str]) -> None
def __init__(self, path, filename, rqdata_api, fields, end_date):
# type: (str, str, Callable, List[str], datetime.date) -> None
self._file = os.path.join(path, filename)
self._trading_dates = None
self._filename = filename
self._rqdata_api = rqdata_api
self._fields = fields
self._env = None
try:
import rqdatac
self.rqdata_init = True
except ImportError:
system_log.info(_("RQData is not installed, relevant data cannot be updated automatically, and some functions will be limited."))
self.rqdata_init = False
self._end_date = end_date
self._updated = []
self._env = Environment.get_instance()

def get_data(self, instrument, dt):
# type: (Instrument, datetime.datetime) -> numpy.ndarray or None
Expand All @@ -662,11 +658,12 @@ def get_data(self, instrument, dt):
@lru_cache(128)
def _get_data_all_time(self, instrument):
# type: (Instrument) -> numpy.ndarray or None
self._auto_update_task(instrument)
if instrument.order_book_id not in self._updated:
self._auto_update_task(instrument)
self._updated.append(instrument.order_book_id)
with h5py.File(self._file, "r") as h5:
try:
data = h5[instrument.order_book_id][:]
except KeyError:
data = h5[instrument.order_book_id][:]
if len(data) == 0:
return None
return data

Expand All @@ -677,56 +674,56 @@ def _auto_update_task(self, instrument):
:param instrument: 合约对象
:type instrument: `Instrument`
"""
self._env = Environment.get_instance()
order_book_id = instrument.order_book_id
start_date = START_DATE
if not os.path.exists(self._file):
h5 = h5py.File(self._file, "w")
try:
df = self._get_df(instrument, start_date)
if not (df is None or df.empty):
h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records())
finally:
h5.close()
else:
try:
h5 = h5py.File(self._file, "a")
except OSError as e:
raise OSError(_("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(self._file))) from e
try:
if order_book_id in h5:
try:
h5 = h5py.File(self._file, "a")
except OSError as e:
raise OSError(_("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(self._file))) from e
try:
if order_book_id in h5:
if len(h5[order_book_id][:]) != 0:
last_date = datetime.datetime.strptime(str(h5[order_book_id][-1]['trading_dt']), "%Y%m%d").date()
if last_date >= self._end_date:
return
start_date = self._env.data_proxy._data_source.get_next_trading_date(last_date).date()
if start_date > datetime.date.today():
if start_date > self._end_date:
return
df = self._get_df(instrument, start_date)
if not (df is None or df.empty):
if order_book_id in h5:
data = np.array(
[tuple(i) for i in chain(h5[order_book_id][:], df.loc[order_book_id].to_records())],
dtype=h5[order_book_id].dtype
)
del h5[order_book_id]
h5.create_dataset(order_book_id, data=data)
else:
h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records())
finally:
h5.close()

def _get_df(self, instrument, start_date):
# type: (Instrument, datetime.date) -> Dataframe
df = self._rqdata_api(instrument.order_book_id, start_date, datetime.date.today(), self._fields)
arr = self._get_array(instrument, start_date)
if arr is None:
arr = np.array([])
if order_book_id not in h5:
h5.create_dataset(order_book_id, data=arr)
else:
if order_book_id in h5:
data = np.array(
[tuple(i) for i in chain(h5[order_book_id][:], arr)],
dtype=h5[order_book_id].dtype)
del h5[order_book_id]
h5.create_dataset(order_book_id, data=data)
else:
h5.create_dataset(order_book_id, data=arr)
finally:
h5.close()

def _get_array(self, instrument, start_date):
# type: (Instrument, datetime.date) -> numpy.array
df = self._rqdata_api(instrument.order_book_id, start_date, self._end_date, self._fields)
if not (df is None or df.empty):
df = df[self._fields] # rqdatac.get_open_auction_info get Futures's data will auto add 'open_interest' and 'prev_settlement'
df.reset_index(inplace=True)
df["order_book_id"] = instrument.order_book_id
time_parameter = list(set(['datetime', 'date', 'trading_date']).intersection(set(df.columns)))[0]
if time_parameter == "datetime" and instrument.type != INSTRUMENT_TYPE.CS:
# 股票无夜盘,且在市时间较长,考虑省略获取交易日的操作
df[time_parameter] = df[time_parameter].map(self._env.data_proxy._data_source.get_future_trading_date)
df['trading_dt'] = df[time_parameter].map(convert_date_to_date_int)
del df[time_parameter]
df.set_index(["order_book_id", "trading_dt"], inplace=True)
return df
df = df[self._fields].loc[instrument.order_book_id] # rqdatac.get_open_auction_info get Futures's data will auto add 'open_interest' and 'prev_settlement'
record = df.iloc[0: 1].to_records()
dtype = [('trading_dt', 'int')]
for field in self._fields:
dtype.append((field, record.dtype[field]))
update_list = []
for index, row in df.iterrows():
values = [row[field] for field in self._fields]
update_data = tuple([convert_date_to_date_int(
self._env.data_proxy._data_source.get_future_trading_date(index)
)] + values)
update_list.append(update_data)
arr = np.array(update_list, dtype=dtype)
return arr
return None

4 changes: 4 additions & 0 deletions rqalpha/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(self, config):
self._frontend_validators = {} # type: Dict[str, List]
self._default_frontend_validators = []
self._transaction_cost_decider_dict = {}
self.rqdatac_init = False

# Environment.event_bus used in StrategyUniverse()
from rqalpha.core.strategy_universe import StrategyUniverse
Expand Down Expand Up @@ -103,6 +104,9 @@ def set_event_source(self, event_source):
def set_broker(self, broker):
self.broker = broker

def set_rqdatac_init(self, result):
self.rqdatac_init = result

def add_frontend_validator(self, validator, instrument_type=None):
if instrument_type:
self._frontend_validators.setdefault(instrument_type, []).append(validator)
Expand Down
5 changes: 3 additions & 2 deletions rqalpha/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def get_strategy_apis():
return {n: getattr(api, n) for n in api.__all__}


def init_rqdatac(rqdatac_uri):
def init_rqdatac(rqdatac_uri, env):
if rqdatac_uri in ["disabled", "DISABLED"]:
return

Expand All @@ -122,6 +122,7 @@ def init_rqdatac(rqdatac_uri):
init_rqdatac_env(rqdatac_uri)
try:
rqdatac.init()
env.set_rqdatac_init(result=True)
except Exception as e:
system_log.warn(_('rqdatac init failed, some apis will not function properly: {}').format(str(e)))

Expand All @@ -136,7 +137,7 @@ def run(config, source_code=None, user_funcs=None):
# avoid register handlers everytime
# when running in ipython
set_loggers(config)
init_rqdatac(getattr(config.base, 'rqdatac_uri', None))
init_rqdatac(getattr(config.base, 'rqdatac_uri', None), env)
system_log.debug("\n" + pformat(config.convert_to_dict()))

env.set_strategy_loader(init_strategy_loader(env, source_code, user_funcs, config))
Expand Down

0 comments on commit 7c0b56e

Please sign in to comment.