In [51]:
import numpy as np
import pandas as pd
import ta
from vnstock3 import Vnstock 
import ta

In [52]:
RSI_PERIOD = 14
RSI_OVERSOLD = 30
RSI_OVERBOUGHT = 70
MACD_FAST_PERIOD = 12
MACD_SLOW_PERIOD = 26
MACD_SIGNAL_PERIOD = 9

In [53]:
def calculate_indicators_macd(df):
    if df.empty:
        return df
    
    df['RSI'] = ta.momentum.RSIIndicator(df['close'], RSI_PERIOD).rsi()
    df['Previous_RSI'] = df['RSI'].shift(1)
    df['Previous_RSI'].fillna(0, inplace=True)
    macd = ta.trend.MACD(df['close'], window_slow=MACD_SLOW_PERIOD, window_fast=MACD_FAST_PERIOD, window_sign=MACD_SIGNAL_PERIOD)
    df['MACD'] = macd.macd()
    df['Signal_Line'] = macd.macd_signal()
    df['Previous_MACD'] = df['MACD'].shift(1)
    df['Previous_Signal_Line'] = df['Signal_Line'].shift(1)
    df['Previous_MACD'].fillna(0, inplace=True)
    df['Previous_Signal_Line'].fillna(0, inplace=True)
    
    return df

In [54]:
def trading_strategy_test_macd(df, day_range=(5, 30), step=5):
    results = {}
    
    for days in range(day_range[0], day_range[1] + 1, step):
        df['Buy_Signal'] = (df['Previous_MACD'] < df['Previous_Signal_Line']) & (df['MACD'] >= df['Signal_Line']) & (df['RSI'] > RSI_OVERSOLD)
        wins = 0
        total_trades = 0
        for i in range(len(df)):
            if df['Buy_Signal'].iloc[i]:
                buy_price = df['close'].iloc[i]
                if i + days < len(df):
                    sell_price = df['close'].iloc[i + days]
                    if sell_price > buy_price:
                        wins += 1
                    total_trades += 1

        win_rate = (wins / total_trades * 100) if total_trades != 0 else 0
        results[days] = win_rate
    
    return results

In [55]:
start_date = '2017-02-01'
end_date = '2019-01-01'

In [56]:
ESG = [
    "BID", "BMP", "BVH", "CTD", "DHG", "DPM", "FPT",
    "HSG", "MBB", "NT2", "REE", "SBT", "VCB", "VIC", "VNM"
]
non_ESG = [
    'MWG', 'CTG', 'SSI', 'HPG', 'NVL', 'GMD', 'KDC', 'STB', 
    'ROS', 'GAS', 'PLX', 'MSN', 'SAB', 'VJC', 'CII'
]

In [57]:
VN30 = ['BID','BMP','BVH','CII','CTD','CTG','DHG','DPM','FPT',
        'GAS','GMD','HPG','HSG','KDC','MBB','MSN','MWG','NT2','NVL',
        'PLX','REE','ROS','SAB','SBT','SSI','STB',
        'VCB','VIC','VJC','VNM']

In [58]:
def company_winrate(symbol, start_date, end_date):
    vn = Vnstock()
    df = vn.stock(symbol=symbol, source='VCI').quote.history(start=start_date, end=end_date)
        
    if df.empty:
        return [0, 0] 
    
    df = calculate_indicators_macd(df)
    winrate = trading_strategy_test_macd(df)
        
    return winrate

In [59]:
winrate_macd = {}
for company in VN30:
    try:
        company_winr = company_winrate(company, start_date=start_date, end_date=end_date)
        winrate_macd[company] = company_winr

    except Exception as e:
        print(f"Error occurred for {company}: {e}")



In [60]:
winrate_macd

{'BID': {5: 73.33333333333333,
  10: 60.0,
  15: 66.66666666666666,
  20: 66.66666666666666,
  25: 53.333333333333336,
  30: 66.66666666666666},
 'BMP': {5: 60.0,
  10: 60.0,
  15: 53.333333333333336,
  20: 46.666666666666664,
  25: 33.33333333333333,
  30: 50.0},
 'BVH': {5: 52.63157894736842,
  10: 42.10526315789473,
  15: 47.368421052631575,
  20: 44.44444444444444,
  25: 38.88888888888889,
  30: 52.94117647058824},
 'CII': {5: 47.05882352941176,
  10: 47.05882352941176,
  15: 47.05882352941176,
  20: 41.17647058823529,
  25: 23.52941176470588,
  30: 29.411764705882355},
 'CTD': {5: 37.5, 10: 50.0, 15: 43.75, 20: 62.5, 25: 56.25, 30: 56.25},
 'CTG': {5: 64.70588235294117,
  10: 47.05882352941176,
  15: 64.70588235294117,
  20: 58.82352941176471,
  25: 58.82352941176471,
  30: 56.25},
 'DHG': {5: 58.82352941176471,
  10: 58.82352941176471,
  15: 58.82352941176471,
  20: 47.05882352941176,
  25: 47.05882352941176,
  30: 29.411764705882355},
 'DPM': {5: 37.5, 10: 43.75, 15: 56.25, 20: 

In [61]:
winrate_macd = pd.DataFrame(winrate_macd)

In [62]:
winrate_macd.to_excel('macd.xlsx')

In [63]:
def calculate_indicators_bb(df):
    if df.empty:
        return df
    df['RSI'] = ta.momentum.RSIIndicator(df['close'], RSI_PERIOD).rsi()
    df['Bollinger_high'] = ta.volatility.bollinger_hband(df['close'], window=15, window_dev=2)
    df['Bollinger_low'] = ta.volatility.bollinger_lband(df['close'], window=15, window_dev=2)
    df['Previous_RSI'] = df['RSI'].shift(1)
    df['Previous_RSI'].fillna(0, inplace=True)
    return df

In [64]:
def trading_strategy_test_bb(df, day_range=(5, 30), step=5):
    results = {}
    
    for days in range(day_range[0], day_range[1] + 1, step):
        df['Buy_Signal'] = (df['close'] <= df['Bollinger_low']) & (df['RSI'] < RSI_OVERSOLD)
        wins = 0
        total_trades = 0
        for i in range(len(df)):
            if df['Buy_Signal'].iloc[i]:
                buy_price = df['close'].iloc[i]
                if i + days < len(df):
                    sell_price = df['close'].iloc[i + days]
                    if sell_price > buy_price:
                        wins += 1
                    total_trades += 1

        win_rate = (wins / total_trades * 100) if total_trades != 0 else 0
        results[days] = win_rate
    
    return results

In [65]:
def company_winrate_bb(symbol, start_date, end_date):
    vn = Vnstock()
    df = vn.stock(symbol=symbol, source='VCI').quote.history(start=start_date, end=end_date)
    if df.empty:
        return [0, 0] 
    
    df = calculate_indicators_bb(df)
    winrate = trading_strategy_test_bb(df)
        
    return winrate

In [66]:
winrate_bb = {}
for company in VN30:
    try:
        company_winr = company_winrate_bb(company, start_date=start_date, end_date=end_date)
        winrate_bb[company] = company_winr

    except Exception as e:
        print(f"Error occurred for {company}: {e}")



In [67]:
winrate_bb = pd.DataFrame(winrate_bb)

In [68]:
winrate_bb.to_excel('bb.xlsx')

In [69]:
OBV_PERIOD = 5

In [70]:
def calculate_indicators_obv(df):
    if df.empty:
        return df
    
    df['RSI'] = ta.momentum.RSIIndicator(df['close'], RSI_PERIOD).rsi()
    df['OBV'] = ta.volume.OnBalanceVolumeIndicator(df['close'], df['volume']).on_balance_volume()
    df['OBV_Slope'] = df['OBV'].diff(periods=OBV_PERIOD)
    df['Previous_RSI'] = df['RSI'].shift(1)
    df['Previous_RSI'].fillna(0, inplace=True)

    return df

In [71]:
def trading_strategy_test_obv(df, day_range=(5, 30), step=5):
    results = {}
    
    for days in range(day_range[0], day_range[1] + 1, step):
        df['Buy_Signal'] = (df['Previous_RSI'] < RSI_OVERSOLD) & (df['RSI'] >= RSI_OVERSOLD) & (df['OBV_Slope'] > 0)
        wins = 0
        total_trades = 0
        for i in range(len(df)):
            if df['Buy_Signal'].iloc[i]:
                buy_price = df['close'].iloc[i]
                if i + days < len(df):
                    sell_price = df['close'].iloc[i + days]
                    if sell_price > buy_price:
                        wins += 1
                    total_trades += 1

        win_rate = (wins / total_trades * 100) if total_trades != 0 else 0
        results[days] = win_rate
    
    return results

In [72]:
def company_winrate_obv(symbol, start_date, end_date):
    vn = Vnstock()
    df = vn.stock(symbol=symbol, source='VCI').quote.history(start=start_date, end=end_date)               
    if df.empty:
        return [0, 0] 
    
    df = calculate_indicators_obv(df)
    winrate = trading_strategy_test_obv(df)
        
    return winrate

In [73]:
winrate_obv = {}
for company in VN30:
    try:
        company_winr = company_winrate_obv(company, start_date=start_date, end_date=end_date)
        winrate_obv[company] = company_winr

    except Exception as e:
        print(f"Error occurred for {company}: {e}")



In [74]:
winrate_obv = pd.DataFrame(winrate_obv)
winrate_obv.to_excel('obv.xlsx')