In [1]:
import datetime
import glob
import logging
import os
import pandas as pd
import time
from vnpy.trader.constant import (Exchange, Interval)
from vnpy.trader.database import database_manager
from vnpy.trader.object import (BarData)

In [2]:
day_bar_continuous_future_folder = r"D:\data\iqfeed\future\future_data_day_continuous_vnpy"

In [3]:
day_bar_data_path_list = glob.glob(os.path.join(day_bar_continuous_future_folder, "*.csv"))

In [4]:
symbol_info_frame = pd.read_csv(r"E:\project\trade_engine\iqfeed_data_processor\iqfeed_csv_to_vnpy_mongodb\futures_to_import.csv")

In [5]:
symbol_info_frame.columns

Index(['symbol_roots', 'symbol_ib_roots', 'symbol_exchange'], dtype='object')

In [6]:
vnpy_1min_data_folder = r"D:\data\iqfeed\future\future_data_1min_vnpy"

In [7]:
vnpy_1min_continuous_data_folder = r"D:\data\iqfeed\future\future_data_1min_continuous_vnpy"

Now process all symbols and generate continuous 1min data

In [72]:
for path in day_bar_data_path_list:
    csv_name = os.path.basename(path)
    ib_symbol_root = csv_name[:-4]
    day_bar_frame = pd.read_csv(path)
    print("processing symbol: {}".format(ib_symbol_root))
    current_symbol = ""
    date_series = None
    vnpy_1min_frame = None

    vnpy_1min_continuous_frame = pd.DataFrame()

    counter = 0
    sub_frame_list = []
    for index, row in day_bar_frame.iterrows():
        counter += 1
        ib_symbol = row.loc["symbol"]
        datetime_format = '%Y-%m-%d'
        current_date = pd.to_datetime(row.loc["date"], format=datetime_format).date()
        multiply_ratio = row.loc["multiply_ratio"]
        if current_symbol != ib_symbol:
            current_symbol = ib_symbol
            vnpy_1min_frame = pd.read_csv(os.path.join(vnpy_1min_data_folder, ib_symbol + ".csv"))
            datetime_format = '%Y%m%d %H:%M:%S'
            date_series = pd.to_datetime(vnpy_1min_frame['datetime'], format=datetime_format).apply(lambda x: x.date())
        # filter to find out all data in the date defined        
        current_date_indices = date_series[date_series == current_date].index
        current_1min_frame = vnpy_1min_frame.loc[current_date_indices, :]
        # change symbol to be continuous future symbol
        current_1min_frame.loc[:, "symbol"] = ib_symbol_root
        current_1min_frame.loc[:, "actual_symbol"] = ib_symbol
        current_1min_frame.loc[:, "multiply_ratio"] = multiply_ratio
        # devided the prices by the ratio defined when combining continuous future
        for column in ['open', 'high', 'low', 'close']:
            current_1min_frame.loc[:, column] = current_1min_frame.loc[:, column] / multiply_ratio
        # Make sure all data columns are floats
        float_columns = ['open', 'high', 'low', 'close', 'volume']
        for col in float_columns:
            current_1min_frame.loc[:, col] = current_1min_frame[col].astype('float')
        sub_frame_list.append(current_1min_frame)
        if counter % 200 == 0:
            print("process for current symbol: {}/{}".format(counter, len(day_bar_frame)))
        
    vnpy_1min_continuous_frame = pd.concat(sub_frame_list)
    vnpy_1min_continuous_frame.to_csv(os.path.join(vnpy_1min_continuous_data_folder, ib_symbol_root + ".csv"), index=False)

processing symbol: CC
process for current symbol: 200/2973
process for current symbol: 400/2973
process for current symbol: 600/2973
process for current symbol: 800/2973
process for current symbol: 1000/2973
process for current symbol: 1200/2973
process for current symbol: 1400/2973
process for current symbol: 1600/2973
process for current symbol: 1800/2973
process for current symbol: 2000/2973
process for current symbol: 2200/2973
process for current symbol: 2400/2973
process for current symbol: 2600/2973
process for current symbol: 2800/2973
processing symbol: CT
process for current symbol: 200/3507
process for current symbol: 400/3507
process for current symbol: 600/3507
process for current symbol: 800/3507
process for current symbol: 1000/3507
process for current symbol: 1200/3507
process for current symbol: 1400/3507
process for current symbol: 1600/3507
process for current symbol: 1800/3507
process for current symbol: 2000/3507
process for current symbol: 2200/3507
process for cu

process for current symbol: 3000/3277
process for current symbol: 3200/3277
processing symbol: RTY
process for current symbol: 200/3566
process for current symbol: 400/3566
process for current symbol: 600/3566
process for current symbol: 800/3566
process for current symbol: 1000/3566
process for current symbol: 1200/3566
process for current symbol: 1400/3566
process for current symbol: 1600/3566
process for current symbol: 1800/3566
process for current symbol: 2000/3566
process for current symbol: 2200/3566
process for current symbol: 2400/3566
process for current symbol: 2600/3566
process for current symbol: 2800/3566
process for current symbol: 3000/3566
process for current symbol: 3200/3566
process for current symbol: 3400/3566
processing symbol: SB
process for current symbol: 200/2980
process for current symbol: 400/2980
process for current symbol: 600/2980
process for current symbol: 800/2980
process for current symbol: 1000/2980
process for current symbol: 1200/2980
process for c

Save continuous future data to mongodb of vnpy

In [9]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("mongo_saver")
currentDT = datetime.datetime.now()
handler = logging.FileHandler("vnpy_data_to_mongo" + currentDT.strftime("%Y_%m_%d_%H_%M_%S") + ".log")
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

future_data_path_list = glob.glob(os.path.join(vnpy_1min_continuous_data_folder, "*.csv"))
for path in future_data_path_list:
    csv_name = os.path.basename(path)
    ib_symbol_root = csv_name[:-4]
    symbol = ib_symbol_root
    exchange = symbol_info_frame[symbol_info_frame["symbol_ib_roots"]==symbol].loc[:, "symbol_exchange"].iloc[0]
    try:
        start_time = time.time()
        logger.info("saving {} starts".format(symbol))
        imported_data = pd.read_csv(path)
        imported_data.loc[:, "exchange"] = Exchange(exchange)
        imported_data.loc[:, "interval"] = Interval.MINUTE
        datetime_format = '%Y%m%d %H:%M:%S'
        imported_data['datetime'] = pd.to_datetime(imported_data['datetime'], format=datetime_format)


        float_columns = ['open', 'high', 'low', 'close', 'volume', 'open_interest']
        for col in float_columns:
            imported_data.loc[:, col] = imported_data[col].astype('float').round(4)

        def move_df_to_mongodb(import_data: pd.DataFrame, collection_name: str):
            bars = []
            start = None
            count = 0
            bar = None

            for row in import_data.itertuples():

                bar = BarData(

                    symbol=row.symbol,
                    exchange=row.exchange,
                    datetime=row.datetime,
                    interval=row.interval,
                    volume=row.volume,
                    open_price=row.open,
                    high_price=row.high,
                    low_price=row.low,
                    close_price=row.close,
                    open_interest=row.open_interest,
                    gateway_name="DB",

                )
                bars.append(bar)

                # do some statistics
                count += 1
                if not start:
                    start = bar.datetime
            end = bar.datetime

            # insert into database
            database_manager.save_bar_data(bars, collection_name)
            print(f"Insert Bar: {count} from {start} - {end}")

        move_df_to_mongodb(imported_data, symbol)
        end_time = time.time()
        logger.info("time used is {}".format(end_time - start_time))
        logger.info("saving {} successful".format(symbol))
    except (ValueError, Exception):
        logger.info("saving {} failed".format(symbol))

2020-05-25 17:38:24,188 - mongo_saver - INFO - saving CC starts
2020-05-25 18:18:34,311 - mongo_saver - INFO - time used is 2410.122610807419
2020-05-25 18:18:34,314 - mongo_saver - INFO - saving CC successful
2020-05-25 18:18:34,317 - mongo_saver - INFO - saving CT starts


Insert Bar: 1225809 from 2008-04-22 02:31:00 - 2020-02-07 13:30:00


2020-05-25 18:56:41,726 - mongo_saver - INFO - time used is 2287.409572124481
2020-05-25 18:56:41,728 - mongo_saver - INFO - saving CT successful


Insert Bar: 1257541 from 2008-04-16 02:31:00 - 2020-02-12 23:51:00
