In [1]:
import traceback
import socket
from opendatatools import futures
from datetime import datetime, timedelta
from typing import Optional, Sequence, List
from rpc.client import RpcClient
from rpc.utility import INTERVAL_ADJUSTMENT_MAP
from rpc.utility import get_duration, extract_vt_symbol, to_rq_symbol, handle_df, load_json


def get_server_setting():
    setting = load_json("setting.json")
    return setting


def init_client(host: str, port: int, authkey: bytes):
    client = RpcClient(host, port, authkey)
    client.connect()
    return client


def get_update_symbol(client: RpcClient) -> List:
    symbols = client.get_update_symbol()
    print("待更新的合约列表获取成功：")
    print(symbols)
    return symbols


def query_by_symbol(vt_symbol: str, source_interval: str) -> dict:
    # symbol convert rules of opendatatools is same with rqdata
    # opendatatools can only fetch 30 days data recently, so it dosen't need to specified start and end date
    symbol, exchange = extract_vt_symbol(vt_symbol)
    rq_symbol = to_rq_symbol(symbol, exchange)

    df, msg = futures.get_kline(source_interval, rq_symbol)
    df['datetime'] = df['datetime'].map(
        lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S'))
    df.set_index('datetime', inplace=True, drop=True)
    df = df[::-1].copy()

    df = handle_df(df, source_interval)
#     return df
    return df.to_dict(orient="records")


def save_all_data(client: RpcClient, source_interval: str, symbols: Optional[Sequence[str]] = None):
    if symbols is None:
        symbols = get_update_symbol(client)
    for vt_symbol in symbols:
        data_dict = query_by_symbol(vt_symbol, source_interval)
        client.save_to_database(data_dict, vt_symbol, source_interval)
        print(f"{vt_symbol}合约数据保存成功")
        

def delete_all_bar(client: RpcClient, source_interval: str, symbols: Optional[Sequence[str]] = None):
    # bar number is different. so if switch data source, it need delete old data.
    if symbols is None:
        symbols = get_update_symbol(client)
    for vt_symbol in symbols:
        symbol, exchange = extract_vt_symbol(vt_symbol)
        client.clean_data_by_symbol(symbol)
#         count = client.delete_bar_data(symbol, exchange.value, source_interval)
        print(f"{vt_symbol}合约数据删除成功")


setting = get_server_setting()
host_home = '192.168.0.107' if setting['is_at_home'] else setting['host_home']
host_tencent = setting['host_cloud']
port = setting['port']
authkey = setting['authkey'].encode('ascii')
source_interval = "60m"

In [2]:
def update_to_2server():
    client_home, client_tencent = None, None
    client_home = init_client(host_home, port, authkey)
    if client_home:
        symbols = get_update_symbol(client_home)
#         symbols = ['ni2105.SHFE']
        save_all_data(client_home, source_interval, symbols)
        client_home.close()
        
    client_tencent = init_client(host_tencent, port, authkey)
    if client_tencent:
        symbols = get_update_symbol(client_tencent)
#         symbols = ['ni2105.SHFE']
        save_all_data(client_tencent, source_interval, symbols)
        client_tencent.close()

    return client_home, client_tencent
    
client_h, client_t = update_to_2server()

连接成功
待更新的合约列表获取成功：
['sp2105.SHFE', 'AP105.CZCE', 'jd2105.DCE', 'pp2105.DCE', 'a2105.DCE', 'bu2106.SHFE', 'TA105.CZCE', 'ru2105.SHFE', 'MA105.CZCE', 'p2105.DCE', 'CF105.CZCE', 'ZC105.CZCE', 'cu2105.SHFE', 'rb2105.SHFE', 'SR105.CZCE', 'ag2106.SHFE', 'SM105.CZCE']
sp2105.SHFE合约数据保存成功
AP105.CZCE合约数据保存成功
jd2105.DCE合约数据保存成功
pp2105.DCE合约数据保存成功
a2105.DCE合约数据保存成功
bu2106.SHFE合约数据保存成功
TA105.CZCE合约数据保存成功
ru2105.SHFE合约数据保存成功
MA105.CZCE合约数据保存成功
p2105.DCE合约数据保存成功
CF105.CZCE合约数据保存成功
ZC105.CZCE合约数据保存成功
cu2105.SHFE合约数据保存成功
rb2105.SHFE合约数据保存成功
SR105.CZCE合约数据保存成功
ag2106.SHFE合约数据保存成功
SM105.CZCE合约数据保存成功
连接关闭
连接成功
待更新的合约列表获取成功：
['rb2105.SHFE', 'SM105.CZCE', 'p2105.DCE', 'pp2105.DCE', 'MA105.CZCE', 'RM105.CZCE', 'bu2106.SHFE', 'jd2105.DCE', 'ZC105.CZCE']
rb2105.SHFE合约数据保存成功
SM105.CZCE合约数据保存成功
p2105.DCE合约数据保存成功
pp2105.DCE合约数据保存成功
MA105.CZCE合约数据保存成功
RM105.CZCE合约数据保存成功
bu2106.SHFE合约数据保存成功
jd2105.DCE合约数据保存成功
ZC105.CZCE合约数据保存成功
连接关闭


In [None]:
def delete_server_bar():
    client_home, client_tencent = None, None
    
    client_home = init_client(host_home, port, authkey)
    if client_home:
#         symbols = get_update_symbol(client_home)
        symbols = ['ni2105.SHFE', 'zn2105.SHFE']
        delete_all_bar(client_home, source_interval, symbols)
        client_home.close()
    
    client_tencent = init_client(host_tencent, port, authkey)
    if client_tencent:
#         symbols = get_update_symbol(client_tencent)
        symbols = ['ni2105.SHFE', 'zn2105.SHFE']
        delete_all_bar(client_tencent, source_interval, symbols)
        client_tencent.close()
    return client_home, client_tencent

delete_server_bar()

In [None]:
def test_server():
    host_home = setting['host_home']
    host_home = '192.168.0.107'
    host_tencent = setting['host_cloud']
    port = setting['port']
    authkey = setting['authkey'].encode('ascii')
#     source_interval = "60m"
    
    client_home, client_tencent = None, None
    
    client_home = init_client(host_home, port, authkey)
    if client_home:
        symbols = get_update_symbol(client_home)
#         symbols = ['ni2005.SHFE']
#         save_all_data(client_home, source_interval, symbols)
        client_home.close()
    
    client_tencent = init_client(host_tencent, port, authkey)
    if client_tencent:
        symbols = get_update_symbol(client_tencent)
#         save_all_data(client_tencent, source_interval, symbols)
        client_tencent.close()

    return client_home, client_tencent
    
client_h, client_t = test_server()

In [None]:
query_by_symbol('rb2005.SHFE', '60m')