<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"></ul></div>

In [4]:
from numba import njit, int64, float64, types
from numba.types import Array, string, boolean
from numba.experimental import jitclass

np.random.seed(42)
x = np.random.randn(1000000)
y = np.random.randn(1000000)
z = np.random.randn(1000000)

# Example of "human-readable" signals
entry_sig_ = ((x,y,'crossup',False),)
exit_sig_ = ((x,z,'crossup',False), 'or_',(x,y,'crossdown',False))

# Turn signals into homogeneous tuple
#entry_sig_
entry_sig = (((x,y,'crossup',False),'NOP','1'),)
#exit_sig_
exit_sig = (((x,z,'crossup',False),'or_','2'),((x,y,'crossdown',False),'NOP','3'))

@njit
def cross(x, y, i):
    '''
    x,y: np.array
    i: int - point in time
    Returns: 1 or 0 when condition is met
    '''
    if (x[i - 1] - y[i - 1])*(x[i] - y[i]) < 0:
        out = 1
    else:
        out = 0
    return out


kv_ty = (types.string,types.int64)

spec = [
    ('memory', types.DictType(*kv_ty)),
]

@njit
def single_signal(x, y, how, acc, i):
    '''
    i: int - point in time
    Returns either signal or accumulator
    '''
    if cross(x, y, i):
        if x[i] < y[i] and how == 'crossdown':
            out = 1
        elif x[i] > y[i] and how == "crossup":
            out = 1
        else:
            out = 0
    else:
        out = 0
    return out

In [6]:
exit_sig[0][2]

'2'

In [None]:
@jitclass(spec)
class MultiSig:
    def __init__(self,entry,exit):
        '''
        initialize memory at single signal level
        '''
        memory_dict = {}
        for i in entry:
            memory_dict[str(i[2])] = 0
        
        for i in exit:
            memory_dict[str(i[2])] = 0
        
        self.memory = memory_dict
        
    def reduce_sig(self, sig, i):
        '''
        Parses multisignal
        sig: homogeneous tuple of tuples ("human-readable" signal definition)
        i: int - point in time
        Returns: resulting value of multisignal
        '''
        L = len(sig)
        out = single_signal(*sig[0][0],i)
        logic = sig[0][1]
        if out:
            self.update_memory(sig[0][2])
        for cnt in range(1, L):
            s = single_signal(*sig[cnt][0],i)
            if s:
                self.update_memory(sig[cnt][2])
            out = out | s if logic == 'or_' else out & s
            logic = sig[cnt][1]
        return out
    
    def update_memory(self, key):
        '''
        update memory
        '''
        self.memory[str(key)] += 1
    
    def reset(self):
        '''
        reset memory
        '''
        dicti = {}
        for i in self.memory:
            dicti[i] = 0
        self.memory = dicti
        
    def query_memory(self, key):
        '''
        return number of hits on signal
        '''
        return self.memory[str(key)]