# vnpy如何使用脚本（非图形界面操作）的批量下载数据

### symbol 代码对应关系

* 所有支持的交易所： vnpy.trader.constant.Exchange 如SSE = "SSE"             # Shanghai Stock Exchange
* 不同的gateway支持的Exchange也是不一样的。 


### 查询所有合约

* vnpy.trader.widget 中ContractManager 为下载界面
* 其中show_contracts函数可以获取所有合约 （获取合约前需要连接XTP/CTP，因为数据是相对应gateway 返回）
  * 调用trader的MainEngine.get_all_contracts(self) -> List[ContractData]:
  * MainEngine.process_contract_event函数通过事件方式添加合约数据到self.contracts
  * gateway.py 一般在初始化或者连接过程会query_contract(),通过事件方式添加到MainEngine的self.contracts中。
  <!-- * 该事件是哪里发布的？可能是gateway中注册了一个EVENT_TIMER，每秒触发一次。 -->
* 配置文件路径 ~/.vntrader/connect_ctp.json
* 当连接多个gateway时，合约是查询多个gateway吗？

### 什么是XTP/CTP gateway？
* gateway 初始化时会传入EventEngine参数，从而实现像行情订阅? event_engine 哪里使用了？
* gateway 是如何从服务拉数据的？没看到多线程的启动


## 存储
* database: BaseDatabase = get_database() # 获取指定数据库，配置文件也是 ~/.vntrader/vt_setting.json
* database.save_bar_data(data)

## 定时更新


In [1]:
# 脚本加载¶
# 在脚本中加载所需的包和数据结构¶
from datetime import datetime
import imp
import os
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.datafeed import get_datafeed
from vnpy.trader.object import BarData, TickData, ContractData, HistoryRequest
from vnpy.trader.utility import load_json

from vnpy_scripttrader import init_cli_trading
from vnpy_ctp import CtpGateway
from vnpy_xtp import XtpGateway
from vnpy_rqdata.rqdata_gateway import RqdataGateway
from vnpy.trader.constant import Product

import pandas as pd

from typing import List

# 获取数据服务实例
pd.set_option('display.width', 800)
datafeed = get_datafeed()
print(datafeed)

  import imp


<vnpy_rqdata.rqdata_datafeed.RqdataDatafeed object at 0x0000028A8415AEC0>


In [None]:
# 获取k线级别的历史数据¶
req = HistoryRequest(
    # 合约代码（示例cu888为米筐连续合约代码，仅用于示范，具体合约代码请根据需求查询数据服务提供商）
    symbol="cu888",
    # 合约所在交易所
    exchange=Exchange.SHFE,
    # 历史数据开始时间
    start=datetime(2019, 1, 1),
    # 历史数据结束时间
    end=datetime(2021, 1, 20),
    # 数据时间粒度，默认可选分钟级、小时级和日级，具体选择需要结合该数据服务的权限和需求自行选择
    interval=Interval.DAILY # 数据时间粒度换为tick级别 interval=Interval.TIC， 数据量较大。
)

# 获取k线历史数据 example: [BarData(gateway_name='RQ', extra=None, symbol='cu888', exchange=<Exchange.SHFE: 'SHFE'>, datetime=datetime.datetime(2019, 1, 2, 0, 0, tzinfo=zoneinfo.ZoneInfo(key='Asia/Shanghai')), interval=<Interval.DAILY: 'd'>, volume=112020.0, turnover=26829419100.0, open_interest=142204.0, open_price=44760.0, high_price=44950.0, low_price=44450.0, close_price=44460.0)]
data: List[BarData] = datafeed.query_bar_history(req) #

# 米筐和vnpy对应关系
* SZSE : XSHE, SSE: XSHG

# A股票代码开头含义
（1）600或601开头的表示该股票为上海证券交易所上市的沪市A股；沪市A股的代码是以600、601或603, 605打头。
股票代码688开头的是科创板股票, 689
（2）900开头的表示该股票为上海证券交易所上市的沪市B股；

（3）000开头的表示该股票为深圳证券交易所上市的深市A股；

（4）200开头的表示该股票为深圳证券交易所上市的深市B股；
     001 是
（5）002开头的表示该股票为深圳证券交易所上市的中小板股票；

（6）300开头的表示该股票为深圳证券交易所上市的创业板股票；
     301

（7）730开头的股票代码表示该股票为沪市新股申购，深市新股申购的代码与深市股票买卖代码相同。

（8）股票配股代码沪市以700开头，深市以080开头。

6字开头的股票是什么股票?

　　6开头的股票是上海交易所股票，需要开通上海交易所账户才能交易。其中600和601开头的股票都属于上海证券交易所上市的股票，也就是平常所说的主板股票。

　　601开头的股票，有三个特点，具体如下：

　　1、股票属于次新股，也就是新发行的股票。

　　2、股票属于大盘蓝筹，流通市值和总市值比较大。

　　3、股票国有成份多，一般都是“中”字头的，例如中国人寿、中国平安、中国太保、中国银行等;或者是国企控股背景的，例如建设银行。

　　沪深两市代码的区别如下：

　　1、深圳股票代码“002”开头的是中小板，“000”开头的是主板，“3”开头的是创业板;上海股票代码“6”开头的，全部的上海股票都为主板。

　　2、上海B股代码是以900开头，新股申购的代码是以730开头，配股代码以700开头。

　　深圳B股代码是以200开头，新股申购的代码是以00开头，配股代码以080开头。

　　3、沪市权证股票是580打头，深市是031打头。

In [2]:
# 获取k线级别的历史数据¶
req = HistoryRequest(
    # 合约代码（示例cu888为米筐连续合约代码，仅用于示范，具体合约代码请根据需求查询数据服务提供商）
    symbol="002478",
    # 合约所在交易所
    # exchange=Exchange.SSE,
    exchange=Exchange.SZSE,
    # 历史数据开始时间
    start=datetime(2010, 1, 1),
    # 历史数据结束时间
    end=datetime(2021, 1, 20),
    # 数据时间粒度，默认可选分钟级、小时级和日级，具体选择需要结合该数据服务的权限和需求自行选择
    interval=Interval.DAILY # 数据时间粒度换为tick级别 interval=Interval.TIC， 数据量较大。
)

# 获取k线历史数据 example: [BarData(gateway_name='RQ', extra=None, symbol='cu888', exchange=<Exchange.SHFE: 'SHFE'>, datetime=datetime.datetime(2019, 1, 2, 0, 0, tzinfo=zoneinfo.ZoneInfo(key='Asia/Shanghai')), interval=<Interval.DAILY: 'd'>, volume=112020.0, turnover=26829419100.0, open_interest=142204.0, open_price=44760.0, high_price=44950.0, low_price=44450.0, close_price=44460.0)]
data: List[BarData] = datafeed.query_bar_history(req) #



In [3]:
len(data)

2512

# 下载数据

### vnpy 下载数据
* vnpy.trader.datafeed.py 中get_datafeed() 获取BaseDatafeed，如rqdata
* vnpy.trader.setting 中指定具体datafeed 设置，配置文件路径 ~/.vntrader/vt_setting.json
* 更多vnpy 支持的数据服务： https://www.vnpy.com/docs/cn/datafeed.html


In [3]:
from vnpy.trader.database import get_database, BaseDatabase

database: BaseDatabase = get_database() # 获取指定数据库，配置文件也是 ~/.vntrader/vt_setting.json
# database.save_bar_data(data)

# 通过CTP接口查询股票代码
setting = load_json("connect_xtp.json")
engine = init_cli_trading([XtpGateway])  # vnpy_scripttrader.engine.ScriptEngine, not vnpy_ctastrategy\vnpy_ctastrategy\engine.py
engine.connect_gateway(setting, "XTP")
# 调用query_contract 才能查询合约成功
# engine.main_engine.get_gateway('XTP').md_api.query_contract()


2023-01-09 16:16:22.519351	行情服务器登录成功
2023-01-09 16:16:22.660452	交易服务器登录成功, 会话编号：270532608


2023-01-09 16:16:23.402780	SSE合约信息查询成功
2023-01-09 16:16:24.166176	SZSE合约信息查询成功


In [4]:
# 查询所有合约
data = engine.get_all_contracts(use_df=True)

In [5]:
print(data["product"].unique())
data_eq = data[data['product']==Product.EQUITY]
data_bond = data[data['product']==Product.BOND]
data_fund = data[data['product']==Product.FUND]
# data['exchange'].unique()

print(data_fund.shape, data_bond.shape, data_eq.shape)

print(data[data['symbol']=='000001'])

[<Product.BOND: '债券'> <Product.FUND: '基金'> <Product.EQUITY: '股票'>]
(4861, 19) (25080, 19) (4902, 19)
      gateway_name  symbol       exchange  name         product  size  \
18993          XTP  000001  Exchange.SZSE  平安银行  Product.EQUITY     1   

       pricetick  min_volume  stop_supported  net_position  history_data  \
18993       0.01         100           False         False         False   

       option_strike option_underlying option_type option_listed  \
18993              0                          None          None   

      option_expiry option_portfolio option_index    vt_symbol  
18993          None                                000001.SZSE  


In [19]:
# 数据批量下载存储

def make_request(symbol: str, exchange: Exchange):
    # 获取k线级别的历史数据¶
    req = HistoryRequest(
        # 合约代码（示例cu888为米筐连续合约代码，仅用于示范，具体合约代码请根据需求查询数据服务提供商）
        symbol=symbol,
        # 合约所在交易所
        # exchange=Exchange.SSE,
        exchange=exchange,
        # 历史数据开始时间
        start=datetime(2005, 1, 1),
        # 历史数据结束时间
        end=datetime(2023, 1, 8),
        # 数据时间粒度，默认可选分钟级、小时级和日级，具体选择需要结合该数据服务的权限和需求自行选择
        interval=Interval.DAILY # 数据时间粒度换为tick级别 interval=Interval.TIC， 数据量较大。
    )
    return req

# symbol01 = data[data['symbol']=='000001']
for idx, row in data_eq.iterrows():
    # print(row['symbol'], row['exchange'].value)
    if database.load_bar_data(row['symbol'], row['exchange'], start=datetime(2005, 1, 1), end=datetime(2023, 1, 8), interval=Interval.DAILY ):
        continue
    req = make_request(row['symbol'], row['exchange'])
    data: List[BarData] = datafeed.query_bar_history(req) #
    database.save_bar_data(data)

In [10]:
output_dir = "d:/dataset/quant/vnpy/"
pd.to_pickle(data, os.path.join(output_dir, "all_contract.pkl"))
data.to_csv(os.path.join(output_dir, "all_contract.csv"))

pd.to_pickle(data_eq, os.path.join(output_dir, "stock_contract.pkl"))
data_eq.to_csv(os.path.join(output_dir, "stock_contract.csv"))

In [14]:
data_eq[data_eq["symbol"].str.match("^(60|000)")]

Unnamed: 0,gateway_name,symbol,exchange,name,product,size,pricetick,min_volume,stop_supported,net_position,history_data,option_strike,option_underlying,option_type,option_listed,option_expiry,option_portfolio,option_index,vt_symbol
16304,XTP,600000,Exchange.SSE,浦发银行,Product.EQUITY,1,0.01,100,False,False,False,0,,,,,,,600000.SSE
16305,XTP,600004,Exchange.SSE,白云机场,Product.EQUITY,1,0.01,100,False,False,False,0,,,,,,,600004.SSE
16306,XTP,600006,Exchange.SSE,东风汽车,Product.EQUITY,1,0.01,100,False,False,False,0,,,,,,,600006.SSE
16307,XTP,600007,Exchange.SSE,中国国贸,Product.EQUITY,1,0.01,100,False,False,False,0,,,,,,,600007.SSE
16308,XTP,600008,Exchange.SSE,首创环保,Product.EQUITY,1,0.01,100,False,False,False,0,,,,,,,600008.SSE
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
19431,XTP,000995,Exchange.SZSE,*ST皇台,Product.EQUITY,1,0.01,100,False,False,False,0,,,,,,,000995.SZSE
19432,XTP,000996,Exchange.SZSE,中国中期,Product.EQUITY,1,0.01,100,False,False,False,0,,,,,,,000996.SZSE
19433,XTP,000997,Exchange.SZSE,新 大 陆,Product.EQUITY,1,0.01,100,False,False,False,0,,,,,,,000997.SZSE
19434,XTP,000998,Exchange.SZSE,隆平高科,Product.EQUITY,1,0.01,100,False,False,False,0,,,,,,,000998.SZSE


2023-01-09 23:53:59.673399	行情服务器连接断开, 原因10200006
2023-01-09 23:54:09.678762	行情服务器登录失败，原因：[XTP:1]connect server failed.
2023-01-09 23:54:59.964328	交易服务器连接断开, 原因10210006
2023-01-09 23:55:09.979603	交易服务器登录失败，原因：[XTP:1]connect server failed.


### qstock 接口下载数据，并存到vnpy可使用sqllite，
* 调用qstock api 下载除权后的数据 df=qs.get_data('603969', start='2015-05-25', end='2015-06-05')
* vnpy 存储：由于vnpy数据结构的限制，我们无法动态的复权，这里介绍把复权好的存入新的db
  * from  

## 如何复权？

In [None]:
from vnpy.trader.database import get_database, BaseDatabase
from vnpy.trader.setting import SETTINGS
from vnpy.trader.object import BarData, Interval
import pandas as pd

# from tqdm import tqdm
from tqdm.notebook import tqdm, trange
import qstock as qs

pd.set_option("display.width", 800)

# SETTINGS["database.database"] = "stock_post_adjust.db" # 除权数据换个数据库

database: BaseDatabase = get_database() # 获取指定数据库，配置文件也是 ~/.vntrader/vt_setting.json

# print(SETTINGS["database.database"], )
# database.save_bar_data(data)

# 使用qstock api 构造vnpy 中BarData
# example: [BarData(gateway_name='RQ', extra=None, symbol='cu888', exchange=<Exchange.SHFE: 'SHFE'>, datetime=datetime.datetime(2019, 1, 2, 0, 0, tzinfo=zoneinfo.ZoneInfo(key='Asia/Shanghai')), interval=<Interval.DAILY: 'd'>, volume=112020.0, turnover=26829419100.0, open_interest=142204.0, open_price=44760.0, high_price=44950.0, low_price=44450.0, close_price=44460.0)]

stock_file = "d:/dataset/quant/vnpy/stock_contract.pkl"
stock_contract: pd.DataFrame = pd.read_pickle(stock_file)

major_stock_contract = stock_contract[stock_contract['symbol'].str.match("^(600|601|603|605|000)")]


def df_to_barData(df: pd.DataFrame, symbol, exchange) -> list[BarData]:
    bar_list: list[BarData] = []
    for idx, row in df.iterrows():
        bar_data: BarData = BarData('QS', symbol=symbol, exchange=exchange, open_interest=0,
                                    interval=Interval.DAILY,
                                    open_price=row['open'],
                                    close_price=row['close'],
                                    high_price=row['high'],
                                    low_price=row['low'],
                                    turnover=row['turnover'],
                                    volume=row['volume'],
                                    datetime=idx.to_pydatetime(),
                                    )
        bar_list.append(bar_data)
    return bar_list

except_stocks = []

# pbar = tqdm(total=len(major_stock_contract), ncols = 800)

for idx, row in tqdm(major_stock_contract.iterrows(), ncols=80):

    vt_symbol = row['vt_symbol']
    symbol = row['symbol']
    exchange = row['exchange']
    # fqt:复权类型，0：不复权，1：前复权；2：后复权，默认前复权
    try:
        df = qs.get_data(symbol, start='2005011', end='20230111', fqt=2) # 格式和vnpy 的RQData 不一样
        data = df_to_barData(df, symbol, exchange)
        database.save_bar_data(data)
    except:
        except_stocks.append(symbol)
        print(f"{symbol} exception")
    

In [None]:
# 超时的重新运行一次未完成的

for idx, row in tqdm(major_stock_contract.iterrows(), ncols=80):
    vt_symbol = row['vt_symbol']
    symbol = row['symbol']
    exchange = row['exchange']
    if symbol not in except_stocks:
        continue
    # fqt:复权类型，0：不复权，1：前复权；2：后复权，默认前复权
    try:
        df = qs.get_data(symbol, start='2005011', end='20230111', fqt=2) # 格式和vnpy 的RQData 不一样
        data = df_to_barData(df, symbol, exchange)
        database.save_bar_data(data)
        except_stocks.remove(symbol)
    except:
        except_stocks.append(symbol)
        print(f"{symbol} exception")

print(f"not downloaded num: {len(except_stocks)}")

In [6]:
except_stocks

[]

# vnpy 读取指定Stock、指定时间段数据
* 跟踪 engine.load_data()就可以知道方法
 
## qstock 读取

In [4]:
from matplotlib.dates import DAILY
from vnpy.trader.database import get_database, BaseDatabase
from vnpy.trader.setting import SETTINGS
from vnpy.trader.object import BarData, Interval, Exchange
import pandas as pd
from datetime import datetime
from typing import List

# from tqdm import tqdm
from tqdm.notebook import tqdm, trange
import qstock as qs

pd.set_option("display.width", 800)
start_day = datetime(2015, 6, 1)
end_day = datetime(2015, 6, 5)
symbol = '000558'
exchange = Exchange.SZSE

# SETTINGS["database.database"] = "stock_post_adjust.db" # 除权数据换个数据库

database: BaseDatabase = get_database() # 获取指定数据库，配置文件也是 ~/.vntrader/vt_setting.json

datas: List[BarData] = database.load_bar_data(symbol=symbol, exchange=exchange, interval=Interval.DAILY, start=start_day, end=end_day)
print(datas)


[BarData(gateway_name='DB', extra=None, symbol='000558', exchange=<Exchange.SZSE: 'SZSE'>, datetime=datetime.datetime(2015, 6, 1, 0, 0, tzinfo=zoneinfo.ZoneInfo(key='Asia/Shanghai')), interval=<Interval.DAILY: 'd'>, volume=120940.0, turnover=347111568.0, open_interest=0.0, open_price=139.17, high_price=147.68, low_price=135.13, close_price=147.68), BarData(gateway_name='DB', extra=None, symbol='000558', exchange=<Exchange.SZSE: 'SZSE'>, datetime=datetime.datetime(2015, 6, 2, 0, 0, tzinfo=zoneinfo.ZoneInfo(key='Asia/Shanghai')), interval=<Interval.DAILY: 'd'>, volume=81685.0, turnover=263880741.0, open_interest=0.0, open_price=155.15, high_price=162.46, low_price=155.1, close_price=162.46), BarData(gateway_name='DB', extra=None, symbol='000558', exchange=<Exchange.SZSE: 'SZSE'>, datetime=datetime.datetime(2015, 6, 3, 0, 0, tzinfo=zoneinfo.ZoneInfo(key='Asia/Shanghai')), interval=<Interval.DAILY: 'd'>, volume=112181.0, turnover=401353552.0, open_interest=0.0, open_price=178.74, high_pric

In [18]:
# 最简单办法直接用qstock的api吧
import qstock as qs

# fqt:复权类型，0：不复权，1：前复权；2：后复权，默认前复权
df=qs.get_data('603611',start='2015-02-01',end='2015-02-11', fqt=2)
df['up'] = df['close'].pct_change()
df

100%|██████████| 1/1 [00:00<00:00,  3.30it/s]


Unnamed: 0_level_0,name,code,open,high,low,close,volume,turnover,turnover_rate,up
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
2015-02-02,诺力股份,603611,35.21,35.21,35.21,35.21,338,1190098.0,0.17,
2015-02-03,诺力股份,603611,38.73,38.73,38.73,38.73,614,2378022.0,0.31,0.099972
2015-02-04,诺力股份,603611,42.6,42.6,42.6,42.6,885,3770057.0,0.44,0.099923
2015-02-05,诺力股份,603611,46.86,46.86,46.86,46.86,3712,17394432.0,1.86,0.1
2015-02-06,诺力股份,603611,51.0,51.0,42.17,42.17,46188,210670677.0,23.09,-0.100085
2015-02-09,诺力股份,603611,37.95,38.3,37.95,37.95,20394,77400118.0,10.2,-0.100071
2015-02-10,诺力股份,603611,35.0,35.99,34.37,35.54,63825,223585658.0,31.91,-0.063505
2015-02-11,诺力股份,603611,35.54,36.43,35.28,35.69,38739,138639992.0,19.37,0.004221


# TODOs

## vnpy 是如何获取tick 数据推送的？
## 
