In [21]:
import gzip
import pandas as pd
import time
import os
from utils import *
from orderbook import OrderBook
from features import FeatureGenerator

In [23]:
def make_orderbooks(order_log: list):
    """
    Function for making orderbooks for each spectrum
    
    Return orderbooks, dataframe with spectrums and dataframe with VWAPs
    """
    # creating order book for each seccode
    order_books = dict()
    for secc in feature_seccodes:
        order_books[secc] = OrderBook(secc)

    # creating spectrum for each seccode
    spectrums = dict()
    for secc in feature_seccodes:
        spectrums[secc] = FeatureGenerator(seccode=secc, px_step=instruments_info[secc]['PRICE_STEP'])

    start = time.time()

    list_spec = []
    list_vwap = []
    list_aggressive_band = []
    list_aggressive_time = []
    list_makers_band = []
    list_makers_time = []
    list_bid_ask_spread = []
    list_aggressive_normalized = []
    list_makers_normalized = []

    # Count total volume for each period from trades made by aggressors
    # The idea is to keep the volume for the largest period and then derive its subperiods
    # Represented as a list of (time, volume) pair
    volume_bids = dict()
    volume_asks = dict()

    # Count total volume for each period from posts made by makers
    # The idea is to keep the volume for the largest period and then derive its subperiods
    # Represented as a list of (time, volume, price) triple
    bids_makers = dict()
    asks_makers = dict()
    
    for secc in feature_seccodes:
        volume_bids[secc] = []
        volume_asks[secc] = []

        bids_makers[secc] = []
        asks_makers[secc] = []
    
    col_names = ['SECCODE', 'TIMESTAMP', 'BID_ASK']
    feature_names = ['SECCODE', 'TIMESTAMP', 'BID', 'ASK']
    aggressors_names = ['SECCODE', 'TIMESTAMP', 'BID', 'ASK']
    makers_names = ['SECCODE', 'TIMESTAMP', 'BID', 'ASK']
    bid_ask_spread_names = ['SECCODE', 'TIMESTAMP', 'SPREAD']

    for row_log in order_log:
        secc = row_log['SECCODE']
        
        if instruments_info[secc]['SCHEDULE'] <= row_log['TIME']:
            continue

        is_ask = row_log['BUYSELL'] == 'S'
        current_time = row_log['TIME'] % int(1E8) / 1E6 # In seconds
        current_volume = row_log['VOLUME']
        current_price = row_log['PRICE']

        order_book = order_books[secc]
        spectrum = spectrums[secc]
        correct = False
        
        # CATCHING AGGRESSORS
        if is_ask:
            if spectrum.best_bid > 0 and \
                    (current_price <= spectrum.best_bid or current_price == 0):

                volume_asks[secc].append((current_time, current_volume))
                # print('Aggressive ask: ', row_log['NO'], current_time, current_volume)
                continue
        else:
            if spectrum.best_ask < int(1e19) and \
                    (current_price >= spectrum.best_ask or current_price == 0):

                volume_bids[secc].append((current_time, current_volume))
                # print('Aggressive bid: ', row_log['NO'], current_time, current_volume)
                continue

        dropwhile_(volume_bids[secc],
                   lambda pair: current_time - pair[0] >= FeatureGenerator.PERIODS[-1])
        dropwhile_(volume_asks[secc],
                   lambda pair: current_time - pair[0] >= FeatureGenerator.PERIODS[-1])

        # handle post
        if row_log['ACTION'] == Action.POST:
            # Filter and update makers
            dropwhile_(bids_makers[secc],
                           lambda pair: current_time - pair[0] >= FeatureGenerator.PERIODS[-1])
            dropwhile_(asks_makers[secc],
                           lambda pair: current_time - pair[0] >= FeatureGenerator.PERIODS[-1])
            
            if is_ask:
                asks_makers[secc].append((current_time, current_volume, current_price))
            else:
                bids_makers[secc].append((current_time, current_volume, current_price))
            
            order_book.add_entry(entry=row_log, 
                                 ask=is_ask)
            spectrum.update_post(order_book=order_book, new_price=current_price,
                                 volume=current_volume, ask=is_ask,
                                 aggressive_bids=volume_bids[secc] + [(current_time, 0)],
                                 aggressive_asks=volume_asks[secc] + [(current_time, 0)],
                                 bids_makers=bids_makers[secc] + [(current_time, 0, current_price)],
                                 asks_makers=asks_makers[secc] + [(current_time, 0, current_price)])
            correct = True

        # handle revoke
        elif row_log['ACTION'] == Action.REVOKE:
            order_book.revoke(orderno=row_log['ORDERNO'], volume=current_volume,
                              ask=is_ask, row_numb=row_log['NO'])
            if correct:
                spectrum.update_revoke(order_book=order_book, new_price=current_price,
                                       volume=current_volume, ask=is_ask,
                                       aggressive_bids=volume_bids[secc] + [(current_time, 0)],
                                       aggressive_asks=volume_asks[secc] + [(current_time, 0)])

        elif row_log['ACTION'] == Action.MATCH:
            order_book.match(orderno=row_log['ORDERNO'], volume=current_volume,
                             ask=is_ask, row_numb=row_log['NO'])
            if correct:
                if is_ask:
                    spectrum.update_match(order_book=order_book, new_price=current_price,
                                          volume=current_volume, ask=is_ask,
                                          aggressive_bids=volume_bids[secc] + [(current_time, 0)],
                                          aggressive_asks=volume_asks[secc] + [(current_time, 0)])
        
        # print(row_log['TIME'])
        # print(volume_bids[secc])
        # print(volume_asks[secc])
        # print(bids_makers[secc])
        # print(asks_makers[secc])
        # print(spectrum.best_ask)
        # print(spectrum.best_bid, '=====', sep='\n')        
        
        if correct:
            # для каждой новой row считаем спектрум добавляем в df
            values = spectrum.bids_normalized.copy()
            values.extend(spectrum.asks_normalized.copy())
            d_values = [ secc, row_log['TIME'], values ]
            list_spec.append(d_values)

            # для каждой новой row считаем VWAPs и добавляем в df
            vwaps_bids = list(spectrum.VWAP_bids.values()).copy()
            vwaps_asks = list(spectrum.VWAP_asks.values()).copy()
            d_vwaps = [ secc, row_log['TIME'], vwaps_bids, vwaps_asks ]
            list_vwap.append(d_vwaps)
            
            # Add aggressors (normalized by band)
            aggressive_bids = list(spectrum.aggressive_bids_normalized_band.values()).copy()
            aggressive_asks = list(spectrum.aggressive_asks_normalized_band.values()).copy()
            d_aggressive = [ secc, row_log['TIME'], aggressive_bids, aggressive_asks ]
            list_aggressive_band.append(d_aggressive)
            
            # Add aggressors (normalized by periods)
            aggressive_bids = list(spectrum.aggressive_bids_normalized_time.values()).copy()
            aggressive_asks = list(spectrum.aggressive_asks_normalized_time.values()).copy()
            d_aggressive = [ secc, row_log['TIME'], aggressive_bids, aggressive_asks ]
            list_aggressive_time.append(d_aggressive)
            
            # Add makers (normalized by band)
            makers_bids = list(spectrum.bids_makers_normalized_band.values()).copy()
            makers_asks = list(spectrum.asks_makers_normalized_band.values()).copy()
            d_makers = [ secc, row_log['TIME'], makers_bids, makers_asks ]
            list_makers_band.append(d_makers)
            
            # Add makers (normalized by periods)
            makers_bids = list(spectrum.bids_makers_normalized_time.values()).copy()
            makers_asks = list(spectrum.asks_makers_normalized_time.values()).copy()
            d_makers = [ secc, row_log['TIME'], makers_bids, makers_asks ]
            list_makers_time.append(d_makers)
            
            # Add makers (normalized as in lectures)
            makers_bids = list(spectrum.bids_makers_normalized.values()).copy()
            makers_asks = list(spectrum.asks_makers_normalized.values()).copy()
            d_makers = [ secc, row_log['TIME'], makers_bids, makers_asks ]
            list_makers_normalized.append(d_makers)
            
             # Add aggressors (normalized as in lectures)
            aggressive_bids = list(spectrum.aggressive_bids_normalized.values()).copy()
            aggressive_asks = list(spectrum.aggressive_asks_normalized.values()).copy()
            d_aggressive = [ secc, row_log['TIME'], aggressive_bids, aggressive_asks ]
            list_aggressive_normalized.append(d_aggressive)
            
            

            # Add bid-ask spread
            d_bid_ask_spread = [ secc, row_log['TIME'], spectrum.bid_ask_spread ]
            list_bid_ask_spread.append(d_bid_ask_spread)
    
    # Saving spectrum
    df_spec = pd.DataFrame(list_spec, columns=col_names)
    # Saving VWAPs
    df_vwap = pd.DataFrame(list_vwap, columns=feature_names)
    # Saving aggressive trades
    df_aggressive_band = pd.DataFrame(list_aggressive_band, columns=aggressors_names)
    df_aggressive_time = pd.DataFrame(list_aggressive_time, columns=aggressors_names)
    df_aggressive_normalized = pd.DataFrame(list_aggressive_normalized, columns=aggressors_names)
    
    # Saving makers
    df_makers_band = pd.DataFrame(list_makers_band, columns=makers_names)
    df_makers_time = pd.DataFrame(list_makers_time, columns=makers_names)
    df_makers_normalized = pd.DataFrame(list_makers_normalized, columns=makers_names)
    
    # Saving bid-ask spread
    df_bid_ask_spread = pd.DataFrame(list_bid_ask_spread, columns=bid_ask_spread_names)

    end = time.time()

    return order_books, df_spec, df_vwap, df_bid_ask_spread, df_aggressive_band, df_aggressive_time, \
                                                             df_makers_band, df_makers_time, df_aggressive_normalized, df_makers_normalized, end - start

## Single Day

In [24]:
# Reading
WORKING_DIR = r"D:\Data\MOEX-FX\2018-03\\"
#WORKING_DIR = r'D:\Innopolis University\2021 Spring Semester\Data Mining\data-mining\\'

orderlog_filename = WORKING_DIR + 'OrderLog20180330.txt'

order_log = read_orderlog(orderlog_filename)

# Preprocessing
order_log = filter(order_log, lambda row: row['SECCODE'] in feature_seccodes)
order_log = preprocess_orderlog(order_log)

In [25]:
# Make orderbooks, spectrum, and vwaps
order_books, df_spec, df_vwap, df_spread, df_aggressive_band, df_aggressive_time, \
    df_makers_band, df_makers_time, df_aggressive_normalized, df_makers_normalized, exec_time = make_orderbooks(order_log[:50000])

In [26]:
exec_time

27.55943489074707

In [28]:
def generate_spectrums(moex_dir: str, months: list, save_dir: str) -> None:
    spectrums_dir = os.path.join(save_dir, 'features')
    try:
        os.mkdir(spectrums_dir)
    except Exception:
        print(f"{spectrums_dir} dir already exists. It may be overwritten.")

    for month in months:
        working_dir = os.path.join(moex_dir, month)
        save_to = os.path.join(spectrums_dir, month)
        try:
            os.mkdir(save_to)
        except Exception:
            print(f"{save_to} dir already exists. It may be overwritten.")
        
        print(f"processing {month}")
        for filename in os.listdir(working_dir):
            # acquiring orderlog
            if 'orderlog' in filename.lower():
                print(f"processing {filename}")
                # reading it
                orderlog_path = os.path.join(working_dir, filename)
                order_log = read_orderlog(orderlog_path)
                # preprocessing
                order_log = filter(order_log, lambda row: row['SECCODE'] in feature_seccodes)
                order_log = preprocess_orderlog(order_log)
                
                # creating order book for each seccode
                order_books = dict()
                for secc in feature_seccodes:
                    order_books[secc] = OrderBook(secc)

                # creating spectrum for each seccode
                spectrums = dict()
                for secc in feature_seccodes:
                    spectrums[secc] = FeatureGenerator(seccode=secc, px_step=instruments_info[secc]['PRICE_STEP'])
                    
                _, df_spec, df_vwap, df_spread, df_aggressive_band, df_aggressive_time, df_makers_band, df_makers_time, df_aggressive_normalized, df_makers_normalized, exec_time = make_orderbooks(order_log[:200000])
                
                # saving spectrum
                for secc in feature_seccodes:
                    save_name = filename.lower().replace('orderlog', 'spectrum')[:-4] + '_' + secc + '.csv'
                    df_cur = df_spec[df_spec['SECCODE'] == secc].sort_values(by=['TIMESTAMP'])
                    df_cur.to_csv(os.path.join(save_to, save_name))
                    df_cur = None
                    
                    save_name = filename.lower().replace('orderlog', 'vwap')[:-4] + '_' + secc + '.csv'
                    df_cur = df_vwap[df_vwap['SECCODE'] == secc].sort_values(by=['TIMESTAMP'])
                    df_cur.to_csv(os.path.join(save_to, save_name))
                    df_cur = None
                    
                    save_name = filename.lower().replace('orderlog', 'aggressive_band')[:-4] + '_' + secc + '.csv'
                    df_cur = df_aggressive_band[df_aggressive_band['SECCODE'] == secc].sort_values(by=['TIMESTAMP'])
                    df_cur.to_csv(os.path.join(save_to, save_name))
                    df_cur = None
                    
                    save_name = filename.lower().replace('orderlog', 'aggressive_time')[:-4] + '_' + secc + '.csv'
                    df_cur = df_aggressive_time[df_aggressive_time['SECCODE'] == secc].sort_values(by=['TIMESTAMP'])
                    df_cur.to_csv(os.path.join(save_to, save_name))
                    df_cur = None
                    
                    save_name = filename.lower().replace('orderlog', 'makers_band')[:-4] + '_' + secc + '.csv'
                    df_cur = df_makers_band[df_makers_band['SECCODE'] == secc].sort_values(by=['TIMESTAMP'])
                    df_cur.to_csv(os.path.join(save_to, save_name))
                    df_cur = None
                    
                    save_name = filename.lower().replace('orderlog', 'makers_time')[:-4] + '_' + secc + '.csv'
                    df_cur = df_makers_time[df_makers_time['SECCODE'] == secc].sort_values(by=['TIMESTAMP'])
                    df_cur.to_csv(os.path.join(save_to, save_name))
                    df_cur = None
                    
                    save_name = filename.lower().replace('orderlog', 'makers_normalized')[:-4] + '_' + secc + '.csv'
                    df_cur = df_makers_normalized[df_makers_normalized['SECCODE'] == secc].sort_values(by=['TIMESTAMP'])
                    df_cur.to_csv(os.path.join(save_to, save_name))
                    df_cur = None
                    
                    save_name = filename.lower().replace('orderlog', 'aggressive_normalized')[:-4] + '_' + secc + '.csv'
                    df_cur = df_aggressive_normalized[df_aggressive_normalized['SECCODE'] == secc].sort_values(by=['TIMESTAMP'])
                    df_cur.to_csv(os.path.join(save_to, save_name))
                    df_cur = None
                    
                    save_name = filename.lower().replace('orderlog', 'spread')[:-4] + '_' + secc + '.csv'
                    df_cur = df_spread[df_spread['SECCODE'] == secc].sort_values(by=['TIMESTAMP'])
                    df_cur.to_csv(os.path.join(save_to, save_name))
                    df_cur = None
                
                order_log = None
                order_books = None
                spectrums = None
                df_spec = None
                
        print('\n')

# All Files

In [31]:
generate_spectrums(moex_dir=r"D:\Data\MOEX-FX\\", months=['2018-03', '2018-04', '2018-05'], save_dir='.')

.\features dir already exists. It may be overwritten.
.\features\2018-03 dir already exists. It may be overwritten.
processing 2018-03
processing OrderLog20180301.txt
processing OrderLog20180302.txt


KeyboardInterrupt: 