In [None]:
import os, sys
import yfinance as yf
# import ta
import pandas as pd
import numpy as np
import vectorbt as vbt
from datetime import date, timedelta, datetime
from itertools import product
from IPython.display import clear_output
import matplotlib.pyplot as plt
from csv import writer
import warnings
# import talib as ta
import math
import re
from warnings import simplefilter
simplefilter(action="ignore", category=pd.errors.PerformanceWarning)




In [5]:
import pandas as pd
import numpy as np
from typing import List, Tuple
import random

class TradingStrategy:
  def __init__(self, data: pd.DataFrame, initial_capital: float = 20000,
               n_positions: int = 20, stop_loss_pct: float = 0.05):
      self.data = data
      self.initial_capital = initial_capital
      self.current_capital = initial_capital
      self.n_positions = n_positions
      self.stop_loss_pct = stop_loss_pct
      self.portfolio_values = []
      self.trade_history = []
      
  def calculate_signals(self, start_idx: int) -> List[str]:
      """Calculate EMA signals for all stocks"""
      # Make sure we have enough data for EMA calculation
      if start_idx < 50:  # Need at least 50 days for EMA50
          return []
          
      # Calculate EMAs using data up to start_idx
      data_slice = self.data.iloc[max(0, start_idx-50):start_idx+1]
      ema10 = data_slice.ewm(span=10, adjust=False).mean()
      ema50 = data_slice.ewm(span=50, adjust=False).mean()
      
      # Compare EMAs at the current point
      buy_signals = []
      for col in self.data.columns:
          if ema10.iloc[-1][col] > ema50.iloc[-1][col]:
              buy_signals.append(col)
      
      return buy_signals
  
  def execute_trades(self, start_date_idx: int, holding_period: int) -> Tuple[float, List]:
      """Execute trades for one period"""
      buy_signals = self.calculate_signals(start_date_idx)
      
      if not buy_signals:  # If no signals, return immediately
          return start_date_idx, []
          
      if len(buy_signals) < self.n_positions:
          selected_stocks = buy_signals
      else:
          selected_stocks = random.sample(buy_signals, self.n_positions)
          
      position_size = self.current_capital / len(selected_stocks)
      positions = []
      
      for stock in selected_stocks:
          entry_price = self.data.iloc[start_date_idx][stock]
          stop_loss = entry_price * (1 - self.stop_loss_pct)
          shares = position_size / entry_price
          positions.append({
              'stock': stock,
              'shares': shares,
              'entry_price': entry_price,
              'stop_loss': stop_loss
          })
          
      period_values = []
      closed_positions = []
      
      end_idx = min(start_date_idx + holding_period, len(self.data))
      
      for day in range(start_date_idx + 1, end_idx):
          daily_value = 0
          
          for pos in positions[:]:
              current_price = self.data.iloc[day][pos['stock']]
              
              if current_price <= pos['stop_loss']:
                  value = pos['shares'] * current_price
                  daily_value += value
                  closed_positions.append({
                      'stock': pos['stock'],
                      'entry_price': pos['entry_price'],
                      'exit_price': current_price,
                      'pnl': (current_price - pos['entry_price']) * pos['shares']
                  })
                  positions.remove(pos)
              else:
                  daily_value += pos['shares'] * current_price
                  
          if daily_value > 0:  # Only append if we have active positions
              period_values.append(daily_value)
          
          if not positions:  # All positions closed
              break
              
      # Close remaining positions at end of period
      final_day = min(start_date_idx + holding_period - 1, len(self.data) - 1)
      for pos in positions:
          exit_price = self.data.iloc[final_day][pos['stock']]
          closed_positions.append({
              'stock': pos['stock'],
              'entry_price': pos['entry_price'],
              'exit_price': exit_price,
              'pnl': (exit_price - pos['entry_price']) * pos['shares']
          })
          
      # Update capital
      total_pnl = sum(pos['pnl'] for pos in closed_positions)
      self.current_capital += total_pnl
      
      if period_values:
          self.portfolio_values.extend(period_values)
      
      return final_day, closed_positions

  def run_strategy(self, start_idx: int, n_periods: int, holding_period: int) -> dict:
      """Run the complete trading strategy"""
      self.current_capital = self.initial_capital
      self.portfolio_values = []
      current_idx = start_idx
      
      for _ in range(n_periods):
          if current_idx >= len(self.data) - holding_period:
              break
          next_idx, trades = self.execute_trades(current_idx, holding_period)
          current_idx = next_idx + 1
          self.trade_history.extend(trades)
          
      # Calculate metrics
      if not self.portfolio_values:
          return {
              'PNL': self.current_capital - self.initial_capital,
              'CAGR': 0,
              'Max_Drawdown': 0,
              'CALMAR': 0
          }
          
      portfolio_values = np.array(self.portfolio_values)
      total_days = len(portfolio_values)
      
      # Calculate metrics
      pnl = self.current_capital - self.initial_capital
      
      # Calculate CAGR
      years = total_days / 252  # Assuming 252 trading days per year
      cagr = (self.current_capital / self.initial_capital) ** (1/years) - 1 if years > 0 else 0
      
      # Calculate Max Drawdown
      peak = portfolio_values[0]
      max_drawdown = 0
      
      for value in portfolio_values:
          if value > peak:
              peak = value
          drawdown = (peak - value) / peak
          max_drawdown = max(max_drawdown, drawdown)
          
      # Calculate CALMAR
      calmar = abs(cagr / max_drawdown) if max_drawdown != 0 else 0
      
      return {
          'PNL': pnl,
          'CAGR': cagr,
          'Max_Drawdown': max_drawdown,
          'CALMAR': calmar
      }

# Example usage:


In [None]:
def download_data(ticker, interval='1d'):
    # dt_range = pd.date_range('2019-12-31', '2020-12-31', freq='6m')
    # define start and end date
    # end_date = datetime.now()-timedelta(days=30)
    # start_date = end_date - timedelta(days=390)

    end_date = datetime.now() #- timedelta(days=30)
    start_date = end_date - timedelta(days=1855)

    # extract data from yahoo finanace\
    btc_price = vbt.YFData.download(
    ticker,
    interval=interval,
    start = start_date,
    end = end_date,
    missing_index='drop').get("Close")
    df = pd.DataFrame(btc_price)
    df.rename(columns={"Close":ticker}, inplace=True)
    df.head()
    df.index=df.index.strftime("%Y-%m-%d")
    df.index = pd.to_datetime(df.index)
    return df


ticker = ["NVDA","AAPL","MSFT","AMZN","GOOGL","GOOG","META","TSLA","AVGO","COST","NFLX","ASML","TMUS","AMD","CSCO","PEP","ADBE","LIN","AZN","TXN","QCOM	","INTU","ISRG","AMGN	","CMCSA","PDD","BKNG","AMAT","ARM","HON","VRTX","PANW","ADP","MU","GILD","ADI","SBUX","MELI","INTC","LRCX","KLAC","MDLZ","ABNB","REGN","CTAS","SNPS","CDNS","PYPL","CRWD","MRVL","MAR","CEG","CSX","ORLY","DASH","WDAY","CHTR","ADSK","FTNT","TTD","ROP","PCAR","NXPI","TEAM","FANG","MNST","CPRT","PAYX","AEP","ODFL","ROST","FAST","KDP","DDOG","EA","BKR","KHC","MCHP","VRSK","GEHC","CTSH","LULU","EXC","XEL","CCEP","IDXX","ON","CSGP","ZS","TTWO","ANSS","CDW","DXCM","BIIB","GFS","ILMN","MDB","WBD","MRNA","DLTR","WBA"]
ticker = [_t.strip() for _t in ticker if _t.strip() not in ["ARM","ABNB","CEG","DASH","GEHC","GFS"]]

def print_exception_detail(err, *args, **kwargs):
    exc_type, exc_obj, exc_tb = sys.exc_info()
    if len(args):
        for _arg in args:
            print(_arg)
    print(f"Error Message: {err}")
    print(f"Error at line number: {exc_tb.tb_lineno}")


df = pd.DataFrame()
for _t in ticker:
    try:
        if df.empty:
            df=download_data(_t)
        else:
            df1 = pd.DataFrame()
            df1 = download_data(_t)
            # per_chg = len(df.index.difference(df2.index)) #/len(df.index))*100
            # print(per_chg)
            # if per_chg<518:
            # df = pd.merge(df, df1, left_index=True, right_index=True, how='inner')  # Only keep matching rows
            df = df.join(df1, how='outer')
    except Exception as err:
        print_exception_detail(err)

In [19]:
strategy = TradingStrategy(df)
results = strategy.run_strategy(
  start_idx=50,  # Starting index (need at least 50 days for EMA50)
  n_periods=12,  # Number of periods
  holding_period=20  # Days to hold positions
)

print(f"PNL: ${results['PNL']:.2f}")
print(f"CAGR: {results['CAGR']:.2%}")
print(f"Max Drawdown: {results['Max_Drawdown']:.2%}")
print(f"CALMAR Ratio: {results['CALMAR']:.2f}")

PNL: $9187.63
CAGR: 55.11%
Max Drawdown: 95.85%
CALMAR Ratio: 0.57
