In [None]:
import pandas
import ib_insync as ibi

from datetime import datetime

In [None]:
!python --version

In [None]:
from eventkit import Event

class EventContext:
    """
    Class for event context.
    TBD: Make it a singleton
    """
    def __init__(self, broker_handle: ibi.IB, data_handle: ibi.IB)-> None:
        """
        Initialize event context.
        
        Parameters
        ----------
        broker_handle: ibi.IB
            Broker API handle.
        data_handle: ibi.IB
            Data source handle.
        """
        self._broker_handle = broker_handle
        self._data_handle = data_handle
    def get_broker(self)-> ibi.IB:
        return self._broker_handle
    def get_data(self)-> ibi.IB:
        return self._data_handle
    
class BrokerEventRelay:
    """
    Class for broker event relay.
    TBD: Allow for different brokers (create derived classes)
    TBD: Add different relay member functions (open, high, low, close, volume)
    """
    def __init__(self,
                 listener: DataSlot,
                 data_name: str,
                 field_name: str = 'close',
                 )-> None:
        """
        Initialize broker event relay.
        
        Parameters
        ----------
        listener: DataSlot
            Listener data slot.
        data_name: str
            Name of the input data.
        field_name: str
            Data field name (open, high, low, close, volume, etc.). 
        """
        self._relay_event = Event()
        self._field_name = field_name
        self._relay_event += listener.on_event
        self._data_name = data_name
        
    def ib_live_bar(self,
                    bars: ibi.RealTimeBarList,
                    has_new_bar: bool)-> None:
        """
        Translate IB real time bar event into price update.
        
        Parameters
        ----------
        bars: ibi.RealTimeBarList
            IB RealTimeBarList.
        has_new_bar: bool
            Whether there is new bar.
        """
        if has_new_bar:
            if self._field_name == 'close':
                field = bars[-1].close
            else:
                raise TypeError('BrokerEventRelay.ib_live_bar: Unsupported data field type.')
            relay_data = TimeDouble(self._data_name, bars[-1].time, field)
            self._relay_event.emit(relay_data)

In [None]:
from typing import List
from abc import ABC, abstractmethod

class TimeDouble:
    """
    Basic timestamped data class.
    """
    def __init__(self,
                data_name: str,
                timestamp: datetime,
                value: float)-> None:
        """
        Initialize time double.
        
        Parameters
        ----------
        data_name: str
            Name of the data represented.
        timestamp: datetime
            Time of the realization of the data point.
        value: float
            Numerical value of the data.
        """
        self._data_name = data_name
        self._timestamp = timestamp
        self._value = value
    def get_name(self)-> str:
        return self._data_name
    def get_time(self)-> datetime:
        return self._timestamp
    def get_value(self)-> float:
        return self._value

In [None]:
from collections import deque
from datetime import timedelta

class SignalBase(ABC):
    """
    Base class for signal.
    """
    def __init__(self,
                input_data_array: List[str],
                warmup_length: int,
                signal_name: str)-> None:
        """
        Initialize signal base.
        
        Parameters
        ----------
        input_data_array: List[str]
            List of event subscription that is required for signal calculation.
        warmup_length: int
            Number of data points to 'burn'
        signal_name: str
            Name of the signal.
        """        
        self._input_data_array = input_data_array
        self._signal_name = signal_name
        self._warmup_length = warmup_length
        self._initialize_data_time_storage(input_data_array)
    
    def _initialize_data_time_storage(self, input_data_array: List[str])-> None:
        """
        Initialize storage.
        
        Parameters
        ----------
        input_data_array: List[str]
            List of event subscription that is required for signal calculation.
        """
        data_storage = {}
        time_storage = {}
        for input_name_ in input_data_array:
            data_storage[input_name_] = deque([])
            time_storage[input_name_] = deque([])
        self._data_storage = data_storage
        self._time_storage = time_storage
    
    def check_all_received(self, data_name: str)-> bool:
        output = True
        num_data = len(self._input_data_array)
        if num_data == 1:
            return output # If there is only one incoming data stream, no need to check
        for i in range(num_data):
            event_name_i = self.__input_data_array[i]
            if len(self.get_time_by_name(event_name_i)) == 0:
                return False
            
            time_diff = self.get_time_by_name(data_name)[-1] - self.get_time_by_name(event_name_i)[-1]
            if time_diff > timedelta(microseconds = 1):
                output = False
        return output
    
    def update_data(self, new_data: TimeDouble)-> None:
        """
        Update data storage
        
        Parameters
        ----------
        new_data: TimeDouble
            Incoming new data.
        """
        # Extend storage
        self._data_storage[new_data.get_name()].append(new_data.get_value())
        self._time_storage[new_data.get_name()].append(new_data.get_time())
        # Remove oldest if warming up is complete
        if len(self._data_storage[new_data.get_name()]) > self._warmup_length:
            self._data_storage[new_data.get_name()].popleft()
            self._time_storage[new_data.get_name()].popleft()
    
    def get_data_by_name(self, data_name: str)-> deque:
        return self._data_storage[data_name]
    
    def get_time_by_name(self, data_name: str)-> deque:
        return self._time_storage[data_name]
    
    def get_warmup_length(self)-> int:
        return self._warmup_length
    
    def get_signal_name(self)-> str:
        return self._signal_name
    
    @abstractmethod
    def calculate_signal(self)-> float:
        """
        Virtual method to be implemented by the derived class.
        """
        pass
    
class MASignal(SignalBase):
    """
    Moving average signal.
    """
    def __init__(self,
                input_data_array: List[Event],
                warmup_length: int,
                signal_name: str)-> None:
        super().__init__(input_data_array, warmup_length, signal_name)
    def calculate_signal(self)-> float:
        """
        Compute the moving average.
        """
        prices = self.get_data_by_name(self._input_data_array[0])
        ### DEBUG
        print(sum(prices) / len(prices))
        return sum(prices) / len(prices)

In [None]:
class DataSlot:
    """
    Class for data slot.
    """
    def __init__(self,
                data_name: str,
                parent_signal: SignalBase)-> None:
        """
        Initialize data slot.

        Parameters
        ----------
        data_name: str
            Name of the input data.
        parent_signal: SignalBase
            Signal that listens to this data.
        """
        self._data_name = data_name
        self._parent_signal = parent_signal

    def on_event(self, new_data: TimeDouble)-> None:
        """
        Perform the action upon getting an event.

        When there is an event (arrival of data) we want to
        - Update data storage
        - Calculate signal if warming up is complete
        
        Parameters
        ----------
        new_data: TimeDouble
            Incoming new data.
        """
        # 1. Update data storage
        self._parent_signal.update_data(new_data)
        
        # 2. If warming up is complete and all data arrived, calculate and publish signal
        if len(self._parent_signal.get_data_by_name(new_data.get_name())) == self._parent_signal.get_warmup_length() and self._parent_signal.check_all_received(new_data.get_name()):
            signal = self._parent_signal.calculate_signal()
            latest_timestamp = self._parent_signal.get_time_by_name(new_data.get_name())[-1]
            signal_event = Event(name = self._parent_signal.get_signal_name())
            signal_event.emit(TimeDouble(self._parent_signal.get_signal_name(), latest_timestamp, signal))

In [None]:
ibi.util.startLoop()
                
# Connection
ib = ibi.IB()
ib.connect('127.0.0.1', 4002, clientId=13)

In [None]:
my_ec = EventContext(ib, ib)

# Subscription
my_contract = ibi.Forex('EURUSD')   
live_bars = ib.reqRealTimeBars(my_contract, 5, 'MIDPOINT', False)

# Create signal
input_data_name = 'EURUSD_close'
my_ma_signal = MASignal([input_data_name], 3, "EURUSD_MA")
my_eurusd_data = DataSlot(input_data_name, my_ma_signal)
my_ber = BrokerEventRelay(my_eurusd_data, input_data_name)
live_bars.updateEvent += my_ber.ib_live_bar

ib.sleep(40)
ib.cancelRealTimeBars(bars)

In [None]:
ib.disconnect()