In [1]:
import numpy as np
import pandas as pd

import sys
print(f"Python version: {sys.version}")
print(f"pandas version: {pd.__version__}")
print(f"numpy version: {np.__version__}")

Python version: 3.9.12 (main, Apr  5 2022, 01:53:17) 
[Clang 12.0.0 ]
pandas version: 1.4.2
numpy version: 1.22.3


In [216]:
data = pd.read_csv("data/EXP1.csv")
data.head()

Unnamed: 0,Date,Open,High,Low,Close,Volume
0,2001-01-01,37.25,38.25,35.75,37.5,1234000
1,2001-01-02,39.5,40.12,39.25,40.0,1567000
2,2001-01-03,37.5,38.5,37.0,37.25,1456000
3,2001-01-04,37.0,37.5,36.5,37.0,1789000
4,2001-01-05,37.0,40.25,37.0,39.0,2345000


In [None]:
data.describe()

In [None]:
data.info()

In [3]:
price_data = data[['High','Low','Close']]

In [85]:
price_data.head()

Unnamed: 0,High,Low,Close
0,38.25,35.75,37.5
1,40.12,39.25,40.0
2,38.5,37.0,37.25
3,37.5,36.5,37.0
4,40.25,37.0,39.0


Two methods for populating charts:

- **High and Low** price: whenever H and L are available
- **Last/Close** price: for illiquid assets (or indices). Also for intraday trading using realtime data

Scale types:

- Constant size boxes
- Variable size boxes

## pnf functions

- init_pnf() initializes the first column
- update_pnf() deals with the rest of the chart

In [4]:
def init_pnf(scale,
             high,
             low,
             close,
             reversal_size,
             box_range=[]):
    '''
    returns status as num value and box_range as array
    '''
    
    if len(box_range) == 0:
        box_range = scale[np.logical_and(scale>=low, scale<=high)]
    else:
        # simplify logic using max()
        if high > box_range.max():
            box_range = scale[np.logical_and(scale>=box_range.min(), scale<=high)]
        if low < box_range.min():
            box_range = scale[np.logical_and(scale<=box_range.max(), scale>=low)]
        
    # check definition of mid_price
    mid_price = box_range.min() + (box_range.max() - box_range.min())/2

    if len(box_range) >= reversal_size and close > mid_price:
        status = 1
    elif len(box_range) >= reversal_size and close < mid_price:
        status = -1
    else:
        status = 0
        
    return status, box_range

In [231]:
def update_pnf(scale,
               high,
               low,
               status,
               reversal_size,
               box_low,
               box_high):
    '''
    updates the chart once the trend status is defined
    returns status and box_range for the day
    '''
    box_range = scale[np.logical_and(scale>=box_low, scale<=box_high)] # needed in case we return the current range
    box_reverse = []
    # new temporary box range with extensions on both sides:
    box_range_new = scale[np.logical_and(scale>=min(low, box_low), scale<=max(high, box_high))]
    box_high_new = box_range_new.max()
    box_low_new = box_range_new.min()
    
    if status == 1:
        # check for upper extensions, else for reversals
        if box_high_new > box_high:
            box_range = scale[np.logical_and(scale>=box_low, scale<=box_high_new)]
            # check for reversal
        elif low < box_high:
            box_reverse = scale[np.logical_and(scale>=low, scale<=box_high)][:-1]            
            # research condition
            # change status and new range
                  
    if status == -1:
        if box_low_new < box_low:
            box_range = scale[np.logical_and(scale>=box_low_new, scale<=box_high)]
        elif high > box_low:
            box_reverse = scale[np.logical_and(scale>=box_low, scale<=high)][1:]

    # Check reversal and reverse status if needed:      
    if len(box_reverse) >= reversal_size:
        status *= -1 # reverse status
        box_range = box_reverse # update box_range
    
    return status, box_range

In [374]:
def generate_column_range(scale, range_low, range_high):
    col_range = scale[np.logical_and(scale>=range_low, scale<=range_high)]
    return col_range  

In [340]:
def chart_grid(scale, columns):
    '''
    Generates a blank monospace-font grid that can be used as a canvas for the PnF chart
    - scale: a np array
    - columns: a list of tuples (int, np.array)
    '''
    hpad = 2
    marker = {0:'*', 1:'X', -1:'O'}
    grid = ""

    for level in np.flip(scale):
        line_price = level
        line = f'{line_price}' + '.'*hpad
        for col in columns:
            line += marker[col[0]] if line_price in col[1] else '.'
        line += '.'*hpad + f'{line_price}\n'
        grid += line
    
    return grid

## Testing our functions

In [182]:
# testing init_pnf() on the 1st line of data:

box_size = 1
reversal_size = 3
scale = np.arange(start=32, stop=46, step = box_size)

high = 38.25
low = 35.75
close = 37.5

status, box_range = init_pnf(scale, high, low, close, reversal_size)

In [184]:
status

1

In [183]:
box_range

array([36, 37, 38])

In [126]:
status

1

In [180]:
# testing init_pnf() on subsequent lines of data (when status==0)
# define: box_range array, status


high = 39.25
low = 35.01
close = 38

init_pnf(scale, high, low, close, reversal_size, box_range)

(0, array([36, 37, 38, 39, 40]))

In [87]:
# testing reversal logic

box_range = np.array([36, 37, 38])
status = 1
scale = scale
reversal_size = 3

high = 40.5
low = 35.5
box_low = box_range.min()
box_high = box_range.max()

box_reverse = []
# this can be simplified into one line using max()
# new range with extensions on both sides:
# if high > box_high:
#     box_range_new = scale[np.logical_and(scale>=box_low, scale<=high)]
# if low < box_low:
#     box_range_new = scale[np.logical_and(scale<=box_high, scale>=low)]
box_range_new = scale[np.logical_and(scale>=min(low, box_low), scale<=max(high, box_high))]

# we actually need only box_high_new and box_low_new
# box_high_new = box_range_new.max()
# box_low_new = box_range_new.min()

if status == 1:
    # we 1st check for extensions on the upside
    if box_range_new.max() > box_high:
        # extend range up:
        box_range = scale[np.logical_and(scale>=box_low, scale<=box_range_new.max())]
    # if no extension, check for reversal:
    elif low < box_high:
        box_reverse = scale[np.logical_and(scale>=low, scale<=box_high)][:-1]

if status == -1:
    # we 1st check for extensions on the downside
    if box_range_new.min() < box_low:
        # extend range down:
        box_range = scale[np.logical_and(scale>=box_range_new.min(), scale<=box_high)]
    # if no extension, check for reversal:
    elif high > box_low:
        box_reverse = scale[np.logical_and(scale>=box_low, scale<=high)][1:]

if len(box_reverse) >= reversal_size:
    status *= -1 # reverse status
    box_range = box_reverse
    
print(box_range, box_range_new, box_reverse, status)

[36 37 38 39 40] [36 37 38 39 40] [] 1


#### tests for update_pnf() go here

In [161]:
# test 1
trend_status = np.array([1, 0, 0])
box_range = np.array([36, 37, 38])

reversal_size = 3

high = 40.12
low = 39.25

start = 1
status = trend_status[0]
box_h = box_range.max()
box_l = box_range.min()

# print(scale, high, low, status, reversal_size, box_l, box_h)
update_pnf(scale, high, low, status, reversal_size, box_l, box_h)

[36 37 38] [36 37 38 39 40]


(1, array([36, 37, 38, 39, 40]))

In [221]:
# test 2
trend_status = np.array([1, 1, 0])
box_range = np.array([36, 37, 38, 39, 40])

reversal_size = 3

high = 38.5
low = 37

status = trend_status[1]
box_h = box_range.max()
box_l = box_range.min()

# print(scale, high, low, status, reversal_size, box_l, box_h)
update_pnf(scale, high, low, status, reversal_size, box_l, box_h)

[36 37 38 39 40] [36 37 38 39 40]


(-1, array([37, 38, 39]))

In [164]:
# test 3
trend_status = np.array([1, 1, -1])
box_range = np.array([37, 38, 39])

reversal_size = 3

high = 40.25
low = 37

start = 1
status = trend_status[2]
box_h = box_range.max()
box_l = box_range.min()

# print(scale, high, low, status, reversal_size, box_l, box_h)
update_pnf(scale, high, low, status, reversal_size, box_l, box_h)

[37 38 39] [37 38 39 40]


(1, array([38, 39, 40]))

## Applying the functions

In [207]:
price_data

Unnamed: 0,High,Low,Close
0,38.25,35.75,37.5
1,40.12,39.25,40.0
2,38.5,37.0,37.25
3,37.5,36.5,37.0
4,40.25,37.0,39.0


In [242]:
# initialise status and box arrays:
trend_status = np.zeros(len(price_data))
box_low = np.zeros(len(price_data))
box_high = np.zeros(len(price_data))

scale = np.arange(32,43)
box_size = 1
reversal_size = 3

print(f'Trend status: {trend_status}\nBox Low: {box_low}\nBox High: {box_high}\nScale: {scale}')

Trend status: [0. 0. 0. 0. 0.]
Box Low: [0. 0. 0. 0. 0.]
Box High: [0. 0. 0. 0. 0.]
Scale: [32 33 34 35 36 37 38 39 40 41 42]


In [243]:
# Initialise the chart until a status (+/-)1 is reached
box_range = []
for index, row in enumerate(price_data.iterrows()):
    high = row[1]['High']
    low = row[1]['Low']
    close = row[1]['Close']
    status, box_range = init_pnf(scale, high, low, close, reversal_size, box_range)
    trend_status[index] = status
    box_low[index] = box_range.min()
    box_high[index] = box_range.max()
    if status != 0:
        break

# status can still be 0! create an example for testing
print(f'Index: {index}\nTrend status: {trend_status}\nBox Low: {box_low}\nBox High: {box_high}\nRange: {scale[np.logical_and(scale>=box_low[index], scale<=box_high[index])]}')

Index: 0
Trend status: [1. 0. 0. 0. 0.]
Box Low: [36.  0.  0.  0.  0.]
Box High: [38.  0.  0.  0.  0.]
Range: [36 37 38]


In [227]:
# Check if there are more lines of data to process
print(index + 1 < len(price_data))

True


In [190]:
# Next, we need to process the prices after index:
start = index + 1
price_data.loc[start:]

Unnamed: 0,High,Low,Close
1,40.12,39.25,40.0
2,38.5,37.0,37.25
3,37.5,36.5,37.0
4,40.25,37.0,39.0


In [191]:
for index, row in enumerate(price_data.loc[start:].iterrows()):
    high = row[1]['High']
    low = row[1]['Low']
    print(high, low)

40.12 39.25
38.5 37.0
37.5 36.5
40.25 37.0


In [244]:
# Process the remaining lines in price_data:
for index, row in enumerate(price_data.loc[start:].iterrows()):
    high = row[1]['High']
    low = row[1]['Low']
    status = trend_status[index+start - 1]
    box_l = box_low[index+start - 1]
    box_h = box_high[index+start - 1]
    status, box_range = update_pnf(scale, high, low, status, reversal_size, box_l, box_h)
    trend_status[index+start] = status
    box_low[index+start] = box_range.min()
    box_high[index+start] = box_range.max()
    print(f'Day: {index+start+1}, Trend status: {status}, High and Low: ', high, low, box_low, box_high)

# status can still be 0! create an example for testing
# print(index, trend_status, box_low, box_high)

Day: 2, Trend status: 1.0 40.12 39.25 [36. 36.  0.  0.  0.] [38. 40.  0.  0.  0.]
Day: 3, Trend status: -1.0 38.5 37.0 [36. 36. 37.  0.  0.] [38. 40. 39.  0.  0.]
Day: 4, Trend status: -1.0 37.5 36.5 [36. 36. 37. 37.  0.] [38. 40. 39. 39.  0.]
Day: 5, Trend status: 1.0 40.25 37.0 [36. 36. 37. 37. 38.] [38. 40. 39. 39. 40.]


In [245]:
print(trend_status, box_low, box_high)

[ 1.  1. -1. -1.  1.] [36. 36. 37. 37. 38.] [38. 40. 39. 39. 40.]


## Printing the PnF chart

In [345]:
pnf_data = pd.DataFrame({'trend_status': trend_status, 'range_low': box_low, 'range_high': box_high})
pnf_data

Unnamed: 0,trend_status,range_low,range_high
0,1.0,36.0,38.0
1,1.0,36.0,40.0
2,-1.0,37.0,39.0
3,-1.0,37.0,39.0
4,1.0,38.0,40.0


In [346]:
(np.diff(np.sign(trend_status)) != 0)

array([False,  True, False,  True])

In [255]:
# We plot a column each time the bool array is True (note: it's shifted up), then for the last row
# TODO generate columns using np.diff

In [371]:
pnf_data_A = pnf_data[:-1].copy()
pnf_data_B = pnf_data[-1:].copy()
pnf_data_A

Unnamed: 0,trend_status,range_low,range_high
0,1.0,36.0,38.0
1,1.0,36.0,40.0
2,-1.0,37.0,39.0
3,-1.0,37.0,39.0


In [372]:
pnf_data_B

Unnamed: 0,trend_status,range_low,range_high
4,1.0,38.0,40.0


In [373]:
pnf_data_A['change'] = (np.diff(np.sign(trend_status)) != 0)
pnf_data_A

Unnamed: 0,trend_status,range_low,range_high,change
0,1.0,36.0,38.0,False
1,1.0,36.0,40.0,True
2,-1.0,37.0,39.0,False
3,-1.0,37.0,39.0,True


In [398]:
ranges = []
trends = []
scale = scale

for row in pnf_data_A.iterrows():
    row = row[1]
    if row['change']:
        col_range = generate_column_range(scale, row['range_low'], row['range_high'])
        ranges.append(col_range)
        trends.append(row['trend_status'])

col_range = generate_column_range(scale, pnf_data_B['range_low'].array[0], pnf_data_B['range_high'].array[0])
ranges.append(col_range)
trends.append(pnf_data_B['trend_status'].array[0])

columns = list(zip(trends, ranges))

print(columns)

[(1.0, array([36, 37, 38, 39, 40])), (-1.0, array([37, 38, 39])), (1.0, array([38, 39, 40]))]


In [397]:
print(chart_grid(scale, columns))

42.......42
41.......41
40..X.X..40
39..XOX..39
38..XOX..38
37..XO...37
36..X....36
35.......35
34.......34
33.......33
32.......32



In [399]:
# test code
# We pass to the plot function a list of range arrays, with a list of trend statuses of the same length

ranges = []
trends = []
scale = scale

r1 = scale[np.logical_and(scale>=36, scale<=40)]
ranges.append(r1)
trends.append(1)

r2 = scale[np.logical_and(scale>=37, scale<=39)]
ranges.append(r2)
trends.append(-1)

r3 = scale[np.logical_and(scale>=38, scale<=40)]
ranges.append(r3)
trends.append(1)

columns =list(zip(trends, ranges))

print(scale, columns)
print(chart_grid(scale, columns))

[32 33 34 35 36 37 38 39 40 41 42] [(1, array([36, 37, 38, 39, 40])), (-1, array([37, 38, 39])), (1, array([38, 39, 40]))]
42.......42
41.......41
40..X.X..40
39..XOX..39
38..XOX..38
37..XO...37
36..X....36
35.......35
34.......34
33.......33
32.......32

