In [30]:
# Imports

from ibpythonic import ibConnection, message
from ibapi.contract import Contract

from time import sleep, strftime
from datetime import datetime
import pickle
import os
from multiprocessing import Pool

import workers

import pandas as pd
import numpy as np

from tinydb import TinyDB, Query

from tqdm import tqdm_notebook as tqdm

In [31]:
# Create contract object

def makeStkContract(contractTuple):
    newContract = Contract()
    newContract.symbol = contractTuple[0]
    newContract.secType = contractTuple[1]
    newContract.exchange = contractTuple[2]
    newContract.currency = contractTuple[3]
    return newContract

In [32]:
# Control variables declaration

bar_data_list = []
busy = False
active_contract_id = 0

In [33]:
# Historical data handler

def my_hist_data_handler(msg):
    global bar_data_list

    # If message does not belong to current contract, leave this function
    if int(msg.reqId) != active_contract_id:
        print('Error. Msg does not belong to active contract')
        return
    
    bar_data_list.append(msg.bar)


In [39]:
# End of historical data handler

def my_hist_data_end_handler(msg):
    print('my_hist_data_end_handler: ' + str(msg))
    global bar_data_list
    global busy
    global active_contract_id
    

    ####### Write data to pickle file
    if False:
        # Merge and write received data to pickle file
        contracts_db = TinyDB('contracts_db.json')
        my_query = Query()
        result = contracts_db.search(my_query.Id == active_contract_id)
        current_contract = result[0]
        sy = current_contract['Symbol']
        ex = current_contract['Exchange']

        # textfile = open('data/' + sy + '_' + ex + '.txt', 'w+')
        # textfile.write("\n".join(bar_data_list))
        # textfile.close()

        with open('data/pickles/' + sy + '_' + ex, 'wb') as fp:
            pickle.dump(bar_data_list, fp)

        # write finished info to contracts database
        timestamp_now = datetime.now()
        string_now = timestamp_now.strftime('%Y-%m-%d')
        contracts_db.update({'Status': string_now}, my_query.Id == active_contract_id)
        contracts_db.close()

        bar_data_list = []
        active_contract_id = []
        busy = False    
    

    ####### Append data to existing or new CSV file
    if True:
        # If message does not belong to current contract, leave this function
        if int(msg.reqId) != active_contract_id:
            print('Error. Msg does not belong to active contract')
            return
        
        # Merge collected data into dataframe
        df_new = pd.DataFrame(columns=('date', 'open', 'high', 'low', 'close', 'volume'))
        for bar in bar_data_list:
            # dashed_date = bar.date[:4] + '-' + bar.date[4:6] + '-' + bar.date[6:]
            row = {'date': bar.date, 'open': bar.open, 'high': bar.high, 'low': bar.low, 'close': bar.close, 'volume': bar.volume}
            df_new = df_new.append(row, ignore_index=True)
        # df_new.loc[:, 'date'] = pd.to_datetime(df_new.date, format='%Y%m%d') #'%Y%m%d  %H:%M:%S'
        df_new = df_new.set_index('date')
        
        # Check if there is already existing data
        print('active_contract_id: ' + str(active_contract_id))
        contracts_db = TinyDB('contracts_db.json')
        my_query = Query()
        result = contracts_db.search(my_query.Id == active_contract_id)
        contracts_db.close()
        print('result: ' + str(result))
        current_contract = result[0]        
        contract_status = current_contract['Status']
        contract_symbol = current_contract['Symbol']
        contract_exchange = current_contract['Exchange']
        filename = '/Users/martin/Google Drive/data/screener/' + contract_symbol + '_' + contract_exchange + '.csv'
        print('data_end: ' + contract_symbol + '_' + contract_exchange)
        
        # Append to existing file or create new file
        if False: # contract_status.startswith('data_ends:') or contract_status.startswith('2019'):
            print('data_end: if')
            df_old = pd.read_csv(filename, index_col='date')
            df_combined = df_new.combine_first(df_old)
            os.remove(filename)
            df_combined.to_csv(filename)
        else:
            print('data_end: else')
            if os.path.exists(filename):
                os.remove(filename)
            df_new.to_csv(filename)

        # write finished info to contracts database
        timestamp_now = datetime.now()
        string_now = timestamp_now.strftime('%Y-%m-%d')
        contracts_db = TinyDB('contracts_db.json')
        my_query = Query()
        contracts_db.update({'Status': 'data_ends:'+string_now}, my_query.Id == active_contract_id)
        contracts_db.close()

        bar_data_list = []
        active_contract_id = 0
        busy = False    
    
    
    ####### Write data to new CSV file
    if False:
        # Merge collected data into dataframe
        df = pd.DataFrame(columns=('date', 'open', 'high', 'low', 'close', 'volume'))
        for bar in bar_data_list:
            row = {'date': bar.date, 'open': bar.open, 'high': bar.high, 'low': bar.low, 'close': bar.close, 'volume': bar.volume}
            df = df.append(row, ignore_index=True)
        df.loc[:, 'date'] = pd.to_datetime(df.date, format='%Y%m%d') #'%Y%m%d  %H:%M:%S'
        df = df.set_index('date')

        # Write dataframe to csv file
        contracts_db = TinyDB('contracts_db.json')
        my_query = Query()
        result = symbol_db.search(my_query.Id == active_contract_id)
        contracts_db.close()

        current_contract = result[0]
        sy = current_contract['Symbol']
        ex = current_contract['Exchange']
        df.to_csv('data/' + sy + '_' + ex + '.csv') 


In [40]:
# Error handler

def my_error_handler(msg):
    print('my_error_handler: ' + str(msg))
    
    global bar_data_list
    global busy
    global active_contract_id
    
    # If message does not belong to current contract, leave this function
    if int(msg.id) != active_contract_id:
        print('Error. Msg does not belong to active contract')
        return
        
    # write msg info to contracts database
    contracts_db = TinyDB('contracts_db.json')
    my_query = Query()
    # todo: clean message
    contracts_db.update({'Status': 'error:'+msg.errorMsg}, my_query.Id == active_contract_id)
    contracts_db.close()
    
    bar_data_list = []
    active_contract_id = 0
    busy = False

In [41]:
# Data retrieval constants

endtime = strftime('%Y%m%d %H:%M:%S')
start_id = 0
end_id = 10

In [42]:
# Create connection object

con = ibConnection(port=7497)
con.register(my_hist_data_handler, message.historicalData)
con.register(my_hist_data_end_handler, message.historicalDataEnd)
con.register(my_error_handler, message.error)

True

In [43]:
# Main function

con.connect()
    
for index in tqdm(range(start_id, end_id)):
    global busy
    global active_contract_id
    
    # Get contract data from contracts db
    contracts_db = TinyDB('contracts_db.json')
    my_query = Query()
    result = contracts_db.search(my_query.Id == index)
    contracts_db.close()
    current_contract = result[0]
    print(current_contract)
    contract_status = current_contract['Status']       
    
    # Check if CSV file is already present    
    if False: # contract_status.startswith('data_ends:') or contract_status.startswith('2019'):
        if contract_status.startswith('data_ends:'):
            start_date = (contract_status.split(':'))[1]
        else:
            start_date = contract_status
        end_date = datetime.today().strftime('%Y-%m-%d')
        ndays = np.busday_count(start_date, end_date)
        ndays += 4
        req_string = str(ndays) + ' D'  
    else:
        req_string = "10 Y"
    print(req_string)

    # todo: stop on certain error status, eg no connection
    
    # Create contract and request data
    contractTuple = (current_contract['Symbol'], 'STK', current_contract['Exchange'], current_contract['Currency'])
    stkContract = makeStkContract(contractTuple)
    active_contract_id = index
    busy = True
    con.reqHistoricalData(active_contract_id,stkContract,endtime,req_string,'1 day','MIDPOINT',1,1, keepUpToDate=False, chartOptions=[])
    
    # Wait for data download and storage
    while busy is True:
        sleep(0.2)
    sleep(0.2)
    print('-------------------------')
con.disconnect()

my_error_handler: <error id=-1, errorCode=2104, errorMsg=Market data farm connection is OK:usfarm.nj>
Error. Msg does not belong to active contract
my_error_handler: <error id=-1, errorCode=2104, errorMsg=Market data farm connection is OK:eufarm>
Error. Msg does not belong to active contract
my_error_handler: <error id=-1, errorCode=2104, errorMsg=Market data farm connection is OK:cashfarm>
Error. Msg does not belong to active contract
my_error_handler: <error id=-1, errorCode=2104, errorMsg=Market data farm connection is OK:usfarm>
Error. Msg does not belong to active contract
my_error_handler: <error id=-1, errorCode=2106, errorMsg=HMDS data farm connection is OK:euhmds>
Error. Msg does not belong to active contract
my_error_handler: <error id=-1, errorCode=2106, errorMsg=HMDS data farm connection is OK:ushmds>
Error. Msg does not belong to active contract
my_error_handler: <error id=-1, errorCode=2158, errorMsg=Sec-def data farm connection is OK:secdefnj>
Error. Msg does not belong 

HBox(children=(IntProgress(value=0, max=10), HTML(value='')))

{'Id': 0, 'Symbol': 'BNQF', 'Name': 'Collateralized ETC on RICI Enhanced Gas Oil TR Index', 'Currency': 'EUR', 'Exchange': 'FWB', 'Status': 'data_ends:2019-10-05'}
10 Y
my_hist_data_end_handler: <historicalDataEnd reqId=0, start=20091005  18:44:23, end=20191005  18:44:23>
active_contract_id: 0
result: [{'Id': 0, 'Symbol': 'BNQF', 'Name': 'Collateralized ETC on RICI Enhanced Gas Oil TR Index', 'Currency': 'EUR', 'Exchange': 'FWB', 'Status': 'data_ends:2019-10-05'}]
data_end: BNQF_FWB
data_end: else
-------------------------
{'Id': 1, 'Symbol': 'CNB', 'Name': 'Lyxor Euro Corporate Bond Ex Financials UCITS ETF', 'Currency': 'EUR', 'Exchange': 'FWB', 'Status': 'data_ends:2019-10-05'}
10 Y
my_hist_data_end_handler: <historicalDataEnd reqId=1, start=20091005  18:44:23, end=20191005  18:44:23>
active_contract_id: 1
result: [{'Id': 1, 'Symbol': 'CNB', 'Name': 'Lyxor Euro Corporate Bond Ex Financials UCITS ETF', 'Currency': 'EUR', 'Exchange': 'FWB', 'Status': 'data_ends:2019-10-05'}]
data_end: 

True

In [11]:
# Qurey data from contracts database

contracts_db = TinyDB('contracts_db.json')
my_query = Query()
result = contracts_db.search(my_query.Id <= 203)
for item in result:
    print(item)
contracts_db.close()

{'Id': 0, 'Symbol': 'BNQF', 'Name': 'Collateralized ETC on RICI Enhanced Gas Oil TR Index', 'Currency': 'EUR', 'Exchange': 'FWB', 'Status': 'Market data farm connection is OK:usfarm.nj'}
{'Id': 1, 'Symbol': 'CNB', 'Name': 'Lyxor Euro Corporate Bond Ex Financials UCITS ETF', 'Currency': 'EUR', 'Exchange': 'FWB', 'Status': '2019-09-29'}
{'Id': 2, 'Symbol': 'UEF5', 'Name': 'UBS ETF - MSCI Emerging Markets Socially Responsible UCITS ETF', 'Currency': 'EUR', 'Exchange': 'FWB', 'Status': '2019-09-29'}
{'Id': 3, 'Symbol': 'GOMA', 'Name': 'BNP Paribas Easy Barclays Euro Government Inflation Linked All Maturities ETF', 'Currency': 'EUR', 'Exchange': 'FWB', 'Status': '2019-09-29'}
{'Id': 4, 'Symbol': 'X0BM', 'Name': 'Coba ETC -2 x Platinum Daily Short Index', 'Currency': 'EUR', 'Exchange': 'FWB', 'Status': '2019-09-29'}
{'Id': 5, 'Symbol': 'FLOT', 'Name': 'Lyxor Barclays Floating Rate Euro 0-7Y UCITS ETF', 'Currency': 'EUR', 'Exchange': 'FWB', 'Status': '2019-09-29'}
{'Id': 6, 'Symbol': 'X0E2', 

In [10]:
# Read all contracts data to dataframe

contracts_db = TinyDB('contracts_db.json')
data_list = str(contracts_db.all())
contracts_db.close()

data_list = data_list.replace("'", '"')
df = pd.read_json(data_list, orient='records')
df

ValueError: Unexpected character found when decoding object value

In [None]:
 # Manual disconnection of connection object
    
con.disconnect()

In [27]:
# Create CSV files from pickles

file_list = []
for filename in os.listdir('data/pickles/'):
    file_list.append(filename)
    
if __name__ ==  '__main__': 
    with Pool(4) as p:
        p.map(workers.pick_to_csv, file_list)

UnpicklingError: invalid load key, '\x00'.

# Enhancements

### Speedup
- download next contract as soon as data is in file

### Extra features for downloading
- start time for downloading is end time of data. last candle of data is considered invalid
- check for splits/mergers
- Stream logger for status messages

### Symbol in multiple exchanges
- if symbol is multiple times in list
    - load data from primary exchange if ppossible
    - if primary is not available, load from exchange with highest average (price-)volume

### contracts database
- pull contracts from ib webpage
- pull contracts data from ib
- abillity to create own univestes (=contracts lists) based on contracts data
- Ignore Messages with error id = -1 (see source)
    <error id=-1, errorCode=2104, errorMsg=Market data farm connection is OK:usfarm.nj>
    <error id=-1, errorCode=2104, errorMsg=Market data farm connection is OK:eufarm>
    <error id=-1, errorCode=2104, errorMsg=Market data farm connection is OK:cashfarm>
    <error id=-1, errorCode=2104, errorMsg=Market data farm connection is OK:usfarm>
    <error id=-1, errorCode=2106, errorMsg=HMDS data farm connection is OK:euhmds>
    <error id=-1, errorCode=2106, errorMsg=HMDS data farm connection is OK:ushmds>
    <error id=1, errorCode=162, errorMsg=Historical Market Data Service error message:HMDS query returned no data: 0I9M@LSE Ask>

### Storage of price data
- store price data in database? sqlite, as handeled in one file?
- cloud storage of data

### Misc
- 

In [14]:
contracts_db = TinyDB('contracts_db.json')
my_query = Query()
result = contracts_db.search(my_query.Id == 0)
contracts_db.close()


current_contract = result[0]
print(current_contract)

{'Id': 0, 'Symbol': 'BNQF', 'Name': 'Collateralized ETC on RICI Enhanced Gas Oil TR Index', 'Currency': 'EUR', 'Exchange': 'FWB', 'Status': 'error:Sec-def data farm connection is OK:secdefnj'}
