# Parallel multiprocessing data acquisition test

We are going to make a prediction model with market book data also. Which we will gather in real time with the lowest latency possible. To achieve lowest latency we will compare several approaches.

**This process consists with 3 basic tasks:**
1. get data as python dictionary, 
2. convert it to `pandas.DataFrame` 
3. save data to file with pandas method.

Sequential approach `sequential` will be a baseline, where it will be done sequentially.

Other multiprocessing approaches will be a different combinations of what processes do and how do the communicate.

All methods are in `'scripts/'`.  
**Methods to test:**
1. `mp_simple_pipe` - 2 processes connected with `Pipe()`
    * Process 1: Get data, save to df
    * Communication: Pipe()
    * Process 2: save df to csv
2. `mp_simple_queue` - 2 processes connected with `Queue()`
    * Process 1: Get data, save to df
    * Communication: Queue()
    * Process 2: save df to csv
3. `mp_complex_pipe` - Get data process connected to 2 parallel save to df processes with pipes. Latter - with save to csv with another pipe.
    * Process 1: Get data
        * Communication: Pipe()
        * Process 2_0: save to df
        * Communication: Pipe()
        * Process 2_1: save to df
    * Communication: Pipe()
    * Process 3: save df to csv
4. `mp_complex_queue` - Get data process adds data to queue connected to 2 save to df processes. Latter connected with save to df with queue also.
    * Process 1: Get data
        * Communication: Queue()
        * Process 2_0: save to df
        * Process 2_1: save to df
    * Communication: Queue()
    * Process 3: save df to csv
5. `mp_complex_2_queues` - Get data process dump data to 1 queue switching with another in order minimize queue blocking. And then to save to df and save to csv process in one.
    * Process 1: Get data
    * Communication: Queue() x2
    * Process 2: save to df,save df to csv
6. `mp_complex_2_queues_to_1` - Get data process alternates between 2 queues as before, those connected to conveyor process that puts data on another queue which goes to save to df and to csv process.
    * Process 1: Get data
    * Communication: Queue()
    * Communication: Queue()
    * Process 2: conveyor 2 queues to 1
    * Communication: Queue()    
    * Process 2: save to df,save df to csv

**File size**  
Every method will be tested with different memory_threshold as a csv file size. As 1e+06, 1e+07, 1e+08 bytes. Around 1.5, 15, 90 mb.

**Comparison**  
We will measure time between market book data chunks, as 1 data acquisition equals 1 chunk (40 rows). And compare them on the base of mean, std, different quantiles, min and max looking for stability (how far outliers are, q99, std and max) and lowest mean latency (mean,median).

**Data generation for analyze**  
Here we will analyze already generated data by `'scripts/mp_latency_test.py'`. Test time for how long those methods will gather data is specified in that python code file. We should expect different amount of chunks (rows and files) saved as methods processes are terminated abruptly in `test_time` seconds.

## Import

In [1]:
import datetime as dt
import time

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

import MetaTrader5 as mt5


from os import listdir
from os.path import isfile, join


# from pandas.plotting import register_matplotlib_converters
# register_matplotlib_converters()

print("MetaTrader5 package author: ",mt5.__author__)
print("MetaTrader5 package version: ",mt5.__version__)

MetaTrader5 package author:  MetaQuotes Ltd.
MetaTrader5 package version:  5.0.37


**Ticker**

https://www.moex.com/ru/derivatives/equity/indices/  
Site with actual RTS futures ticker.

In [2]:
# ticker = 'RIU2'

In [3]:
%%html
<style>
table {float:left}
</style>

In [4]:
# # terminal init
# if not mt5.initialize():
#     print("initialize() failed, error code =",mt5.last_error())
#     quit()
# else:
#     print(mt5.last_error())

## Functions

In [5]:

def generate_paths():
    '''
    Creates dictionary with name of the method as key and path to csv files as value.
    '''
    memory_thresholds = [1e+6,1e+7,1e+8]
    names = ['sequential',
             'mp_simple_pipe',
             'mp_simple_queue',
             'mp_complex_pipe',
             'mp_complex_queue',
             'mp_complex_2_queues',
             'mp_complex_2_queues_to_1']
    result = {}
    for name in names:
        for memory_threshold in memory_thresholds:
            result[f'{name}_{memory_threshold:.1e}'] = join('data',name,f'{memory_threshold:.1e}')
    return result
# generate_paths()

In [6]:
def file_number(str_):
    '''
    Key for ordering csv files numericaly, by file name.
    '''
    return int(str_.split('\\')[-1][:-4])

In [7]:
# FUNCTION TO GET PATHS TO CSVs

def get_paths_to_csvs(path_to_csvs,sort=True):
    '''
    Returns list with paths to csv files and orders them numericaly if sort=True
    '''
    result = [join(path_to_csvs, f) for f in listdir(path_to_csvs) if isfile(join(path_to_csvs, f))]
    if sort:
        result.sort(key=file_number) # function above
    return result

# test_path = 'data/sequential/'
# get_paths_to_csvs(test_path)

In [24]:
# FUNCTION TO GET INFO ABOUT FILES COUNT, IS TIME ORDERED OR NOT AND ROW COUNT
def df_info(all_files,name):
    '''
    Receives a list with csvs and a name of a method.
    
    Returns df with method name, number of files, is time order kept by the method or not (as 0 and 1) and row count.
    '''
    df_list = []

    for filename in all_files:
        df = pd.read_csv(filename)
        df_list.append(df)

    df = pd.concat(df_list, axis=0, ignore_index=True)
    df['time_after'] = pd.to_datetime(df['time_after'])
    df['time_before'] = pd.to_datetime(df['time_before'])
    df['delta'] = df['time_after'] - df['time_before']
    df['time_mean'] = df['time_before'] + (df['delta']/2)
    
    min_,max_ = df['time_mean'].min(),df['time_mean'].max()
    
    result = pd.DataFrame({'files_n':len(all_files),
                           'time_order':1,
                           'row_count':df.shape[0],
                           'time_delta': max_ - min_
                          },
                         index=[name])    
    
    df_index_before = df.index.copy()
    df = df.sort_values('time_mean',kind='stable')
    
    if df_index_before.equals(df.index):
        # print('Time order is kept')
        pass
    else:
        # print('Time order is invalid')
        result['time_order'] = 0
    
    return result

In [9]:
# FUNCTION TO CALCULATE LATENCY DISTRIBUTION
def concate_files(all_files):
    '''
    Receives a list with csvs.
    
    Returns concatenated df with more columns with time, especially diff - difference between previous and next time.
    '''
    df_list = []

    for filename in all_files:
        df = pd.read_csv(filename)
        df_list.append(df)

    df = pd.concat(df_list, axis=0, ignore_index=True)
    df['time_after'] = pd.to_datetime(df['time_after'])
    df['time_before'] = pd.to_datetime(df['time_before'])
    df['delta'] = df['time_after'] - df['time_before']
    df['time_mean'] = df['time_before'] + (df['delta']/2)
    
    # df_index_before = df.index.copy()
    df = df.sort_values('time_mean',kind='stable')
    
    # if df_index_before.equals(df.index):
    #     # print('Time order is kept')
    # else:
    #     # print('Time order is invalid')
    
    df['diff'] = df['time_mean'].diff()
    return df

def calculate_diff_components(df,delete_zero_diffs=True):
    '''
    Receives df with diff column
    Returns another df with diff components (this is what we need to compare).
    
    delete_zero_diffs if preferable, as there are a lot of 0 diffs, because every chunk comes in same time,
    that's about 40 rows with diff = 0. 
    '''
    dt_components = df['diff'].dt.components
    
    if delete_zero_diffs:
        dt_components['bool_sum'] = (dt_components == 0).sum(axis=1)
        dt_components = dt_components[dt_components['bool_sum'] != 7]
        dt_components = dt_components.drop(columns='bool_sum')
        
    return dt_components

def concate_and_diffs(all_files,delete_zero_diffs=True):
    '''
    Receives list with csv files,
    Returns a combination of previous 2 functions
    '''
    df = concate_files(all_files)
    result = calculate_diff_components(df,delete_zero_diffs)
    return result


In [10]:
def methods_info():
    '''
    Generate methods' info with files_n, time_order is kept or not and row count using df_info().
    '''
    methods_info = pd.DataFrame()
    for name,path in generate_paths().items():
        try:
            result = df_info(get_paths_to_csvs(path),name)
        except Exception as e:
            result = pd.DataFrame({'files_n':0,
                                   'time_order': np.nan},index=[name])
            # print(e)
        methods_info = pd.concat([methods_info,result])
    return methods_info

In [11]:
def save_diff_files_to_csv():
    '''
    Saves diffs dfs to data/diff folder with name of the method. 
    '''
    for name,path in generate_paths().items():
        try:
            concate_and_diffs(get_paths_to_csvs(path)).to_csv(f'data/diffs/{name}.csv')
            print(name)
        except:
            continue


In [12]:
def create_dict_with_methods_diff():
    '''
    Creates a dict with method name as a key and diff df as a value.
    '''
    method_diff_time_components_dict = {}
    for path in get_paths_to_csvs('data\\diffs\\',sort=False):
        name = path.split('\\')[-1][:-4]
        method_diff_time_components_dict[name] = pd.read_csv(path)

    return method_diff_time_components_dict    

In [13]:
def create_list_with_diff(method_diff_time_components_dict,return_df=False):
    '''
    Receives dictionary with method name as key and df with diffs as value
    Returns list with df diffs
    Or one concated df if return_df == True
    '''
    df_list = []
    for name, df in method_diff_time_components_dict.items():
        df['name'] = name
        df_list.append(df)
    if return_df:
        return pd.concat(df_list)
    else:
        return df_list


In [14]:
def diff_describe(df_list):
    '''
    Receives list with diffs dfs.
    Returns df with statistics for each method.
    '''
    df_diff_dist = pd.DataFrame()
    for df in df_list:
        # plt.figure(figsize=(12,8))
        # sns.histplot(data=df,x='milliseconds')
        # plt.plot()
        temp_ = pd.DataFrame()
        for col in df.columns.drop(['Unnamed: 0','name']):
            desc = df[col].describe([.25,.5,.75,.99]).reset_index()
            desc['name'] = col
            # desc.index = [str(x)+'_'+col for x in desc.index]
            desc.index = pd.MultiIndex.from_frame(desc[['name','index']])
            desc = desc.drop(columns=['index','name'])
            # display(desc)
            desc.columns = [df['name'][0]]

            temp_ = pd.concat([temp_,desc],axis=0)
        # display(temp_)
        # break
        df_diff_dist = pd.concat([df_diff_dist,temp_.T])
        # break
    return df_diff_dist



In [15]:
def all_together(fresh_data=False):
    methods_info_df = methods_info()

    if fresh_data:
        save_diff_files_to_csv()

    df_dict = create_dict_with_methods_diff()
    df_list = create_list_with_diff(df_dict)
    df_diff_dist = diff_describe(df_list)
    return methods_info_df, df_diff_dist

## Analysis

In [25]:
methods_info_df, df_diff_dist = all_together()
a_list = []
for name,data in df_diff_dist.drop(columns=['days','hours','microseconds','nanoseconds'],level=0).groupby(level=0,axis=1,sort=False):
    a_list.append(data)

No objects to concatenate
No objects to concatenate
No objects to concatenate
No objects to concatenate
No objects to concatenate
No objects to concatenate
No objects to concatenate
No objects to concatenate


In [26]:
methods_info_df

Unnamed: 0,files_n,time_order,row_count,time_delta
sequential_1.0e+06,412,1.0,7365312.0,0 days 00:09:54.085247
sequential_1.0e+07,27,1.0,4779904.0,0 days 00:09:59.990272500
sequential_1.0e+08,1,1.0,1251392.0,0 days 00:09:59.983577500
mp_simple_pipe_1.0e+06,464,1.0,8300032.0,0 days 00:09:58.792146
mp_simple_pipe_1.0e+07,27,1.0,4821984.0,0 days 00:09:45.614413500
mp_simple_pipe_1.0e+08,0,,,NaT
mp_simple_queue_1.0e+06,461,1.0,8246368.0,0 days 00:09:58.685000
mp_simple_queue_1.0e+07,27,1.0,4821984.0,0 days 00:09:46.823001
mp_simple_queue_1.0e+08,0,,,NaT
mp_complex_pipe_1.0e+06,604,0.0,10804352.0,0 days 00:09:58.260001


* No all methods keep time order and that's expected.
* Not enough test time to make a 100 mb csv (threshold `1e+08`)
* Why is row count so different between different thresholds in 1 method?
* Time delta (that is calculated as `datetime.max - datetime.min`) looks strange in complex queue and lower. Why?
* `mp_complex_2_queues` doesn't work

In [27]:
a_list[0]

name,minutes,minutes,minutes,minutes,minutes,minutes,minutes,minutes,minutes
index,count,mean,std,min,25%,50%,75%,99%,max
mp_complex_2_queues_to_1_1.0e+06,81613.0,1.2e-05,0.0035,0.0,0.0,0.0,0.0,0.0,1.0
mp_complex_2_queues_to_1_1.0e+07,61390.0,1.6e-05,0.004036,0.0,0.0,0.0,0.0,0.0,1.0
mp_complex_pipe_1.0e+06,337635.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
mp_complex_pipe_1.0e+07,167429.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
mp_complex_queue_1.0e+06,81613.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
mp_complex_queue_1.0e+07,66971.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
mp_simpe_queue_1.0e+06,257698.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
mp_simpe_queue_1.0e+07,150686.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
mp_simple_pipe_1.0e+06,259375.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
mp_simple_pipe_1.0e+07,150686.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


* `2_queues_to_1` has 1 min delay. Why?

In [28]:
a_list[1]

name,seconds,seconds,seconds,seconds,seconds,seconds,seconds,seconds,seconds
index,count,mean,std,min,25%,50%,75%,99%,max
mp_complex_2_queues_to_1_1.0e+06,81613.0,3.7e-05,0.010501,0.0,0.0,0.0,0.0,0.0,3.0
mp_complex_2_queues_to_1_1.0e+07,61390.0,0.000163,0.04036,0.0,0.0,0.0,0.0,0.0,10.0
mp_complex_pipe_1.0e+06,337635.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
mp_complex_pipe_1.0e+07,167429.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
mp_complex_queue_1.0e+06,81613.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
mp_complex_queue_1.0e+07,66971.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
mp_simpe_queue_1.0e+06,257698.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
mp_simpe_queue_1.0e+07,150686.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
mp_simple_pipe_1.0e+06,259375.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
mp_simple_pipe_1.0e+07,150686.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


* Are those delays on network side? As sequential method should not have this much.

In [91]:
a_list[2]

name,milliseconds,milliseconds,milliseconds,milliseconds,milliseconds,milliseconds,milliseconds,milliseconds,milliseconds
index,count,mean,std,min,25%,50%,75%,99%,max
mp_complex_2_queues_to_1_1.0e+06,81613.0,0.876968,3.041656,0.0,1.0,1.0,1.0,2.0,547.0
mp_complex_2_queues_to_1_1.0e+07,61390.0,0.793158,4.153593,0.0,0.0,1.0,1.0,2.0,921.0
mp_complex_pipe_1.0e+06,337635.0,1.342088,0.637725,0.0,1.0,1.0,2.0,2.0,122.0
mp_complex_pipe_1.0e+07,167429.0,3.06957,1.766654,0.0,2.0,3.0,5.0,6.0,78.0
mp_complex_queue_1.0e+06,81613.0,0.834071,1.615541,0.0,1.0,1.0,1.0,1.0,180.0
mp_complex_queue_1.0e+07,66971.0,0.846322,1.447216,0.0,1.0,1.0,1.0,2.0,146.0
mp_simpe_queue_1.0e+06,257698.0,1.899274,0.463997,1.0,2.0,2.0,2.0,3.0,13.0
mp_simpe_queue_1.0e+07,150686.0,3.494591,1.588832,1.0,2.0,3.0,5.0,6.0,40.0
mp_simple_pipe_1.0e+06,259375.0,1.90533,0.464051,1.0,2.0,2.0,2.0,3.0,16.0
mp_simple_pipe_1.0e+07,150686.0,3.473183,1.679443,1.0,2.0,3.0,5.0,7.0,51.0


* `mp_simple_queue_1.0e+06` seems to be the most stable with the lowest max and std

In [92]:
# temp_ = a_list[2].copy()

# for col in temp_.columns:
#     temp_[col] = temp_[col] / temp_[col].min()

# temp_ = temp_.replace([np.inf, -np.inf], np.nan)    
# temp_['sum'] = temp_.sum(axis=1)
# temp_

## Conclusion 

`mp_simple_queue` with `1.0e+06` file size threshold value is the most stable with the lowest max and std. We will use it to gather market book data.