## Data Processing Module (import, resample)

In [13]:
import pandas as pd
import numpy as np
import logging
import os, os.path
import datetime
import pytz

from abc import ABC, abstractmethod
from pytz import timezone
from queue import Queue, Empty
from typing import Any, Dict

from constants import *
from event import MarketEvent

In [2]:
logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(levelname)-8s line %(lineno)s] %(funcName)18s: %(message)s',
                    datefmt='%a, %d %b %Y %H:%M:%S')

Introducing a __DataHandler__ class. This ABC is an interface for all data handlers (both live and historic) from different sources.

In [3]:
class DataHandler(ABC):
    """
    DataHandler is an abstract base class providing an interface for
    all inherited data handlers (both live and historic).
    """
    
    @abstractmethod
    def all_bars(self, symbol:str) -> tpFrame:
        """
        Returns all bars for a given symbol.
        
        Parameters:
        symbol - a Ticker name 
        
        Returns: 
        DataFrame for the symbol
        datetime -> OHLCV
        """
        raise NotImplementedError("Should implement get_all_bars(symbol)")
    
    @abstractmethod
    def latest_bars(self, symbol:str, n:int = 1) -> (tpFrame, bool):
        """
        Returns the DataFrame with last N bars for a given symbol 
        (or fewer if less bars are available) 
        
        Parameters:
        symbol - a Ticker name 
        n - a number of bars
        
        Returns: 
        DataFrame for the symbol
        datetime -> OHLCV
        True - if enough bars was available
        False - if fewer bars was available
        """
        raise NotImplementedError("Should implement get_latest_bars(symbol, n)")

    @abstractmethod
    def update_bars(self) -> bool:
        """
        Updates a datetime, thus shift to the next latest bar.
        
        Returns: 
        True - if shift was successful
        False - if no more bars exist
        """
        raise NotImplementedError("Should implement update_bars()")

Introducing a __CSVDataHandler__ class. 

Functionality:

 1. Reads data for multiple symbols.
 2. Resample data according to the given timeframe.
 3. Implements DataHandler interface.
 
Sample data can be downloaded from <a href=https://www.finam.ru/>Finam</a> (multiple files for a one ticker are supported), and <a href=https://www.alphavantage.co/>Alpha Vantage</a>.

In [28]:
class CSVDataHandler(DataHandler):
    """
    CSVDataHandler is designed to read CSV files for
    each requested symbol from disk and implement the DataHandler
    interface. 
    """

    def __init__(self, csv_dir: str, system_tf: Timeframe, 
                 symbol_dict: Dict[str, Dict[str, Any]], events: Queue = None) -> None:
        """
        Initialises the data handler by requesting the location 
        of the CSV files and a list of symbols.

        It is assumed that all files are in the form 'symbol<...>.csv', 
        where symbol is in the symbol_dict dictionary: symbol -> parameters.

        Parameters:
        csv_dir - Directory path to the CSV files (the 'data' folder).
        system_tf - Timeframe for a system 'heartbeat' and resampling (e.g. 5 minutes, day).
        symbol_dict - A dictionary of symbols with parameters.
        events - The Event Queue. If None (as by default), then only all_bars() is available.
        """
        
        self._csv_dir = csv_dir
        self._system_tf = system_tf
        self._symbol_dict = symbol_dict
        self._events = events
        
        self._symbol_data = {} # Dictionary: symbol -> DataFrame of bars
        # {'APPL':0} -> {'AAPL':1} -> ... {'APPL':None}
        self._latest_idx = {} # Dictionary: symbol -> index of the 'last' bar
        
        self.continue_backtest = True # whether there are more bars

        self._convert_csv_files()
    
    def _read_csv(self, file: str, joint: bool) -> tpFrame:
        """
        Calls pandas read_csv function with suitable parameters.
        
        Parameters:
        file - full file name.
        joint - if True, date and time are in the same column.
        
        Returns:
        DataFrame
        """
        
        n = []
        if joint:
            n=['datetime','open','high','low','close','volume']
        else:
            n=['date', 'time','open','high','low','close','volume']
        return pd.io.parsers.read_csv(file, header=None, skiprows=1, names=n)
    
    def _load_csv(self, symbol: str, joint_dt: bool, exact_name:str = None) -> tpFrame:
        """
        Load data from the files
        
        Parameters:
        symbol - the Ticker name.
        joint_dt - if True, Date and Time are in the same column.
        exact_name - exact file name; if None, then looking for all files 'symbol[...].csv'.
        
        Returns: 
        DataFrame for the symbol
        """
        
        df = pd.DataFrame()
        if exact_name:
            # load data from the exact file
            try:
                file = os.path.join(self._csv_dir, self._symbol_dict[symbol]['file'])
                df = self._read_csv(file, joint_dt)
            except KeyError:
                logging.error("No file name is given")
                raise
        else: # combine all files like 'symbol...csv'
            frames = []
            for root, dirs, files in os.walk(self._csv_dir):
                for file in files:
                    if file.startswith(symbol) and 'checkpoint' not in file:
                        frames.append(self._read_csv(os.path.join(root, file), joint_dt))
            df = pd.concat(frames)        
        return df
    
    def _convert_files(self, symbol: str, dt_format: str, joint_dt: bool, reindex: bool) -> tpFrame:
        """
        Import files, downloaded from sample sources.
        
        Parameters:
        symbol - the Ticker name.
        dt_format - string of a DateTime format.
        joint_dt - if True, Date and Time are in the same column.
        reindex - if True, then revert index from the last to the first.
        
        Returns: 
        DataFrame for the symbol
        """
        
        df = pd.DataFrame()
        try:
            df = self._load_csv(symbol, joint_dt, exact_name = self._symbol_dict[symbol]['exn'])
        except KeyError:
            logging.debug("No exact name is given. Take all files %s<...>.csv" % symbol)
            df = self._load_csv(symbol, joint_dt)
       
        # set date + time as index
        try:
            if not joint_dt:
                df['datetime'] = df['date'].astype(str) + df['time'].astype(str)
            df['datetime'] = pd.to_datetime(df['datetime'], format=dt_format)
        except ValueError:
            logging.debug("No time column is given")
            # '%Y-%m-%d %H:%M:%S' extract only '%Y-%m-%d'
            df['datetime'] = pd.to_datetime(df['date'], format=dt_format[0:dt_format.index('d') + 1])
        df = df.set_index('datetime')
        
        if not joint_dt:
            df.drop('date', axis = 1, inplace=True)
            df.drop('time', axis = 1, inplace=True)
        
        # reindex from the last to the first
        if reindex: 
            df = df.reindex(index=df.index[::-1])
            
        return df
    
    def _resample_symbol_data(self, df: tpFrame) -> tpFrame:
        """
        Resample the DataFrame according to the selected timeframe.
        If timeframe is less, than already given to the system, 
        does nothing.
        
        Parameters:
        df - Dataframe.
        
        Returns: 
        modified Dataframe.
        """
        
        df_new = pd.DataFrame({'open': df.open.resample(self._system_tf, label='right', closed='right').first().dropna(),
                               'high': df.high.resample(self._system_tf, label='right', closed='right').max().dropna(),
                               'low': df.low.resample(self._system_tf, label='right', closed='right').min().dropna(),
                               'close': df.close.resample(self._system_tf, label='right', closed='right').last().dropna(),
                               'volume': df.volume.resample(self._system_tf, label='right', closed='right').sum(
                                   min_count = 1).dropna().astype(float)
                              })
        #df_new = df_new.apply(pd.to_numeric, downcast='float')
        return df_new   
    
    def _convert_csv_files(self) -> bool:
        """
        Opens the CSV files from the data directory, converting
        them into pandas DataFrames under a symbol in the _symbol_data 
        dictionary.
        
        Returns:
        True - if all tickers were loaded
        False - otherwise
        """    
        
        result = True
        for s in self._symbol_dict.keys():
            df = None
            if self._symbol_dict[s]['src'] is 'av': # file is from Alpha Vantage
                df = self._convert_files(s, AV_DATETIME, True, True)
            elif self._symbol_dict[s]['src'] is 'finam': # file is from Finam
                df = self._convert_files(s, FINAM_DATETIME, False, False)
            # elif ... another source
            
            if df is None:
                result = False
                logging.error("Error converting files for the symbol %s" % s)
            else:
                df = self._resample_symbol_data(df)
                # if timeframe is minutes or hours then set up localization
                if ('T' or 'H') in self._system_tf:
                    df = df.tz_localize(timezone(self._symbol_dict[s]['tz']))

                self._symbol_data[s] = df
                self._latest_idx[s] = 0
                # save maximum index for this symbol
                self._symbol_dict[s]['len'] = len(list(df.index))
        return result
    
    def all_bars(self, symbol):
        return self._symbol_data[symbol]
  
    def latest_bars(self, symbol:str, n:int = 1) -> (tpFrame, bool):
        """
        Returns the DataFrame with last n bars for a given symbol 
        (or fewer if less bars are available) 
        """  
        
        try:
            bars = self._symbol_data[symbol]
            idx = self._latest_idx[symbol]
            df = bars.iloc[idx-n:idx]
            return df, (df.shape[0] == n)       
        except KeyError:
            logging.error("Symbol %s is not available in the data set." % s)            
      
    def _next_datetime(self) -> tpDateTime:
        """
        Return next minimum datetime from the data.
        """
        dt = None
        for s in self._symbol_dict.keys():
            if self._latest_idx[s] is not None:
                idx = self._latest_idx[s]
                idx_dt = self._symbol_data[s].index[idx]
                if dt is None or (idx_dt < dt): 
                    dt = idx_dt
        return dt
    
    def update_bars(self) -> bool:
        """
        Updates a datetime, thus shift to the next latest bar.
        """
        
        for s in self._symbol_dict.keys():
            if self._latest_idx[s] == self._symbol_dict[s]['len']:
                self._latest_idx[s] = None
                  
        self._system_dt = self._next_datetime()
        
        # find symbols, which have updated time in the data
        result = False
        upd_symbols = []
        for s in self._symbol_dict.keys():
            if self._latest_idx[s] is not None:
                result = True
                if self._system_dt in self._symbol_data[s].index:
                    self._latest_idx[s] += 1  
                    upd_symbols.append(s)
                    
        if self._events is not None:
            self._events.put(MarketEvent(upd_symbols, self._system_dt))               
        
        self.continue_backtest = result
        return result

### Example calls of CSVDataHandler

In [15]:
bars = CSVDataHandler('data', TIMEFRAME_DAY,
                      {'UVXY':
                       {'src': 'av', 'tz': 'US/Eastern', 'exn':True, 'file':'UVXY_daily.csv'}, 
                       'IMOEX':
                       {'src': 'finam', 'tz': 'Europe/Moscow'}
                      })

[Tue, 11 Feb 2020 14:21:44 DEBUG    line 104]     _convert_files: No exact name is given. Take all files IMOEX<...>.csv
[Tue, 11 Feb 2020 14:21:44 DEBUG    line 113]     _convert_files: No time column is given


In [16]:
df = bars.all_bars('UVXY')
df.head()

Unnamed: 0_level_0,open,high,low,close,volume
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2011-10-04,40.45,40.8,34.3,34.3,11420.0
2011-10-05,32.91,33.15,30.23,30.23,3400.0
2011-10-06,30.19,31.3,29.0,29.0,34458.0
2011-10-07,28.43,30.96,28.08,29.42,13601.0
2011-10-10,27.58,27.6,25.99,25.99,28700.0


In [17]:
df.info(memory_usage='deep')

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 2049 entries, 2011-10-04 to 2019-11-22
Data columns (total 5 columns):
open      2049 non-null float64
high      2049 non-null float64
low       2049 non-null float64
close     2049 non-null float64
volume    2049 non-null float64
dtypes: float64(5)
memory usage: 96.0 KB


In [18]:
df = bars.all_bars('IMOEX')
df.head()

Unnamed: 0_level_0,open,high,low,close,volume
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2000-01-05,173.0,173.0,173.0,173.0,0.0
2000-01-06,186.26,186.26,186.26,186.26,0.0
2000-01-10,200.81,200.81,200.81,200.81,0.0
2000-01-11,199.57,199.57,199.57,199.57,0.0
2000-01-12,196.88,196.88,196.88,196.88,0.0


In [19]:
df.info(memory_usage='deep')

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 4972 entries, 2000-01-05 to 2019-11-22
Data columns (total 5 columns):
open      4972 non-null float64
high      4972 non-null float64
low       4972 non-null float64
close     4972 non-null float64
volume    4972 non-null float64
dtypes: float64(5)
memory usage: 233.1 KB


In [20]:
bars = CSVDataHandler('data', TIMEFRAME_MIN5,
                      {'SPFB':
                       {'src': 'finam', 'tz': 'Europe/Moscow', 'exn': False}
                      })

In [21]:
df = bars.all_bars('SPFB')
df.head()

Unnamed: 0_level_0,open,high,low,close,volume
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2009-07-16 18:10:00+04:00,63.73,63.73,63.73,63.73,100.0
2009-07-16 18:45:00+04:00,64.09,64.14,64.09,64.14,4.0
2009-07-16 18:50:00+04:00,64.07,64.11,64.07,64.11,3.0
2009-07-16 18:55:00+04:00,64.19,64.19,64.19,64.19,2.0
2009-07-16 19:00:00+04:00,64.25,64.25,64.25,64.25,1.0


Compare to the __original data__

|DATE|TIME|OPEN|HIGH|LOW|CLOSE|VOL|
|----|----|----|----|---|-----|---|
|20090716|180700|63.730|63.730|63.730|63.730|100|
|20090716|184300|64.090|64.090|64.090|64.090|1|
|20090716|184400|64.140|64.140|64.140|64.140|3|
|20090716|184800|64.070|64.070|64.070|64.070|1|
|20090716|184900|64.110|64.110|64.110|64.110|2|
|20090716|185500|64.190|64.190|64.190|64.190|2|
|20090716|190000|64.250|64.250|64.250|64.250|1|
|20090716|192200|64.100|64.100|64.100|64.100|4|

With events queue:

In [22]:
events = Queue()

In [23]:
bars = CSVDataHandler('data', TIMEFRAME_HOUR,
                      {'UVXY':
                       {'src': 'av', 'tz': 'US/Eastern', 'exn':True, 'file':'UVXY.csv'}
                      }, events)

In [24]:
df = bars.all_bars('UVXY')
df.head()

Unnamed: 0_level_0,open,high,low,close,volume
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2019-11-08 10:00:00,18.5703,18.79,18.2074,18.72,2626038.0
2019-11-08 11:00:00,18.7007,18.8063,18.26,18.39,3566914.0
2019-11-08 12:00:00,18.39,18.42,18.16,18.18,1866990.0
2019-11-08 13:00:00,18.185,18.185,18.0,18.04,898833.0
2019-11-08 14:00:00,18.04,18.1,17.99,18.08,756785.0


In [29]:
bars = CSVDataHandler('data', TIMEFRAME_MIN5,
                      {'UVXY':
                       {'src': 'av', 'tz': 'US/Eastern', 'exn':True, 'file':'UVXY_daily.csv'}
                      }, events)

In [30]:
df = bars.all_bars('UVXY')
df.head()

Unnamed: 0_level_0,open,high,low,close,volume
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2011-10-04 00:00:00-04:00,40.45,40.8,34.3,34.3,11420.0
2011-10-05 00:00:00-04:00,32.91,33.15,30.23,30.23,3400.0
2011-10-06 00:00:00-04:00,30.19,31.3,29.0,29.0,34458.0
2011-10-07 00:00:00-04:00,28.43,30.96,28.08,29.42,13601.0
2011-10-10 00:00:00-04:00,27.58,27.6,25.99,25.99,28700.0


In [41]:
bars = CSVDataHandler('data', TIMEFRAME_MIN5,
                      {'SPFB':
                       {'src': 'finam', 'tz': 'Europe/Moscow', 'exn': False}
                      }, events)

In [42]:
df = bars.all_bars('SPFB')
df.head()

Unnamed: 0_level_0,open,high,low,close,volume
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2009-07-16 18:10:00+04:00,63.73,63.73,63.73,63.73,100.0
2009-07-16 18:45:00+04:00,64.09,64.14,64.09,64.14,4.0
2009-07-16 18:50:00+04:00,64.07,64.11,64.07,64.11,3.0
2009-07-16 18:55:00+04:00,64.19,64.19,64.19,64.19,2.0
2009-07-16 19:00:00+04:00,64.25,64.25,64.25,64.25,1.0


In [43]:
df.info(memory_usage='deep')

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 390596 entries, 2009-07-16 18:10:00+04:00 to 2019-08-30 23:50:00+03:00
Data columns (total 5 columns):
open      390596 non-null float64
high      390596 non-null float64
low       390596 non-null float64
close     390596 non-null float64
volume    390596 non-null float64
dtypes: float64(5)
memory usage: 17.9 MB


In [49]:
n = 0
logging.info("Start!")
while True:
    if bars.continue_backtest:
        bars.update_bars()
        n += 1
    else:
        break
logging.info("Finish!")
logging.info("Events generated: %s" % n)

[Tue, 11 Feb 2020 14:41:03 INFO     line 2]           <module>: Start!
[Tue, 11 Feb 2020 14:41:03 INFO     line 9]           <module>: Finish!
[Tue, 11 Feb 2020 14:41:03 INFO     line 10]           <module>: Events generated: 0


### Memory usage enhancement
__NOT USED SO FAR__ due to problems with conversions

In [58]:
df.info(memory_usage='deep')

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 390596 entries, 2009-07-16 18:10:00+04:00 to 2019-08-30 23:50:00+03:00
Data columns (total 5 columns):
open      390596 non-null float64
high      390596 non-null float64
low       390596 non-null float64
close     390596 non-null float64
volume    390596 non-null float64
dtypes: float64(5)
memory usage: 27.9 MB


In [48]:
bars._events.qsize()

783244

In [50]:
def mem_usage(pandas_obj):
    if isinstance(pandas_obj, tpFrame):
        usage_b = pandas_obj.memory_usage(deep=True).sum()
    else: # tpSeries
        usage_b = pandas_obj.memory_usage(deep=True)
    usage_mb = usage_b / 1024 ** 2 # convert to MBs
    return "{:03.2f} MB".format(usage_mb)

In [51]:
converted_float = df.apply(pd.to_numeric, downcast='float')

print(mem_usage(df))
print(mem_usage(converted_float))

compare_floats = pd.concat([df.dtypes,converted_float.dtypes], axis=1)
compare_floats.columns = ['before','after']
compare_floats.apply(pd.Series.value_counts)

27.88 MB
20.43 MB


Unnamed: 0,before,after
float32,,5.0
float64,5.0,


In [54]:
converted_float.info(memory_usage='deep')

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 390596 entries, 2009-07-16 18:10:00+04:00 to 2019-08-30 23:50:00+03:00
Data columns (total 5 columns):
open      390596 non-null float32
high      390596 non-null float32
low       390596 non-null float32
close     390596 non-null float32
volume    390596 non-null float32
dtypes: float32(5)
memory usage: 20.4 MB


__HERE__ is the problem (compare to the original data)

In [55]:
converted_float.head()

Unnamed: 0_level_0,open,high,low,close,volume
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2009-07-16 18:10:00+04:00,63.73,63.73,63.73,63.73,100.0
2009-07-16 18:45:00+04:00,64.089996,64.139999,64.089996,64.139999,4.0
2009-07-16 18:50:00+04:00,64.07,64.110001,64.07,64.110001,3.0
2009-07-16 18:55:00+04:00,64.190002,64.190002,64.190002,64.190002,2.0
2009-07-16 19:00:00+04:00,64.25,64.25,64.25,64.25,1.0
