In [1]:
import pandas as pd
import numpy as np

np.random.seed(42)
length = 100
hefty_length = 100_000

lows = np.add.accumulate(np.random.rand(length)-0.5)
hefty_lows = np.add.accumulate(np.random.rand(hefty_length)-0.5)

large_bars = pd.DataFrame(data={'high' : lows, \
                           'low': lows+2},
                   index=np.arange(length))

hefty_bars = pd.DataFrame(data={'high' : hefty_lows, \
                           'low': hefty_lows+2},
                   index=np.arange(hefty_length))

In [2]:

from numpy import minimum, maximum, argmin, argmax, add

def max_swing(bars, upswing=True, hard_left=True, hard_right=True):
    
    if len(bars) <= 2 + hard_left + hard_right:
        return None, None
 
    if upswing:
        mins = minimum.accumulate(bars['low']) 
        
        swing_end = (bars['high'] - mins).idxmax()
        if swing_end == 0:
            return None, None, None, None
        swing_start = (mins[:swing_end]).idxmin()
        
        start_value = mins[swing_start]
        end_value = bars.at[swing_end, 'high']
        
    else:
        maxs = maximum.accumulate(bars['high'])
        
        swing_end = (maxs - bars['low']).idxmax()
        if swing_end == 0:
            return None, None, None, None
        swing_start = (maxs[:swing_end]).idxmax()
        
        start_value = maxs[swing_start]
        end_value = bars.at[swing_end, 'low']
    
    #now get rid of duplicate indexes, this should not do any harm
    if hard_left and swing_start == bars.index[0]:
        swing_start = bars.index[1]
        
    if hard_right and swing_end == bars.index[-1]:
        swing_end = bars.index[-2]
    
    return swing_start, swing_end, start_value, end_value #bars.index[swing_start], bars.index[swing_end]

In [3]:
bars = pd.DataFrame({'high' : [ 11, 10,  9,  8, 11,  8,  9, 10,  9],
                       'low': [  6,  5,  8,  7,  6,  7,  8,  5,  4]},
                       index= ['0','1','2','3','4','5','6','7','8'])

In [4]:
print(max_swing(bars, upswing=False, hard_left=True, hard_right=True))
print(max_swing(bars))
print(max_swing(large_bars))
print(max_swing(large_bars, upswing=False, hard_left=True, hard_right=True))
print(max_swing(hefty_bars))
print(max_swing(hefty_bars, upswing=False, hard_left=True, hard_right=True))

('1', '7', 11, 4)
('1', '4', 5, 11)
(49, 55, -0.7038047803869438, -0.6049868234352309)
(3, 85, 0.6559068512657203, -1.2117654367552042)
(15854, 23640, -59.50515559703218, 40.35354685208753)
(23640, 50398, 40.35354685208753, -57.88156463066682)


In [5]:
#Small dataset 9 elements
%timeit max_swing(bars, upswing=False, hard_left=True, hard_right=True)

406 µs ± 28.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


In [6]:
#Large dataset 100 elements
%timeit max_swing(large_bars, upswing=False, hard_left=True, hard_right=True)

430 µs ± 58.3 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


In [7]:
#hefty dataset 100_000 elements
%timeit max_swing(hefty_bars, upswing=False, hard_left=True, hard_right=True)

1.06 ms ± 31.3 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


Now, version using np.ndarrays directly:

In [8]:

from numpy import minimum, maximum, argmin, argmax, add

def max_swing_array(bars, upswing=True, hard_left=True, hard_right=True):
    
    if len(bars) <= 2 + hard_left + hard_right:
        return None, None
 
    if upswing:
        mins = minimum.accumulate(bars[:,1]) # bars['low'] is bars[:,1]
        
        swing_end = (bars[:,0] - mins).argmax() # bars['high'] is bars[:,0]
        if swing_end == 0:
            return None, None, None, None
        swing_start = (mins[:swing_end]).argmin()
        
        start_value = mins[swing_start]
        end_value = bars[swing_end, 0] # bars.at[swing_end, 'high']
        
    else:
        maxs = maximum.accumulate(bars[:,0]) #bars['high']
        
        swing_end = (maxs - bars[:,1]).argmax() #bars['low']
        if swing_end == 0:
            return None, None, None, None
        swing_start = (maxs[:swing_end]).argmax()
        
        start_value = maxs[swing_start]
        end_value = bars[swing_end, 1] #bars.at[swing_end, 'low']
    
    #now get rid of duplicate indexes, this should not do any harm
    if hard_left and swing_start == 0:
        swing_start = 1
        
    if hard_right and swing_end == len(bars)-1:
        swing_end = len(bars)-2
    
    return swing_start, swing_end, start_value, end_value #now instead of indexes, we got offsets!

In [9]:
print(max_swing_array(bars.values, upswing=False, hard_left=True, hard_right=True))
print(max_swing_array(bars.values))
print(max_swing_array(large_bars.values))
print(max_swing_array(large_bars.values, upswing=False, hard_left=True, hard_right=True))
print(max_swing_array(hefty_bars.values))
print(max_swing_array(hefty_bars.values, upswing=False, hard_left=True, hard_right=True))

(1, 7, 11, 4)
(1, 4, 5, 11)
(49, 55, -0.7038047803869438, -0.6049868234352309)
(3, 85, 0.6559068512657203, -1.2117654367552042)
(15854, 23640, -59.50515559703218, 40.35354685208753)
(23640, 50398, 40.35354685208753, -57.88156463066682)


In [10]:
#9 bars
%timeit max_swing_array(bars.values, upswing=False, hard_left=True, hard_right=True)

8.8 µs ± 99 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)


In [11]:
#100 bars
%timeit max_swing_array(large_bars.values, upswing=False, hard_left=True, hard_right=True)

9.12 µs ± 63.1 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)


In [12]:
#100_000 bars
%timeit max_swing_array(hefty_bars.values, upswing=False, hard_left=True, hard_right=True)

361 µs ± 4.31 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


Now, refactor to use numba:

In [13]:
bars.head()

Unnamed: 0,high,low
0,11,6
1,10,5
2,9,8
3,8,7
4,11,6


In [14]:
from numpy import minimum, maximum, argmin, argmax, add
from numba import jit, njit

@njit(fastmath=True)
def max_swing_numba(bars, upswing=True, hard_left=True, hard_right=True):
    
    if len(bars) <= 2 + hard_left + hard_right:
        return 0, 0, 0, 0
 
    current_min = np.inf
    current_max = -np.inf
    current_min_index = 0
    current_max_index = 0
    
    global_min = np.inf
    global_max = -np.inf
    global_max_index = 0
    global_min_index = 0
    
    length = 0
    
    if upswing:
        for i in range(len(bars)):
            if current_max < bars[i,0]: #bars[i,0] stores highs
                current_max = bars[i,0]
                current_max_index = i
            
            if current_min > bars[i,1] or i== len(bars)-1: #stores lows
                if current_max - current_min > length: # assign new max values
                    length = current_max - current_min
                    global_max = current_max
                    global_min = current_min
                    length = global_max - global_min
                    global_max_index = current_max_index
                    global_min_index = current_min_index
                    
                current_min_index = i
                current_min = bars[i,1]
                current_max = bars[i,0]
                
        swing_start = global_min_index
        swing_end = global_max_index    
        start_value = global_min
        end_value = global_max   
        
    else:
        for i in range(len(bars)):
            if current_min > bars[i,1]: #bars[i,1] stores lows
                current_min = bars[i,1]
                current_min_index = i
                
            if current_max < bars[i,0] or i == len(bars)-1: #stores highs
                if current_max - current_min > length: # assign new max values
                    global_max = current_max
                    global_min = current_min
                    length = global_max - global_min
                    global_max_index = current_max_index
                    global_min_index = current_min_index
                    
                current_max_index = i
                current_max = bars[i,0]
                current_min = bars[i,1]
                
        swing_start = global_max_index
        swing_end = global_min_index
        start_value = global_max
        end_value = global_min
    #now get rid of duplicate indexes, this should not do any harm
    
    if hard_left and swing_start == 0:
        swing_start = 1
        
    if hard_right and swing_end == len(bars)-1:
        swing_end = len(bars)-2
    
    return swing_start, swing_end, start_value, end_value #now instead of indexes, we got offsets!



In [15]:
print(max_swing_numba(bars.values, upswing=False, hard_left=True, hard_right=True))
print(max_swing_numba(bars.values))
print(max_swing_numba(large_bars.values))
print(max_swing_numba(large_bars.values, upswing=False, hard_left=True, hard_right=True))
print(max_swing_numba(hefty_bars.values))
print(max_swing_numba(hefty_bars.values, upswing=False, hard_left=True, hard_right=True))

(1, 7, 11.0, 4.0)
(1, 4, 5.0, 11.0)
(49, 55, -0.7038047803869438, -0.6049868234352309)
(3, 85, 0.6559068512657203, -1.2117654367552042)
(15854, 23640, -59.50515559703218, 40.35354685208753)
(23640, 50398, 40.35354685208753, -57.88156463066682)


In [16]:
#need to access np.ndarrays directly, so that the jit compiler
tmp = bars.values
tmp2 = large_bars.values
tmp3 = hefty_bars.values

In [17]:
#9 bars
max_swing_numba(tmp, upswing=False, hard_left=True, hard_right=True)
%timeit max_swing_numba(tmp, upswing=False, hard_left=True, hard_right=True)

599 ns ± 1.55 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)


In [18]:
#100 bars
max_swing_numba(tmp2, upswing=False, hard_left=True, hard_right=True)
%timeit max_swing_numba(tmp2, upswing=False, hard_left=True, hard_right=True)

809 ns ± 2.92 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)


In [19]:
#100_000bars
max_swing_numba(tmp3, upswing=False, hard_left=True, hard_right=True)
%timeit max_swing_numba(tmp3, upswing=False, hard_left=True, hard_right=True)

218 µs ± 403 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)


Looks like numba provides  least overhead...
Now lets try loop with numba njit parallel optimization (this will only work for max_swing_numba, as it is compiled to native code, @jit will actually slown down the loops):

In [20]:
#@jit
def max_swing_loop():
    table = []
    for i in range(100000):
        table.append(max_swing(bars))
    return table

In [21]:
#@jit
def max_swing_array_loop():
    table = []
    for i in range(100000):
        table.append(max_swing_array(bars.values))  
    return table

In [22]:
from numba import prange
@njit

def max_swing_numba_loop():
    table = []
    for i in range(100000):
        table.append(max_swing_numba(tmp))
    return table

In [23]:
from numba import prange
@njit(parallel=True)

def max_swing_numba_loop_parallel():
    start = np.zeros((100000,1))
    end = np.zeros((100000,1))
    value_start = np.zeros((100000,1))
    value_end = np.zeros((100000,1))
    for i in prange(100000):
        start[i], end[i], value_start[i], value_end[i] = max_swing_numba(tmp)
    return start, end, value_start, value_end

In [24]:
%timeit max_swing_loop()

38.9 s ± 546 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [25]:
%timeit max_swing_array_loop()

880 ms ± 2.05 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [26]:
%timeit max_swing_numba_loop()

16.6 ms ± 193 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [27]:
%timeit max_swing_numba_loop_parallel()

932 µs ± 221 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [28]:
start, end, value_start, value_end = max_swing_numba_loop_parallel()

In [29]:
print(start[0], end[0], value_start[0], value_end[0])

[1.] [4.] [5.] [11.]


In [30]:
38.9 * 10e6 / 932

417381.974248927

Narzut zabijał ten algorytm.