## Main idea

Combine 
- fundamental,
- technical (including weekly RSI, negatively divergent higher high/lower low), and
- financial statements analysis

to find stocks that are attractive to buy long term (as a buy and hold). Ideal holding period is 1 to 5 years.

Goal is to hold these medium/long term and not worry so much about the allocation, then when the stocks look less attractive, then sell them (or a portion).

In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from groq import Groq
import re
from datetime import datetime, timedelta
from typing import Dict, Tuple, Optional, Any

In [None]:
from dotenv import load_dotenv
import os

# Load environment variables
load_dotenv('groq_api.env')

In [None]:
# Initialize Groq client
groq_api_key = os.getenv('GROQ_API_KEY')
groq_client = Groq(api_key=groq_api_key)

## Data collection

### TODO
Extend the function below to get all, or top 100 sp500 stocks, plus any that we currently own

In [None]:
def get_top_sp500_stocks(n=20):
    # Declare top 10 stocks in S&P 500 (for demonstration)
    tickers = ['AAPL', 'MSFT', 'AMZN', 'GOOGL', 'GOOG', 'META', 'TSLA', 'BRK.B', 'JPM', 'V', 'DASH', 'NFLX', 'DIS', 'SBUX', 'BABA', 'NVDA', 'BIDU', 'XOM', 'PEG', 'CEG', 'BWXT', 'NEM', 'GFI', 'HMY', 'CVX', 'AVGO', 'HD', 'PG', 'WMT', 'JNJ', 'ABBV']
    
    # Get market cap for each stock
    market_caps = {}
    for ticker in tickers:
        stock = yf.Ticker(ticker)
        market_caps[ticker] = stock.info.get('marketCap', 0)
    
    # Sort by market cap and get top n
    top_stocks = sorted(market_caps.items(), key=lambda x: x[1], reverse=True)[:n]
    return [stock[0] for stock in top_stocks]

In [None]:
top_stocks = get_top_sp500_stocks(6)
pd.DataFrame(top_stocks, columns=['Ticker']).to_csv('top_20_stocks.csv', index=False)

## Retrieve financial data

Retrieve both historical price data and income statements for each stock over the past five years.

In [None]:
def get_financial_data(ticker, start_date, end_date):
    stock = yf.Ticker(ticker)
    
    # Get historical price data
    price_data = stock.history(start=start_date, end=end_date)
    
    # Get income statement
    income_statement = stock.financials
    
    return {
        "price_data": price_data,
        "income_statement": income_statement
    }

In [None]:
start_date = datetime.now() - timedelta(days=5*365)
end_date = datetime.now()

all_data = {}
for ticker in top_stocks:
    all_data[ticker] = get_financial_data(ticker, start_date, end_date)

In [None]:
all_data

In [None]:
def format_income_statement_for_llm(income_statement_column):
    formatted_text = ""
    for index, value in income_statement_column.items():
        formatted_value = f"{value:,.2f}" if isinstance(value, (int, float)) else str(value)
        formatted_text += f"{index}: {formatted_value}\n"
    return formatted_text.strip()

In [None]:
# Example usage
for ticker, data in all_data.items():
    current_year = data['income_statement'].columns[0]
    formatted_statement = format_income_statement_for_llm(data['income_statement'][current_year])
    print(f"Formatted Income Statement for {ticker}:\n{formatted_statement}\n")

## Prompt for income statement evaluation

### TODO

Update the prompt to figure out which financial metrics should be used out of the ones available in yfinance

In [None]:
def create_prompt_for_income_statement_v0(current_year_income_statement, previous_year_income_statement):
    prompt = f"""
Evaluate the following income statements for the current year and the previous year. Provide a score between 0 and 10 for each criterion, where 0 is very poor and 10 is excellent. Consider criteria such as revenue growth, profitability, operating efficiency, and earnings quality. Additionally, provide an overall score based on the average of the criteria scores.

Income Statement for the Current Year:
{current_year_income_statement}

Income Statement for the Previous Year:
{previous_year_income_statement}

Criteria for Evaluation:
1. Revenue Growth: Analyze the growth in revenue compared to the previous year.
2. Gross Profit Margin: Calculate as Gross Profit / Total Revenue.
3. Operating Margin: Calculate as Operating Income / Total Revenue.
4. Net Profit Margin: Calculate as Net Income / Total Revenue.
5. EPS Growth: Compare EPS to the previous year.
6. Operating Efficiency: Consider Operating Expense relative to Total Revenue.
7. Interest Coverage Ratio: Calculate as EBIT / Interest Expense.

Provide the score for each criterion and an overall score. Include explanations for each score.
    """
    return prompt

def create_prompt_for_income_statement_v1(current_year_income_statement, previous_year_income_statement):
    prompt = f"""
Evaluate the following income statements for the current year and the previous year. Provide a score between 0 and 10 for each criterion, where 0 is very poor and 10 is excellent. Consider criteria such as revenue growth, profitability, operating efficiency, and earnings quality. Additionally, provide an overall score based on the average of the criteria scores.

Income Statement for the Current Year:
{current_year_income_statement}

Income Statement for the Previous Year:
{previous_year_income_statement}

Criteria for Evaluation:
1. Revenue Growth: Analyze the growth in revenue compared to the previous year.
2. Gross Profit Margin: Calculate as Gross Profit / Total Revenue.
3. Operating Margin: Calculate as Operating Income / Total Revenue.
4. Net Profit Margin: Calculate as Net Income / Total Revenue.
5. EPS Growth: Analyze the growth in Earnings Per Share (EPS) compared to the previous year.
6. Operating Efficiency: Consider Operating Expense relative to Total Revenue.
7. Interest Coverage Ratio: Calculate as Earnings Before Interest and Taxes (EBIT) / Interest Expense.

Example Format:
1. Revenue Growth: 8.5
2. Gross Profit Margin: 7.0
3. Operating Margin: 6.5
4. Net Profit Margin: 7.0
5. EPS Growth: 8.0
6. Operating Efficiency: 7.5
7. Interest Coverage Ratio: 9.0
Overall Score: 7.5

Provide the score for each criterion and an overall score. Include explanations for each score.
    """
    return prompt

def create_prompt_for_income_statement(current_year_income_statement: Dict, previous_year_income_statement: Dict) -> str:
    prompt = f"""
Analyze the following income statements and provide a structured evaluation. Follow this EXACT format:

CRITERION SCORES
Revenue Growth: [score 0-10]
REASON: [brief explanation]

Gross Profit Margin: [score 0-10]
REASON: [brief explanation]

Operating Margin: [score 0-10]
REASON: [brief explanation]

Net Profit Margin: [score 0-10]
REASON: [brief explanation]

EPS Growth: [score 0-10]
REASON: [brief explanation]

Operating Efficiency: [score 0-10]
REASON: [brief explanation]

Interest Coverage: [score 0-10]
REASON: [brief explanation]

OVERALL SCORE: [average of above scores, rounded to 2 decimal places]

SUMMARY: [brief overall analysis]

Current Year Income Statement:
{current_year_income_statement}

Previous Year Income Statement:
{previous_year_income_statement}

Important:
- Always provide numerical scores for ALL criteria
- Scores must be between 0 and 10
- If exact calculation isn't possible, estimate based on available data
- Format must match the template exactly
- Always include the OVERALL SCORE as a number
"""
    return prompt

In [None]:
def evaluate_income_statements_llm_v0(current_year_income_statement, previous_year_income_statement):
    prompt = create_prompt_for_income_statement(current_year_income_statement, previous_year_income_statement)
    response = groq_client.chat.completions.create(
        messages=[{"role": "user", "content": prompt}],
        model="llama-3.1-8b-instant",
        temperature=0.2,
        max_tokens=1000
    )
    analysis = response.choices[0].message.content.strip()
    score = re.search(r"Overall Score: (\d+\.\d+)", analysis)
    return float(score.group(1)) if score else None

def evaluate_income_statements_llm(current_year_income_statement: Dict, 
                                 previous_year_income_statement: Dict,
                                 max_retries: int = 2) -> Optional[float]:
    """
    Evaluate income statements with retry logic for failed parsing
    """
    for attempt in range(max_retries):
        try:
            prompt = create_prompt_for_income_statement(
                current_year_income_statement, 
                previous_year_income_statement
            )
            
            response = groq_client.chat.completions.create(
                messages=[{"role": "user", "content": prompt}],
                model="llama-3.1-8b-instant",
                temperature=0.2 - (attempt * 0.1),  # Reduce temperature on retries
                max_tokens=1000
            )
            
            result = process_llm_output(response.choices[0].message.content.strip())
            
            if result['overall_score'] is not None:
                return result['overall_score']
                
        except Exception as e:
            print(f"Attempt {attempt + 1} failed: {str(e)}")
            
        if attempt < max_retries - 1:
            print(f"Retrying evaluation... (Attempt {attempt + 2}/{max_retries})")
    
    return None

In [None]:
def process_llm_output_v0(llm_output):
    # Extract individual criterion scores
    criterion_scores = re.findall(r"(\d+\.\s*[\w\s]+):\s*(\d+(?:\.\d+)?)", llm_output)
    
    # Extract overall score
    overall_score = re.search(r"Overall Score: (\d+(?:\.\d+)?)", llm_output)
    
    return {
        'criterion_scores': dict(criterion_scores),
        'overall_score': float(overall_score.group(1)) if overall_score else None,
        'full_analysis': llm_output
    }

def process_llm_output_v1(llm_output):
    # Extract individual criterion scores
    criterion_scores = re.findall(r"(\d+\.\s*[\w\s]+):\s*(\d+(?:\.\d+)?)", llm_output)
    
    # Create a dictionary to store the scores
    criterion_scores_dict = {score[0].strip(): float(score[1]) for score in criterion_scores}
    
    # Extract overall score
    overall_score = re.search(r"Overall Score:\s*(\d+(?:\.\d+)?)", llm_output)
    
    return {
        'criterion_scores': criterion_scores_dict,
        'overall_score': float(overall_score.group(1)) if overall_score else None,
        'full_analysis': llm_output
    }

def process_llm_output(llm_output: str) -> Dict[str, Any]:
    # Define all expected criteria
    expected_criteria = [
        'Revenue Growth',
        'Gross Profit Margin',
        'Operating Margin',
        'Net Profit Margin',
        'EPS Growth',
        'Operating Efficiency',
        'Interest Coverage'
    ]
    
    # Dictionary to store scores
    scores = {}
    
    # Process each criterion
    for criterion in expected_criteria:
        # Look for scores in format "Criterion: [score]" or "Criterion: score"
        pattern = rf"{criterion}:\s*(?:\[)?(\d+(?:\.\d+)?)(?:\])?"
        match = re.search(pattern, llm_output)
        if match:
            scores[criterion] = float(match.group(1))
    
    # Extract overall score
    overall_pattern = r"OVERALL SCORE:\s*(?:\[)?(\d+(?:\.\d+)?)(?:\])?"
    overall_match = re.search(overall_pattern, llm_output)
    overall_score = float(overall_match.group(1)) if overall_match else None
    
    # If we're missing any scores, calculate the overall score ourselves
    if overall_score is None and scores:
        overall_score = round(sum(scores.values()) / len(scores), 2)
    
    # Extract summary if present
    summary_pattern = r"SUMMARY:\s*(.+?)(?=\n|$)"
    summary_match = re.search(summary_pattern, llm_output)
    summary = summary_match.group(1) if summary_match else ""
    
    return {
        'criterion_scores': scores,
        'overall_score': overall_score,
        'summary': summary,
        'full_analysis': llm_output
    }

## LLM stock evaluation

### TODO 

Add technical analysis, any other components to create a composite score (e.g. using combined or weighted ranking) for the overall scores

In [None]:
def evaluate_stock_v0(ticker, start_date, end_date):
    data = get_financial_data(ticker, start_date, end_date)
    income_statement = data['income_statement']
    
    scores = []
    for i in range(len(income_statement.columns) - 1):
        current_year = format_income_statement_for_llm(income_statement.iloc[:, i])
        previous_year = format_income_statement_for_llm(income_statement.iloc[:, i+1])
        score = evaluate_income_statements_llm(current_year, previous_year)
        scores.append((income_statement.columns[i].year, score))
    
    return pd.DataFrame(scores, columns=['Year', 'Score'])

def evaluate_stock(ticker: str, start_date: str, end_date: str) -> pd.DataFrame:
    """
    Evaluate stock with improved error handling and logging
    """
    data = get_financial_data(ticker, start_date, end_date)
    income_statement = data['income_statement']
    scores = []
    
    for i in range(len(income_statement.columns) - 1):
        try:
            current_year = format_income_statement_for_llm(income_statement.iloc[:, i])
            previous_year = format_income_statement_for_llm(income_statement.iloc[:, i+1])
            
            score = evaluate_income_statements_llm(current_year, previous_year)
            
            if score is not None:
                scores.append({
                    'Year': income_statement.columns[i].year,
                    'Score': score
                })
            else:
                print(f"Warning: Could not calculate score for {ticker} in {income_statement.columns[i].year}")
                
        except Exception as e:
            print(f"Error processing {ticker} for {income_statement.columns[i].year}: {str(e)}")
    
    return pd.DataFrame(scores)

Calculate scores for all of our stocks

In [None]:
all_scores = []
for ticker in top_stocks:
    print(f"Evaluating {ticker}...")
    scores = evaluate_stock(ticker, start_date, end_date)
    scores['Ticker'] = ticker
    all_scores.append(scores)

all_scores = pd.concat(all_scores)

In [None]:
all_scores.head(10)

## Visualizing scores

In [None]:
# Pivot the table
pivoted_scores = all_scores.pivot(index='Year', columns='Ticker', values='Score')

# Sort the index (Year) in descending order to have the most recent year first
pivoted_scores = pivoted_scores.sort_index(ascending=False)

# Keep only the last 3 years
pivoted_scores = pivoted_scores.head(3)

# Reset the index to make 'Year' a regular column
pivoted_scores = pivoted_scores.reset_index()

# Save the pivoted DataFrame to a CSV file
pivoted_scores.to_csv('pivoted_stock_scores.csv', index=False)

print(pivoted_scores)

# Fundamental analysis

In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
from typing import Dict, Any, List, Optional
import requests
from bs4 import BeautifulSoup
import datetime
from yahooquery import Ticker

In [None]:
class FundamentalAnalyzer:
    def __init__(self, ticker: str):
        self.ticker = ticker
        self.yf_ticker = yf.Ticker(ticker)
        self.yq_ticker = Ticker(ticker)
        
    def get_all_financial_data(self) -> Dict[str, Any]:
        """Get comprehensive financial data from multiple sources"""
        try:
            return {
                'income_statement': self.yf_ticker.financials,
                'balance_sheet': self.yf_ticker.balance_sheet,
                'cash_flow': self.yf_ticker.cashflow,
                'info': self.yf_ticker.info,
                'institutional_holders': self.yf_ticker.institutional_holders,
                'recommendations': self.yf_ticker.recommendations,
                'sustainability': self.yf_ticker.sustainability,
                'calendar': self.yf_ticker.calendar
            }
        except Exception as e:
            print(f"Error fetching financial data: {str(e)}")
            return {}

    def calculate_financial_ratios(self, data: Dict[str, pd.DataFrame]) -> Dict[str, float]:
        """Calculate key financial ratios"""
        try:
            latest_income = data['income_statement'].iloc[:, 0]  # Most recent year
            latest_balance = data['balance_sheet'].iloc[:, 0]
            latest_cash_flow = data['cash_flow'].iloc[:, 0]
            
            ratios = {
                # Profitability Ratios
                'gross_margin': (latest_income['Gross Profit'] / latest_income['Total Revenue']) * 100,
                'operating_margin': (latest_income['Operating Income'] / latest_income['Total Revenue']) * 100,
                'net_margin': (latest_income['Net Income'] / latest_income['Total Revenue']) * 100,
                'roe': (latest_income['Net Income'] / latest_balance['Total Stockholder Equity']) * 100,
                
                # Liquidity Ratios
                'current_ratio': latest_balance['Total Current Assets'] / latest_balance['Total Current Liabilities'],
                'quick_ratio': (latest_balance['Total Current Assets'] - latest_balance['Inventory']) / 
                              latest_balance['Total Current Liabilities'],
                
                # Efficiency Ratios
                'asset_turnover': latest_income['Total Revenue'] / latest_balance['Total Assets'],
                'inventory_turnover': latest_income['Total Revenue'] / latest_balance.get('Inventory', 1),
                
                # Leverage Ratios
                'debt_to_equity': latest_balance['Total Liabilities'] / latest_balance['Total Stockholder Equity'],
                'debt_to_assets': latest_balance['Total Liabilities'] / latest_balance['Total Assets'],
                
                # Cash Flow Ratios
                'operating_cash_flow_ratio': latest_cash_flow['Operating Cash Flow'] / latest_balance['Total Current Liabilities'],
                'cash_flow_coverage': latest_cash_flow['Operating Cash Flow'] / latest_balance['Total Liabilities']
            }
            
            return {k: round(float(v), 2) for k, v in ratios.items() if not pd.isna(v)}
        except Exception as e:
            print(f"Error calculating ratios: {str(e)}")
            return {}

    def get_market_data(self) -> Dict[str, Any]:
        """Get market-related data"""
        try:
            info = self.yf_ticker.info
            return {
                'market_cap': info.get('marketCap'),
                'enterprise_value': info.get('enterpriseValue'),
                'pe_ratio': info.get('trailingPE'),
                'forward_pe': info.get('forwardPE'),
                'price_to_book': info.get('priceToBook'),
                'enterprise_to_ebitda': info.get('enterpriseToEbitda'),
                'beta': info.get('beta'),
                'dividend_yield': info.get('dividendYield', 0) * 100,
                'fifty_two_week_high': info.get('fiftyTwoWeekHigh'),
                'fifty_two_week_low': info.get('fiftyTwoWeekLow')
            }
        except Exception as e:
            print(f"Error fetching market data: {str(e)}")
            return {}

    def get_peer_comparison(self) -> Dict[str, pd.DataFrame]:
        """Get peer comparison data"""
        try:
            # Get peers from Yahoo Finance
            peers = self.yq_ticker.get_peer_companies()
            if not isinstance(peers, list):
                return {}
            
            peer_data = {}
            for peer in peers[:5]:  # Limit to 5 peers for efficiency
                peer_ticker = yf.Ticker(peer)
                peer_data[peer] = {
                    'Market Cap': peer_ticker.info.get('marketCap'),
                    'P/E Ratio': peer_ticker.info.get('trailingPE'),
                    'Revenue Growth': peer_ticker.info.get('revenueGrowth'),
                    'Profit Margin': peer_ticker.info.get('profitMargin'),
                    'ROE': peer_ticker.info.get('returnOnEquity')
                }
            
            return {'peer_comparison': pd.DataFrame(peer_data).T}
        except Exception as e:
            print(f"Error in peer comparison: {str(e)}")
            return {}

    def analyze_trend(self, data: pd.DataFrame, periods: int = 4) -> Dict[str, float]:
        """Analyze trends in financial metrics"""
        try:
            trends = {}
            for column in data.columns:
                values = data[column].values[:periods]
                if len(values) >= 2:
                    trend = np.polyfit(range(len(values)), values, 1)[0]
                    trends[f'{column}_trend'] = trend
            return trends
        except Exception as e:
            print(f"Error analyzing trends: {str(e)}")
            return {}

    def get_comprehensive_analysis(self) -> Dict[str, Any]:
        """Get comprehensive fundamental analysis"""
        # Get all financial data
        financial_data = self.get_all_financial_data()
        if not financial_data:
            return {'error': 'Failed to fetch financial data'}

        # Calculate all components
        analysis = {
            'financial_ratios': self.calculate_financial_ratios(financial_data),
            'market_data': self.get_market_data(),
            'peer_comparison': self.get_peer_comparison(),
            'financial_trends': {
                'income_trends': self.analyze_trend(financial_data['income_statement']),
                'balance_trends': self.analyze_trend(financial_data['balance_sheet']),
                'cash_flow_trends': self.analyze_trend(financial_data['cash_flow'])
            },
            'esg_data': self.yf_ticker.sustainability,
            'raw_financial_data': financial_data
        }

        return analysis

def format_analysis_report(analysis: Dict[str, Any]) -> str:
    """Format the analysis results into a readable report"""
    report = []
    
    # Financial Health Score (simple version)
    def calculate_health_score(ratios: Dict[str, float]) -> float:
        if not ratios:
            return 0
        
        weights = {
            'current_ratio': 0.15,
            'debt_to_equity': 0.15,
            'operating_margin': 0.2,
            'roe': 0.2,
            'net_margin': 0.15,
            'asset_turnover': 0.15
        }
        
        score = 0
        for metric, weight in weights.items():
            if metric in ratios:
                # Normalize the ratio and add to score
                if metric in ['current_ratio', 'roe', 'operating_margin', 'net_margin', 'asset_turnover']:
                    score += min(max(ratios[metric], 0), 10) * weight
                else:  # For metrics where lower is better
                    score += (10 - min(max(ratios[metric], 0), 10)) * weight
        
        return round(score, 2)

    # Add sections to report
    if 'financial_ratios' in analysis:
        report.append("Financial Health Score: " + 
                     str(calculate_health_score(analysis['financial_ratios'])) + " / 10")
        
        report.append("\nKey Financial Ratios:")
        for ratio, value in analysis['financial_ratios'].items():
            report.append(f"{ratio.replace('_', ' ').title()}: {value}")

    if 'market_data' in analysis:
        report.append("\nMarket Metrics:")
        for metric, value in analysis['market_data'].items():
            if value is not None:
                if 'market_cap' in metric or 'enterprise_value' in metric:
                    value = f"${value:,.0f}"
                elif isinstance(value, float):
                    value = f"{value:.2f}"
                report.append(f"{metric.replace('_', ' ').title()}: {value}")

    if 'peer_comparison' in analysis and 'peer_comparison' in analysis['peer_comparison']:
        report.append("\nPeer Comparison Summary:")
        peer_df = analysis['peer_comparison']['peer_comparison']
        report.append(str(peer_df.describe()))

    return "\n".join(report)

# Example usage:
def analyze_stock(ticker: str) -> str:
    analyzer = FundamentalAnalyzer(ticker)
    analysis = analyzer.get_comprehensive_analysis()
    return format_analysis_report(analysis)

# Technical analysis

In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass

In [None]:
@dataclass
class TechnicalSignal:
    date: pd.Timestamp
    signal_type: str
    strength: float
    reason: str

class TechnicalAnalyzer:
    def __init__(self, ticker: str, period: str = "2y", interval: str = "1wk"):
        """
        Initialize with default parameters for technical analysis
        period: valid periods: 1d,5d,1mo,3mo,6mo,1y,2y,5y,10y,ytd,max
        interval: valid intervals: 1m,2m,5m,15m,30m,60m,90m,1h,1d,5d,1wk,1mo,3mo
        """
        self.ticker = ticker
        self.data = self._get_price_data(ticker, period, interval)
        
    def _get_price_data(self, ticker: str, period: str, interval: str) -> pd.DataFrame:
        """Fetch and prepare price data"""
        stock = yf.Ticker(ticker)
        df = stock.history(period=period, interval=interval)
        return df
    
    def calculate_rsi(self, window: int = 14) -> pd.Series:
        """Calculate Relative Strength Index"""
        delta = self.data['Close'].diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
        rs = gain / loss
        return 100 - (100 / (1 + rs))
    
    def calculate_macd(self, fast: int = 12, slow: int = 26, signal: int = 9) -> Tuple[pd.Series, pd.Series, pd.Series]:
        """Calculate MACD, Signal line, and MACD histogram"""
        exp1 = self.data['Close'].ewm(span=fast, adjust=False).mean()
        exp2 = self.data['Close'].ewm(span=slow, adjust=False).mean()
        macd = exp1 - exp2
        signal_line = macd.ewm(span=signal, adjust=False).mean()
        histogram = macd - signal_line
        return macd, signal_line, histogram
    
    def calculate_moving_averages(self) -> Dict[str, pd.Series]:
        """Calculate various moving averages"""
        return {
            'SMA20': self.data['Close'].rolling(window=20).mean(),
            'SMA50': self.data['Close'].rolling(window=50).mean(),
            'SMA200': self.data['Close'].rolling(window=200).mean(),
            'EMA20': self.data['Close'].ewm(span=20, adjust=False).mean(),
            'EMA50': self.data['Close'].ewm(span=50, adjust=False).mean()
        }
    
    def detect_divergence_v0(self, price: pd.Series, indicator: pd.Series, window: int = 20) -> List[TechnicalSignal]:
        """
        Detect regular and hidden divergence patterns
        Returns list of divergence signals
        """
        signals = []
        
        # Get local maxima and minima
        def get_extrema(series: pd.Series, window: int) -> Tuple[pd.Series, pd.Series]:
            maxima = pd.Series(index=series.index, dtype=float)
            minima = pd.Series(index=series.index, dtype=float)
            
            for i in range(window, len(series) - window):
                if all(series.iloc[i] > series.iloc[i-window:i]) and \
                   all(series.iloc[i] > series.iloc[i+1:i+window+1]):
                    maxima.iloc[i] = series.iloc[i]
                if all(series.iloc[i] < series.iloc[i-window:i]) and \
                   all(series.iloc[i] < series.iloc[i+1:i+window+1]):
                    minima.iloc[i] = series.iloc[i]
                    
            return maxima.dropna(), minima.dropna()
        
        price_maxima, price_minima = get_extrema(price, window)
        ind_maxima, ind_minima = get_extrema(indicator, window)
        
        # Detect bearish regular divergence (higher highs in price, lower highs in indicator)
        for i in range(1, len(price_maxima)):
            if price_maxima.iloc[i] > price_maxima.iloc[i-1] and \
               ind_maxima.iloc[i] < ind_maxima.iloc[i-1]:
                signals.append(TechnicalSignal(
                    date=price_maxima.index[i],
                    signal_type='bearish_divergence',
                    strength=0.8,
                    reason='Bearish regular divergence: Higher price high with lower indicator high'
                ))
        
        # Detect bullish regular divergence (lower lows in price, higher lows in indicator)
        for i in range(1, len(price_minima)):
            if price_minima.iloc[i] < price_minima.iloc[i-1] and \
               ind_minima.iloc[i] > ind_minima.iloc[i-1]:
                signals.append(TechnicalSignal(
                    date=price_minima.index[i],
                    signal_type='bullish_divergence',
                    strength=0.8,
                    reason='Bullish regular divergence: Lower price low with higher indicator low'
                ))
                
        return signals

    def detect_divergence(self, price: pd.Series, indicator: pd.Series, window: int = 20) -> List[TechnicalSignal]:
        signals = []
        
        # Get local maxima and minima
        def get_extrema(series: pd.Series, window: int) -> Tuple[pd.Series, pd.Series]:
            maxima = pd.Series(index=series.index, dtype=float)
            minima = pd.Series(index=series.index, dtype=float)
            
            for i in range(window, len(series) - window):
                if all(series.iloc[i] > series.iloc[i-window:i]) and \
                   all(series.iloc[i] > series.iloc[i+1:i+window+1]):
                    maxima.iloc[i] = series.iloc[i]
                if all(series.iloc[i] < series.iloc[i-window:i]) and \
                   all(series.iloc[i] < series.iloc[i+1:i+window+1]):
                    minima.iloc[i] = series.iloc[i]
                    
            return maxima.dropna(), minima.dropna()
        
        price_maxima, price_minima = get_extrema(price, window)
        ind_maxima, ind_minima = get_extrema(indicator, window)
        
        # Debugging: Print lengths
        print(f"Length of price_minima: {len(price_minima)}")
        print(f"Length of ind_minima: {len(ind_minima)}")
        
        # Reindex ind_minima to match price_minima
        ind_minima = ind_minima.reindex(price_minima.index, fill_value=np.nan)
        
        # Detect bearish regular divergence (higher highs in price, lower highs in indicator)
        for i in range(1, len(price_maxima)):
            if pd.isna(ind_maxima.iloc[i]) or pd.isna(ind_maxima.iloc[i-1]):
                continue
            if price_maxima.iloc[i] > price_maxima.iloc[i-1] and \
               ind_maxima.iloc[i] < ind_maxima.iloc[i-1]:
                signals.append(TechnicalSignal(
                    date=price_maxima.index[i],
                    signal_type='bearish_divergence',
                    strength=0.8,
                    reason='Bearish regular divergence: Higher price high with lower indicator high'
                ))
        
        # Detect bullish regular divergence (lower lows in price, higher lows in indicator)
        for i in range(1, len(price_minima)):
            if pd.isna(ind_minima.iloc[i]) or pd.isna(ind_minima.iloc[i-1]):
                continue
            if price_minima.iloc[i] < price_minima.iloc[i-1] and \
               ind_minima.iloc[i] > ind_minima.iloc[i-1]:
                signals.append(TechnicalSignal(
                    date=price_minima.index[i],
                    signal_type='bullish_divergence',
                    strength=0.8,
                    reason='Bullish regular divergence: Lower price low with higher indicator low'
                ))
                
        return signals
    
    def calculate_volume_profile(self, bins: int = 50) -> pd.DataFrame:
        """Calculate volume profile for price levels"""
        price_bins = pd.cut(self.data['Close'], bins=bins)
        return self.data['Volume'].groupby(price_bins).sum().sort_index()
    
    def detect_support_resistance(self, window: int = 20, threshold: float = 0.1) -> Tuple[List[float], List[float]]:
        """Detect support and resistance levels using price action and volume"""
        volume_profile = self.calculate_volume_profile()
        high_volume_levels = volume_profile[volume_profile > volume_profile.quantile(1 - threshold)].index
        
        support_levels = []
        resistance_levels = []
        
        for level in high_volume_levels:
            price_level = level.mid
            # Check if price has reversed near this level multiple times
            touches = sum((self.data['Low'] <= price_level * 1.01) & 
                         (self.data['Low'] >= price_level * 0.99))
            if touches >= 3:
                support_levels.append(price_level)
                
            touches = sum((self.data['High'] <= price_level * 1.01) & 
                         (self.data['High'] >= price_level * 0.99))
            if touches >= 3:
                resistance_levels.append(price_level)
                
        return support_levels, resistance_levels
    
    def analyze_price_action(self) -> Dict[str, Any]:
        """Perform comprehensive price action analysis"""
        # Calculate indicators
        rsi = self.calculate_rsi()
        macd, signal, histogram = self.calculate_macd()
        mas = self.calculate_moving_averages()
        
        # Detect divergences
        rsi_divergences = self.detect_divergence(self.data['Close'], rsi)
        macd_divergences = self.detect_divergence(self.data['Close'], macd)
        
        # Get support/resistance levels
        support, resistance = self.detect_support_resistance()
        
        # Generate signals based on multiple indicators
        signals = []
        
        # Check for oversold/overbought conditions with confirmation
        for i in range(len(self.data)):
            if i < 2:
                continue
                
            # Oversold conditions (potential buy)
            if (rsi.iloc[i] < 30 and  # RSI oversold
                histogram.iloc[i] > histogram.iloc[i-1] and  # MACD histogram turning up
                self.data['Close'].iloc[i] > mas['SMA20'].iloc[i]):  # Price above short-term MA
                
                signals.append(TechnicalSignal(
                    date=self.data.index[i],
                    signal_type='buy',
                    strength=0.7,
                    reason='Oversold conditions with positive momentum'
                ))
                
            # Overbought conditions (potential sell)
            if (rsi.iloc[i] > 70 and  # RSI overbought
                histogram.iloc[i] < histogram.iloc[i-1] and  # MACD histogram turning down
                self.data['Close'].iloc[i] < mas['SMA20'].iloc[i]):  # Price below short-term MA
                
                signals.append(TechnicalSignal(
                    date=self.data.index[i],
                    signal_type='sell',
                    strength=0.7,
                    reason='Overbought conditions with negative momentum'
                ))
        
        return {
            'indicators': {
                'rsi': rsi,
                'macd': macd,
                'macd_signal': signal,
                'macd_histogram': histogram,
                'moving_averages': mas
            },
            'signals': signals + rsi_divergences + macd_divergences,
            'levels': {
                'support': support,
                'resistance': resistance
            }
        }

def generate_analysis_report(ticker: str) -> str:
    """Generate a readable technical analysis report"""
    analyzer = TechnicalAnalyzer(ticker)
    analysis = analyzer.analyze_price_action()
    
    report = [f"Technical Analysis Report for {ticker}\n"]
    
    # Add current indicator values
    current_rsi = analysis['indicators']['rsi'].iloc[-1]
    current_macd = analysis['indicators']['macd'].iloc[-1]
    report.append(f"Current RSI: {current_rsi:.2f}")
    report.append(f"Current MACD: {current_macd:.2f}")
    
    # Add recent signals
    recent_signals = [s for s in analysis['signals'] 
                     if s.date >= analyzer.data.index[-10]]  # Last 10 periods
    
    if recent_signals:
        report.append("\nRecent Signals:")
        for signal in recent_signals:
            report.append(f"{signal.date.date()}: {signal.signal_type} "
                        f"(Strength: {signal.strength:.1f})")
            report.append(f"Reason: {signal.reason}")
    
    # Add support/resistance levels
    report.append("\nKey Price Levels:")
    report.append(f"Support levels: {', '.join(f'${x:.2f}' for x in analysis['levels']['support'])}")
    report.append(f"Resistance levels: {', '.join(f'${x:.2f}' for x in analysis['levels']['resistance'])}")
    
    return "\n".join(report)

# Example usage:
# report = generate_analysis_report('AAPL')
# print(report)

In [None]:
report = generate_analysis_report('AAPL')
print(report)

### Testing `TechnicalAnalyzer`

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime

# Sample mock data for yfinance
# mock_history_data = pd.DataFrame({
#     'Close': [100, 101, 102, 103, 104, 105, 106, 107, 108, 109],
#     'Open': [99, 100, 101, 102, 103, 104, 105, 106, 107, 108],
#     'High': [101, 102, 103, 104, 105, 106, 107, 108, 109, 110],
#     'Low': [98, 99, 100, 101, 102, 103, 104, 105, 106, 107],
#     'Volume': [1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000]
# }, index=pd.date_range(start='2023-01-01', periods=10))

# Generate mock data for 20 periods
start_date = '2023-01-01'
periods = 20

# Create a date range for 20 periods
date_range = pd.date_range(start=start_date, periods=periods)

# Generate mock data
mock_history_data = pd.DataFrame({
    'Close': np.linspace(100, 119, periods),  # Linear increase from 100 to 119
    'Open': np.linspace(99, 118, periods),    # Linear increase from 99 to 118
    'High': np.linspace(101, 120, periods),   # Linear increase from 101 to 120
    'Low': np.linspace(98, 117, periods),     # Linear increase from 98 to 117
    'Volume': np.arange(1000, 1000 + 1000 * periods, 1000)  # Volume increases by 1000 each period
}, index=date_range)

print(mock_history_data)

In [None]:
from unittest.mock import patch

# Mock the yfinance.Ticker object
with patch('yfinance.Ticker') as mock_ticker:
    mock_ticker.return_value.history.return_value = mock_history_data
    
    # Initialize the TechnicalAnalyzer with the mocked data
    analyzer = TechnicalAnalyzer(ticker='AAPL')

In [None]:
rsi = analyzer.calculate_rsi(window=14)
print("RSI:")
print(rsi)

In [None]:
macd, signal, histogram = analyzer.calculate_macd()
print("MACD:")
print(macd)
print("Signal Line:")
print(signal)
print("MACD Histogram:")
print(histogram)

In [None]:
moving_averages = analyzer.calculate_moving_averages()
print("Moving Averages:")
for key, value in moving_averages.items():
    print(f"{key}:")
    print(value)

In [None]:
price = analyzer.data['Close']
indicator = analyzer.calculate_rsi()
signals = analyzer.detect_divergence(price, indicator)
print("Divergence Signals:")
for signal in signals:
    print(signal)

In [None]:
volume_profile = analyzer.calculate_volume_profile(bins=50)
print("Volume Profile:")
print(volume_profile)

In [None]:
support, resistance = analyzer.detect_support_resistance()
print("Support Levels:")
print(support)
print("Resistance Levels:")
print(resistance)

In [None]:
analysis = analyzer.analyze_price_action()
print("Price Action Analysis:")
print("Indicators:")
print(analysis['indicators'])
print("Signals:")
print(analysis['signals'])
print("Support and Resistance Levels:")
print(analysis['levels'])

## Combined model

In [None]:
class StockEvaluator:
    def __init__(self, tickers: List[str], fundamental_weight: float = 0.6, technical_weight: float = 0.4):
        self.tickers = tickers
        self.fundamental_weight = fundamental_weight
        self.technical_weight = technical_weight

    def evaluate_fundamentals(self, ticker: str) -> float:
        analyzer = FundamentalAnalyzer(ticker)
        analysis = analyzer.get_comprehensive_analysis()
        if 'financial_ratios' in analysis:
            return self.calculate_fundamental_score(analysis['financial_ratios'])
        return 0.0

    def calculate_fundamental_score(self, ratios: Dict[str, float]) -> float:
        weights = {
            'current_ratio': 0.15,
            'debt_to_equity': 0.15,
            'operating_margin': 0.2,
            'roe': 0.2,
            'net_margin': 0.15,
            'asset_turnover': 0.15
        }
        
        score = 0
        for metric, weight in weights.items():
            if metric in ratios:
                if metric in ['current_ratio', 'roe', 'operating_margin', 'net_margin', 'asset_turnover']:
                    score += min(max(ratios[metric], 0), 10) * weight
                else:
                    score += (10 - min(max(ratios[metric], 0), 10)) * weight
        
        return round(score, 2)

    def evaluate_technicals(self, ticker: str) -> float:
        analyzer = TechnicalAnalyzer(ticker)
        analysis = analyzer.analyze_price_action()
        return self.calculate_technical_score(analysis)

    def calculate_technical_score(self, analysis: Dict[str, Any]) -> float:
        score = 0
        signals = analysis['signals']
        if signals:
            for signal in signals:
                if signal.signal_type == 'buy':
                    score += signal.strength
                elif signal.signal_type == 'sell':
                    score -= signal.strength
        return max(0, min(10, score))

    def evaluate_stocks(self) -> pd.DataFrame:
        results = []
        for ticker in self.tickers:
            fundamental_score = self.evaluate_fundamentals(ticker)
            technical_score = self.evaluate_technicals(ticker)
            combined_score = (fundamental_score * self.fundamental_weight) + (technical_score * self.technical_weight)
            results.append({
                'Ticker': ticker,
                'Fundamental Score': fundamental_score,
                'Technical Score': technical_score,
                'Combined Score': combined_score
            })
        return pd.DataFrame(results)

    def generate_recommendations(self, df: pd.DataFrame) -> pd.DataFrame:
        df['Recommendation'] = np.where(df['Combined Score'] >= 7, 'Buy', 
                                       np.where(df['Combined Score'] <= 4, 'Sell', 'Hold'))
        return df.sort_values(by='Combined Score', ascending=False)

# Example usage
tickers = ['AAPL', 'MSFT', 'AMZN', 'GOOGL', 'META', 'TSLA']
evaluator = StockEvaluator(tickers)
results = evaluator.evaluate_stocks()
recommendations = evaluator.generate_recommendations(results)
print(recommendations)

In [None]:
class StockEvaluator:
    def __init__(self, tickers: List[str], fundamental_weight: float = 0.6, technical_weight: float = 0.4):
        self.tickers = tickers
        self.fundamental_weight = fundamental_weight
        self.technical_weight = technical_weight

    def evaluate_fundamentals(self, ticker: str, date: str) -> float:
        """
        Evaluate fundamental score for a specific date.
        """
        analyzer = FundamentalAnalyzer(ticker)
        analysis = analyzer.get_comprehensive_analysis()
        if 'financial_ratios' in analysis:
            return self.calculate_fundamental_score(analysis['financial_ratios'])
        return 0.0

    def evaluate_technicals(self, ticker: str, date: str) -> float:
        """
        Evaluate technical score for a specific date.
        """
        analyzer = TechnicalAnalyzer(ticker, period="2y", interval="1d")
        analysis = analyzer.analyze_price_action()
        # Filter signals up to the specific date
        signals_up_to_date = [s for s in analysis['signals'] if s.date <= pd.Timestamp(date)]
        return self.calculate_technical_score({'signals': signals_up_to_date})

    def evaluate_stock_on_date(self, ticker: str, date: str) -> Dict[str, float]:
        """
        Evaluate a stock on a specific date.
        """
        fundamental_score = self.evaluate_fundamentals(ticker, date)
        technical_score = self.evaluate_technicals(ticker, date)
        combined_score = (fundamental_score * self.fundamental_weight) + (technical_score * self.technical_weight)
        return {
            'Ticker': ticker,
            'Date': date,
            'Fundamental Score': fundamental_score,
            'Technical Score': technical_score,
            'Combined Score': combined_score
        }

# Example usage
evaluator = StockEvaluator(['AAPL'])
specific_date = '2023-10-01'
result = evaluator.evaluate_stock_on_date('AAPL', specific_date)
print(result)

In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
from typing import Dict, Any, List, Optional, Tuple
import requests
from bs4 import BeautifulSoup
import datetime
from yahooquery import Ticker
from dataclasses import dataclass

class FundamentalAnalyzer:
    def __init__(self, ticker: str):
        self.ticker = ticker
        self.yf_ticker = yf.Ticker(ticker)
        self.yq_ticker = Ticker(ticker)
        
    def get_all_financial_data(self) -> Dict[str, Any]:
        """Get comprehensive financial data from multiple sources"""
        try:
            return {
                'income_statement': self.yf_ticker.financials,
                'balance_sheet': self.yf_ticker.balance_sheet,
                'cash_flow': self.yf_ticker.cashflow,
                'info': self.yf_ticker.info,
                'institutional_holders': self.yf_ticker.institutional_holders,
                'recommendations': self.yf_ticker.recommendations,
                'sustainability': self.yf_ticker.sustainability,
                'calendar': self.yf_ticker.calendar
            }
        except Exception as e:
            print(f"Error fetching financial data: {str(e)}")
            return {}

    def calculate_financial_ratios(self, data: Dict[str, pd.DataFrame]) -> Dict[str, float]:
        """Calculate key financial ratios"""
        try:
            latest_income = data['income_statement'].iloc[:, 0]  # Most recent year
            latest_balance = data['balance_sheet'].iloc[:, 0]
            latest_cash_flow = data['cash_flow'].iloc[:, 0]
            
            ratios = {
                # Profitability Ratios
                'gross_margin': (latest_income['Gross Profit'] / latest_income['Total Revenue']) * 100,
                'operating_margin': (latest_income['Operating Income'] / latest_income['Total Revenue']) * 100,
                'net_margin': (latest_income['Net Income'] / latest_income['Total Revenue']) * 100,
                'roe': (latest_income['Net Income'] / latest_balance['Stockholders Equity']) * 100,
                
                # Liquidity Ratios
                'current_ratio': latest_balance['Total Current Assets'] / latest_balance['Total Current Liabilities'],
                'quick_ratio': (latest_balance['Total Current Assets'] - latest_balance['Inventory']) / 
                              latest_balance['Total Current Liabilities'],
                
                # Efficiency Ratios
                'asset_turnover': latest_income['Total Revenue'] / latest_balance['Total Assets'],
                'inventory_turnover': latest_income['Total Revenue'] / latest_balance.get('Inventory', 1),
                
                # Leverage Ratios
                'debt_to_equity': latest_balance['Total Liabilities'] / latest_balance['Stockholders Equity'],
                'debt_to_assets': latest_balance['Total Liabilities'] / latest_balance['Total Assets'],
                
                # Cash Flow Ratios
                'operating_cash_flow_ratio': latest_cash_flow['Operating Cash Flow'] / latest_balance['Total Current Liabilities'],
                'cash_flow_coverage': latest_cash_flow['Operating Cash Flow'] / latest_balance['Total Liabilities']
            }
            
            return {k: round(float(v), 2) for k, v in ratios.items() if not pd.isna(v)}
        except Exception as e:
            print(f"Error calculating ratios: {str(e)}")
            return {}

    def get_market_data(self) -> Dict[str, Any]:
        """Get market-related data"""
        try:
            info = self.yf_ticker.info
            return {
                'market_cap': info.get('marketCap'),
                'enterprise_value': info.get('enterpriseValue'),
                'pe_ratio': info.get('trailingPE'),
                'forward_pe': info.get('forwardPE'),
                'price_to_book': info.get('priceToBook'),
                'enterprise_to_ebitda': info.get('enterpriseToEbitda'),
                'beta': info.get('beta'),
                'dividend_yield': info.get('dividendYield', 0) * 100,
                'fifty_two_week_high': info.get('fiftyTwoWeekHigh'),
                'fifty_two_week_low': info.get('fiftyTwoWeekLow')
            }
        except Exception as e:
            print(f"Error fetching market data: {str(e)}")
            return {}

    def get_peer_comparison(self) -> Dict[str, pd.DataFrame]:
        """Get peer comparison data"""
        try:
            # Get peers from Yahoo Finance info
            info = self.yf_ticker.info
            peers = info.get('peerComponents', [])
            if not isinstance(peers, list):
                return {}
            
            peer_data = {}
            for peer in peers[:5]:  # Limit to 5 peers for efficiency
                peer_ticker = yf.Ticker(peer)
                peer_data[peer] = {
                    'Market Cap': peer_ticker.info.get('marketCap'),
                    'P/E Ratio': peer_ticker.info.get('trailingPE'),
                    'Revenue Growth': peer_ticker.info.get('revenueGrowth'),
                    'Profit Margin': peer_ticker.info.get('profitMargin'),
                    'ROE': peer_ticker.info.get('returnOnEquity')
                }
            
            return {'peer_comparison': pd.DataFrame(peer_data).T}
        except Exception as e:
            print(f"Error in peer comparison: {str(e)}")
            return {}

    def analyze_trend(self, data: pd.DataFrame, periods: int = 4) -> Dict[str, float]:
        """Analyze trends in financial metrics"""
        try:
            trends = {}
            for column in data.columns:
                values = pd.to_numeric(data[column].values[:periods], errors='coerce')
                if len(values) >= 2 and not np.isnan(values).any():
                    trend = np.polyfit(range(len(values)), values, 1)[0]
                    trends[f'{column}_trend'] = trend
            return trends
        except Exception as e:
            print(f"Error analyzing trends: {str(e)}")
            return {}

    def get_comprehensive_analysis(self) -> Dict[str, Any]:
        """Get comprehensive fundamental analysis"""
        # Get all financial data
        financial_data = self.get_all_financial_data()
        if not financial_data:
            return {'error': 'Failed to fetch financial data'}

        # Calculate all components
        analysis = {
            'financial_ratios': self.calculate_financial_ratios(financial_data),
            'market_data': self.get_market_data(),
            'peer_comparison': self.get_peer_comparison(),
            'financial_trends': {
                'income_trends': self.analyze_trend(financial_data['income_statement']),
                'balance_trends': self.analyze_trend(financial_data['balance_sheet']),
                'cash_flow_trends': self.analyze_trend(financial_data['cash_flow'])
            },
            'esg_data': self.yf_ticker.sustainability,
            'raw_financial_data': financial_data
        }

        return analysis

def format_analysis_report(analysis: Dict[str, Any]) -> str:
    """Format the analysis results into a readable report"""
    report = []
    
    # Financial Health Score (simple version)
    def calculate_health_score(ratios: Dict[str, float]) -> float:
        if not ratios:
            return 0
        
        weights = {
            'current_ratio': 0.15,
            'debt_to_equity': 0.15,
            'operating_margin': 0.2,
            'roe': 0.2,
            'net_margin': 0.15,
            'asset_turnover': 0.15
        }
        
        score = 0
        for metric, weight in weights.items():
            if metric in ratios:
                # Normalize the ratio and add to score
                if metric in ['current_ratio', 'roe', 'operating_margin', 'net_margin', 'asset_turnover']:
                    score += min(max(ratios[metric], 0), 10) * weight
                else:  # For metrics where lower is better
                    score += (10 - min(max(ratios[metric], 0), 10)) * weight
        
        return round(score, 2)

    # Add sections to report
    if 'financial_ratios' in analysis:
        report.append("Financial Health Score: " + 
                     str(calculate_health_score(analysis['financial_ratios'])) + " / 10")
        
        report.append("\nKey Financial Ratios:")
        for ratio, value in analysis['financial_ratios'].items():
            report.append(f"{ratio.replace('_', ' ').title()}: {value}")

    if 'market_data' in analysis:
        report.append("\nMarket Metrics:")
        for metric, value in analysis['market_data'].items():
            if value is not None:
                if 'market_cap' in metric or 'enterprise_value' in metric:
                    value = f"${value:,.0f}"
                elif isinstance(value, float):
                    value = f"{value:.2f}"
                report.append(f"{metric.replace('_', ' ').title()}: {value}")

    if 'peer_comparison' in analysis and 'peer_comparison' in analysis['peer_comparison']:
        report.append("\nPeer Comparison Summary:")
        peer_df = analysis['peer_comparison']['peer_comparison']
        report.append(str(peer_df.describe()))

    return "\n".join(report)

@dataclass
class TechnicalSignal:
    date: pd.Timestamp
    signal_type: str
    strength: float
    reason: str

class TechnicalAnalyzer:
    def __init__(self, ticker: str, period: str = "2y", interval: str = "1wk"):
        """
        Initialize with default parameters for technical analysis
        period: valid periods: 1d,5d,1mo,3mo,6mo,1y,2y,5y,10y,ytd,max
        interval: valid intervals: 1m,2m,5m,15m,30m,60m,90m,1h,1d,5d,1wk,1mo,3mo
        """
        self.ticker = ticker
        self.data = self._get_price_data(ticker, period, interval)
        
    def _get_price_data(self, ticker: str, period: str, interval: str) -> pd.DataFrame:
        """Fetch and prepare price data"""
        stock = yf.Ticker(ticker)
        df = stock.history(period=period, interval=interval)
        return df
    
    def calculate_rsi(self, window: int = 14) -> pd.Series:
        """Calculate Relative Strength Index"""
        delta = self.data['Close'].diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
        rs = gain / loss
        return 100 - (100 / (1 + rs))
    
    def calculate_macd(self, fast: int = 12, slow: int = 26, signal: int = 9) -> Tuple[pd.Series, pd.Series, pd.Series]:
        """Calculate MACD, Signal line, and MACD histogram"""
        exp1 = self.data['Close'].ewm(span=fast, adjust=False).mean()
        exp2 = self.data['Close'].ewm(span=slow, adjust=False).mean()
        macd = exp1 - exp2
        signal_line = macd.ewm(span=signal, adjust=False).mean()
        histogram = macd - signal_line
        return macd, signal_line, histogram
    
    def calculate_moving_averages(self) -> Dict[str, pd.Series]:
        """Calculate various moving averages"""
        return {
            'SMA20': self.data['Close'].rolling(window=20).mean(),
            'SMA50': self.data['Close'].rolling(window=50).mean(),
            'SMA200': self.data['Close'].rolling(window=200).mean(),
            'EMA20': self.data['Close'].ewm(span=20, adjust=False).mean(),
            'EMA50': self.data['Close'].ewm(span=50, adjust=False).mean()
        }
    
    def detect_divergence(self, price: pd.Series, indicator: pd.Series, window: int = 20) -> List[TechnicalSignal]:
        signals = []
        
        # Get local maxima and minima
        def get_extrema(series: pd.Series, window: int) -> Tuple[pd.Series, pd.Series]:
            maxima = pd.Series(index=series.index, dtype=float)
            minima = pd.Series(index=series.index, dtype=float)
            
            for i in range(window, len(series) - window):
                if all(series.iloc[i] > series.iloc[i-window:i]) and \
                   all(series.iloc[i] > series.iloc[i+1:i+window+1]):
                    maxima.iloc[i] = series.iloc[i]
                if all(series.iloc[i] < series.iloc[i-window:i]) and \
                   all(series.iloc[i] < series.iloc[i+1:i+window+1]):
                    minima.iloc[i] = series.iloc[i]
                    
            return maxima.dropna(), minima.dropna()
        
        price_maxima, price_minima = get_extrema(price, window)
        ind_maxima, ind_minima = get_extrema(indicator, window)
        
        # Debugging: Print lengths
        print(f"Length of price_minima: {len(price_minima)}")
        print(f"Length of ind_minima: {len(ind_minima)}")
        
        # Reindex ind_minima to match price_minima
        ind_minima = ind_minima.reindex(price_minima.index, fill_value=np.nan)
        
        # Detect bearish regular divergence (higher highs in price, lower highs in indicator)
        for i in range(1, len(price_maxima)):
            if pd.isna(ind_maxima.iloc[i]) or pd.isna(ind_maxima.iloc[i-1]):
                continue
            if price_maxima.iloc[i] > price_maxima.iloc[i-1] and \
               ind_maxima.iloc[i] < ind_maxima.iloc[i-1]:
                signals.append(TechnicalSignal(
                    date=price_maxima.index[i],
                    signal_type='bearish_divergence',
                    strength=0.8,
                    reason='Bearish regular divergence: Higher price high with lower indicator high'
                ))
        
        # Detect bullish regular divergence (lower lows in price, higher lows in indicator)
        for i in range(1, len(price_minima)):
            if pd.isna(ind_minima.iloc[i]) or pd.isna(ind_minima.iloc[i-1]):
                continue
            if price_minima.iloc[i] < price_minima.iloc[i-1] and \
               ind_minima.iloc[i] > ind_minima.iloc[i-1]:
                signals.append(TechnicalSignal(
                    date=price_minima.index[i],
                    signal_type='bullish_divergence',
                    strength=0.8,
                    reason='Bullish regular divergence: Lower price low with higher indicator low'
                ))
                
        return signals
    
    def calculate_volume_profile(self, bins: int = 50) -> pd.DataFrame:
        """Calculate volume profile for price levels"""
        price_bins = pd.cut(self.data['Close'], bins=bins)
        return self.data['Volume'].groupby(price_bins, observed=False).sum().sort_index()
    
    def detect_support_resistance(self, window: int = 20, threshold: float = 0.1) -> Tuple[List[float], List[float]]:
        """Detect support and resistance levels using price action and volume"""
        volume_profile = self.calculate_volume_profile()
        high_volume_levels = volume_profile[volume_profile > volume_profile.quantile(1 - threshold)].index
        
        support_levels = []
        resistance_levels = []
        
        for level in high_volume_levels:
            price_level = level.mid
            # Check if price has reversed near this level multiple times
            touches = sum((self.data['Low'] <= price_level * 1.01) & 
                         (self.data['Low'] >= price_level * 0.99))
            if touches >= 3:
                support_levels.append(price_level)
                
            touches = sum((self.data['High'] <= price_level * 1.01) & 
                         (self.data['High'] >= price_level * 0.99))
            if touches >= 3:
                resistance_levels.append(price_level)
                
        return support_levels, resistance_levels
    
    def analyze_price_action(self) -> Dict[str, Any]:
        """Perform comprehensive price action analysis"""
        # Calculate indicators
        rsi = self.calculate_rsi()
        macd, signal, histogram = self.calculate_macd()
        mas = self.calculate_moving_averages()
        
        # Detect divergences
        rsi_divergences = self.detect_divergence(self.data['Close'], rsi)
        macd_divergences = self.detect_divergence(self.data['Close'], macd)
        
        # Get support/resistance levels
        support, resistance = self.detect_support_resistance()
        
        # Generate signals based on multiple indicators
        signals = []
        
        # Check for oversold/overbought conditions with confirmation
        for i in range(len(self.data)):
            if i < 2:
                continue
                
            # Oversold conditions (potential buy)
            if (rsi.iloc[i] < 30 and  # RSI oversold
                histogram.iloc[i] > histogram.iloc[i-1] and  # MACD histogram turning up
                self.data['Close'].iloc[i] > mas['SMA20'].iloc[i]):  # Price above short-term MA
                
                signals.append(TechnicalSignal(
                    date=self.data.index[i],
                    signal_type='buy',
                    strength=0.7,
                    reason='Oversold conditions with positive momentum'
                ))
                
            # Overbought conditions (potential sell)
            if (rsi.iloc[i] > 70 and  # RSI overbought
                histogram.iloc[i] < histogram.iloc[i-1] and  # MACD histogram turning down
                self.data['Close'].iloc[i] < mas['SMA20'].iloc[i]):  # Price below short-term MA
                
                signals.append(TechnicalSignal(
                    date=self.data.index[i],
                    signal_type='sell',
                    strength=0.7,
                    reason='Overbought conditions with negative momentum'
                ))
        
        return {
            'indicators': {
                'rsi': rsi,
                'macd': macd,
                'macd_signal': signal,
                'macd_histogram': histogram,
                'moving_averages': mas
            },
            'signals': signals + rsi_divergences + macd_divergences,
            'levels': {
                'support': support,
                'resistance': resistance
            }
        }

def generate_analysis_report(ticker: str) -> str:
    """Generate a readable technical analysis report"""
    analyzer = TechnicalAnalyzer(ticker)
    analysis = analyzer.analyze_price_action()
    
    report = [f"Technical Analysis Report for {ticker}\n"]
    
    # Add current indicator values
    current_rsi = analysis['indicators']['rsi'].iloc[-1]
    current_macd = analysis['indicators']['macd'].iloc[-1]
    report.append(f"Current RSI: {current_rsi:.2f}")
    report.append(f"Current MACD: {current_macd:.2f}")
    
    # Add recent signals
    recent_signals = [s for s in analysis['signals'] 
                     if s.date >= analyzer.data.index[-10]]  # Last 10 periods
    
    if recent_signals:
        report.append("\nRecent Signals:")
        for signal in recent_signals:
            report.append(f"{signal.date.date()}: {signal.signal_type} "
                        f"(Strength: {signal.strength:.1f})")
            report.append(f"Reason: {signal.reason}")
    
    # Add support/resistance levels
    report.append("\nKey Price Levels:")
    report.append(f"Support levels: {', '.join(f'${x:.2f}' for x in analysis['levels']['support'])}")
    report.append(f"Resistance levels: {', '.join(f'${x:.2f}' for x in analysis['levels']['resistance'])}")
    
    return "\n".join(report)

class StockEvaluator:
    def __init__(self, tickers: List[str], fundamental_weight: float = 0.6, technical_weight: float = 0.4):
        self.tickers = tickers
        self.fundamental_weight = fundamental_weight
        self.technical_weight = technical_weight

    def evaluate_fundamentals(self, ticker: str) -> float:
        analyzer = FundamentalAnalyzer(ticker)
        analysis = analyzer.get_comprehensive_analysis()
        if 'financial_ratios' in analysis:
            return self.calculate_fundamental_score(analysis['financial_ratios'])
        return 0.0

    def calculate_fundamental_score(self, ratios: Dict[str, float]) -> float:
        weights = {
            'current_ratio': 0.15,
            'debt_to_equity': 0.15,
            'operating_margin': 0.2,
            'roe': 0.2,
            'net_margin': 0.15,
            'asset_turnover': 0.15
        }
        
        score = 0
        for metric, weight in weights.items():
            if metric in ratios:
                if metric in ['current_ratio', 'roe', 'operating_margin', 'net_margin', 'asset_turnover']:
                    score += min(max(ratios[metric], 0), 10) * weight
                else:
                    score += (10 - min(max(ratios[metric], 0), 10)) * weight
        
        return round(score, 2)

    def evaluate_technicals(self, ticker: str) -> float:
        analyzer = TechnicalAnalyzer(ticker)
        analysis = analyzer.analyze_price_action()
        return self.calculate_technical_score(analysis)

    def calculate_technical_score(self, analysis: Dict[str, Any]) -> float:
        score = 0
        signals = analysis['signals']
        if signals:
            for signal in signals:
                if signal.signal_type == 'buy':
                    score += signal.strength
                elif signal.signal_type == 'sell':
                    score -= signal.strength
        return max(0, min(10, score))

    def evaluate_stocks(self) -> pd.DataFrame:
        results = []
        for ticker in self.tickers:
            fundamental_score = self.evaluate_fundamentals(ticker)
            technical_score = self.evaluate_technicals(ticker)
            combined_score = (fundamental_score * self.fundamental_weight) + (technical_score * self.technical_weight)
            results.append({
                'Ticker': ticker,
                'Fundamental Score': fundamental_score,
                'Technical Score': technical_score,
                'Combined Score': combined_score
            })
        return pd.DataFrame(results)

    def generate_recommendations(self, df: pd.DataFrame) -> pd.DataFrame:
        df['Recommendation'] = np.where(df['Combined Score'] >= 7, 'Buy', 
                                       np.where(df['Combined Score'] <= 4, 'Sell', 'Hold'))
        return df.sort_values(by='Combined Score', ascending=False)

# Example usage
tickers = ['AAPL', 'MSFT', 'AMZN', 'GOOGL', 'META', 'TSLA']
evaluator = StockEvaluator(tickers)
results = evaluator.evaluate_stocks()
recommendations = evaluator.generate_recommendations(results)
print(recommendations)

## Backtesting

In [None]:
class Backtester:
    def __init__(self, tickers: List[str], start_date: str, end_date: str):
        self.tickers = tickers
        self.start_date = start_date
        self.end_date = end_date
        self.evaluator = StockEvaluator(tickers)

    def run_backtest(self) -> pd.DataFrame:
        """
        Run backtest over the specified date range.
        """
        date_range = pd.date_range(start=self.start_date, end=self.end_date, freq='M')  # Monthly evaluation
        results = []

        for date in date_range:
            date_str = date.strftime('%Y-%m-%d')
            for ticker in self.tickers:
                try:
                    evaluation = self.evaluator.evaluate_stock_on_date(ticker, date_str)
                    results.append(evaluation)
                except Exception as e:
                    print(f"Error evaluating {ticker} on {date_str}: {str(e)}")

        return pd.DataFrame(results)

    def simulate_portfolio(self, backtest_results: pd.DataFrame, initial_capital: float = 10000) -> pd.DataFrame:
        """
        Simulate portfolio performance based on backtest results.
        """
        portfolio = {ticker: 0 for ticker in self.tickers}
        cash = initial_capital
        portfolio_value = []

        for date, group in backtest_results.groupby('Date'):
            # Rebalance portfolio based on recommendations
            for _, row in group.iterrows():
                ticker = row['Ticker']
                recommendation = 'Buy' if row['Combined Score'] >= 7 else ('Sell' if row['Combined Score'] <= 4 else 'Hold')
                price = yf.Ticker(ticker).history(start=date, end=date)['Close'].iloc[0]

                if recommendation == 'Buy' and cash >= price:
                    shares_to_buy = cash // price
                    portfolio[ticker] += shares_to_buy
                    cash -= shares_to_buy * price
                elif recommendation == 'Sell' and portfolio[ticker] > 0:
                    cash += portfolio[ticker] * price
                    portfolio[ticker] = 0

            # Calculate portfolio value
            total_value = cash
            for ticker, shares in portfolio.items():
                price = yf.Ticker(ticker).history(start=date, end=date)['Close'].iloc[0]
                total_value += shares * price
            portfolio_value.append({'Date': date, 'Portfolio Value': total_value})

        return pd.DataFrame(portfolio_value)

# Example usage
backtester = Backtester(tickers=['AAPL', 'MSFT'], start_date='2022-01-01', end_date='2023-10-01')
backtest_results = backtester.run_backtest()
portfolio_performance = backtester.simulate_portfolio(backtest_results)
print(portfolio_performance)

# Plot portfolio performance
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.plot(portfolio_performance['Date'], portfolio_performance['Portfolio Value'], label='Portfolio Value')
plt.title('Portfolio Performance Over Time')
plt.xlabel('Date')
plt.ylabel('Value ($)')
plt.legend()
plt.show()