# Library & Config Import

In [1]:
import sys
import os
from datetime import date, datetime

current_dir = os.getcwd()
# Parent of jupyter_notebook = aws_lambda_architecture
module_dir = os.path.abspath(os.path.join(current_dir, ".."))
if module_dir not in sys.path:
    sys.path.insert(0, module_dir)

import polars as pl
from shared.analytics_core.inputs import load_ohlcv_from_rds, load_ohlcv_by_timeframe
from shared.analytics_core.backtester import Backtester, BacktestResult
from shared.analytics_core.strategies.library import GoldenCrossStrategy, VegasChannelStrategy
from shared.analytics_core.executor import MultiTimeframeExecutor
from shared.analytics_core.indicators.technicals import calculate_all_indicators

import plotly.graph_objects as go

import boto3
import json

def get_secret(secret_name):
    session = boto3.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name='ca-west-1'
    )
    response = client.get_secret_value(SecretId=secret_name)
    return json.loads(response['SecretString'])
rds_secret = get_secret("dev-batch-postgres-credentials")

RDS_HOST = rds_secret['host']
RDS_PORT = rds_secret['port']
RDS_USER = rds_secret['username']
RDS_PASSWORD = rds_secret['password']
RDS_DB = rds_secret['dbname']
RDS_CONNECTION_STRING = f"postgresql://{RDS_USER}:{RDS_PASSWORD}@{RDS_HOST}:{RDS_PORT}/{RDS_DB}"

session = boto3.Session()
creds = session.get_credentials()
if creds:
    f = creds.get_frozen_credentials()
    S3ACCESSKEY = f.access_key
    S3SECRETKEY = f.secret_key
    s3_region = "ca-west-1"

if not RDS_CONNECTION_STRING:
    print("Set RDS_HOST in env or in this cell for backtesting.")

# Data Loading

In [31]:
import polars as pl

s3_path = "s3://dev-condvest-datalake/silver/silver_3d/year=2024/month=01/*parquet"

# Read directly, assuming credentials are in environment variables
df = pl.read_parquet(s3_path)

# Or, pass credentials explicitly
df_explicit = pl.read_parquet(
    s3_path,
    storage_options={
        "aws_access_key_id": S3ACCESSKEY,
        "aws_secret_access_key": S3SECRETKEY,
        "aws_region": s3_region,  # Required if not in environment
    },
)
df_explicit['ts'].dtype

Datetime(time_unit='us', time_zone='Etc/UTC')

In [29]:
pdf = df_explicit.to_pandas()
import plotly.graph_objects as go
symbol = 'AMD'
close = pdf.loc[pdf['symbol'] == symbol]['close']
min = pdf.loc[pdf['symbol'] == symbol]['low']
max = pdf.loc[pdf['symbol'] == symbol]['high']
open = pdf.loc[pdf['symbol'] == symbol]['open']
fig = go.Figure()
fig.add_trace(go.Candlestick(x=pdf.loc[pdf['symbol'] == symbol]['ts'],
                             open=open,
                             high=max,
                             low=min,
                             close=close))  
fig.show()


In [2]:
from shared.analytics_core.inputs import load_ohlcv_from_s3, load_ohlcv_from_rds

SYMBOL = "ORCL"
START_DATE = date(2020, 1, 1)
END_DATE = date(2026, 2, 7)
S3_BUCKET = "dev-condvest-datalake"
# Load OHLCV from RDS
df_ohlcv = load_ohlcv_from_rds(SYMBOL, RDS_CONNECTION_STRING, start_date=START_DATE, end_date=END_DATE, table_name="raw_ohlcv")
print(f"Loaded {df_ohlcv.height} rows for {SYMBOL}")
df_ohlcv.head() 

# # Load OHLCV from S3
# df_ohlcv = load_ohlcv_from_s3(S3_BUCKET, SYMBOL, timeframe="3d", start_date=START_DATE, end_date=END_DATE)
# print(f"Loaded {df_ohlcv.height} rows for {SYMBOL}")
# df_ohlcv.head() 


Loaded 1346 rows for ORCL


symbol,open,high,low,close,volume,timestamp,interval
str,"decimal[*,2]","decimal[*,2]","decimal[*,2]","decimal[*,2]",i64,datetime[μs],str
"""ORCL""",58.13,58.26,55.86,56.6,32560400,2020-09-21 04:00:00,"""1d"""
"""ORCL""",56.19,56.53,55.33,56.41,13306000,2020-09-22 04:00:00,"""1d"""
"""ORCL""",55.94,56.29,54.81,54.87,15513500,2020-09-23 04:00:00,"""1d"""
"""ORCL""",54.66,55.74,54.24,55.18,9560500,2020-09-24 04:00:00,"""1d"""
"""ORCL""",55.16,55.83,54.87,55.65,9371900,2020-09-25 04:00:00,"""1d"""


In [3]:
# Resample 1d to 3d
df_ohlcv = df_ohlcv.group_by_dynamic('timestamp', every='3d')\
    .agg(
        pl.col('open').first().alias('open'),
        pl.col('high').max().alias('high'),
        pl.col('low').min().alias('low'),
        pl.col('close').last().alias('close'),
        pl.col('volume').sum().alias('volume'),
        pl.col('symbol').first().alias('symbol')
        )


# Backtester

## Vegas Channel

### Strategy Sandbox Testing

In [4]:
# Run strategy and backtest; store signals in memory (no MongoDB).
if df_ohlcv is not None and not df_ohlcv.is_empty():
    df_pl = df_ohlcv
    if "date" not in df_pl.columns and "timestamp" in df_pl.columns:
        df_pl = df_pl.rename({"timestamp": "date"})
    df_pl = calculate_all_indicators(df_pl)
    strategy = VegasChannelStrategy()
    df_with_signals = strategy.run(df_pl)
    backtester = Backtester(initial_capital=10000.0)
    result = backtester.run(strategy, df_with_signals)
    signals_in_memory = strategy.get_signals(df_with_signals)
    print(f"Backtest: {result.total_trades} trades, win_rate={result.win_rate:.2%}, return_pct={(result.final_capital/result.initial_capital - 1)*100:.2f}%")
    signals_in_memory
else:
    signals_in_memory = pl.DataFrame()
    print("Skipped: no OHLCV data.")


Backtest: 4 trades, win_rate=75.00%, return_pct=158.56%


### Return

In [5]:
# Plot candlestick price chart with buy/sell arrows, and show total capital growth as a subplot below

import pandas as pd
import plotly.graph_objs as go
from plotly.subplots import make_subplots

# Prepare trades DataFrame
trade_records = [
    {
        "entry_date": t.entry_date,
        "exit_date": t.exit_date,
        "entry_price": t.entry_price,
        "exit_price": t.exit_price,
        "pnl": t.pnl
    }
    for t in result.trades
]
trades_df = pd.DataFrame(trade_records)
trades_df["entry_date"] = pd.to_datetime(trades_df["entry_date"])
trades_df["exit_date"] = pd.to_datetime(trades_df["exit_date"])
print(trades_df)
# Build cumulative cash curve (total account value evolution)
equity_curve = result.equity_curve


  entry_date  exit_date  entry_price  exit_price    pnl
0 2020-12-16 2022-02-12        60.79       75.18  14.39
1 2022-08-14 2022-08-26        76.36       71.69  -4.67
2 2022-11-12 2025-03-28        75.80      140.07  64.27
3 2025-05-12 2025-11-23       162.60      195.95  33.35


In [6]:
# Convert pl.DataFrame to pandas DataFrame
df = df_pl.to_pandas()
symbol_name = SYMBOL
# Plot candlestick chart
fig = go.Figure(data=[go.Candlestick(x=df['date'],
                open=df['open'],
                high=df['high'],
                low=df['low'],
                close=df['close'])])

# Add EMA8, EMA13, EMA144, EMA169
fig.add_trace(go.Scatter(x=df['date'], y=df['ema_8'], mode='lines', name='EMA8'))
fig.add_trace(go.Scatter(x=df['date'], y=df['ema_13'], mode='lines', name='EMA13'))
fig.add_trace(go.Scatter(x=df['date'], y=df['ema_144'], mode='lines', name='EMA144'))
fig.add_trace(go.Scatter(x=df['date'], y=df['ema_169'], mode='lines', name='EMA169'))

# Add buy/sell arrows
fig.add_trace(go.Scatter(x=trades_df['entry_date'], y=trades_df['entry_price'], mode='markers', name='Buy', marker=dict(color='green', size=10, symbol='triangle-up')))
fig.add_trace(go.Scatter(x=trades_df['exit_date'], y=trades_df['exit_price'], mode='markers', name='Sell', marker=dict(color='red', size=10, symbol='triangle-down')))      

# Figure Update
fig.update_layout(
    title=f'{symbol_name} Price with EMA8, EMA13, EMA144, EMA169 and Buy/Sell Arrows',
    xaxis_title='Date',
    yaxis_title='Price',
    legend_title='Legend',
    yaxis_fixedrange=False
)

fig.show()

## Golden Cross

### Strategy Sandbox Testing

In [13]:
# Run strategy and backtest; store signals in memory (no MongoDB).
if df_ohlcv is not None and not df_ohlcv.is_empty():
    df_pl = df_ohlcv
    if "date" not in df_pl.columns and "timestamp" in df_pl.columns:
        df_pl = df_pl.rename({"timestamp": "date"})
    df_pl = calculate_all_indicators(df_pl)
    strategy = GoldenCrossStrategy()
    df_with_signals = strategy.run(df_pl)
    backtester = Backtester(initial_capital=10000.0)
    result = backtester.run(strategy, df_with_signals)
    signals_in_memory = strategy.get_signals(df_with_signals)
    print(f"Backtest: {result.total_trades} trades, win_rate={result.win_rate:.2%}, return_pct={(result.final_capital/result.initial_capital - 1)*100:.2f}%")
    signals_in_memory
else:
    signals_in_memory = pl.DataFrame()
    print("Skipped: no OHLCV data.")


Backtest: 41 trades, win_rate=46.34%, return_pct=139.95%


### Return

In [14]:
# Plot candlestick price chart with buy/sell arrows, and show total capital growth as a subplot below
import pandas as pd
import plotly.graph_objs as go
from plotly.subplots import make_subplots

# Prepare trades DataFrame
trade_records = [
    {
        "entry_date": t.entry_date,
        "exit_date": t.exit_date,
        "entry_price": t.entry_price,
        "exit_price": t.exit_price,
        "pnl": t.pnl
    }
    for t in result.trades
]
trades_df = pd.DataFrame(trade_records)
trades_df["entry_date"] = pd.to_datetime(trades_df["entry_date"])
trades_df["exit_date"] = pd.to_datetime(trades_df["exit_date"])
print(trades_df)
# Build cumulative cash curve (total account value evolution)
equity_curve = result.equity_curve


            entry_date           exit_date  entry_price  exit_price    pnl
0  2020-10-07 04:00:00 2020-10-20 04:00:00        56.61       55.83  -0.78
1  2020-11-11 05:00:00 2021-01-04 05:00:00        53.45       59.57   6.12
2  2021-02-04 05:00:00 2021-02-17 05:00:00        59.42       58.24  -1.18
3  2021-02-22 05:00:00 2021-03-16 04:00:00        60.45       62.70   2.25
4  2021-03-29 04:00:00 2021-04-22 04:00:00        66.72       70.53   3.81
5  2021-05-06 04:00:00 2021-05-11 04:00:00        75.11       73.18  -1.93
6  2021-06-04 04:00:00 2021-06-15 04:00:00        78.08       76.90  -1.18
7  2021-07-02 04:00:00 2021-07-28 04:00:00        77.07       82.43   5.36
8  2021-09-24 04:00:00 2021-10-28 04:00:00        85.04       91.30   6.26
9  2021-12-10 05:00:00 2021-12-21 05:00:00        97.37       86.82 -10.55
10 2022-01-12 05:00:00 2022-01-18 05:00:00        84.10       81.47  -2.63
11 2022-02-02 05:00:00 2022-02-17 05:00:00        78.56       71.93  -6.63
12 2022-03-01 05:00:00 20

In [15]:
# Convert pl.DataFrame to pandas DataFrame
df = df_pl.to_pandas()
symbol_name = SYMBOL
# Plot candlestick chart
fig = go.Figure(data=[go.Candlestick(x=df['date'],
                open=df['open'],
                high=df['high'],
                low=df['low'],
                close=df['close'])])

# Add MA20, MA50, MA200
fig.add_trace(go.Scatter(x=df['date'], y=df['sma_20'], mode='lines', name='MA20'))
fig.add_trace(go.Scatter(x=df['date'], y=df['sma_50'], mode='lines', name='MA50'))
fig.add_trace(go.Scatter(x=df['date'], y=df['sma_200'], mode='lines', name='MA200'))
# Add buy/sell arrows
fig.add_trace(go.Scatter(x=trades_df['entry_date'], y=trades_df['entry_price'], mode='markers', name='Buy', marker=dict(color='green', size=10, symbol='triangle-up')))
fig.add_trace(go.Scatter(x=trades_df['exit_date'], y=trades_df['exit_price'], mode='markers', name='Sell', marker=dict(color='red', size=10, symbol='triangle-down')))      

# Figure Update
fig.update_layout(
    title=f'{symbol_name} Price with MA20, MA50, MA200 and Buy/Sell Arrows',
    xaxis_title='Date',
    yaxis_title='Price',
    legend_title='Legend',
    yaxis_fixedrange=False
)

fig.show()

# StockScanner

# Archive

## Alerts Labelling EDA


### Alerts Labelling

In [None]:
class AddAlert:
    def __init__(self, df_dict, interval, symbol):

        self.df_dict = [entry for entry in df_dict if entry['interval'] == interval]
        self.window = 30  # Number of days to compare the stock price
        self.interval = interval
        self.symbol = symbol
        self.dict = {}
        self.rolling_window = 50

    def velocity_accel_decel_alert(self, base_window=28):
        # Initialize the list to keep track of the last `obs_window` velocity statuses
        previous_velocity_status = None 
        alert_duration = 0
        window_dict = {
            1: base_window,      
            3: base_window - 8,   
            5: base_window - 8,   
            8: base_window - 14,   
            13: base_window - 14   
        }

        # Loop through the stock data to calculate velocity acceleration and deceleration alerts
        for i in range(len(self.df_dict)):
            
            entry = self.df_dict[i]  # Current row (entry) of stock data
            
            # Ensure 'alerts' dictionary exists in the entry
            if 'alerts' not in entry:
                entry['alerts'] = {}
                
            interval = entry['interval']
            obs_window = window_dict.get(interval, base_window//4)
            
            # Skip the first `obs_window` days (cannot determine acceleration before that)
            if i < obs_window:
                continue
    
            # Record velocity statuses for the last `obs_window` days
            previous_window = self.df_dict[i-obs_window:i]  # Slice the last `obs_window` data points
    
            # Initialize current velocity status
            current_velocity_status = None

            # Calculate velocity counts in the observation window
            count_velocity_loss = sum(
                1 for row in previous_window
                if 'velocity_alert' in row['alerts'] and
                row['alerts']['velocity_alert']['alert_type'] in ['velocity_loss', 'velocity_weak',
                                                                'velocity_negotiating']
            )
    
            count_velocity_maintained = sum(
                1 for row in previous_window
                if 'velocity_alert' in row['alerts'] and
                row['alerts']['velocity_alert']['alert_type'] == 'velocity_maintained'
            )
            # Check for deceleration in the last 10 days (skip if any decelerated alert is found)
            short_term_max = max(entry['8ema'], entry['13ema'])
            lng_term_max = max(entry['169ema'], entry['144ema'])
            
            short_term_min = min(entry['8ema'], entry['13ema'])
            lng_term_min = min(entry['169ema'], entry['144ema'])
            
            if np.isnan(entry['144ema']):
                lng_term_max = entry['13ema']
                lng_term_min = entry['13ema']
                short_term_max = entry['8ema']
                short_term_min = entry['8ema']
    
            # Check for velocity acceleration (EMA8 crosses above EMA13) and use the observation window counts
            if lng_term_max <= short_term_max < entry['open'] < entry['close']: 
                # Trigger acceleration alert if the count of 'velocity_weak' exceeds 'velocity_maintained'
                if count_velocity_loss > count_velocity_maintained:
                    alert_duration += 1
                    if'accelerated' not in previous_window:
                        if 'velocity_accelerated' not in entry['alerts']:
                            current_velocity_status = 'accelerated'

        # Check for velocity deceleration (EMA8 crosses below EMA13)
            elif entry['close'] < short_term_min <= lng_term_min:
                # velocity_maintained' exceeds 'velocity_weak'
                if count_velocity_maintained < count_velocity_loss:
                    alert_duration += 1
                    if 'decelerated' not in previous_window:
                        if 'velocity_decelerated' not in entry['alerts']:
                            current_velocity_status = 'decelerated'
                            
            # Add the velocity acceleration or deceleration alert to the entry
            allow_new_alert = (current_velocity_status != previous_velocity_status \
                or alert_duration >= 30)
                
            if current_velocity_status and allow_new_alert:
                
                entry['alerts']['momentum_alert'] = {
                    "date": entry['date'],
                    "alert_type": current_velocity_status,
                    "details": f"Velocity status changed: {current_velocity_status}"
                }
                # Update previous_velocity_status to the current one
                previous_velocity_status = current_velocity_status
                alert_duration = 0
            elif alert_duration >= 30:
                alert_duration = 0
                previous_velocity_status = None

    def velocity_alert_dict(self, window=3):

        # Loop through the data to calculate the velocity alerts
        for i in range(len(self.df_dict)):
            # Skip rows before the window starts to avoid indexing errors
            if i < window:
                continue

            # Initialize variables for the current entry
            entry = self.df_dict[i]

            # Ensure 'alerts' dictionary exists in the entry
            if 'alerts' not in entry:
                entry['alerts'] = {}

            # Condition where closing price is above both EMAs
            above_13ema = entry['close'] > max(entry['13ema'], entry['8ema'])
            above_169ema = entry['close'] > max(entry['169ema'], entry['144ema'])

            # Condition where closing price is below both EMAs
            below_13ema = entry['close'] < entry['13ema']
            below_169ema = entry['close'] < entry['169ema']

            # Define long-term and short-term bounds
            lng_term_max = max(entry['144ema'], entry['169ema'])
            lng_term_min = min(entry['144ema'], entry['169ema'])

            short_term_max = max(entry['13ema'], entry['8ema'])
            short_term_min = min(entry['13ema'], entry['8ema'])

            # Handle the case when 144ema is missing
            if np.isnan(entry['144ema']):
                above_169ema = above_13ema
                below_169ema = below_13ema
                lng_term_max = lng_term_min = entry['13ema']
                short_term_max = short_term_min = entry['8ema']

            # Condition where closing price is between 13EMA and 169EMA
            between_13_169ema = entry['13ema'] >= entry['close'] >= entry['169ema']

            # Condition where ema8 and ema13 is above ema144 and ema169
            in_up_trend = short_term_min > lng_term_max
            in_down_trend = short_term_max < lng_term_min

            # Check for the 'velocity_maintained' alert (above both EMAs)
            if above_13ema and above_169ema and in_up_trend:
                current_alert_type = 'velocity_maintained'

            # Check for the 'velocity_weak' alert (sandwiched between 13EMA and 169EMA)
            elif between_13_169ema and below_13ema:
                current_alert_type = 'velocity_weak'

            # Check for the 'velocity_loss' alert (below either 13EMA or 169EMA)
            elif below_169ema and below_13ema:
                current_alert_type = 'velocity_loss'
            else:
                current_alert_type = 'velocity_negotiating'

            # Add the alert only if it's different from the previous alert
            if current_alert_type:
                entry['alerts']['velocity_alert'] = {
                    "date": entry['date'],
                    "alert_type": current_alert_type,
                    "details": f"Velocity alert triggered: {current_alert_type}"}

    def add_169ema_touch_alert(self):

        touch_sup_count_window = deque(maxlen=self.rolling_window)
        touch_res_count_window = deque(maxlen=self.rolling_window)
        recent_touch_count = 0
        tolerance_dict = {interval: tolerance for interval, tolerance in
                        zip([1, 3, 5, 8, 13], [0.002, 0.02, 0.05, 0.07, 0.1])}

        for i, entry in enumerate(self.df_dict):

            # Define the upper and lower bounds for the tolerance range
            alert_type = 'neutral'
            lower_bound = 0
            upper_bound = 0
            interval = entry['interval']
            # Define Upper and Lower Bound based on available EMAs
            if not np.isnan(entry.get('144ema', np.nan)) and not np.isnan(entry.get('169ema', np.nan)):
                lower_bound = min(entry['144ema'], entry['169ema']) * (1 - tolerance_dict[interval])
                upper_bound = max(entry['144ema'], entry['169ema']) * (1 + tolerance_dict[interval])
            elif not np.isnan(entry.get('144ema', np.nan)):
                lower_bound = entry['144ema'] * (1 - tolerance_dict[interval])
                upper_bound = entry['144ema'] * (1 + tolerance_dict[interval])
            elif not np.isnan(entry.get('13ema', np.nan)) and not np.isnan(entry.get('8ema', np.nan)):
                lower_bound = min(entry['13ema'], entry['8ema']) * (1 - tolerance_dict[interval])
                upper_bound = max(entry['13ema'], entry['8ema']) * (1 + tolerance_dict[interval])
            elif not np.isnan(entry.get('8ema', np.nan)):
                lower_bound = entry['8ema'] * (1 - tolerance_dict[interval])
                upper_bound = entry['8ema'] * (1 + tolerance_dict[interval])

            # Ensure bounds are defined before checking closing price range
            if lower_bound is not None and upper_bound is not None:
                if (lower_bound <= entry['low'] <= upper_bound) or \
                        (lower_bound <= entry.get('13ema', 0) <= upper_bound) or \
                        (lower_bound <= entry.get('8ema', 0) <= upper_bound):

                    # Define Tunnel Max and Min safely
                    lng_term_tunnel_max = max(entry.get('169ema', 0), entry.get('144ema', 0))
                    lng_term_tunnel_min = min(entry.get('169ema', 0), entry.get('144ema', 0))
                    short_term_tunnel_max = max(entry.get('13ema', 0), entry.get('8ema', 0))
                    short_term_tunnel_min = min(entry.get('13ema', 0), entry.get('8ema', 0))
                    candle_min = min(entry['close'], entry['open'])
                    candle_max = max(entry['close'], entry['open'])

                    if np.isnan(entry['169ema']) and np.isnan(entry['144ema']):
                        lng_term_tunnel_max = short_term_tunnel_max
                        short_term_tunnel_max = candle_max
                    if np.isnan(entry['169ema']) and np.isnan(entry['144ema']):
                        lng_term_tunnel_min = short_term_tunnel_min
                        short_term_tunnel_min = candle_min

                    # Determine if the touch is from above (support) or below (resistance) using 13EMA > 169EMA
                    if short_term_tunnel_min > lng_term_tunnel_max \
                            and (candle_min > lng_term_tunnel_min):
                        alert_type = 'support'  # Touched from above

                    elif short_term_tunnel_max < lng_term_tunnel_max \
                            and (entry['close'] < lng_term_tunnel_max):
                        alert_type = 'resistance'  # Touched from below

                    # Check for similar alert in the last 3 entries to avoid duplication
                    if i >= 3 and any(
                            'alerts' in self.df_dict[j] and
                            '169ema_touched' in self.df_dict[j]['alerts'] and
                            self.df_dict[j]['alerts']['169ema_touched']['type'] == alert_type
                            for j in range(i - 3, i)
                    ):
                        continue  # Skip adding alert if similar alert exists

                    # Update touch count window and add to recent touch count
                    if alert_type == 'support':
                        touch_sup_count_window.append(1)
                        recent_touch_count = sum(touch_sup_count_window)
                    elif alert_type == 'resistance':
                        touch_res_count_window.append(1)
                        recent_touch_count = sum(touch_res_count_window)

                    # Add alert to entry
                    if 'alerts' not in entry:
                        entry['alerts'] = {}

                    # Record the touch alert with the updated count
                    entry['alerts']['169ema_touched'] = {
                        "date": entry['date'],
                        "type": alert_type,  # 'support' or 'resistance'
                        "count": recent_touch_count,  # count of touching support/resistance
                        "169ema": entry['169ema'],
                        "details": f"Close price touched EMA169 from {alert_type} within {tolerance_dict[interval] * 100}% tolerance"
                    }
                else:
                    # No touch: add zeros to maintain window length
                    touch_sup_count_window.append(0)
                    touch_res_count_window.append(0)

    def filter_data(self):
        filtered_data = {}
        filtered_data[self.symbol] = [
            {"date": record['date'],
            'interval': record['interval'],  # Keep interval
            'alerts': record['alerts']  # Keep alerts
            }
            for record in self.df_dict  # Iterate over the list directly
        ]

        return filtered_data

    def apply(self):
        self.add_169ema_touch_alert()
        self.velocity_alert_dict()
        self.velocity_accel_decel_alert()
        self.filter_data()

        return self.df_dict

In [None]:
class AddStructuralArea:
    def __init__(self, df_dict, interval=1):
        self.df_dict = [entry for entry in df_dict if entry['interval'] == interval]
        self.window = 30

    def _ensure_structural_area(self, entry):
        if 'structural_area' not in entry:
            entry['structural_area'] = {}

    def _get_window_data(self, i):
        window = self.df_dict[i-self.window:i]
        return {
            'close': [entry['close'] for entry in window],
            'high': max(entry['high'] for entry in window),
            'low': min(entry['low'] for entry in window)
        }

    def kernel_density_estimation(self):
        for i, entry in enumerate(self.df_dict):
            if i < self.window:
                continue
                
            self._ensure_structural_area(entry)
            window_data = self._get_window_data(i)
            close_prices = np.array(window_data['close'])

            # Calculate histogram
            hist_counts, bin_edges = np.histogram(close_prices, bins=20)
            max_bin_idx = np.argmax(hist_counts)
            
            # Find second highest peak
            remaining_hist = hist_counts[max_bin_idx+1:]
            sec_bin_idx = max_bin_idx + 1 + np.argmax(remaining_hist) if len(remaining_hist) > 0 else max_bin_idx

            # Get bin ranges
            main_bin = (bin_edges[max_bin_idx], bin_edges[max_bin_idx + 1])
            second_bin = (bin_edges[sec_bin_idx], bin_edges[sec_bin_idx + 1])

            entry['structural_area']['kernel_density_estimation'] = {
                "date": entry['date'],
                "top": main_bin[0],
                "bottom": main_bin[1],
                "second_top": second_bin[0],
                "second_bottom": second_bin[1],
                "details": f"Most frequent price bin between {main_bin[0]} and {main_bin[1]}.\nSecond most frequent price bin between {second_bin[0]} and {second_bin[1]}."
            }

    def fibonacci_retracement(self):
        for i, entry in enumerate(self.df_dict):
            if i < self.window:
                continue
                
            self._ensure_structural_area(entry)
            window_data = self._get_window_data(i)
            
            # Calculate fibonacci levels
            diff = window_data['high'] - window_data['low']
            fibs = {
                'fib_236': 0.236,
                'fib_382': 0.382,
                'fib_500': 0.500,
                'fib_618': 0.618,
                'fib_786': 0.,
                'fib_1236': 1.236,
                'fib_1382': 1.382
            }
            
            levels = {key: window_data['low'] + (diff * ratio) 
                    for key, ratio in fibs.items()}
            
            entry['structural_area']['fibonacci_retracement'] = {
                "date": entry['date'],
                **levels,
                "details": f"Fibonacci retracement levels: " + 
                        ", ".join(f"{ratio*100}%: {level}" 
                                for ratio, level in zip(fibs.values(), levels.values()))
            }

    def apply(self):
        self.kernel_density_estimation()
        self.fibonacci_retracement()
        return self.df_dict


In [None]:
class DataVisualizer:
    def __init__(self, mongo_config):
        """Initialize the DataAnalyzer class"""
        self.mongo_config = mongo_config

        self.host = self.mongo_config['connection_string']
        self.db_name = self.mongo_config['db']
        self.collection_name = self.mongo_config['processed_collection_name']
        
    def fetch_and_prepare_data(self, symbol):
        """
        Fetch and prepare data for a given stock symbol.
        
        Parameters: 
        symbol (str): The stock symbol to fetch data for.
        
        Returns:
        list: A list of dictionaries containing the prepared data.
        """
        # Fetch all interval data for the selected stock
        lst = list(MongoClient(self.host)[self.db_name][self.collection_name]\
            .find(
                {'symbol': symbol, 'instrument': {'$in': ['equity', 'crypto', 'commodity', 'bond', 'sector']}},  # Add instrument filter
                {
                    '_id': 0, 
                    'date': 1,  # Ensure date is included in projection
                    'symbol': 1,  # Include symbol
                    'close': 1, 
                    'open': 1, 
                    'low': 1, 
                    'high': 1, 
                    'interval': 1, 
                    '13ema': 1,
                    '8ema': 1,
                    '169ema': 1, 
                    '144ema': 1
                }
            ).sort([('interval', ASCENDING), ('date', ASCENDING)]))
        
        if not lst:
            print(f"No data found for symbol: {symbol}")
            return []
            
        return lst

    def plot_candlestick_with_alerts(self, df, alert_df, alert, title=None, save_path=None):
        # Create a candlestick chart
        fig = go.Figure(data=[go.Candlestick(x=df['date'],
                        open=df['open'], high=df['high'],
                        low=df['low'], close=df['close'],
                        name='Candlestick')])

        # Plot the EMAs as lines
        fig.add_scatter(x=df['date'], y=df['13ema'], mode='lines', name='13EMA')
        fig.add_scatter(x=df['date'], y=df['8ema'], mode='lines', name='8EMA')
        fig.add_scatter(x=df['date'], y=df['169ema'], mode='lines', name='169EMA')
        fig.add_scatter(x=df['date'], y=df['144ema'], mode='lines', name='144EMA')
        
        # Add annotations for alerts
        for i, row in alert_df.iterrows():
            fig.add_annotation(
                x=row['date'],
                y=row['close'],
                text=row[alert],
                showarrow=True,
                arrowhead=1
            )

        # Update the layout for better visualization
        fig.update_layout(
            title=title,
            xaxis_title="Date",
            yaxis_title="Price",
            xaxis_rangeslider_visible=False
        )
        
        # Save the plot if save_path is provided
        if save_path:
            fig.write_image(save_path)

        # Show the plot
        fig.show()
        
    def plot_candlestick_with_structural_area(self, df, price_levels, dense_trading_area, selected_date):
        # Create a candlestick chart
        fig = go.Figure(data=[go.Candlestick(x=df['date'],
                        open=df['open'], high=df['high'],
                        low=df['low'], close=df['close'],
                        name='Candlestick')])

        # Add Horizontal Lines for price levels with different colors
        colors = ['rgba(255,0,0,0.3)', 'rgba(0,255,0,0.3)', 'rgba(0,0,255,0.3)', 
                'rgba(255,165,0,0.3)', 'rgba(128,0,128,0.3)']  # Red, Green, Blue, Orange, Purple
        for i, level in enumerate(price_levels):
            if pd.notna(level) and isinstance(level, (int, float)):  # Check if numeric
                fig.add_hline(
                    y=float(level),  # Ensure numeric type
                    line_dash='solid', 
                    line_color=colors[i % len(colors)],
                    annotation_text=f'fibonacci {float(level):.2f}',
                    annotation_position='top right'
                )

        # Add Horizontal Lines for dense trading areas
        print("Dense trading areas:", dense_trading_area)  # Debug print
        for area in dense_trading_area:
            if pd.notna(area) and isinstance(area, (int, float)):  # Check if numeric
                print(f"Adding rectangle for area: {area}")  # Debug print
                # Add a horizontal dashed line
                fig.add_hline(
                    y=float(area),  # Ensure numeric type
                    line_dash="dash",
                    line_color="rgba(255,0,0,0.3)",  # Transparent red
                    annotation_text=f'Dense Trading Area {float(area):.2f}',
                    annotation_position='top right'
                )

        # Add a vertical line at the selected date
        if isinstance(selected_date, (str, pd.Timestamp)):  # Ensure date type
            # Convert selected_date to datetime if it's a string
            selected_date = pd.to_datetime(selected_date)
            
            # Create shape for vertical line
            fig.add_shape(
                type="line",
                x0=selected_date,
                x1=selected_date,
                y0=0,
                y1=1,
                yref="paper",
                line=dict(color="black", width=1, dash="solid"),
            )
            
            # Add annotation for the date
            fig.add_annotation(
                x=selected_date,
                y=1,
                yref="paper",
                text=f'Selected Date: {selected_date.strftime("%Y-%m-%d")}',
                showarrow=False,
                yshift=10
            )
        # Update the layout for better visualization
        fig.update_layout(title='Candlestick with Price Levels', xaxis_title='Date', yaxis_title='Price', xaxis_rangeslider_visible=False)
        fig.show()
    
    def visualize_alerts(self, selected_symbol, alert_type, anchor_date='1990-01-01'):
        """
        Analyze and plot alerts for a given symbol and alert type.
        
        Parameters:
        -----------
        selected_symbol : str
            The stock symbol to analyze
        alert_type : str
            Type of alert to analyze ('momentum_alert' or '169ema_touched')
        anchor_date : str, optional
            Starting date for analysis, default '1990-01-01'
        """
        df_dict = self.fetch_and_prepare_data(selected_symbol)
        # Create output directory with timestamp
        alert_name = 'momentum_alerts' if alert_type == 'momentum_alert' else 'support_resistance_alerts'
        output_dir = os.path.join('..', 'eda_viz', f'{selected_symbol}_{alert_name}')
        os.makedirs(output_dir, exist_ok=True)

        # Define intervals to analyze
        intervals = [1, 3, 5, 8, 13]

        # Create plots for each interval
        for interval in intervals:
            print(f"\nAnalyzing {interval}-day interval data")
            
            # Filter data for current interval
            fil_df_dict = [entry for entry in df_dict if entry['interval'] == interval]
            add_alert = AddAlert(fil_df_dict, interval, selected_symbol)
            # Generate alerts
            alert_dict = add_alert.apply()
            
            # Extract alert types
            for entry in alert_dict:
                if 'alerts' in entry and alert_type in entry['alerts']:
                    entry[alert_type] = entry['alerts'][alert_type]['alert_type' if alert_type == 'momentum_alert' else 'type']
        
            # Convert to DataFrame and filter dates
            alert_dict_df = pd.DataFrame(alert_dict)
            alert_dict_df = alert_dict_df[alert_dict_df['date'] > anchor_date]
            
            # Filter alerts by type
            non_null_alerts = alert_dict_df[alert_dict_df[alert_type].notna()]
            
            # Create filtered DataFrames based on alert type
            if alert_type == 'momentum_alert':
                alert_1 = non_null_alerts[
                    (non_null_alerts[alert_type] == 'accelerated') & 
                    (non_null_alerts['date'] > anchor_date)
                ]
                alert_2 = non_null_alerts[
                    (non_null_alerts[alert_type] == 'decelerated') & 
                    (non_null_alerts['date'] > anchor_date)
                ]
            else:  # 169ema_touched
                alert_1 = non_null_alerts[
                    (non_null_alerts[alert_type] == 'support') & 
                    (non_null_alerts['date'] > anchor_date)
                ]
                alert_2 = non_null_alerts[
                    (non_null_alerts[alert_type] == 'resistance') & 
                    (non_null_alerts['date'] > anchor_date)
                ]
            
            # Plot both alert types in one plot
            alert_desc = 'Momentum' if alert_type == 'momentum_alert' else 'Support/Resistance'
            title = f"{selected_symbol} {interval}-day Interval - {alert_desc} Alerts"
            filename = f"{selected_symbol}_{interval}day_{alert_name}.png"
            save_path = os.path.join(output_dir, filename)
            
            print(f"Plotting {alert_desc.lower()} alerts for {interval}-day interval")
            self.plot_candlestick_with_alerts(
                alert_dict_df,
                pd.concat([alert_1, alert_2]),
                alert_type,
                title=title
            )
    
    def visualize_structural_area(self, selected_symbol, selected_date='2024-12-01'):
        df_dict = self.fetch_and_prepare_data(selected_symbol)
        intervals = [1, 3, 5, 8, 13]
        for interval in intervals:
            print(f"\nAnalyzing {interval}-day interval data")
            
            # Filter data for current interval and symbol
            fil_df_dict = [entry for entry in df_dict 
                        if entry['interval'] == interval 
                        and entry['symbol'] == selected_symbol]
            
            if not fil_df_dict:
                print(f"No data found for interval {interval}")
                continue
                
            add_struct_area = AddStructuralArea(fil_df_dict, interval)
            struct_area_dict = add_struct_area.apply()
            
            # Convert to DataFrame first
            for entry in struct_area_dict:
                if 'structural_area' in entry:
                    entry['top'] = entry['structural_area']['kernel_density_estimation']['top']
                    entry['bottom'] = entry['structural_area']['kernel_density_estimation']['bottom']
                    entry['second_top'] = entry['structural_area']['kernel_density_estimation']['second_top']
                    entry['second_bottom'] = entry['structural_area']['kernel_density_estimation']['second_bottom']
                    entry['fib_236'] = entry['structural_area']['fibonacci_retracement']['fib_236']
                    entry['fib_382'] = entry['structural_area']['fibonacci_retracement']['fib_382']
                    entry['fib_500'] = entry['structural_area']['fibonacci_retracement']['fib_500']
                    entry['fib_618'] = entry['structural_area']['fibonacci_retracement']['fib_618']
                    entry['fib_786'] = entry['structural_area']['fibonacci_retracement']['fib_786']
                    entry['fib_1236'] = entry['structural_area']['fibonacci_retracement']['fib_1236']
                    entry['fib_1382'] = entry['structural_area']['fibonacci_retracement']['fib_1382']
                else:
                    entry['touch_type'] = np.nan
                    entry['count'] = 0.0
            
            # Convert to DataFrame
            struct_area_df = pd.DataFrame(struct_area_dict).drop(columns=['structural_area'])
            # Filter structural area data up to selected date
            struct_area_df_filtered = struct_area_df[struct_area_df['date'] <= pd.to_datetime(selected_date)]

            # Only plot if we have valid data
            if not struct_area_df_filtered.empty and any(struct_area_df_filtered['top'].notna()):
                print(f"Plotting structural areas for {interval}-day interval up to {selected_date}")                
                # Plot the structural area using filtered data for structural areas but full data for candlesticks
                print(struct_area_df_filtered['second_top'].iloc[-1], struct_area_df_filtered['second_bottom'].iloc[-1])
                self.plot_candlestick_with_structural_area(
                    struct_area_df,  # Use full data for candlesticks
                    [struct_area_df_filtered['fib_236'].iloc[-1], struct_area_df_filtered['fib_382'].iloc[-1],
                    struct_area_df_filtered['fib_500'].iloc[-1], struct_area_df_filtered['fib_618'].iloc[-1],
                    struct_area_df_filtered['fib_786'].iloc[-1], struct_area_df_filtered['fib_1236'].iloc[-1],
                    struct_area_df_filtered['fib_1382'].iloc[-1]],
                    [struct_area_df_filtered['top'].iloc[-1], struct_area_df_filtered['bottom'].iloc[-1],
                    struct_area_df_filtered['second_top'].iloc[-1], struct_area_df_filtered['second_bottom'].iloc[-1]],
                    selected_date
                )
            else:
                print(f"No valid structural area data found for interval {interval} up to {selected_date}")

In [None]:
# Define the MongoDB configuration
mongo_config = load_client_config('mongodb', 'production')

# Initialize the DataVisualizer
data_visualizer = DataVisualizer(mongo_config)

### Vegas Channels Alerts Visualization

In [None]:
# Analyze Momentum Alerts
data_visualizer.visualize_alerts('CL=F', 'momentum_alert')

### Vegas Channels Support/Resistance Alerts Visualization

In [None]:
data_visualizer.visualize_alerts('AMD', '169ema_touched')

### Structural Area Visualization

In [None]:
data_visualizer = DataVisualizer(mongo_config)
data_visualizer.visualize_structural_area('CL=F', selected_date='2024-12-23')

## Stock Filtering

### Step 1. Fetch Alert Data

In [None]:
# Initialize MongoClient with the URL
mongo_client = MongoClient(mongo_config['connection_string'])

# Load the processed data from the database
processed_df = pd.DataFrame(list(mongo_client[mongo_config['db']][mongo_config['processed_collection_name']].find({})))

# Load the alert data from the database
alert_dict = list(mongo_client[mongo_config['db']][mongo_config['alert_collection_name']['long_term']]\
    .find({'instrument': {'$in': ['equity']}, 
        'date': {'$gte': pd.to_datetime('2020-01-01')}}))

# Extract the alerts from the alert_dict and process fields
for row in alert_dict:
    if 'alerts' in row and 'momentum_alert' in row['alerts']:
        row['momentum_alert'] = row['alerts']['momentum_alert']['alert_type']

    if 'alerts' in row and 'velocity_alert' in row['alerts']:
        row['velocity_alert'] = row['alerts']['velocity_alert']['alert_type']

    if 'alerts' in row and '169ema_touched' in row['alerts']:
        row['touch_type'] = row['alerts']['169ema_touched']['type']
        row['count'] = row['alerts']['169ema_touched']['count']

    elif 'alerts' in row and '13ema_touched' in row['alerts']:
        row['touch_type'] = row['alerts']['13ema_touched']['type']
        row['count'] = row['alerts']['13ema_touched']['count']

    else:
        row['touch_type'] = np.nan

    if 'structural_area' in row and isinstance(row['structural_area'], dict):
        if 'kernel_density_estimation' in row['structural_area']:
            kde = row['structural_area']['kernel_density_estimation']
            row['top'] = kde.get('top')
            row['bottom'] = kde.get('bottom') 
            row['second_top'] = kde.get('second_top')
            row['second_bottom'] = kde.get('second_bottom')
            
        if 'fibonacci_retracement' in row['structural_area']:
            fib = row['structural_area']['fibonacci_retracement']
            row['fib_236'] = fib.get('fib_236')
            row['fib_382'] = fib.get('fib_382')
            row['fib_500'] = fib.get('fib_500')
            row['fib_618'] = fib.get('fib_618')
            row['fib_786'] = fib.get('fib_786')
            
# Convert the alert_dict to a DataFrame and clean up
alert_df = pd.DataFrame(alert_dict)
alert_df = alert_df.drop(columns=['alerts', '_id'])
alert_df.head()

### Step 2. One Hot Encoded the Alert Data

In [None]:
# Initialize the OneHotEncoder
ohe = OneHotEncoder(sparse_output=False)

columns_to_encode = ['touch_type', 'momentum_alert']
df_to_be_encoded = alert_df.loc[:,columns_to_encode]
alert_df.drop(columns=columns_to_encode, inplace=True)

encoded_array = ohe.fit_transform(df_to_be_encoded)
encoded_df = pd.DataFrame(encoded_array, columns=ohe.get_feature_names_out())

# Concat the encoded dataframe to the alert dataframe
encoded_alert_df = pd.concat([alert_df, encoded_df], axis=1)
encoded_alert_df.head()

### Step 3. Apply Interval Weighting on Encoded Alert Data

* Alert on higher interval indicates a more convincing message
* High Interval Alert lasts longer
* High Interval Alerts dominates the equity movement in Low Interval
    - Avoid False Signals in Low Interval
    - High Interval Signals are Stable in a long run

In [None]:
# Get the distinct interval
distinct_intervals = encoded_alert_df['interval'].unique()
# Apply Linear Weighting on Alert Data
interval_weights = {interval: weight for weight, interval in enumerate(distinct_intervals, start=1)}
encoded_alert_df['interval_weight'] = encoded_alert_df['interval'].map(interval_weights)

evaluate_summary = encoded_alert_df.copy()
# Calculate weighted values for each alert type
alert_types = ['touch_type_resistance','touch_type_support', 'momentum_alert_accelerated','momentum_alert_decelerated']
for alert in alert_types:
    evaluate_summary[f'weighted_{alert}'] = evaluate_summary[alert] * evaluate_summary['interval_weight']
    
evaluate_summary = evaluate_summary.drop(columns=alert_types+['touch_type_nan','momentum_alert_nan','touch_type_neutral','interval_weight'])

In [None]:
evaluate_summary

### Step 4. Data Analysis on Encoded Alert Data

* Analyze all stocks daily
* Group the data by the critical alerts
    - Acceleration/ Deceleration
    - Support/ Resistance Touching
* Sort the grouped alert data by Momentum alert, touching alerts and touching counts 
    - Sort the Interval in Ascending order because we prefer potential to stability

In [None]:
eval_sum = evaluate_summary[(evaluate_summary['date'] >= '2020-01-01')]
# Data Analysis
results = eval_sum \
                .loc[:, ['symbol',
                        'interval',
                        'weighted_momentum_alert_accelerated',
                        'weighted_momentum_alert_decelerated',
                        'weighted_touch_type_resistance',
                        'weighted_touch_type_support',
                        'count'
                        ]] \
        .groupby(['symbol', 'interval']) \
        .sum() \
        .sort_values(['interval', 'weighted_momentum_alert_accelerated',
                        'weighted_momentum_alert_decelerated',
                        'weighted_touch_type_support',
                        'count'],

                        ascending=[True, False, True, False, False]).reset_index()

# Store the accelerating stock
short_acc_equ = results[(results['weighted_momentum_alert_accelerated'] > 1) \
                        & (results['weighted_momentum_alert_decelerated'] < 1) \
                        & (results['interval'] <= 3)].loc[:, 'symbol']

lng_acc_equ = results[(results['weighted_momentum_alert_accelerated'] > 1) \
                        & (results['weighted_momentum_alert_decelerated'] < 1) \
                        & (results['interval'] == 5)].loc[:, 'symbol']

ext_lng_acc_equ = results[(results['weighted_momentum_alert_accelerated'] > 0) \
                        & (results['weighted_momentum_alert_decelerated'] < 1) \
                        & (results['interval'] == 13)].loc[:, 'symbol']

# Store the main force accumulating stock
short_main_acc_equ = results[(results['weighted_touch_type_support'] > 1) \
                                & (results['weighted_touch_type_resistance'] < 1) \
                                & (results['count'] > 2) \
                                & (results['interval'] <= 3)].loc[:, 'symbol']

lng_main_acc_equ = results[(results['weighted_touch_type_support'] > 1) \
                        & (results['weighted_touch_type_resistance'] < 1) \
                        & (results['count'] > 2) \
                        & (results['interval'] == 5)].loc[:, 'symbol']

ext_lng_main_acc_equ = results[(results['weighted_touch_type_support'] > 1) \
                                & (results['weighted_touch_type_resistance'] < 1) \
                                & (results['count'] > 2) \
                                & (results['interval'] == 13)].loc[:, 'symbol']

# Create dictionary of results
stock_dict = {
        'date': str(eval_sum['date'].iloc[0]),
        'accelerating': short_acc_equ.tolist(),
        'main_accumulating': short_main_acc_equ.tolist(),
        'long_accelerating': lng_acc_equ.tolist(),
        'long_main_accumulating': lng_main_acc_equ.tolist(),
        'ext_long_accelerating': ext_lng_acc_equ.tolist(),
        'ext_accumulating': ext_lng_main_acc_equ.tolist(),
}


### Finalized Stock Picking Class

In [None]:
class Pick_Stock:
    """A class to pick stocks based on various criteria and alerts.
    
    Attributes:
        data (pd.DataFrame): DataFrame containing stock data and alerts. Initialized as None and populated in run().
        distinct_intervals (np.array): Array of unique intervals from data. Populated after data is loaded.
        stock_candidates (dict): Dictionary to store candidate stocks.
        interval_weights (dict): Weights assigned to different intervals.
    """

    def __init__(self, mongo_config, instrument, start_date=None, sandbox_mode=False):
        # Store MongoDB configuration
        self.mongo_config = mongo_config

        # Set Sandbox mode and start date accordingly
        self.sandbox_mode = sandbox_mode
        self.start_date = start_date if self.sandbox_mode else '2020-01-01'
        self.instrument = instrument

        # Connect to MongoDB
        self.client = MongoClient(self.mongo_config['connection_string'])
        self.db = self.client[self.mongo_config['db']]
        self.collection = self.db[self.mongo_config['alert_collection_name']['long_term']]
        self.candidate_collection_name = self.mongo_config['candidates_collection_name']['long_term']
        self.candidate_collection = self.db[self.candidate_collection_name]
        # Initialize a one hot encoder
        self.ohe = OneHotEncoder(sparse_output=False)
    
    def get_stock_dataframe(self):
        # Fetch the data from MongoDB and convert to DataFrame
        alert_dict = list(self.collection.find(
            {
                'date': {'$gte': pd.to_datetime(self.start_date)},
                'instrument': self.instrument
                },
            {'_id': 0},
            sort=[('date', 1)]
        ))
        # Extract the alerts from the alert_dict and process fields
        for row in alert_dict:
            if 'alerts' in row and 'momentum_alert' in row['alerts']:
                row['momentum_alert'] = row['alerts']['momentum_alert']['alert_type']

            if 'alerts' in row and 'velocity_alert' in row['alerts']:
                row['velocity_alert'] = row['alerts']['velocity_alert']['alert_type']

            if 'alerts' in row and '169ema_touched' in row['alerts']:
                row['touch_type'] = row['alerts']['169ema_touched']['type']
                row['count'] = row['alerts']['169ema_touched']['count']

            elif 'alerts' in row and '13ema_touched' in row['alerts']:
                row['touch_type'] = row['alerts']['13ema_touched']['type']
                row['count'] = row['alerts']['13ema_touched']['count']

            elif 'alerts' in row and 'velocity_alert' in row['alerts']:
                row['velocity_alert'] = row['alerts']['velocity_alert']['alert_type']

            else:
                row['touch_type'] = np.nan
                
            # Extract the structural area data
            if 'structural_area' in row and isinstance(row['structural_area'], dict):
                if 'kernel_density_estimation' in row['structural_area']:
                    kde = row['structural_area']['kernel_density_estimation']
                    row['top'] = kde.get('top')
                    row['bottom'] = kde.get('bottom') 
                    row['second_top'] = kde.get('second_top')
                    row['second_bottom'] = kde.get('second_bottom')
            
                if 'fibonacci_retracement' in row['structural_area']:
                    fib = row['structural_area']['fibonacci_retracement']
                    row['fib_236'] = fib.get('fib_236')
                    row['fib_382'] = fib.get('fib_382')
                    row['fib_500'] = fib.get('fib_500')
                    row['fib_618'] = fib.get('fib_618')
                    row['fib_786'] = fib.get('fib_786')
            
        # Convert the alert_dict to a DataFrame and process it
        data = pd.DataFrame(alert_dict).drop(columns=['alerts'], errors='ignore')

        # Prepare the data for encoding
        alert_columns = ['touch_type', 'momentum_alert', 'velocity_alert']
        encoded_arr = self.ohe.fit_transform(data[alert_columns])
        encoded_df = pd.DataFrame(encoded_arr, columns=self.ohe.get_feature_names_out())

        # Concat the encoded DataFrame with the original DataFrame
        data = pd.concat([data.drop(columns=alert_columns), encoded_df], axis=1)

        return data

    def create_time_series_collection(self, collection_name, keep_duration=None):
        if collection_name not in self.db.list_collection_names():
            self.db.create_collection(
                collection_name,
                timeseries={
                    "timeField": "date" if 'datastream' not in collection_name else "datetime",
                    "metaField": "symbol",
                    "granularity": "hours" if 'datastream' not in collection_name else "minutes"
                },
                expireAfterSeconds=keep_duration
            )
            print(f"Time Series Collection {collection_name} created successfully")

    def insert_candidates(self, candidates):
        """
        Insert candidate stock data into a MongoDB time series collection, setting up auto-expiry for 7 days.
        Only insert data for dates newer than the latest existing date.
        """
        # Create the time series collection if it doesn't exist
        keep_duration = 157788000 # 5 years
        self.create_time_series_collection(self.candidate_collection_name, keep_duration=keep_duration)

        # Get latest date from collection
        latest_record = self.candidate_collection.find_one({'instrument': self.instrument}, sort=[("date", -1)])
        latest_date = latest_record['date'] if latest_record else None

        # Prepare data for insertion, filtering for dates newer than latest
        documents = []
        for date_str, values in candidates.items():
            date_obj = pd.to_datetime(date_str)
            
            # Skip if date is not newer than latest
            if latest_date and date_obj <= latest_date:
                continue
                
            documents.append({
                'date': date_obj,
                'instrument': self.instrument,
                'accelerating': values['accelerating'], 
                'accumulating': values['accumulating'],
                'long_accelerating': values['long_accelerating'],
                'long_accumulating': values['long_accumulating'],
                'velocity_maintained': values['velocity_maintained']
            })

        # Insert documents into MongoDB
        if documents:
            try:
                self.candidate_collection.insert_many(documents, ordered=False)
                print("New candidate stocks inserted successfully!")
            except Exception as e:
                logging.error(f"Error inserting documents: {e}")
        else:
            print("No new data to insert. All candidate data is up to date.")
        
    def evaluate_micro_interval_stocks(self, data):
        # Group by symbol and apply interval-based weighting for velocity alerts
        eval_sum = data.copy()

        # Apply Linear Weighting on Alert Data
        eval_sum['interval_weight'] = eval_sum['interval'].map(self.interval_weights)
        
        # Calculate weighted values for each alert type
        alerts_cols = ['touch_type_resistance', 'touch_type_support', 'momentum_alert_accelerated', 'momentum_alert_decelerated']
        for alert in alerts_cols:
            eval_sum[f'weighted_{alert}'] = eval_sum[alert] * eval_sum['interval_weight'] 
        
        # Drop the columns that are not needed
        eval_sum = eval_sum.drop(
            columns=alerts_cols + ['touch_type_nan', 'momentum_alert_nan', 'touch_type_neutral', 'interval_weight'])
        
        # Data Analysis
        results = eval_sum \
                    .loc[:, ['symbol',
                            'interval',
                            'weighted_momentum_alert_accelerated',
                            'weighted_momentum_alert_decelerated',
                            'weighted_touch_type_resistance',
                            'weighted_touch_type_support',
                            'count'
                            ]] \
            .groupby(['symbol', 'interval']) \
            .sum() \
            .sort_values(['interval', 
                        'weighted_momentum_alert_accelerated',
                        'weighted_momentum_alert_decelerated',
                        'weighted_touch_type_support',
                        'count'],
                        
                        ascending=[True, False, True, False, False]).reset_index()

        # Store the accelerating stock
        short_acc_equ = results[(results['weighted_momentum_alert_accelerated'] > 1) \
                                & (results['weighted_momentum_alert_decelerated'] < 1) \
                                & (results['interval'] <= 3)].loc[:, 'symbol']
        
        lng_acc_equ = results[(results['weighted_momentum_alert_accelerated'] > 1) \
                            & (results['weighted_momentum_alert_decelerated'] < 1) \
                            & (results['interval'] == 5)].loc[:, 'symbol']
        
        # Store the main force accumulating stock
        short_main_acc_equ = results[(results['weighted_touch_type_support'] > 1) \
                                    & (results['weighted_touch_type_resistance'] < 1) \
                                    & (results['count'] >= 2) \
                                    & (results['interval'] <= 3)].loc[:, 'symbol']
        
        lng_main_acc_equ = results[(results['weighted_touch_type_support'] > 1) \
                                & (results['weighted_touch_type_resistance'] < 1) \
                                & (results['count'] >= 1) \
                                & (results['interval'] == 5)].loc[:, 'symbol']

        # Create dictionary of results
        stock_dict = {
            'accelerating': short_acc_equ.tolist(),
            'accumulating': short_main_acc_equ.tolist(),
            'long_accelerating': lng_acc_equ.tolist(),
            'long_accumulating': lng_main_acc_equ.tolist()
        }
        
        return stock_dict
    
    def evaluate_macro_interval_stocks(self, data):
        # Group by symbol and apply interval-based weighting for velocity alerts
        eval_sum = data.copy()
        # The interval_weights dict has values as keys and indices as values, need to swap
        self.interval_weights = {v: k for k, v in self.interval_weights.items()}
        eval_sum['interval_weight'] = eval_sum['interval'].map(self.interval_weights)
        
        # Calculate weighted values for velocity alerts
        eval_sum['weighted_velocity_maintained'] = eval_sum['velocity_alert_velocity_maintained'] * eval_sum['interval_weight']
        eval_sum['weighted_velocity_weak'] = eval_sum['velocity_alert_velocity_weak'] * eval_sum['interval_weight']
        eval_sum['weighted_velocity_loss'] = eval_sum['velocity_alert_velocity_loss'] * eval_sum['interval_weight']
        
        # Drop unnecessary columns
        eval_sum = eval_sum.drop(columns=['velocity_alert_velocity_maintained', 'velocity_alert_velocity_weak', 'velocity_alert_velocity_loss', 'interval_weight'])
        
        # Group and aggregate data
        results = eval_sum \
                    .loc[:, ['symbol',
                            'interval',
                            'weighted_velocity_maintained',
                            'weighted_velocity_weak', 
                            'weighted_velocity_loss']] \
            .groupby(['symbol','interval']) \
            .sum() \
            .sort_values(['weighted_velocity_maintained',
                        'weighted_velocity_weak',
                        'weighted_velocity_loss'],
                        ascending=[False, True, True]) \
            .reset_index()
        
        # Filter for stocks with maintained velocity in higher intervals (>=8)
        maintained_stocks = results[(results['weighted_velocity_maintained'] > 0) &
                                (results['weighted_velocity_weak'] == 0) &
                                (results['weighted_velocity_loss'] == 0) &
                                (results['interval'] >= 8)].loc[:, 'symbol']
        # Create dictionary of results
        stock_dict = {
            'velocity_maintained': maintained_stocks.tolist()
        }
        
        return stock_dict
    
    def run(self):
        # Fetch and process data
        self.data = self.get_stock_dataframe()
        self.distinct_intervals = self.data['interval'].unique()
        self.interval_weights = dict(enumerate(self.distinct_intervals, 1))
        # Group data by date for more efficient processing
        grouped_data = self.data.groupby('date')
        
        # Process candidates for each date in parallel
        candidates_dict_micro = {}
        candidates_dict_macro = {}
        print('generating stock candidates for each day...')
        
        for date, today_data in grouped_data:
            # Analyze candidates for this date
            micro_interval_data = today_data.loc[today_data['interval'] <= 5]
            candidates_dict_micro[str(date)] = self.evaluate_micro_interval_stocks(micro_interval_data)
        
            macro_interval_data = today_data.loc[today_data['interval'] >= 8]
            candidates_dict_macro[str(date)] = self.evaluate_macro_interval_stocks(macro_interval_data)
            
        micro_interval_df = pd.DataFrame.from_dict(candidates_dict_micro, orient='index')
        macro_interval_df = pd.DataFrame.from_dict(candidates_dict_macro, orient='index')
            
        stock_candidates_df = pd.concat([micro_interval_df, macro_interval_df], axis=1)
        # # Store results in MongoDB
        # self.insert_candidates(stock_candidates_df.to_dict(orient='index'))
        return stock_candidates_df



In [None]:
pick_stock_instance = Pick_Stock(mongo_config, instrument='crypto', sandbox_mode=True, start_date='2020-01-01')
stock_candidates_df = pick_stock_instance.run()

# Summary of the stock_candidates_df
accelerating_count = stock_candidates_df['accelerating'].apply(lambda x: len(set(x))).sum()
accumulating_count = stock_candidates_df['accumulating'].apply(lambda x: len(set(x))).sum()
long_accelerating_count = stock_candidates_df['long_accelerating'].apply(lambda x: len(set(x))).sum()
long_accumulating_count = stock_candidates_df['long_accumulating'].apply(lambda x: len(set(x))).sum()
velocity_maintained_count = stock_candidates_df['velocity_maintained'].apply(lambda x: len(set(x))).sum()

# Return the date range of the stock_candidates_df
date_range = stock_candidates_df.index.unique()
print(f'The date range of the stock_candidates_df is from {date_range.min()} to {date_range.max()}')

print(f'The number of accelerating stocks is {accelerating_count}')
print(f'The number of accumulating stocks is {accumulating_count}')
print(f'The number of long accelerating stocks is {long_accelerating_count}')
print(f'The number of long accumulating stocks is {long_accumulating_count}')
print(f'The number of velocity maintained stocks is {velocity_maintained_count}')

stock_candidates_df

## Strategy

### Trading Strategy Design


In [None]:
class LongTermTradingStrategy:
    def __init__(self, mongo_config, 
                instrument,
                start_date=None,
                end_date=None,
                sandbox_mode=False,
                initial_capital=10000,
                buy_signals = None,
                sell_signals = None,
                verbose=False):
        # Initialize trade tracking state
        self._init_trade_state(instrument, buy_signals, sell_signals)
        
        # Initialize configuration parameters
        self._init_config(mongo_config, instrument, start_date, end_date, sandbox_mode, initial_capital, verbose)
        
        # Initialize MongoDB connections and load data
        self._init_mongo_connections()
        self._load_historical_data()
        self._load_alert_data()
        
    def _init_trade_state(self, instrument, buy_signals, sell_signals):
        """Initialize trade tracking variables"""
        self.protected = False
        self.peak_profit_pct = None  
        self.peak_profit = None 
        self.trades = []
        self.complete_trades = {}
        self.current_trade = {}
        self.dynamic_protection = False
        self.buy_fee = 0.0025 if instrument == 'crypto' else 0.002
        self.sell_fee = 0.0075 if instrument == 'crypto' else 0.002
        self.dynamic_asset_control = False
        self.buy_signals = buy_signals
        self.sell_signals = sell_signals
        
    def _init_config(self, mongo_config, instrument, start_date, end_date, sandbox_mode, initial_capital, verbose):
        """Initialize configuration parameters"""
        self.start_date = start_date if sandbox_mode else '2020-01-01'
        self.end_date = end_date if sandbox_mode else '2024-12-31'
        self.instrument = instrument
        self.capital = initial_capital
        self.initial_capital = initial_capital
        self.daily_capital = initial_capital
        self.mongo_config = mongo_config
        self.verbose = verbose
        
    def _init_mongo_connections(self):
        """Initialize MongoDB client and database connections"""
        self.client = MongoClient(self.mongo_config['connection_string'])
        self.db = self.client[self.mongo_config['db']]
        self.data_collection = self.db[self.mongo_config['processed_collection_name']]
        self.alert_collection = self.db[self.mongo_config['alert_collection_name']['long_term']]
        
        # Load stock candidates data
        self.stock_candidates = stock_candidates_df

    def _load_historical_data(self):
        """Load historical price data from MongoDB"""
        self.df = pd.DataFrame(list(self.data_collection.find(
            {
                'date': {'$gte': pd.to_datetime(self.start_date), '$lte': pd.to_datetime(self.end_date)},
                'instrument': self.instrument,
                'interval': 1
            },
            {'_id': 0}
        ))).sort_values(by=['date', 'symbol'])

    def _load_alert_data(self):
        """Load and process alert data from MongoDB"""
        self.alert_df = self.get_alert_dataframe()

    def get_alert_dataframe(self):
        """Fetch and process alert data into DataFrame format"""
        data_dict = list(self.alert_collection.find(
            {
                'date': {'$gte': pd.to_datetime(self.start_date), '$lte': pd.to_datetime(self.end_date)},
                'instrument': self.instrument
            }, 
            {'_id': 0}
        ))
        
        for row in data_dict:
            self._process_alert_row(row)
            
        return pd.DataFrame(data_dict).drop(columns=['alerts'])

    def _process_alert_row(self, row):
        """Process individual alert data row"""
        # Initialize default values
        row.update({
            'momentum_alert': np.nan,
            'velocity_alert': np.nan, 
            'touch_type': np.nan,
            'count': 0.0
        })

        if not isinstance(row.get('alerts'), dict):
            return

        alerts = row['alerts']

        # Process momentum and velocity alerts
        self._process_momentum_velocity_alerts(row, alerts)

        # Process EMA touch alerts
        self._process_ema_touch_alerts(row, alerts)

    def _process_momentum_velocity_alerts(self, row, alerts):
        """Process momentum and velocity alerts from alert data"""
        # Process momentum alerts
        if isinstance(alerts.get('momentum_alert'), dict):
            row['momentum_alert'] = alerts['momentum_alert'].get('alert_type')
        
        # Process velocity alerts    
        if isinstance(alerts.get('velocity_alert'), dict):
            row['velocity_alert'] = alerts['velocity_alert'].get('alert_type')
            
    def _process_ema_touch_alerts(self, row, alerts):
        """Process EMA touch alerts from alert data"""
        if isinstance(alerts.get('169ema_touched'), dict):
            touch_data = alerts['169ema_touched']
            row['touch_type'] = touch_data.get('type')
            row['count'] = touch_data.get('count', 0.0)
            
        elif isinstance(alerts.get('13ema_touched'), dict):
            touch_data = alerts['13ema_touched']
            row['touch_type'] = touch_data.get('type')
            row['count'] = touch_data.get('count', 0.0)
            
    def excute_trades(self):
        """Execute trades for each date in stock candidates"""
        for idx, date in enumerate(self.stock_candidates.index.unique()):
            self.complete_trades[date] = self.daily_capital
            self.manage_trade(pd.to_datetime(date), idx)
            
    def manage_trade(self, date, idx):
        """Manage existing trades and look for new opportunities"""
        if self.current_trade:
            self._manage_existing_trade(date, idx)
        else:
            self._look_for_new_trade(date, idx)
        
    def _manage_existing_trade(self, date, idx):
        """Handle management of current open position"""
        stock = self.current_trade["symbol"]
        
        # Check if the same day as purchase
        if self.current_trade["entry_date"] == date:
            self._log(f"Skipping sell check - same day as purchase for {stock}")
            return
        
        # Get the current alert data for the stock (Should be the next day after the purchase)
        tracked_data = self._get_tracked_data(stock, date)
        if tracked_data['alert'].empty:
            self._log(f"No alert data found for {stock} on {date}")
            return
        
        # Apply trading rules if there is an alert
        self._apply_trading_rules(tracked_data, idx)
        
    def _get_tracked_data(self, stock, date):
        """Get relevant price and alert data for a stock"""
        # Get the next day data
        next_day_data = self.df[(self.df['symbol'] == stock) & (self.df['date'] > date)].sort_values('date')
        # Get the current day data
        current_day_data = self.df[(self.df['symbol'] == stock) & (self.df['date'] == date)]
        # Get the previous day data
        previous_day_data = self.df[(self.df['symbol'] == stock) & (self.df['date'] < date)].sort_values('date')
        
        # Condition 1: Today is out of available trading days
        if len(current_day_data) == 0:
            return {
                'price': previous_day_data.iloc[-1],
                'price_next_day': previous_day_data.iloc[-1],
                'alert': self.alert_df[(self.alert_df['symbol'] == stock) & 
                                    (self.alert_df['date'] == date) & 
                                    (self.alert_df['interval'] == 1)]
            }
        # Condition 2: No next day data
        if len(next_day_data) == 0:
            return {
                'price': current_day_data.iloc[0],
                'price_next_day': current_day_data.iloc[0],
                'alert': self.alert_df[(self.alert_df['symbol'] == stock) & 
                                    (self.alert_df['date'] == date) & 
                                    (self.alert_df['interval'] == 1)]}
        # Condition 3: Next day data exists
        else:
            return {
                'price': current_day_data.iloc[0],
                'price_next_day': next_day_data.iloc[0],
                'alert': self.alert_df[(self.alert_df['symbol'] == stock) & 
                                (self.alert_df['date'] == date) & 
                                (self.alert_df['interval'] == 1)]}
    
    def _apply_trading_rules(self, tracked_data, idx):
        """Apply trading rules to determine if position should be closed"""
        # Scenario 1: Check if we need to activate profit protection
        self._log("Applying trading rules")
        if not self.dynamic_protection:
            self._check_profit_protection(tracked_data)
            
        # Scenario 2: Dynamically manage the profit
        elif self.dynamic_protection:
            self._log(f"Processing date: {tracked_data['price']['date']}")
            self._manage_dynamic_protection(tracked_data)
            
        # Scenario 3: Check for exit signals if neither profit protection nor dynamic protection is activated
        if not self.protected and not self.dynamic_protection:
            self._check_exit_signals(tracked_data, idx)
        
        # Track the daily return
        if len(self.current_trade) != 0:
            self.track_daily_capital(tracked_data)
    
    def track_daily_capital(self, tracked_data):
        """Track and record total account capital daily"""
        # Calculate current total capital
        current_price = tracked_data['price']['close']
        entry_price = self.current_trade['entry_price']
        position_size = self.current_trade['position_size']
        # Calculate profit/loss and daily capital
        profit_loss = (current_price - entry_price) * position_size
        self.daily_capital = self.current_trade['initial_capital'] + profit_loss
        
        # Record the total capital for this day
        current_date = tracked_data['price']['date'].strftime('%Y-%m-%d 00:00:00')
        self.complete_trades[current_date] = self.daily_capital
        
        # # Debug logging
        # self._log(f"Date: {current_date}")
        # self._log(f"Current price: {current_price}")
        # self._log(f"Entry price: {entry_price}")
        # self._log(f"Profit/Loss: {profit_loss}")
        # self._log(f"Initial capital: {self.current_trade['initial_capital']}")
        # self._log(f"Daily capital: {self.daily_capital}")
        
    def _check_exit_signals(self, tracked_data, idx):
        """Check if exit signals are triggered"""
        # Check for sell signals in alert data
        alert_data = tracked_data['alert']
        
        for signal in self.sell_signals:
            if signal in alert_data.values:
                self._log(f"Sell signal detected, closing position for {self.current_trade['symbol']} at {tracked_data['price']['date']}")
                self.track_profit_loss(tracked_data['price_next_day'], signal, sell=True)
                return
            
    def _check_profit_protection(self, tracked_data):
        """Check if profit protection should be activated"""
        tracked_profit_loss = self.track_profit_loss(tracked_data['price'], sell=False)
        # Potential divide by zero if self.current_trade['cost'] is 0
        if self.current_trade['cost'] == 0:
            tracked_profit_pct = 0
        else:
            tracked_profit_pct = tracked_profit_loss / self.current_trade['cost']
            
        self._log(f"Symbol: {self.current_trade['symbol']} | Date: {tracked_data['price']['date']} | Tracked profit loss: {tracked_profit_loss} | Tracked profit pct: {tracked_profit_pct}")
        if tracked_profit_pct >= 0.3:
            self._activate_profit_protection(tracked_profit_loss, tracked_profit_pct)
            self._log("Profit protection activated")
        else:
            self._log("Profit protection not required")
            
    def _activate_profit_protection(self, profit, profit_pct):
        """Activate profit protection mechanism"""
        self.dynamic_protection = True
        self.peak_profit = profit
        self.peak_profit_pct = profit_pct

    def _manage_dynamic_protection(self, tracked_data):
        """Manage dynamic profit protection"""
        self._log("Dynamic protection activated")
        # Track profit loss
        current_profit = self.track_profit_loss(tracked_data['price'], sell=False)
        # Potential divide by zero if self.capital + self.current_trade['cost'] is 0
        denominator = self.capital + self.current_trade['cost']
        if denominator == 0:
            current_profit_pct = 0
        else:
            current_profit_pct = (current_profit - self.capital + self.current_trade['cost']) / denominator
            
        self._log(f"Current profit: {current_profit} | Current profit pct: {current_profit_pct} in date: {tracked_data['price']['date']}")
        
        # Update peak profits
        self._update_peak_profits(current_profit, current_profit_pct)
        self._log(f"Peak profit: {self.peak_profit} | Peak profit pct: {self.peak_profit_pct} in date: {tracked_data['price']['date']}")
        
        # Check if profits have declined
        if self._check_profit_decline(tracked_data, current_profit_pct):
            self._log(f"Profit decline detected, closing position in date: {tracked_data['price']['date']}")
            return
        
        # Check if velocity signals are triggered
        if self._check_velocity_signals(tracked_data):
            self._log(f"Velocity signal detected, closing position in date: {tracked_data['price']['date']}")
            return
        
    def _update_peak_profits(self, current_profit, current_profit_pct):
        """Update peak profit values if current profits are higher"""
        if current_profit > self.peak_profit:
            self.peak_profit = current_profit
            self.peak_profit_pct = current_profit_pct
            
    def _check_profit_decline(self, tracked_data, current_profit_pct):
        """Check if profits have declined significantly from peak"""
        if self.peak_profit_pct - current_profit_pct >= 0.5:
            self.track_profit_loss(tracked_data['price_next_day'], exit_reason='profit_protection', sell=True)
            
            # Reset the peak profit and profit protection
            self.protected = False
            self.peak_profit = 0
            self.peak_profit_pct = 0
            return True
        return False
    
    def _check_velocity_signals(self, tracked_data):
        """Check velocity signals for additional protection"""
        alert_data = tracked_data['alert']
        for signal in self.sell_signals:
            if signal in alert_data.values:
                self.track_profit_loss(tracked_data['price_next_day'], exit_reason=signal, sell=True)
                # Reset the peak profit and profit protection
                self.protected = False
                self.peak_profit = 0
                self.peak_profit_pct = 0
                return True
        return False
    
    def _look_for_new_trade(self, date, idx):
        """Look for new trading opportunities"""
        cur_stock_pick = self.stock_candidates.iloc[idx]
        stock, alert = self.find_alert(cur_stock_pick, desired_alerts=self.buy_signals)
        if stock != None:
            self._log(f"Looking for new trade for {stock} on {date}")
        else:
            return
        
        self._open_new_position(stock, date, alert)
        
    def _open_new_position(self, stock, date, alert):
        """Open a new trading position"""
        
        # Step 1: Open a new position in the next candle open
        next_day_data = self.df[(self.df['symbol'] == stock) & (self.df['date'] == date + pd.Timedelta(days=1))]
        if next_day_data.empty:
            return
        avg_cost = next_day_data['open'].iloc[0]
        
        # Step 2: Calculate fees and available capital
        portion = self.dynamic_portion(self.capital) if self.dynamic_asset_control else 1
        trading_capital = self.capital * portion
        fees = trading_capital * self.buy_fee
        post_fees_capital = trading_capital - fees
        
        # Step 3: Calculate position size
        position_size = post_fees_capital / avg_cost
        position_size = position_size if position_size < 0 else int(position_size)
        
        # Step 4: Calculate actual cost
        total_cost = avg_cost * position_size
        
        # Step 5: Verify we have enough capital
        if total_cost > post_fees_capital:
            self._log(f"Insufficient capital for trade in {stock}")
            return
        # Step 6: Caculate Remaining Capital    
        remaining_capital = self.capital - total_cost
        
        self._log(f"Next day data for {stock} on {next_day_data['date'].iloc[0]}")
        self._log(f"Position size: {position_size:.8f} shares at ${avg_cost:.2f}")
        self._log(f"Cost: ${total_cost:.8f}")
        self._log(f"Fees: ${fees:.8f}")
        self._log(f"Remaining capital: ${remaining_capital:.8f}")
        self.current_trade = {
            "initial_capital": post_fees_capital + remaining_capital,
            "entry_date": next_day_data['date'].iloc[0],
            "symbol": stock,
            "entry_price": avg_cost,
            "position_size": position_size,
            "fees": fees,
            "cost": total_cost,
            "remaining_capital": remaining_capital,
            "entry_reason": alert
        }
        
        # Deduct cost from available capital
        self.capital = post_fees_capital - total_cost
        
        self._log(f"Opened new position for {stock} on {next_day_data['date'].iloc[0]}")
        self._log(f"Remaining capital: ${self.capital:.2f}")
    
    def dynamic_portion(self, available_capital):
        """Calculate the dynamic portion of the available capital"""
        # All in if available capital is less than 10000
        if available_capital <= 10000:
            return 1
        # 50% if available capital is less than 100000
        elif (available_capital <= 100000) and (available_capital > 10000):
            return 0.5
        # 25% if available capital is less than 1000000
        elif (available_capital <= 1000000) and (available_capital >= 100000):
            return 0.25
        # 10% if available capital is less than 10000000
        else:
            return 0.1
        
    def track_profit_loss(self, tracked_data, exit_reason=None, sell=False):
        """Track and calculate profit/loss for a position"""
        
        # Track current trade
        exit_price = tracked_data['close'] 
        entry_price = self.current_trade['entry_price']
        cost = self.current_trade['cost']
        profit_loss = (exit_price - entry_price) * self.current_trade['position_size']
                
        # Potential divide by zero if cost is 0
        if cost == 0:
            profit_rate = 0
        else:
            profit_rate = profit_loss / cost
        
        # Close the position if sell signal is detected
        if sell:
            # Close the position
            self._close_position(tracked_data, exit_price, profit_loss, exit_reason)
            return None
        
        return profit_loss
    
    def _close_position(self, tracked_data, exit_price, profit_loss, exit_reason):
        """Close out an existing position and record the trade"""
        
        # Step 1. Get the amount of capital after sold
        gain_loss_after_sold = profit_loss + self.current_trade["cost"] # Reamining captial + profit or loss + the total cost
        
        # Step 2. Compute the actual final captial after fees
        self.capital = (self.current_trade['remaining_capital'] + gain_loss_after_sold) - (gain_loss_after_sold * self.sell_fee)
        
        # Step 3. Compute the profit rate
        # Potential divide by zero if self.current_trade["cost"] is 0
        if self.current_trade["cost"] == 0:
            profit_rate = 0
        else:
            profit_rate = profit_loss / self.current_trade["cost"]
        
        self.trades.append({
            "symbol": self.current_trade["symbol"],
            "entry_price": self.current_trade["entry_price"],
            "entry_date": self.current_trade["entry_date"],
            "exit_price": exit_price,
            "exit_date": tracked_data['date'],
            "position_size": self.current_trade["position_size"],
            "cost": f"{self.current_trade['cost']:.2f}",
            "profit/loss%": f"{profit_rate * 100:.2f}%",
            "profit/loss": f"{profit_loss:.2f}",
            "remaining_capital": f"{self.current_trade['remaining_capital']:.2f}",
            "total_capital": f"{self.capital:.2f}",
            "entry_reason": self.current_trade["entry_reason"],
            "exit_reason": exit_reason,
            "holding_period_days": (tracked_data['date'] - self.current_trade["entry_date"]).days
            
        })
        
        # Reset trade state
        self.current_trade = {}
        
    def find_alert(self, stock_data, desired_alerts):
        """Find matching alerts in stock data"""
        for alert in desired_alerts:
            cur_alert_data = stock_data.loc[alert]
            if len(cur_alert_data) != 0:
                return stock_data[alert][0], alert
        return None, None
    
    def run(self):
        """Run the trading strategy"""
        self.excute_trades()
        trade_df = self.get_trades()
        captial_df = self.get_daily_return()
        return trade_df, captial_df 
    
    def get_daily_return(self): 
        """Get DataFrame of daily returns"""
        complete_trades_df = pd.DataFrame(self.complete_trades.items(), columns=['date', 'long_capital'])
        return complete_trades_df
    
    def get_trades(self):
        """Get DataFrame of completed trades"""
        return pd.DataFrame(self.trades)
    
    def _log(self, message):
        """Log debug messages if verbose mode is enabled"""
        if self.verbose:
            print(message)


In [None]:
strategy = LongTermTradingStrategy(mongo_config=mongo_config, 
                                start_date='2020-01-01',
                                end_date='2024-12-31',
                                sandbox_mode=True,
                                instrument='crypto',
                                verbose=False,
                                buy_signals=['long_accelerating'],
                                sell_signals=['velocity_loss'])

trade_df, captial_df = strategy.run()
captial_df

In [None]:
trade_df

### Complete Trade History

In [None]:
class CompleteTradeHistory:
    def __init__(self, mongo_config, instrument):
        self.mongo_client = MongoClient(mongo_config['connection_string'])
        self.mongo_db = self.mongo_client[mongo_config['db']]
        self.raw_data_collection = f"{mongo_config['warehouse_interval']}_data"
        self.raw_data = pd.DataFrame(list(self.mongo_db[self.raw_data_collection].find({'instrument': instrument})))

    def _position_live_update(self, trade_data, holding_symbol_data, all_trade_data):
        live_update_data = []
        for holding_timestamp in holding_symbol_data['date']:
            # Close out an existing position and record the trade
            # Step 1. Current Profit Loss Computation
            
            buy_price = trade_data['entry_price']
            current_price = holding_symbol_data[holding_symbol_data['date'] == holding_timestamp]['close'].iloc[0]
            cur_profit_loss = (current_price - buy_price) * trade_data['position_size']
            
            # Step 2. Current Profit Loss Pct Computation
            cur_profit_loss_pct = cur_profit_loss / float(trade_data['cost'])

            # Step 3. Compute the current capital given the current profit loss
            current_capital = (float(trade_data['remaining_capital']) + cur_profit_loss) + float(trade_data['cost'])    
            
            live_update_data.append({
                "symbol": trade_data["symbol"],
                "entry_price": trade_data["entry_price"], 
                "entry_date": trade_data["entry_date"],
                "exit_price": current_price,
                "exit_date": holding_timestamp,
                "position_size": trade_data["position_size"],
                "cost": trade_data["cost"],
                "profit_loss_pct": f"{cur_profit_loss_pct * 100}%",
                "profit/loss": f"{cur_profit_loss}",
                "remaining_capital": trade_data["remaining_capital"],
                "total_capital": f"{current_capital:.2f}",
                "holding_period_days": (holding_timestamp - trade_data["entry_date"]).days
            })
            
        live_update_data = pd.DataFrame(live_update_data)
        
        # Find all matching trades for this symbol and entry date
        matching_trades = all_trade_data[
            (all_trade_data['symbol'] == trade_data['symbol']) & 
            (all_trade_data['entry_date'] == trade_data['entry_date'])
        ].index
        
        if len(matching_trades) > 0:
            # Get the first matching trade index
            trade_index_before = matching_trades[0]
            trade_index_after = trade_index_before + 1
            
            # Concat the dataframes
            all_trade_data = pd.concat(
                [all_trade_data.iloc[:trade_index_before], live_update_data, all_trade_data.iloc[trade_index_after:]], 
                ignore_index=True)
        else:
            # If no matching trade found, append to end
            all_trade_data = pd.concat([all_trade_data, live_update_data], ignore_index=True)
            
        return all_trade_data

    def generate_complete_history(self, results_df):
        all_trade_data = results_df.copy()
        
        for index, trade_data in results_df.iterrows():
            start_trade_date = trade_data['entry_date']
            end_trade_date = trade_data['exit_date']
            trade_symbol = trade_data['symbol']
            holding_symbol_data = self.raw_data[
                (self.raw_data['symbol'] == trade_symbol) & 
                (self.raw_data['date'] > start_trade_date) & 
                (self.raw_data['date'] <= end_trade_date)
            ]
            all_trade_data = self._position_live_update(trade_data, holding_symbol_data, all_trade_data)
            
        return all_trade_data

In [None]:
complete_trade_history = CompleteTradeHistory(mongo_config, instrument='crypto').generate_complete_history(trade_df)
complete_trade_history

### Back Test Evaluation

A class designed to backtest and analyze the performance of all combinations of buy and sell signals

Key Features:
- Loads historical stock data, alerts, and stock candidates from MongoDB
- Implements a dynamic profit protection mechanism that triggers when profits reach 30%
- Tracks and analyzes stock performance based on velocity alerts
- Records trade results including entry/exit dates, profits/losses, and exit reasons

Testing Logic:
1. **Entry**: Test out all the signals in the buy_signals list
2. **Exit Conditions**: Test out all the signals in the sell_signals list

Usage:
- Study the dual tunnels strategy performance assuming client uses the system at any point in a year
- Use the results to estimate the potential returns and risks of the dual tunnels strategy
- Study the reason of exit and entry to improve the strategy


In [None]:
import numpy as np
import pandas as pd

class StrategyEDA:
    def __init__(self, mongo_config, start_date, end_date, buy_signals, sell_signals, instrument):
        self.mongo_config = mongo_config
        self.start_date = pd.to_datetime(start_date)
        self.end_date = pd.to_datetime(end_date)
        self.instrument = instrument
        self.title = self.process_title(buy_signals, sell_signals)
    
    def process_title(self, buy_signals, sell_signals):
        buy_signals = ', '.join(buy_signals)
        sell_signals = ', '.join(sell_signals)
        return f'{self.instrument} Buy: {buy_signals} | Sell: {sell_signals}'
    
    def get_nasdaq_return_data(self):
        mongo_client = MongoClient(self.mongo_config['connection_string'])
        nasdaq_data = pd.DataFrame(list(mongo_client[self.mongo_config['db']][self.mongo_config['warehouse_interval'] + '_data'].\
            find({'symbol': '^IXIC',
                    'date': {'$gte': self.start_date, '$lte': self.end_date},
                    },
                    {'date': 1, 'close': 1, '_id': 0})))
        nasdaq_data['return'] = nasdaq_data['close'].pct_change()
        nasdaq_data.dropna(inplace=True)
        
        # Assuming 10000 initial capital
        nasdaq_data['total_capital'] = 10000 * (1 + nasdaq_data['return']).cumprod()
        nasdaq_data.set_index('date', inplace=True)
        return nasdaq_data
    
    def get_bitcoin_return_data(self):
        mongo_client = MongoClient(self.mongo_config['connection_string'])
        bitcoin_data = pd.DataFrame(list(mongo_client[self.mongo_config['db']][self.mongo_config['warehouse_interval'] + '_data'].\
            find({'symbol': 'BTC',
                    'date': {'$gte': self.start_date, '$lte': self.end_date},
                    },
                    {'date': 1, 'close': 1, '_id': 0})))
        bitcoin_data['return'] = bitcoin_data['close'].pct_change()
        bitcoin_data.dropna(inplace=True)
        
        # Assuming 10000 initial capital
        bitcoin_data['total_capital'] = 10000 * (1 + bitcoin_data['return']).cumprod()    
        bitcoin_data.set_index('date', inplace=True)
        return bitcoin_data
    
    def plot_trading_analysis(self, results_df):
        import plotly.graph_objects as go
        from plotly.subplots import make_subplots
        
        # Convert profit/loss from string to float
        results_df['profit_loss%'] = results_df['profit/loss%'].str.rstrip('%').astype(float)
        
        # Create subplots with different heights
        fig = make_subplots(rows=2, cols=1, 
                        row_heights=[0.7, 0.3],
                        subplot_titles=(f'Portfolio Value Over Time',
                                        f'Distribution of Returns'))
        
        # Process data for portfolio value plot
        results_df_sorted = results_df.sort_values('exit_date')
        first_data = pd.DataFrame({'total_capital': [10000], 'exit_date': [pd.to_datetime(self.start_date)]})
        pd.set_option("future.no_silent_downcasting", True)
        results_df_sorted = pd.concat([first_data, results_df_sorted]).fillna(0)
        results_df_sorted = results_df_sorted.reset_index(drop=True)
        results_df_sorted.set_index('exit_date', inplace=True)

        # Create date range and interpolate
        date_range = pd.date_range(start=pd.to_datetime(self.start_date), 
                                end=pd.to_datetime(self.end_date), 
                                freq='D')
        results_df_sorted = results_df_sorted.reindex(date_range)
        results_df_sorted['total_capital'] = results_df_sorted['total_capital'].astype(float)
        results_df_sorted['total_capital'] = results_df_sorted['total_capital'].interpolate()
        results_df_sorted['monthly_return'] = results_df_sorted['total_capital'].pct_change(periods=30)
        
        # Add Comparison Data
        if self.instrument == 'equity':
            nasdaq_data = self.get_nasdaq_return_data()
            nasdaq_data = nasdaq_data[self.start_date:self.end_date]
        elif self.instrument == 'crypto':
            bitcoin_data = self.get_bitcoin_return_data()
            bitcoin_data = bitcoin_data[self.start_date:self.end_date]
        else:
            raise ValueError(f"Invalid instrument: {self.instrument}")
        
        # Plot strategy line
        fig.add_trace(
            go.Scatter(x=results_df_sorted.index, 
                    y=results_df_sorted['total_capital'],
                    name='Strategy',
                    line=dict(color='blue')),
            row=1, col=1
        )
        
        # Plot Bitcoin
        if self.instrument == 'crypto':
            fig.add_trace(
                go.Scatter(x=bitcoin_data.index, 
                        y=bitcoin_data['total_capital'],
                    name='Bitcoin',
                    line=dict(color='rgba(247, 147, 26, 0.5)')),  # Bitcoin orange
            row=1, col=1
            )
        # Plot NASDAQ
        if self.instrument == 'equity':
            fig.add_trace(
                go.Scatter(x=nasdaq_data.index, 
                    y=nasdaq_data['total_capital'],
                    name='NASDAQ',
                    line=dict(color='rgba(85, 85, 85, 0.5)')),  # Dark gray
            row=1, col=1
        )
        
        # Add initial principle line
        fig.add_hline(y=10000, line_dash="dash", line_color="gray", 
                    annotation_text="Initial Principle",
                    row=1, col=1)
    
        # Add bear market shading
        fig.add_vrect(
            x0="2022-01-01", x1="2022-12-31",
            fillcolor="red", opacity=0.1,
            layer="below", line_width=0,
            row=1, col=1
        )
        
        # Add final values annotation
        strategy_final = results_df_sorted['total_capital'].iloc[-1]
        if self.instrument == 'equity':
            nasdaq_final = nasdaq_data['total_capital'].iloc[-1]
        elif self.instrument == 'crypto':
            bitcoin_final = bitcoin_data['total_capital'].iloc[-1]
        else:
            raise ValueError(f"Invalid instrument: {self.instrument}")
        
        # Position annotations based on log scale
        max_y = results_df_sorted['total_capital'].max()
        
        if self.instrument == 'equity':
            fig.add_annotation(
                text=f'Strategy Final: ${strategy_final:,.2f} (NASDAQ Final: ${nasdaq_final:,.2f})',
                x=pd.to_datetime(self.start_date) + pd.Timedelta(days=365),
                y=max_y * 0.9,
                showarrow=False,
                row=1, col=1
            )
        elif self.instrument == 'crypto':
            fig.add_annotation(
                text=f'Strategy Final: ${strategy_final:,.2f} (Bitcoin Final: ${bitcoin_final:,.2f})',
                x=pd.to_datetime(self.start_date) + pd.Timedelta(days=365),
                y=max_y * 0.9,
                showarrow=False,
                row=1, col=1
            )
        else:
            raise ValueError(f"Invalid instrument: {self.instrument}")
        # Add monthly return annotations adjusted for log scale
        for i in range(0, len(results_df_sorted), 30):
            if i+30 < len(results_df_sorted):
                monthly_return = results_df_sorted['monthly_return'].iloc[i+ 30]
                if not pd.isna(monthly_return):
                    date = results_df_sorted.index[i+ 30]
                    current_value = results_df_sorted['total_capital'].iloc[i+ 30]
                    y_pos = current_value * 1.1  # Position 10% above the current value 
                    color = 'green' if monthly_return > 0 else 'red' 
                    fig.add_annotation( 
                        text=f'{monthly_return:.3%}', 
                        x=date, 
                        y=y_pos, 
                        showarrow=False, 
                        font=dict(color=color, size=10), 
                        textangle=90, 
                        row=1,  
                        col=1
                    )
        
        # Add returns distribution plot
        fig.add_trace(
            go.Histogram(x=results_df['profit_loss%'],
                        name='Returns Distribution',
                        nbinsx=180),
            row=2, col=1
        )
        
        # Add vertical line at x=0 for distribution
        fig.add_vline(x=0, line_dash="dash", line_color="red", row=2, col=1)

        # Update layout with log y-axis
        fig.update_layout(
            height=800,
            width=1200,
            showlegend=True,
            title_text=self.title,
            xaxis=dict(
                range=[pd.to_datetime(self.start_date), pd.to_datetime(self.end_date)]
            ),
            yaxis=dict(type='linear')  # Set y-axis to logarithmic scale
        )
        
        # Update axes labels
        fig.update_xaxes(title_text="Date", row=1, col=1)
        fig.update_xaxes(title_text="Return %", row=2, col=1,)
        fig.update_yaxes(title_text="Total Amount ($)", row=1, col=1)
        fig.update_yaxes(title_text="Count", row=2, col=1)
        
        # Show the plot
        fig.show()

In [None]:
class StrategyAnalytics:
    def __init__(self, verbose=True, instrument='crypto', trade_type='bull'):
        self.verbose = verbose
        self.instrument = instrument
        self.trade_type = trade_type
        
    def _base_case_analysis(self, bull_result_df, crypto_df):
        base_case_dict = {}
        # Get the base case
        symbol_list = bull_result_df['symbol'].unique()
        for symbol in symbol_list:
            symbol_df = crypto_df.xs(symbol, axis=0)
            # Compute return rate 
            return_rate = symbol_df.iloc[-1]['close'] / symbol_df.iloc[0]['close']
            # Compute Max Drawdown
            max_drawdown = symbol_df['close'].div(symbol_df['close'].cummax()).sub(1).min()
            base_case_dict[symbol] = {'return_rate': return_rate, 'max_drawdown': max_drawdown}
        base_case_df = pd.DataFrame(base_case_dict)
        return base_case_df
    
    def monthly_return_rate(self, capital_df):
        # Get the monthly return rate
        capital_col = 'long_capital' if self.trade_type == 'bull' else 'short_capital'
        df = capital_df[['date', capital_col]]
        X = df.copy()
        # Group by month and get last value, keeping the date column
        X = X.groupby(['date']).last().reset_index()
        X['monthly_return_rate'] = X[capital_col].pct_change()
        
        return X
    
    def sharpe_ratio(self, captial_df):
        # Create a copy of results_df for manipulation
        X = captial_df.copy()
        X['date'] = pd.to_datetime(X['date'])
        capital_col = 'long_capital' if self.trade_type == 'bull' else 'short_capital'
        X['daily_return'] = X[capital_col].pct_change()
        
        # Handle any NaN values in daily returns
        X['daily_return'] = X['daily_return'].fillna(0)
        
        unique_year = X['date'].dt.year.nunique()
        
        # Last day capital
        last_day_capital = X[capital_col].iloc[-1]
        
        # Initial capital
        initial_capital = X[capital_col].iloc[0]
        
        # Risk free rate
        risk_free_rate = 0.03
        
        # Excessive daily return 
        mean_annualized_return = (last_day_capital/ initial_capital) ** ((1/unique_year)) - 1
        
        # Calculate standard deviation avoiding NaN/invalid values
        daily_returns = X['daily_return'].replace([np.inf, -np.inf], np.nan).dropna()
        std_annualized_return = daily_returns.std() * np.sqrt(365)
        
        # Avoid division by zero
        if std_annualized_return == 0:
            return 0
            
        # Compute annualized Sharpe Ratio
        sharpe_ratio = ((mean_annualized_return - risk_free_rate) / std_annualized_return)
        
        return sharpe_ratio

    def max_drawdown(self, captial_df):
        """Calculate the maximum drawdown percentage from peak to trough, considering chronological order.
        
        Args:
            captial_df: DataFrame containing capital column with daily portfolio values
            
        Returns:
            float: Maximum drawdown percentage (0 to 100)
        """
        X = captial_df.copy()
        X['date'] = pd.to_datetime(X['date'])
        X = X.sort_values('date')  # Ensures chronological order
        
        capital_col = 'long_capital' if self.trade_type == 'bull' else 'short_capital'
        
        # This is key - expanding().max() only considers values up to current point
        running_max = X[capital_col].expanding().max()
        
        # Calculate drawdown in percentage terms
        drawdown = ((X[capital_col] - running_max) / running_max) * -100
        
        # Get the maximum drawdown
        max_dd = drawdown.max()
        
        return max_dd
    
    @staticmethod
    def win_rate(results_df):
        # Create a copy of results_df for manipulation
        X = results_df.copy()

        X['profit/loss%'] = X['profit/loss%'].str.rstrip('%').astype(float)
        win_trade = X[X['profit/loss%'] > 0]
        loss_trade = X[X['profit/loss%'] < 0]
        
        # Handle division by zero case
        total_trades = len(win_trade) + len(loss_trade)
        if total_trades == 0:
            return 0
            
        return len(win_trade) / total_trades
    
    @staticmethod
    def max_profit(results_df):
        # Create a copy of results_df for manipulation
        X = results_df.copy()
        X['profit/loss%'] = X['profit/loss%'].str.rstrip('%').astype(float)
        return X['profit/loss%'].max()

    @staticmethod
    def max_loss(results_df):
        X = results_df.copy()
        X['profit/loss%'] = X['profit/loss%'].str.rstrip('%').astype(float)
        return X['profit/loss%'].min()

    @staticmethod
    def median_return(results_df):
        # Create a copy of results_df for manipulation
        X = results_df.copy()
        X['profit/loss%'] = X['profit/loss%'].str.rstrip('%').astype(float)
        return X['profit/loss%'].median()
        
    @staticmethod
    def mean_return(results_df):
        # Create a copy of results_df for manipulation
        X = results_df.copy()
        X['profit/loss%'] = X['profit/loss%'].str.rstrip('%').astype(float)
        return X['profit/loss%'].mean()
        
    @staticmethod
    def std_return(results_df):
        # Create a copy of results_df for manipulation
        X = results_df.copy()
        X['profit/loss%'] = X['profit/loss%'].str.rstrip('%').astype(float)
        return X['profit/loss%'].std()
    
    @staticmethod
    def final_capital(results_df):
        X = results_df.copy()
        X['total_capital'] = X['total_capital'].astype(float)
        return X['total_capital'].iloc[-1]

    def annualized_return(self, results_df):
        import numpy as np
        import pandas as pd

        # Create a copy of results_df for manipulation
        X = results_df.copy()

        # Strip '%' and convert profit/loss% to numeric
        X['profit/loss%'] = X['profit/loss%'].str.rstrip('%').astype(float)

        # Set Exit_date as the index for easier grouping
        X['exit_date'] = pd.to_datetime(X['exit_date'])
        X.set_index('exit_date', inplace=True)

        # Group by year and calculate annualized returns
        annualized_returns = []
        for year, group in X.groupby(X.index.year):
            # Calculate cumulative return for the year
            cumulative_return = np.prod(1 + (group['profit/loss%'] / 100)) - 1

            # Calculate the number of days in the year
            days_in_year = group.index.dayofyear.max()
            
            # Handle division by zero case
            if days_in_year == 0:
                continue

            # Annualize the return
            if self.instrument == 'equity':
                annualized = (1 + cumulative_return) ** (252 / days_in_year) - 1
            else:  # Assume 365 days for non-equity instruments
                annualized = (1 + cumulative_return) ** (365 / days_in_year) - 1

            # Store the result as a list
            annualized_returns.append([year, annualized])

        # Convert to DataFrame with column names
        annualized_returns = pd.DataFrame(annualized_returns, columns=['year', 'annualized_return'])

        return annualized_returns

    def run(self, results_df, captial_df):
        # Create a copy for manipulation
        X = results_df.copy()
        Y = captial_df.copy()
        # Calculate all metrics once
        metrics = {
            'sharpe_ratio': self.sharpe_ratio(Y),
            'max_drawdown': self.max_drawdown(Y),
            'final_capital': self.final_capital(X),
            'median_return': self.median_return(X), 
            'mean_return': self.mean_return(X),
            'std_return': self.std_return(X),
            'max_profit': self.max_profit(X),
            'max_loss': self.max_loss(X),
            'annualized_return': self.annualized_return(X)
        }
        
        # Print summary using calculated metrics
        if self.verbose:
            print("\n" + "="*50)
            print("TRADING STRATEGY PERFORMANCE SUMMARY") 
            print("="*50 + "\n")
            
            print("YEARLY RETURNS")
            print("-"*20)
            print(metrics['annualized_return'])

            print("\n")
            
            print("KEY STATISTICS")
            print("-"*20)
            print(f"Final Capital: ${metrics['final_capital']:,.2f}")
            print(f"Sharpe Ratio: {metrics['sharpe_ratio']:.2f}")
            print(f"Maximum Drawdown: {metrics['max_drawdown']:.2f}%")
            print(f"Maximum Profit: {metrics['max_profit']:.2f}%")
            print(f"Maximum Loss: {metrics['max_loss']:.2f}%")
            print("\n")
            
            print("RETURN METRICS")
            print("-"*20)
            print(f"Mean Return: {metrics['mean_return']:.2f}%")
            print(f"Median Return: {metrics['median_return']:.2f}%")
            print(f"Return Std Dev: {metrics['std_return']:.2f}%")
            print("\n" + "="*50)
            
        return metrics


### Equity Trading Backtest

In [None]:
from itertools import combinations, product

output_dir = os.path.join('..', 'data', 'strategies_combination_results')
os.makedirs(output_dir, exist_ok=True)
# Stock Candidates Generation
instrument = 'equity'
pick_stock_instance = Pick_Stock(mongo_config, instrument=instrument, sandbox_mode=True, start_date='2020-01-01')
stock_candidates_df = pick_stock_instance.run()

# Define all available buy and sell signals
buy_signals = ['accumulating', 'long_accumulating', 'accelerating', 'long_accelerating', 'velocity_maintained']
sell_signals = ['velocity_loss', 'velocity_weak']

# Generate all combinations of buy and sell signals
max_signals = len(buy_signals)  # Maximum signals to include in a combination
all_buy_combinations = [
    list(comb) for r in range(1, max_signals + 1) for comb in combinations(buy_signals, r)
]
all_sell_combinations = [
    list(comb) for r in range(1, len(sell_signals) + 1) for comb in combinations(sell_signals, r)
]
# Test all combinations
strategy_results = {}
for buy_combination, sell_combination in product(all_buy_combinations, all_sell_combinations):
    # Run the trading strategy
    trading_strategy = LongTermTradingStrategy(
        mongo_config=mongo_config,
        start_date='2020-01-01',
        end_date='2024-12-31',
        sandbox_mode=True,
        instrument=instrument,
        buy_signals=buy_combination,
        sell_signals=sell_combination,
        verbose=False
    )
    print(f'Running {buy_combination} | {sell_combination}')
    results_df, captial_df = trading_strategy.run()
    # Only keep the results that has more than 50,000 final capital and total assest never drops below 10,000
    results_df['total_capital'] = results_df['total_capital'].astype(float)
    if results_df['total_capital'].iloc[-1] > 50000:
        # Save results to CSV
        results_df.to_csv(os.path.join(output_dir, f'results_{instrument}_{buy_combination}_{sell_combination}.csv'), index=False)
        # Run the analytics
        strategy_analytics = StrategyAnalytics(instrument=instrument)
        analytics_results = strategy_analytics.run(results_df, captial_df)
        # Run the EDA analysis
        eda_analysis = StrategyEDA(mongo_config=mongo_config, start_date='2020-01-01', end_date='2024-12-31', buy_signals=buy_combination, sell_signals=sell_combination, instrument=instrument)
        eda_analysis.plot_trading_analysis(results_df)
        # Store results
        combination_key = f"Buy: {buy_combination} | Sell: {sell_combination}"
        strategy_results[combination_key] = analytics_results
    else:
        continue
    

In [None]:
buy = ['long_accumulating','accelerating','velocity_maintained']
sell = ['velocity_weak']    
trading_strategy = LongTermTradingStrategy(
    mongo_config=mongo_config,
    start_date='2020-01-01',
    end_date='2024-12-31',
    sandbox_mode=True,
    instrument=instrument,
    buy_signals=buy,
    sell_signals=sell,
    verbose=False
)
results_df, captial_df = trading_strategy.run()
eda_analysis = StrategyEDA(mongo_config=mongo_config, start_date='2020-01-01', end_date='2024-12-31', buy_signals=buy, sell_signals=sell, instrument=instrument)
eda_analysis.plot_trading_analysis(results_df)

In [None]:
# Find the best combinations based on multiple metrics
metrics = {
    'sharpe_ratio': {'best': float('-inf'), 'combination': None},
    'final_capital': {'best': float('-inf'), 'combination': None}, 
    'max_drawdown': {'best': float('inf'), 'combination': None}  # For drawdown, lower is better
}

for combination, results in strategy_results.items():
    # Check each metric
    if results['sharpe_ratio'] > metrics['sharpe_ratio']['best']:
        metrics['sharpe_ratio']['best'] = results['sharpe_ratio']
        metrics['sharpe_ratio']['combination'] = combination
    if results['final_capital'] > metrics['final_capital']['best']:
        metrics['final_capital']['best'] = results['final_capital']
        metrics['final_capital']['combination'] = combination
    if results['max_drawdown'] < metrics['max_drawdown']['best']:
        metrics['max_drawdown']['best'] = results['max_drawdown']
        metrics['max_drawdown']['combination'] = combination
        
# Print results for each metric
print("Best combinations by metric:")
print(f"\nBest by Sharpe Ratio ({metrics['sharpe_ratio']['best']:.4f}):")
print(f"Combination: {metrics['sharpe_ratio']['combination']}")

print(f"\nBest by Final Capital (${metrics['final_capital']['best']:,.2f}):")
print(f"Combination: {metrics['final_capital']['combination']}")

print(f"\nBest by Max Drawdown ({metrics['max_drawdown']['best']:.2%}):")
print(f"Combination: {metrics['max_drawdown']['combination']}")

print("\nCombinations with Sharpe ratio > 1:")
for combination, results in strategy_results.items():
    if results['sharpe_ratio'] > 1:
        print(f"\nCombination: {combination}")
        print(f"Sharpe Ratio: {results['sharpe_ratio']:.4f}")

### Crypto Trading Backtest

In [None]:
from itertools import combinations, product
mongo_config = load_client_config('mongodb', 'production')
url = mongo_config['connection_string']
mongo_client = MongoClient(url)
all_sandbox_collections = mongo_client['data_lake']['sandbox_results']

output_dir = os.path.join('..', 'data', 'strategies_combination_results')

os.makedirs(output_dir, exist_ok=True)
# Stock Candidates Generation
instrument = 'crypto'
pick_stock_instance = Pick_Stock(mongo_config, instrument=instrument, sandbox_mode=True, start_date='2020-01-01')
stock_candidates_df = pick_stock_instance.run()

# Define all available buy and sell signals
buy_signals = ['accumulating', 'long_accumulating', 'accelerating', 'long_accelerating', 'velocity_maintained']
sell_signals = ['velocity_loss', 'velocity_weak']

# Generate all combinations of buy and sell signals
max_signals = len(buy_signals)  # Maximum signals to include in a combination
all_buy_combinations = [
    list(comb) for r in range(1, max_signals + 1) for comb in combinations(buy_signals, r)
]
all_sell_combinations = [
    list(comb) for r in range(1, len(sell_signals) + 1) for comb in combinations(sell_signals, r)
]

# Test all combinations
crypto_strategy_results = {}
for buy_combination, sell_combination in product(all_buy_combinations, all_sell_combinations):
    # Run the trading strategy
    trading_strategy = LongTermTradingStrategy(
        mongo_config=mongo_config,
        start_date='2020-01-01',
        end_date='2024-12-31',
        sandbox_mode=True,
        instrument=instrument,
        buy_signals=buy_combination,
        sell_signals=sell_combination,
        verbose=False
    )
    
    results_df, captial_df = trading_strategy.run()
    results_df['total_capital'] = results_df['total_capital'].astype(float)
    # Run the analytics
    strategy_analytics = StrategyAnalytics(instrument=instrument)
    analytics_results = strategy_analytics.run(results_df.drop(columns=['exit_reason', 'entry_reason']), captial_df)
    
    # Only keep the results that has more than 300,000 final capital and total assest never drops below 10,000
    if (results_df['total_capital'].iloc[-1] > 300000) and (analytics_results['max_drawdown'] < 50) and (analytics_results['sharpe_ratio'] > 1):
        
        results_df['buy_signals'] = f"{buy_combination}"
        results_df['sell_signals'] = f"{sell_combination}"
        
        captial_df['buy_signals'] = f"{buy_combination}"
        captial_df['sell_signals'] = f"{sell_combination}"
        
        # Save results to MongoDB
        mongo_client['data_lake']['sandbox_trade_history'].insert_many(results_df.to_dict('records'))  
        mongo_client['data_lake']['sandbox_daily_capital'].insert_many(captial_df.to_dict('records'))   
        # Run the EDA analysis
        eda_analysis = StrategyEDA(mongo_config=mongo_config, start_date='2020-01-01', end_date='2024-12-31', buy_signals=buy_combination, sell_signals=sell_combination, instrument=instrument)
        eda_analysis.plot_trading_analysis(results_df)
        # Store results
        combination_key = f"Buy: {buy_combination} | Sell: {sell_combination}"
        crypto_strategy_results[combination_key] = analytics_results
    else:
        continue


In [None]:
# Find the best combinations based on multiple metrics
crypto_metrics = {
    'sharpe_ratio': {'best': float('-inf'), 'combination': None},
    'final_capital': {'best': float('-inf'), 'combination': None}, 
    'max_drawdown': {'best': float('inf'), 'combination': None}  # For drawdown, lower is better
}

for combination, results in crypto_strategy_results.items():
    # Check each metric
    if results['sharpe_ratio'] > crypto_metrics['sharpe_ratio']['best']:
        crypto_metrics['sharpe_ratio']['best'] = results['sharpe_ratio']
        crypto_metrics['sharpe_ratio']['combination'] = combination
    if results['final_capital'] > crypto_metrics['final_capital']['best']:
        crypto_metrics['final_capital']['best'] = results['final_capital']
        crypto_metrics['final_capital']['combination'] = combination
    if results['max_drawdown'] < crypto_metrics['max_drawdown']['best']:
        crypto_metrics['max_drawdown']['best'] = results['max_drawdown']
        crypto_metrics['max_drawdown']['combination'] = combination
        
# Print results for each metric
print("Best combinations by metric:")
print(f"\nBest by Sharpe Ratio ({crypto_metrics['sharpe_ratio']['best']:.4f}):")
print(f"Combination: {crypto_metrics['sharpe_ratio']['combination']}")

print(f"\nBest by Final Capital (${crypto_metrics['final_capital']['best']:,.2f}):")
print(f"Combination: {crypto_metrics['final_capital']['combination']}")

print(f"\nBest by Max Drawdown ({crypto_metrics['max_drawdown']['best']:.2%}):")
print(f"Combination: {crypto_metrics['max_drawdown']['combination']}") 

# Print combinations with Sharpe ratio > 1, sorted by Sharpe ratio
print("\nCombinations with Sharpe ratio > 1 (sorted by Sharpe ratio):")
sorted_results = sorted(
    [(combination, results) for combination, results in crypto_strategy_results.items() if results['sharpe_ratio'] > 1],
    key=lambda x: x[1]['sharpe_ratio'],
    reverse=True
)
for combination, results in sorted_results:
    print(f"\nCombination: {combination}")
    print(f"Sharpe Ratio: {results['sharpe_ratio']:.4f} | Final Capital: ${results['final_capital']:,.2f} | Max Drawdown: {results['max_drawdown']:.2%}")

### Monte Carlo Simulation

In [None]:
# Define the MongoDB configuration
mongo_config = load_client_config('mongodb', 'production')

# Load the trading backtest results
client = MongoClient(mongo_config['url'])
db_name = mongo_config['db_name']
collection = client[db_name]['sandbox_results']

back_test_results = pd.DataFrame(list(collection.find({'buy_signals': "['long_accelerating']",
                                                        'sell_signals': "['velocity_loss']"})))

gain_loss_pct = back_test_results['profit/loss%'].str.rstrip('%').astype(float) / 100

trade_history = gain_loss_pct.to_numpy()
print(trade_history)

In [None]:
# Randomize trade sequences
num_simulations = 1000
num_trades = len(trade_history)

simulated_equity_curves = []
for _ in range(num_simulations):
    randomized_trades = np.random.choice(trade_history, size=num_trades, replace=True)
    equity_curve = np.cumsum(randomized_trades)  # Cumulative sum for equity curve
    simulated_equity_curves.append(equity_curve)


In [None]:
# Add Gaussian noise to trades
noise_std_dev = 0.5  # Adjust based on expected variability
simulated_equity_curves_with_noise = []
for _ in range(num_simulations):
    noisy_trades = trade_history + np.random.normal(0, noise_std_dev, num_trades)
    equity_curve = np.cumsum(noisy_trades)
    simulated_equity_curves_with_noise.append(equity_curve)

In [None]:
# Simulate extreme market conditions by amplifying losses and gains
extreme_factor = 1.5  # Amplify outcomes by a factor
extreme_trades = trade_history * extreme_factor

simulated_extreme_curves = []
for _ in range(num_simulations):
    extreme_randomized_trades = np.random.choice(extreme_trades, size=num_trades, replace=True)
    equity_curve = np.cumsum(extreme_randomized_trades)
    simulated_extreme_curves.append(equity_curve)


In [None]:
# Convert results to DataFrame for analysis
simulated_equity_curves_df = pd.DataFrame(simulated_equity_curves)
simulated_equity_curves_with_noise_df = pd.DataFrame(simulated_equity_curves_with_noise)
simulated_extreme_curves_df = pd.DataFrame(simulated_extreme_curves)

# Calculate final equity values and drawdowns
def analyze_results(simulated_df):
    final_equity_values = simulated_df.iloc[:, -1]
    max_drawdowns = simulated_df.apply(lambda x: np.maximum.accumulate(x) - x, axis=1).max(axis=1)

    # Summary statistics
    avg_final_equity = final_equity_values.mean()
    worst_drawdown = max_drawdowns.max()
    confidence_interval = np.percentile(final_equity_values, [5, 95])

    return avg_final_equity, worst_drawdown, confidence_interval

# Analyze each scenario
results_randomized = analyze_results(simulated_equity_curves_df)
results_with_noise = analyze_results(simulated_equity_curves_with_noise_df)
results_extreme = analyze_results(simulated_extreme_curves_df)

print("Randomized Trades:", results_randomized)
print("With Noise:", results_with_noise)
print("Extreme Conditions:", results_extreme)


In [None]:
# Plot sample equity curves for one scenario (randomized trades)
plt.figure(figsize=(10, 6))
for i in range(len(simulated_equity_curves_df)):  # Plot first 10 simulations
    plt.plot(simulated_equity_curves_df.iloc[i], alpha=0.5)
plt.title("Sample Monte Carlo Simulated Equity Curves (Randomized Trades)")
plt.xlabel("Trade Number")
plt.ylabel("Equity")
plt.show()

# Plot histogram of final equity values for all scenarios
plt.figure(figsize=(8, 5))
plt.hist(simulated_equity_curves_df.iloc[:, -1], bins=30, alpha=0.7, label="Randomized")
plt.hist(simulated_equity_curves_with_noise_df.iloc[:, -1], bins=30, alpha=0.7, label="With Noise")
plt.hist(simulated_extreme_curves_df.iloc[:, -1], bins=30, alpha=0.7, label="Extreme")
plt.title("Distribution of Final Equity Values")
plt.xlabel("Final Equity")
plt.ylabel("Frequency")
plt.legend()
plt.show()


## Macro Economic Analysis

### Correlation Analysis

In [None]:
from pymongo import DESCENDING
import pandas as pd
import pymongo
from pymongo import MongoClient
import yfinance as yf

class MacroEconomicAnalysis:
    def __init__(self, mongo_url='localhost', db_name='data_lake', collection_name='long_term_alert'):
        self.mongo_url = mongo_url
        self.db_name = db_name
        self.collection_name = collection_name
        self.current_date = pd.Timestamp('2024-11-22')
        self.time_periods = {
            'last_2_weeks': pd.Timedelta(days=14),
            'last_month': pd.Timedelta(days=30), 
            'last_3_months': pd.Timedelta(days=90)
        }
        
    def load_data(self):
        # Create MongoDB client connection once
        mongo_client = MongoClient(self.mongo_url)
        collection = mongo_client[self.db_name][self.collection_name]
        
        results = {}
        
        # Get distinct symbols for index, commodity and bond instruments
        target_instruments = ['index', 'commodity', 'bond', 'sector']
        for instrument in target_instruments:
            symbols = collection.distinct('symbol', {'instrument': instrument})
            results[instrument] = {}
            
            for symbol in symbols:
                # Get all historical data for each symbol and time period
                results[instrument][symbol] = {}
                for period_name, period_delta in self.time_periods.items():
                    start_date = self.current_date - period_delta
                    
                    results[instrument][symbol][period_name] = list(collection.find(
                        {
                            'symbol': symbol,
                            'interval': 1,
                            'date': {'$gte': start_date}
                        },
                        {'close': 1, '_id': 0}
                    ).sort('date', DESCENDING))
                
        return results
    
    def compute_correlations(self, data):
        correlations = {}
        
        for period in self.time_periods.keys():
            # Create empty dataframe for each time period
            period_df = pd.DataFrame()
            
            # Add data for each instrument and symbol
            for instrument in data:
                for symbol in data[instrument]:
                    symbol_data = pd.DataFrame(data[instrument][symbol][period])
                    if not symbol_data.empty:
                        # Calculate returns using pct_change
                        
                        returns = pd.Series(symbol_data['close']).pct_change()
                        period_df[f"{symbol}"] = returns
            
            # Calculate correlation matrix on returns for this period
            correlations[period] = period_df.corr(method='pearson')
        
        return correlations
    
    def run(self):
        # Load data for all instruments and symbols
        data = self.load_data()
        # Compute correlations between all symbols for each time period
        correlations = self.compute_correlations(data)
        
        return correlations
    
macro_economic_analysis = MacroEconomicAnalysis()
correlations = macro_economic_analysis.run()


## Fundamentals Analysis

In [None]:
from polygon import RESTClient
import yfinance as yf 
# Ploygon API
api_key = load_client_config(tool='websocket', mode='production')['api_key']
client = RESTClient(api_key)

# Fetch Related Companies
ticker = "AAPL"

try:
    related_companies = client.get_related_companies(ticker)
    tickers = [company.ticker for company in related_companies]
except Exception as e:
    print(f"Error fetching related companies: {e}")


# Fetch Fundamentals
for ticker in tickers:
    try:    
        yf_ticker = yf.Ticker(ticker)
        current_price = yf_ticker.history(period='1d')['Close'].iloc[0]
        eps = yf_ticker.info['trailingEps']
        price_to_eps = current_price / eps
        print(f"{ticker}: {eps} | {price_to_eps}")
    except Exception as e:
        continue

In [None]:
import pandas_market_calendars as mcal
import pytz
from datetime import datetime

# Get NYSE calendar
nyse = mcal.get_calendar('NYSE')

# Get current NY time
ny_tz = pytz.timezone('America/New_York')
today = datetime.now(ny_tz).strftime('%Y-%m-%d')

# Get market schedule
schedule = nyse.schedule(start_date=today, end_date=today)

# Get market hours
market_open = schedule.iloc[0]['market_open'].tz_convert('America/New_York')
market_close = schedule.iloc[0]['market_close'].tz_convert('America/New_York')

print(market_open)
print(market_close)
