# Strategy - Key Levels Detection (Close price)

In [1]:
import warnings
warnings.filterwarnings('ignore')

import time
from datetime import date, datetime
from datetime import timedelta
from dateutil.relativedelta import relativedelta

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import pandas_ta as ta
from ps_candlestick import get_type_candlestick
from ps_candlestick import has_reversal_pattern

## Load data from csv file

In [2]:
import os
from pathlib import Path
notebook_path = os.getcwd()
algo_dir = Path(notebook_path).parent.parent.parent
csv_file = str(algo_dir) + '/vn-stock-data/VN30ps/VN30F1M_5minutes.csv'
is_file = os.path.isfile(csv_file)
if is_file:
    dataset = pd.read_csv(csv_file, index_col='Date', parse_dates=True)
else:
    dataset = pd.read_csv("https://raw.githubusercontent.com/zuongthaotn/vn-stock-data/main/VN30ps/VN30F1M_5minutes.csv", index_col='Date', parse_dates=True)

In [3]:
htd = dataset.copy()
htd['max_5bars_prev'] = htd['Close'].rolling(5).max()
htd['max_5bars_next'] = htd['Close'].shift(-4).rolling(5).max()
htd['min_5bars_prev'] = htd['Close'].rolling(5).min()
htd['min_5bars_next'] = htd['Close'].shift(-4).rolling(5).min()
htd['is_keylevel'] = htd.apply(lambda r: True if ((r['Close'] == r['max_5bars_prev'] and r['Close'] == r['max_5bars_next']) or (r['Close'] == r['min_5bars_prev'] and r['Close'] == r['min_5bars_next'])) else False, axis=1)

In [4]:
has_keylevel = htd[htd['is_keylevel'] == True]
has_keylevel['count'] = '1'
has_keylevel['keylevels'] = has_keylevel['Close']

In [5]:
def cal_keylevels(tick):
    if tick.empty:
        return ''
    keylevels = []
    tick = tick.sort_values(ascending=False)
    i = 0
    while i < len(tick):
        if i == 0 or i == len(tick)-1:
            keylevels.append(tick[i])
        else:
            if tick[i-1] > tick[i] + 2 or tick[i] > tick[i+1] + 2:
                keylevels.append(tick[i])
        i = i+1
    return ", ".join(map(str, keylevels))

has_keylevel_day = has_keylevel.resample("D").agg({
        'count': 'count',
        'keylevels': cal_keylevels
        #'keylevels': lambda x: ", ".join(map(str, x))
    })
has_keylevel_day = has_keylevel_day[has_keylevel_day['count'] != 0]

In [6]:
has_keylevel_day['keylevels'] = has_keylevel_day['keylevels'].shift(1)
join_data = has_keylevel_day[['keylevels']]

In [7]:
data = dataset.copy()
data['prev_Close'] = data['Close'].shift(1)
data["ma_line"] = data["Close"].rolling(20).mean()
data['above_ma'] = data.apply(lambda r: 1 if r['Close'] > r['ma_line'] else 0, axis=1)
data['below_ma'] = data.apply(lambda r: 1 if r['Close'] < r['ma_line'] else 0, axis=1)
data['total_above_ma'] = data['above_ma'].rolling(150).sum()
data['total_below_ma'] = data['below_ma'].rolling(150).sum()
data['trend'] = data.apply(lambda r: 'switch' if r['total_above_ma'] == r['total_below_ma'] else ('up' if r['total_above_ma'] > r['total_below_ma'] else 'down'), axis=1)
data = data.assign(time_d=pd.PeriodIndex(data.index, freq='1D').to_timestamp())
data = pd.merge(data, join_data, left_on="time_d", right_index=True, how="left")

In [8]:
def trade_simulation(prepared_data):
    _trades = pd.DataFrame(columns=("EntryTime", "EntryPrice", "ExitTime", "ExitPrice", "Type", "Profit"))
    has_open_deal = False
    type_open_deal = ''
    # Stoploss at 20 pips
    sl_step = 2
    # Takeprofit at 60 pips(R/R = 1/3)
    tp_step = 6
    for i, row in prepared_data.iterrows():
        if 100*row.name.hour + row.name.minute < 1420:
            if has_open_deal:
                if type_open_deal == 'short':
                    # Stoploss
                    if row['High'] > entry_price + sl_step:
                        profit = -sl_step
                        exit_price = entry_price + sl_step
                        exit_time = row.name
                        _trades.loc[len(_trades)] = [entry_time, entry_price, exit_time, exit_price, "Short", profit]
                        has_open_deal = False
                        type_open_deal = ''
                    else:
                        # Takeprofit
                        if row['Low'] < entry_price - tp_step:
                            profit = tp_step
                            exit_price = entry_price - tp_step
                            exit_time = row.name
                            _trades.loc[len(_trades)] = [entry_time, entry_price, exit_time, exit_price, "Short", profit]
                            has_open_deal = False
                            type_open_deal = ''
                elif type_open_deal == 'long':
                    # Stoploss
                    if row['Low'] < entry_price - sl_step:
                        profit = -sl_step
                        exit_price = entry_price - sl_step
                        exit_time = row.name
                        _trades.loc[len(_trades)] = [entry_time, entry_price, exit_time, exit_price, "Long", profit]
                        has_open_deal = False
                        type_open_deal = ''
                    else:
                        # Takeprofit
                        if row['High'] > entry_price + tp_step:
                            profit = tp_step
                            exit_price = entry_price + tp_step
                            exit_time = row.name
                            _trades.loc[len(_trades)] = [entry_time, entry_price, exit_time, exit_price, "Long", profit]
                            has_open_deal = False
                            type_open_deal = ''
                        #
            if not has_open_deal:
                if row['signal'] == 'short':
                    # Open short deal
                    entry_price = row['Close']
                    entry_time = row.name
                    has_open_deal = True
                    type_open_deal = 'short'
                elif row['signal'] == 'long':
                    # Open short deal
                    entry_price = row['Close']
                    entry_time = row.name
                    has_open_deal = True
                    type_open_deal = 'long'
        else:
            if 100*row.name.hour + row.name.minute == 1425:
                if has_open_deal:
                    # close open deal at 2:25PM (dataframe)
                    if type_open_deal == 'short':
                        profit = entry_price - row['Close']
                        if profit < -sl_step:
                            profit = -sl_step
                        exit_price = row['Close']
                        exit_time = row.name
                        has_open_deal = False
                        type_open_deal = ''
                        _trades.loc[len(_trades)] = [entry_time, entry_price, exit_time, exit_price, "Short", profit]
                    elif type_open_deal == 'long':
                        profit = row['Close'] - entry_price
                        if profit < -sl_step:
                            profit = -sl_step
                        exit_price = row['Close']
                        exit_time = row.name
                        has_open_deal = False
                        type_open_deal = ''
                        _trades.loc[len(_trades)] = [entry_time, entry_price, exit_time, exit_price, "Long", profit]
    return _trades

## Strategy 06

In [9]:
data7 = data.copy()
def cal_signal_06(row):
    signal = ''
    if row['Open'] > row['Close'] and row['cross_shift'] == 'cross_down':
        # Cross key_level and black candlestick
        signal = 'short'
    elif row['Open'] < row['Close']  and row['cross_shift'] == 'cross_down':
        # Cross key_level and white candlestick
        signal = 'long'
    return signal
def cal_cross(row):
    cross = ''
    if row['keylevels'] != None:
        keylevels = str(row['keylevels']).split(", ")
        keylevels = [float(i) for i in keylevels]
        for keylevel in keylevels:
            if row['Open'] > keylevel > row['Close']:
                cross = 'cross_down'
            elif row['Open'] < keylevel < row['Close']:
                cross = 'cross_up'
    return cross
data7['cross'] = data7.apply(lambda r: cal_cross(r), axis=1)
data7['cross_shift'] = data7['cross'].shift(1)
data7['signal'] = data7.apply(lambda r: cal_signal_06(r), axis=1)
trade06_result = trade_simulation(data7)

In [10]:
trade06_result.Profit.sum()

467.00000000000045

In [11]:
trade06_result

Unnamed: 0,EntryTime,EntryPrice,ExitTime,ExitPrice,Type,Profit
0,2018-08-15 10:45:00,959.7,2018-08-15 14:10:00,953.7,Short,6.0
1,2018-08-15 14:10:00,954.1,2018-08-15 14:25:00,951.2,Short,2.9
2,2018-08-20 09:35:00,944.5,2018-08-20 10:10:00,946.5,Short,-2.0
3,2018-08-20 10:40:00,946.0,2018-08-20 11:00:00,948.0,Short,-2.0
4,2018-08-20 14:00:00,946.4,2018-08-20 14:25:00,944.7,Short,1.7
...,...,...,...,...,...,...
2260,2024-07-08 13:05:00,1308.1,2024-07-08 14:25:00,1312.0,Long,3.9
2261,2024-07-09 09:05:00,1314.8,2024-07-09 09:30:00,1308.8,Short,6.0
2262,2024-07-09 09:30:00,1308.2,2024-07-09 10:40:00,1310.2,Short,-2.0
2263,2024-07-09 10:50:00,1307.9,2024-07-09 11:00:00,1309.9,Short,-2.0


In [12]:
data7[data7.index > '2024-07-09 00:00:00']

Unnamed: 0_level_0,Open,High,Low,Close,Volume,prev_Close,ma_line,above_ma,below_ma,total_above_ma,total_below_ma,trend,time_d,keylevels,cross,cross_shift,signal
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1
2024-07-09 09:00:00,1316.2,1316.9,1314.5,1315.1,5509,1314.0,1310.21,1,0,79.0,71.0,up,2024-07-09,"1316.4, 1315.8, 1312.8, 1311.4, 1309.2, 1306.0",cross_down,,
2024-07-09 09:05:00,1315.0,1315.3,1314.6,1314.8,1898,1315.1,1310.545,1,0,79.0,71.0,up,2024-07-09,"1316.4, 1315.8, 1312.8, 1311.4, 1309.2, 1306.0",,cross_down,short
2024-07-09 09:10:00,1314.8,1315.0,1314.5,1314.6,1279,1314.8,1310.85,1,0,79.0,71.0,up,2024-07-09,"1316.4, 1315.8, 1312.8, 1311.4, 1309.2, 1306.0",,,
2024-07-09 09:15:00,1314.7,1314.7,1313.7,1314.2,2914,1314.6,1311.2,1,0,79.0,71.0,up,2024-07-09,"1316.4, 1315.8, 1312.8, 1311.4, 1309.2, 1306.0",,,
2024-07-09 09:20:00,1314.3,1314.3,1313.2,1313.8,2476,1314.2,1311.475,1,0,79.0,71.0,up,2024-07-09,"1316.4, 1315.8, 1312.8, 1311.4, 1309.2, 1306.0",,,
2024-07-09 09:25:00,1313.8,1313.8,1311.5,1311.5,4818,1313.8,1311.605,0,1,78.0,72.0,up,2024-07-09,"1316.4, 1315.8, 1312.8, 1311.4, 1309.2, 1306.0",cross_down,,
2024-07-09 09:30:00,1311.3,1311.3,1307.4,1308.2,11630,1311.5,1311.545,0,1,77.0,73.0,up,2024-07-09,"1316.4, 1315.8, 1312.8, 1311.4, 1309.2, 1306.0",cross_down,cross_down,short
2024-07-09 09:35:00,1308.0,1308.3,1306.6,1307.7,6973,1308.2,1311.49,0,1,76.0,74.0,up,2024-07-09,"1316.4, 1315.8, 1312.8, 1311.4, 1309.2, 1306.0",,cross_down,short
2024-07-09 09:40:00,1307.9,1307.9,1305.9,1306.5,7154,1307.7,1311.3,0,1,75.0,75.0,switch,2024-07-09,"1316.4, 1315.8, 1312.8, 1311.4, 1309.2, 1306.0",,,
2024-07-09 09:45:00,1306.6,1306.8,1304.7,1304.8,6854,1306.5,1311.025,0,1,74.0,76.0,down,2024-07-09,"1316.4, 1315.8, 1312.8, 1311.4, 1309.2, 1306.0",cross_down,,


## Strategy 05

In [13]:
data6 = data.copy()
def cal_signal_05(row):
    signal = ''
    if row['keylevels'] != None:
        keylevels = str(row['keylevels']).split(", ")
        keylevels = [float(i) for i in keylevels]
        max_keylevel = keylevels[0]
        min_keylevel = keylevels[-1]
        if row['High'] > max_keylevel > row['Close']:
            # Short right when cross the highest keylevel
            signal = 'short'
        elif row['Low'] < min_keylevel < row['Close']:
            # Long right when cross the Lowest keylevel
            signal = 'long'
        elif row['trend'] == 'down' and row['Open'] > row['Close']:
            for keylevel in keylevels:
                if row['prev_Close'] > keylevel > row['Close']:
                    # Cross key_level and black candlestick
                    signal = 'short'
        elif row['trend'] == 'up' and row['Open'] < row['Close']:
            for keylevel in keylevels:
                if row['prev_Close'] < keylevel < row['Close']:
                    # Cross key_level and white candlestick
                    signal = 'long'
    return signal
data6['signal'] = data6.apply(lambda r: cal_signal_05(r), axis=1)
trade05_result = trade_simulation(data6)

In [14]:
trade05_result.Profit.sum()

1577.5000000000011

In [15]:
trading_fee = len(trade05_result) * 0.3
trading_fee

876.3

In [16]:
# win rate
len(trade05_result[trade05_result.Profit > 0]) / (len(trade05_result[trade05_result.Profit < 0]) + len(trade05_result[trade05_result.Profit > 0]))

0.35015447991761073

In [14]:
trade05_result[trade05_result.EntryTime > '2024-07-01 00:00:00']

Unnamed: 0,EntryTime,EntryPrice,ExitTime,ExitPrice,Type,Profit
2901,2024-07-01 09:25:00,1275.5,2024-07-01 09:40:00,1273.5,Long,-2.0
2902,2024-07-01 09:40:00,1274.5,2024-07-01 10:00:00,1272.5,Long,-2.0
2903,2024-07-01 10:15:00,1274.4,2024-07-01 14:00:00,1280.4,Long,6.0
2904,2024-07-03 09:10:00,1297.8,2024-07-03 13:20:00,1299.8,Short,-2.0
2905,2024-07-04 09:10:00,1307.9,2024-07-04 09:40:00,1309.9,Short,-2.0
2906,2024-07-04 10:30:00,1308.2,2024-07-04 13:10:00,1310.2,Short,-2.0
2907,2024-07-04 13:20:00,1307.8,2024-07-04 14:25:00,1308.0,Short,-0.2
2908,2024-07-05 09:15:00,1310.6,2024-07-05 09:30:00,1308.6,Long,-2.0
2909,2024-07-05 10:00:00,1311.1,2024-07-05 11:00:00,1309.1,Long,-2.0
2910,2024-07-05 13:20:00,1310.1,2024-07-05 13:40:00,1308.1,Long,-2.0


## Strategy 04

In [109]:
data5 = data.copy()
def cal_signal_04(row):
    signal = ''
    if row['keylevels'] != None:
        keylevels = str(row['keylevels']).split(", ")
        keylevels = [float(i) for i in keylevels]
        max_keylevel = keylevels[0]
        min_keylevel = keylevels[-1]
        if row['High'] > max_keylevel > row['Close'] and 100*row.name.hour + row.name.minute < 1130:
            # Short right when cross the highest keylevel
            signal = 'short'
        elif row['Low'] < min_keylevel < row['Close'] and 100*row.name.hour + row.name.minute < 1130:
            # Long right when cross the Lowest keylevel
            signal = 'long'
        elif row['trend'] == 'down' and row['Open'] > row['Close']:
            for keylevel in keylevels:
                if row['prev_Close'] > keylevel > row['Close']  and row['Close'] > keylevel - 1.5:
                    # Cross key_level and black candlestick
                    signal = 'short'
        elif row['trend'] == 'up' and row['Open'] < row['Close']:
            for keylevel in keylevels:
                if row['prev_Close'] < keylevel < row['Close'] and row['Close'] < keylevel + 1.5:
                    # Cross key_level and white candlestick
                    signal = 'long'
    return signal
data5['signal'] = data5.apply(lambda r: cal_signal_04(r), axis=1)
trade04_result = trade_simulation(data5)

In [110]:
trade04_result.Profit.sum()

899.4000000000001

In [108]:
trade04_result

Unnamed: 0,EntryTime,EntryPrice,ExitTime,ExitPrice,Type,Profit
0,2018-08-15 09:50:00,961.1,2018-08-15 13:10:00,955.1,Short,6.0
1,2018-08-15 13:45:00,957.0,2018-08-15 13:50:00,959.0,Short,-2.0
2,2018-08-15 14:05:00,956.2,2018-08-15 14:25:00,951.2,Short,5.0
3,2018-08-20 09:00:00,945.8,2018-08-20 10:15:00,947.8,Short,-2.0
4,2018-08-20 10:35:00,945.8,2018-08-20 11:00:00,947.8,Short,-2.0
...,...,...,...,...,...,...
2418,2024-07-09 09:00:00,1315.1,2024-07-09 09:30:00,1309.1,Short,6.0
2419,2024-07-09 09:40:00,1306.5,2024-07-09 11:10:00,1312.5,Long,6.0
2420,2024-07-09 11:10:00,1312.6,2024-07-09 13:05:00,1318.6,Long,6.0
2421,2024-07-09 13:35:00,1316.0,2024-07-09 13:50:00,1318.0,Short,-2.0


## Strategy 03

In [99]:
data4 = data.copy()
def cal_signal_03(row):
    signal = ''
    if row['trend'] == 'down' and row['keylevels'] != None:
        keylevels = str(row['keylevels']).split(", ")
        keylevels = [float(i) for i in keylevels]
        max_keylevel = keylevels[0]
        if row['Open'] > row['Close']:
            if row['High'] > max_keylevel > row['Close']:
                # Short right when cross the highest keylevel
                signal = 'short'
            else:
                for keylevel in keylevels:
                    if row['prev_Close'] > keylevel > row['Close']  and row['Close'] > keylevel - 1.5:
                        # Cross key_level and black candlestick
                        signal = 'short'
    elif row['trend'] == 'up' and row['keylevels'] != None:
        keylevels = str(row['keylevels']).split(", ")
        keylevels = [float(i) for i in keylevels]
        min_keylevel = keylevels[-1]
        if row['Open'] < row['Close']:
            if row['Low'] < min_keylevel < row['Close']:
                # Long right when cross the Lowest keylevel
                signal = 'long'
            else:
                for keylevel in keylevels:
                    if row['prev_Close'] < keylevel < row['Close'] and row['Close'] < keylevel + 1.5:
                        # Cross key_level and white candlestick
                        signal = 'long'
    return signal
data4['signal'] = data4.apply(lambda r: cal_signal_03(r), axis=1)
trade03_result = trade_simulation(data4)

In [100]:
trade03_result.Profit.sum()

659.5999999999998

In [105]:
# win rate
len(trade02_result[trade02_result.Profit > 0]) / (len(trade02_result[trade02_result.Profit < 0]) + len(trade02_result[trade02_result.Profit > 0]))

0.3330526315789474

## Strategy 02

In [96]:
data3 = data.copy()
def cal_signal_02(row):
    signal = ''
    if row['trend'] == 'down' and row['keylevels'] != None:
        keylevels = str(row['keylevels']).split(", ")
        keylevels = [float(i) for i in keylevels]
        max_keylevel = keylevels[0]
        if row['Open'] > row['Close']:
            if row['High'] > max_keylevel > row['Close']:
                # Short right when cross the highest keylevel
                signal = 'short'
            else:
                for keylevel in keylevels:
                    if row['prev_Close'] > keylevel > row['Close']:
                        # Cross key_level and black candlestick
                        signal = 'short'
    elif row['trend'] == 'up' and row['keylevels'] != None:
        keylevels = str(row['keylevels']).split(", ")
        keylevels = [float(i) for i in keylevels]
        min_keylevel = keylevels[-1]
        if row['Open'] < row['Close']:
            if row['Low'] < min_keylevel < row['Close']:
                # Long right when cross the Lowest keylevel
                signal = 'long'
            else:
                for keylevel in keylevels:
                    if row['prev_Close'] < keylevel < row['Close']:
                        # Cross key_level and white candlestick
                        signal = 'long'
    return signal
data3['signal'] = data3.apply(lambda r: cal_signal_02(r), axis=1)
trade02_result = trade_simulation(data3)

In [97]:
trade02_result.Profit.sum()

1005.0000000000009

In [98]:
# win rate
len(trade02_result[trade02_result.Profit > 0]) / (len(trade02_result[trade02_result.Profit < 0]) + len(trade02_result[trade02_result.Profit > 0]))

0.3330526315789474

In [95]:
trade02_result

Unnamed: 0,EntryTime,EntryPrice,ExitTime,ExitPrice,Type,Profit
0,2018-08-15 09:55:00,960.9,2018-08-15 13:10:00,954.9,Short,6.0
1,2018-08-15 13:45:00,957.0,2018-08-15 13:50:00,959.0,Short,-2.0
2,2018-08-15 14:05:00,956.2,2018-08-15 14:25:00,951.2,Short,5.0
3,2018-08-20 09:00:00,945.8,2018-08-20 10:15:00,947.8,Short,-2.0
4,2018-08-20 10:35:00,945.8,2018-08-20 11:00:00,947.8,Short,-2.0
...,...,...,...,...,...,...
1864,2024-07-05 13:20:00,1310.1,2024-07-05 13:40:00,1308.1,Long,-2.0
1865,2024-07-08 09:50:00,1314.2,2024-07-08 10:00:00,1312.2,Long,-2.0
1866,2024-07-08 11:25:00,1308.3,2024-07-08 14:25:00,1312.0,Long,3.7
1867,2024-07-09 09:55:00,1306.2,2024-07-09 11:10:00,1312.2,Long,6.0


## Strategy 01

In [13]:
data2 = data.copy()

In [14]:
def cal_signal_01(row):
    signal = ''
    if row['trend'] == 'down' and row['keylevels'] != None:
        keylevels = str(row['keylevels']).split(", ")
        keylevels = [float(i) for i in keylevels]
        max_keylevel = keylevels[0]
        if row['Open'] > row['Close'] and row['High'] > max_keylevel > row['Close']:
            signal = 'short'
        current_price = row['Close']
        for keylevel in keylevels:
            if row['Open'] > row['Close'] and row['High'] > float(keylevel) > row['Close']:
                # Cross key_level and black candlestick
                if row['cs_reversal'] == 1:
                    signal = 'short'
    elif row['trend'] == 'up' and row['keylevels'] != None:
        signal = ''
        # keylevels = str(row['keylevels']).split(", ")
        # current_price = row['Close']
        # for keylevel in keylevels:
        #     if row['Open'] < row['Close'] and row['Low'] < float(keylevel) < row['Close']:
        #         signal = 'long'
    return signal
data2['cs'] = data2.apply(lambda r: get_type_candlestick(r), axis=1)
data2['cs_shift_1'] = data2['cs'].shift(1)
data2['cs_shift_2'] = data2['cs'].shift(2)
data2['cs_reversal'] = data2.apply(lambda r: has_reversal_pattern(r), axis=1)
data2['signal'] = data2.apply(lambda r: cal_signal_01(r), axis=1)

In [15]:
trade01_result = trade_simulation(data2)
trade01_result.Profit.sum()

194.50000000000068