In [687]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import yfinance as yf
import os.path
from sklearn.model_selection import train_test_split
import plotly.graph_objects as go
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import GradientBoostingClassifier
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

import warnings
warnings.filterwarnings('ignore')
#global constant
Buy = 1
Sell = -1
Neutral = 0

In [688]:

def get_data(ticker,start_date,end_date):
    fname = "../data/"+ticker+"_"+ start_date+ "_" + end_date +".csv"
    if not os.path.isfile(fname):
        df = yf.download(ticker ,start=start_date, end=end_date)
        df.to_csv(fname)
        
    df = pd.read_csv(fname)
    df['Date'] = pd.to_datetime(df['Date'])
    return df
    

In [689]:
# Function to calculate Weighted Moving Average (WMA)
def calculate_wma(prices, window):
    weights = np.arange(1, window + 1)
    wma = prices.rolling(window).apply(lambda prices: np.dot(prices, weights) / weights.sum(), raw=True)
    return wma

# Function to calculate Relative Strength Index (RSI)
def calculate_rsi(prices, window):
    delta = prices.diff()
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)
    avg_gain = gain.rolling(window).mean()
    avg_loss = loss.rolling(window).mean()
    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))
    return rsi

def calculate_fibonacci_levels(prices, window):

    high_price = prices.rolling(window).max()
    low_price = prices.rolling(window).min()
    difference = high_price - low_price
    level1 = high_price - difference * 0.236
    level2 = high_price - difference * 0.382
    level3 = high_price - difference * 0.618
    return level1, level2, level3

def calculate_bollinger_bands(prices, window, num_of_std=2):
    SMA= prices.rolling(window=window).mean()
    STD = prices.rolling(window=window).std()
    UpperBand = SMA + (STD * num_of_std)
    LowerBand = SMA - (STD * num_of_std)
    return UpperBand,LowerBand

def calculate_percentage_change(prices, window):

    pc = []
    
    for i in range(len(prices)):
        if i < window:
            pc.append(pd.NA)
        else:
            start_price = prices[i - window]
            end_price = prices[i]
            pc.append((end_price - start_price) / start_price * 100)
    return pc

In [690]:
def apply_techinal_indicators(df):

    df['WMA'] = calculate_wma(df['Adj Close'], 14)

    df['RSI'] = calculate_rsi(df['Adj Close'], 14)

    df['BUB'],df['BLB'] = calculate_bollinger_bands(df['Adj Close'], 14)

    df['PC'] = calculate_percentage_change(df['Adj Close'], 28)
    return df

#level1,level2,level3 = calculate_fibonacci_levels(new_df['Adj Close'], 14)
#new_df['Fib 23.6%'] = level1
#new_df['Fib 38.2%'] = level2
#new_df['Fib 61.8%'] = level3


In [691]:
# Step 8: Generate buy, sell, and neutral signals for WMA
def calcualte_signal_by_techinal_indicators(df):
    df['WMA Signal'] = np.where(df['Adj Close'] < df['WMA'], 1,
                                    np.where(df['Adj Close'] > df['WMA'], -1, 0))

    # Step 9: Generate buy, sell, and neutral signals for RSI
    df['RSI Signal'] = np.where(df['RSI'] < 30, 1,
                                    np.where(df['RSI'] > 70, -1, 0))


    # Step 9: Generate buy, sell, and neutral signals for RSI

    df['PC Signal'] = np.where(df['PC'] < -5, 1,
                                    np.where(df['PC'] > 15, -1, 0))


    df['BB Signal'] = np.where(df['Adj Close'] < df['BLB'], 1,
                                    np.where(df['Adj Close'] > df['BUB'], -1, 0))
    return df

##bb, parabolic sar

#new_df['All Signal'] = np.where((new_df['WMA Signal'] == 'Buy') & (new_df['RSI Signal'] == 'Buy') & (new_df['Fib Signal'] == 'Buy'), 'Buy',
#                                np.where((new_df['WMA Signal'] == 'Sell') & (new_df['RSI Signal'] == 'Sell') & (new_df['Fib Signal'] == 'Sell'), 'Sell', 'Neutral'))

#new_df['All Signal'] = np.where((new_df['WMA Signal'] == 'Buy') & (new_df['RSI Signal'] == 'Buy') & (new_df['Fib Signal'] == 'Buy'), 'Buy',
#                                np.where((new_df['WMA Signal'] == 'Sell') & (new_df['RSI Signal'] == 'Sell') & (new_df['Fib Signal'] == 'Sell'), 'Sell', 'Neutral'))


In [692]:
def calculate_profit(adj_close,signal, initial_cash=10000):
    cash = initial_cash
    stock = 0
    portfolio_value = []
     # Filter dataframe between entry and exit dates
    for i in range(len(adj_close)):
        if signal.iloc[i] == 1 and cash > 0:
            # Buy as many stocks as possible with available cash
            stock = cash / adj_close.iloc[i]
            cash = 0
            #print(singal,df.iloc[i]["Date"], "Buy")
        elif signal.iloc[i] == -1 and stock > 0:
            # Sell all stocks
            cash = stock * adj_close.iloc[i]
            stock = 0
            #print(singal,df.iloc[i]["Date"], "Sell")
        # Calculate the current value of the portfolio
        current_value = cash + stock * adj_close.iloc[i]
        portfolio_value.append(current_value)

    final_value = cash + stock * adj_close.iloc[-1]
    profit = final_value - initial_cash
    profit_percentage = (profit / initial_cash) * 100
  #  print(df)
    return profit, portfolio_value, profit_percentage




In [693]:
def plot(ticker, signal, df):
   
    fig = go.Figure()

    # Plot Adjusted Close Price
    fig.add_trace(go.Scatter(x=df['Date'], y=df['Adj Close'], mode='lines', name='Adj Close', line=dict(color='blue')))

    if signal == "WMA Signal":
        fig.add_trace(go.Scatter(x=df['Date'], y=df['WMA'], mode='lines', name='WMA', line=dict(color='orange')))
    elif signal == "RSI Signal":
        fig.add_trace(go.Scatter(x=df['Date'], y=df['RSI'], mode='lines', name='RSI', line=dict(color='orange')))
    elif signal == "BB Signal":
        fig.add_trace(go.Scatter(x=df['Date'], y=df['BUB'], mode='lines', name='BUB', line=dict(color='orange')))
        fig.add_trace(go.Scatter(x=df['Date'], y=df['BLB'], mode='lines', name='BLB', line=dict(color='orange')))

    df_buy_signals = df[df[signal] == 1]
    df_sell_signals = df[df[signal] == -1]
    
    fig.add_trace(go.Scatter(x=df_buy_signals['Date'], y=df_buy_signals['Adj Close'], mode='markers', name='Buy Signal',
                             marker=dict(symbol='triangle-up', size=10, color='green')))
    fig.add_trace(go.Scatter(x=df_sell_signals['Date'], y=df_sell_signals['Adj Close'], mode='markers', name='Sell Signal',
                             marker=dict(symbol='triangle-down', size=10, color='red')))
    
    fig.update_layout(
        title=f'{ticker} Stock Analysis {signal}',
        xaxis_title='Date',
        yaxis_title='Price',
        legend=dict(x=0, y=1),
        xaxis=dict(rangeslider=dict(visible=True)),
        template='plotly_dark',
        height=600,
        width=1000
    )

    fig.show()

def plot2(title, x,y,x_buy,y_buy,x_sell,y_sell):
   
    fig = go.Figure()

    # Plot Adjusted Close Price
    fig.add_trace(go.Scatter(x=x, y=y, mode='lines', name='Adj Close', line=dict(color='blue')))
    
    fig.add_trace(go.Scatter(x=x_buy, y=y_buy, mode='markers', name='Buy Signal',
                             marker=dict(symbol='triangle-up', size=10, color='green')))
    fig.add_trace(go.Scatter(x=x_sell, y=y_sell, mode='markers', name='Sell Signal',
                             marker=dict(symbol='triangle-down', size=10, color='red')))
    
    fig.update_layout(
        title=f'{title} Stock Analysis',
        xaxis_title='Date',
        yaxis_title='Price',
        legend=dict(x=0, y=1),
        xaxis=dict(rangeslider=dict(visible=True)),
        template='plotly_dark',
        height=600,
        width=1000
    )

    fig.show()

In [694]:

def plot_profit(ticker,signal,df,entry_date,exit_date):
  
    df = df[(df['Date'] >= entry_date) & (df['Date'] <= exit_date)]
    
    plt.figure(figsize=(20, 7))

    # Plot Adjusted Close Price
    plt.plot(df['Date'], df['Adj Close'], label='Adj Close', color='blue')

    if(signal=="WMA Signal"):
        plt.plot(df['Date'], df['WMA'], label='WMA', color='orange')
    if(signal=="RSI Signal"):
        plt.plot(df['Date'], df['RSI'], label='RSI', color='orange')
    
    if(signal=="BB Signal"):
        plt.plot(df['Date'], df['BUB'], label='BUB', color='orange')
        plt.plot(df['Date'], df['BLB'], label='BLB', color='orange')

    df_buy_signals = df[df[signal] == 1]
    df_sell_signals = df[df[signal] == -1]
    
    plt.scatter(df_buy_signals['Date'], df_buy_signals['Adj Close'], label='Buy Signal', marker='^', color='green', alpha=1)
    plt.scatter(df_sell_signals['Date'], df_sell_signals['Adj Close'], label='Sell Signal', marker='v', color='red', alpha=1)
    

    # Plot Fibonacci Levels
    #plt.plot(df['Date'], df['BUB'], color='r', linestyle='--', label='BUB')
    #plt.plot(df['Date'], df['BLB'], color='g', linestyle='--', label='BLB')


    plt.title(f'{ticker} Stock Analysis {signal}')
    plt.xlabel('Date')
    plt.ylabel('Price')
    plt.legend()
    plt.grid(True)
    plt.show()

In [695]:
# Specify entry and exit dates
def calculate_all_profit(ticker,df):
    pc_profit, pc_portfolio_value, pc_profit_percentage  = calculate_profit(df["Adj Close"],df["PC Signal"])
    print(f"PC Signal : Final Profit: ${pc_profit:.2f}, {pc_profit_percentage.round(2)}%")
    
    wam_profit, wam_portfolio_value, wam_profit_percentage = calculate_profit(df["Adj Close"],df["WMA Signal"])
    print(f"WMA Signal : Final Profit: ${wam_profit:.2f}, {wam_profit_percentage.round(2)}%")

    rsi_profit, rsi_portfolio_value, rsi_profit_percentage = calculate_profit(df["Adj Close"],df["RSI Signal"])
    print(f"RSI Signal : Final Profit: ${rsi_profit:.2f}, {rsi_profit_percentage.round(2)}%")

    bb_profit, bb_portfolio_value, bb_profit_percentage = calculate_profit(df["Adj Close"],df["BB Signal"])
    print(f"BB Signal : Final Profit: ${bb_profit:.2f}, {bb_profit_percentage.round(2)}%")

    plot(ticker,"PC Signal",df)
    plot(ticker,"WMA Signal",df)
    plot(ticker,"RSI Signal",df)
    plot(ticker,"BB Signal",df)
    

In [696]:
def MLLogistic(df_train):
        
    # Create features and target
    feature_list = ['Adj Close','WMA']
    features = df_train[['Date','Adj Close','WMA']]
    target = df_train['WMA Signal']  # Using SMA Signal as target for this example

    # # Split the data
    X_train, X_test, y_train, y_test = train_test_split(features, target, test_size=0.3, random_state=42)
    X_train_wo_date =X_train[feature_list]
    X_test_wo_date = X_test[feature_list]
    # Standardize the features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train_wo_date)
    X_test_scaled = scaler.transform(X_test_wo_date)

    # Train the logistic regression model
    model = LogisticRegression(max_iter=1000)
    model.fit(X_train_scaled, y_train)

    # Make predictions
    y_pred = model.predict(X_test_scaled)

    # Evaluate the model
    print(confusion_matrix(y_test, y_pred))
    print(classification_report(y_test, y_pred))
    return model, scaler
   
    
    


In [697]:
def MLLogisticTest(model,scaler,df):

    features = df[['Adj Close','WMA']]
    X_test_scaled = scaler.transform(features)
    y_pred = model.predict(X_test_scaled)
    return y_pred



In [698]:
##### main #####

sd = '1887-12-31' #'2014-01-01'
ed = '1999-12-31'#'2023-12-31'
#appl
ticker_symbol = "AAPL"

new_df = get_data(ticker_symbol,sd,ed)[['Date', 'Adj Close']]
new_df = apply_techinal_indicators(new_df)
new_df.dropna(inplace=True)
new_df = calcualte_signal_by_techinal_indicators(new_df)
#new_df['BB Signal'].value_counts()
model,scaler = MLLogistic(new_df)

exit_date = pd.to_datetime(ed)
entry_date = exit_date - pd.Timedelta(days=365*5)
df_filter = new_df[(new_df['Date'] >= entry_date) & (new_df['Date'] <= exit_date)]
#print(df_filter)





#calculate_all_profit(ticker_symbol,df_filter)
df_filter["Logistic WMA Signal"] =  MLLogisticTest(model,scaler,df_filter)


    

    #bb_profit, bb_portfolio_value, bb_profit_percentage = calculate_profit(X_test["Adj Close"], df_y_pred["Logistic Signal"])
    #print(f"Logistic Signal : Final Profit: ${bb_profit:.2f}, {bb_profit_percentage.round(2)}%")
    
# df_buy_signals = df_filter[df_filter["Logistic Signal"] == 1]
# df_sell_signals = df_filter[df_filter["Logistic Signal"] == -1]
# plot2("Logistic WMA",df_filter['Date'], df_filter['Adj Close'],df_buy_signals['Date'],df_buy_signals['Adj Close'],df_sell_signals['Date'],df_sell_signals['Adj Close'] )


ml_wam_profit, ml_wam_portfolio_value, ml_wam_profit_percentage = calculate_profit(df_filter["Adj Close"],df_filter["Logistic WMA Signal"])
print(f"Logistic WMA Signal : Final Profit: ${ml_wam_profit:.2f}, {ml_wam_profit_percentage.round(2)}%")
plot(ticker_symbol,"Logistic WMA Signal",df_filter)

wam_profit, wam_portfolio_value, wam_profit_percentage = calculate_profit(df_filter["Adj Close"],df_filter["WMA Signal"])
print(f"WMA Signal : Final Profit: ${wam_profit:.2f}, {wam_profit_percentage.round(2)}%")
plot(ticker_symbol,"WMA Signal",df_filter)

[[692  63]
 [  0 682]]
              precision    recall  f1-score   support

          -1       1.00      0.92      0.96       755
           1       0.92      1.00      0.96       682

    accuracy                           0.96      1437
   macro avg       0.96      0.96      0.96      1437
weighted avg       0.96      0.96      0.96      1437

Logistic WMA Signal : Final Profit: $4758.75, 47.59%


WMA Signal : Final Profit: $5115.00, 51.15%
