Skip to content

Commit

Permalink
#
Browse files Browse the repository at this point in the history
  • Loading branch information
yutiansut committed Jul 12, 2019
1 parent e38fb9b commit 4376a2b
Show file tree
Hide file tree
Showing 7 changed files with 342 additions and 0 deletions.
5 changes: 5 additions & 0 deletions QARMC/__init__.py
@@ -0,0 +1,5 @@
__version__ = '0.0.1'
__author__ = 'yutiansut'

from QARMC.collectors.ctpbeecollector import QARMC_CtpBeeCollector
from QARMC.collectors.wscollector import QARMC_WsCollector
Empty file added QARMC/collectors/__init__.py
Empty file.
121 changes: 121 additions & 0 deletions QARMC/collectors/ctpbeecollector.py
@@ -0,0 +1,121 @@
import datetime
import json

from QAPUBSUB import consumer, producer
from QAREALTIME.setting import (market_data_ip, market_data_password,
market_data_user)
from QUANTAXIS.QAUtil.QALogs import QA_util_log_info


class QARMC_CtpBeeCollector():
def __init__(self, code):
self.data = {}
self.min5_data = {}
self.pro = producer.publisher(exchange='1min_{}'.format(
code), user=market_data_user, password=market_data_password, host=market_data_ip)
self.pro_realtimemin = producer.publisher(exchange='realtime_min_{}'.format(
code), user=market_data_user, password=market_data_password, host=market_data_ip)
self.is_send = False
self.last_volume = 0
self.c = consumer.subscriber_routing(
exchange='CTPX', routing_key=code, user=market_data_user, password=market_data_password, host=market_data_ip)

def create_new(self, new_tick):
"""
{'gateway_name': 'ctp', 'symbol': 'au2004', 'exchange': 'SHFE',
'datetime': '2019-07-02 23:40:19.500000', 'name': '黄金2004',
'volume': 918, 'last_price': 318.35, 'last_volume': 0,
'limit_up': 325.95, 'limit_down': 300.9, 'open_interest':4940.0,
'average_price': 315256.2091503268, 'preSettlementPrice': 313.45,
'open_price': 314.0, 'high_price': 318.35, 'low_price': 313.9,
'pre_close': 314.05, 'bid_price_1': 318.25, 'bid_price_2': 0, 'bid_price_3': 0,
'bid_price_4': 0, 'bid_price_5': 0, 'ask_price_1': 318.45, 'ask_price_2': 0,
'ask_price_3': 0, 'ask_price_4': 0, 'ask_price_5': 0, 'bid_volume_1': 6,
'bid_volume_2': 0, 'bid_volume_3': 0, 'bid_volume_4': 0, 'bid_volume_5': 0,
'ask_volume_1': 3, 'ask_volume_2': 0, 'ask_volume_3': 0, 'ask_volume_4': 0,
'ask_volume_5': 0, 'vt_symbol': 'au2004.SHFE'}
"""
# time = '{}-{}-{} '.format(new_tick['ActionDay'][0:4], new_tick['ActionDay'][4:6], new_tick['ActionDay']
# [6:8]) + new_tick['datetime'] + str('%.6f' % (new_tick['UpdateMillisec']/1000000))[1:]
self.data[new_tick['symbol']] = {'open': new_tick['last_price'],
'high': new_tick['last_price'],
'low': new_tick['last_price'],
'close': new_tick['last_price'],
'code': str(new_tick['symbol']).upper(),
'datetime': new_tick['datetime'],
'volume': new_tick['volume']-self.last_volume}

def update_bar(self, new_tick):

time = new_tick['datetime']
old_data = self.data[new_tick['symbol']]
old_data['close'] = new_tick['last_price']
old_data['high'] = old_data['high'] if old_data['high'] > new_tick['last_price'] else new_tick['last_price']
old_data['low'] = old_data['low'] if old_data['low'] < new_tick['last_price'] else new_tick['last_price']
old_data['datetime'] = time
old_data['volume'] = new_tick['volume'] - self.last_volume
self.data[new_tick['symbol']] = old_data
return old_data

def publish_bar(self, symbol):
QA_util_log_info('=================================')
QA_util_log_info('publish')
QA_util_log_info('=================================')
self.pro.pub(json.dumps(self.data[symbol]))
self.is_send = True

def upcoming_data(self, new_tick):
# print(new_tick)
curtime = new_tick['datetime']
time = curtime
try:
if new_tick['datetime'][17:19] == '00' and len(new_tick['datetime']) == 19:
print(True)
old_data = self.update_bar(new_tick)
self.last_volume = new_tick['volume']
self.publish_bar(new_tick['symbol'])
self.pro_realtimemin.pub(json.dumps(old_data))
self.data[new_tick['symbol']] = {}
self.data[new_tick['symbol']]['datetime'] = time

elif new_tick['datetime'][17:19] == '00' and len(new_tick['datetime']) > 19:
if self.is_send:
self.is_send = False
else:
self.publish_bar(new_tick['symbol'])

QA_util_log_info('xxx')
self.create_new(new_tick)
self.pro_realtimemin.pub(json.dumps(
self.data[new_tick['symbol']]))
QA_util_log_info(self.data)
else:
try:
self.update_bar(new_tick)
except:
self.create_new(new_tick)
self.pro_realtimemin.pub(json.dumps(
self.data[new_tick['symbol']]))
except Exception as e:
print(e)

def callback(self, a, b, c, body):
self.upcoming_data(json.loads(body))

def start(self):
self.c.callback = self.callback
while True:
self.c.start()


if __name__ == '__main__':

import click
@click.command()
@click.option('--code', default='au1910')
def handler(code):
r = QARMC_CtpBeeCollector(code)

r.start()

handler()
85 changes: 85 additions & 0 deletions QARMC/collectors/wscollector.py
@@ -0,0 +1,85 @@
#
from QAPUBSUB.producer import publisher_routing
from QAPUBSUB.consumer import subscriber
from QUANTAXIS.QAEngine import QA_Thread
from QA_OTGBroker import on_pong, on_message, on_error, subscribe_quote, on_close, login, peek
import websocket
import threading
import click
import time
import json
import pymongo
from QARMC.util import fix_dict


class QARMC_WsCollector(QA_Thread):
def __init__(self):

super().__init__()
self.ws = websocket.WebSocketApp('wss://openmd.shinnytech.com/t/md/front/mobile',
on_pong=on_pong,
on_message=self.on_message,
on_error=on_error,
on_close=on_close)

def _onopen(ws):
def run():
ws.send(peek())
threading.Thread(target=run, daemon=False).start()

self.quoteclient = pymongo.MongoClient().QAREALTIME.realtimeQuote
self.ws.on_open = _onopen
self.data = {}
self.subscribe_list = ['SHFE.rb1910', 'DCE.j1909']
self.sub = subscriber(exchange='QAQuote')
self.sub.callback = self.callback
threading.Thread(target=self.ws.run_forever,
name='market_websock', daemon=False).start()
threading.Thread(target=self.sub.start,
name='market_subscriber', daemon=True).start()

def on_message(self, message):
print(message)
message = json.loads(message)
if 'data' in message.keys():
data = message['data'][0]
if 'quotes' in data.keys():
data = data['quotes']
for items in data.keys():
try:
item = items.replace('.', '_')
if item not in self.data.keys():
self.data[item] = data[items]
else:
for keys in data[items].keys():
self.data[item][keys] = data[items][keys]
self.data[item]['instrument_id'] = item
self.quoteclient.update_one({'instrument_id': item},
{'$set': self.data[item]}, upsert=True)
except Exception as e:
print(e)

self.ws.send(peek())

def callback(self, a, b, c, data):
data = json.loads(data)
if data['topic'] == 'subscribe':
new_ins = data['code'].replace('_', '.').split(',')
import copy

old = len(self.subscribe_list)
self.subscribe_list.extend(new_ins)
self.subscribe_list = list(
set(self.subscribe_list))
if old < len(self.subscribe_list):
self.ws.send(subscribe_quote(','.join(self.subscribe_list)))

def run(self):
time.sleep(2)
self.ws.send(subscribe_quote('SHFE.rb1910,DCE.j1909'))
while True:
time.sleep(1)


if __name__ == "__main__":
QARMC_WsCollector().start()
45 changes: 45 additions & 0 deletions QARMC/util.py
@@ -0,0 +1,45 @@
import copy
def fix_dict(datax, ignore_duplicate_key=False):
"""
Removes dots "." from keys, as mongo doesn't like that.
If the key is already there without the dot, the dot-value get's lost.
This modifies the existing dict!
:param ignore_duplicate_key: True: if the replacement key is already in the dict, now the dot-key value will be ignored.
False: raise ValueError in that case.
"""
#datax = copy.deepcopy(data)

if isinstance(datax, (list, tuple)):
list2 = list()
for e in datax:
list2.append(fix_dict(e))
# end if
return list2
if isinstance(datax, dict):
# end if
for key, value in datax.items():
value = fix_dict(value)
old_key = key
if "." in key:
key = old_key.replace(".", "_")
#if key not in datax:
datax[key] = value
# else:
# error_msg = "Dict key {key} containing a \".\" was ignored, as {replacement} already exists".format(
# key=old_key, replacement=key)
# # if force:
# import warnings
# warnings.warn(error_msg, category=RuntimeWarning)
# # else:
# # raise ValueError(error_msg)
# # end if
# end if
del datax[old_key]
# end if
datax[key] = value
# end for
return datax
# end if
return datax
# end def
3 changes: 3 additions & 0 deletions requirements.txt
@@ -0,0 +1,3 @@
quantaxis
quantaxis_pubsub
quantaxis_otgbroker
83 changes: 83 additions & 0 deletions setup.py
@@ -0,0 +1,83 @@

import codecs
import io
import os
import re
import sys
import webbrowser
import platform
import configparser
try:
from setuptools import setup
except:
from distutils.core import setup
"""
"""

if sys.version_info.major != 3 or sys.version_info.minor not in [4, 5, 6, 7, 8]:
print('wrong version, should be 3.4/3.5/3.6/3.7/3.8 version')
sys.exit()

with io.open('QARMC/__init__.py', 'rt', encoding='utf8') as f:
context = f.read()
VERSION = re.search(r'__version__ = \'(.*?)\'', context).group(1)
AUTHOR = re.search(r'__author__ = \'(.*?)\'', context).group(1)



def read(fname):

return codecs.open(os.path.join(os.path.dirname(__file__), fname)).read()


NAME = "qarealtime_collector"
"""
"""
PACKAGES = ["QARMC", "QARMC.collectors"]
"""
"""

DESCRIPTION = "QARMC: QUANTAXIS REALTIME MARKETDATA COLLECTORS"



KEYWORDS = ["quantaxis", "quant", "finance", "Backtest", 'Framework']
"""
"""

AUTHOR_EMAIL = "yutiansut@qq.com"

URL = "https://github.com/yutiansut/QAREALTIMEMARKETCOLLECTOR"


LICENSE = "MIT"




setup(
name=NAME,
version=VERSION,
description=DESCRIPTION,
long_description=DESCRIPTION,
classifiers=[
'License :: OSI Approved :: MIT License',
'Programming Language :: Python',
'Intended Audience :: Developers',
'Operating System :: OS Independent',
],
install_requires=['quantaxis','quantaxis_pubsub'],
entry_points={
'console_scripts': [
]
},
# install_requires=requirements,
keywords=KEYWORDS,
author=AUTHOR,
author_email=AUTHOR_EMAIL,
url=URL,
license=LICENSE,
packages=PACKAGES,
include_package_data=True,
zip_safe=True
)

0 comments on commit 4376a2b

Please sign in to comment.