In [1]:
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import requests
import time
import os
import json

class NSEIndexAnalyzer:
    def __init__(self):
        self.base_dir = 'index_analysis'
        self.results_dir = os.path.join(self.base_dir, 'results')
        os.makedirs(self.results_dir, exist_ok=True)
        
        # List of Indian indices to analyze with their Yahoo Finance tickers
        self.equity_market_indices = [
            {
                'name': 'NIFTY 50',
                'ticker': '^NSEI'
            },
            {
                'name': 'NIFTY BANK',
                'ticker': '^NSEBANK'
            },
            {
                'name': 'NIFTY IT',
                'ticker': '^CNXIT'
            },
            {
                'name': 'NIFTY PHARMA',
                'ticker': '^CNXPHARMA'
            },
            {
                'name': 'NIFTY AUTO',
                'ticker': '^CNXAUTO'
            },
            {
                'name': 'NIFTY FINANCIAL SERVICES',
                'ticker': '^CNXFINANCE'
            },
            {
                'name': 'NIFTY FMCG',
                'ticker': '^CNXFMCG'
            },
            {
                'name': 'NIFTY METAL',
                'ticker': '^CNXMETAL'
            },
            {
                'name': 'NIFTY REALTY',
                'ticker': '^CNXREALTY'
            },
            {
                'name': 'NIFTY MEDIA',
                'ticker': '^CNXMEDIA'
            },
            {
                'name': 'NIFTY PSU BANK',
                'ticker': '^CNXPSUBANK'
            },
            {
                'name': 'NIFTY MIDCAP 50',
                'ticker': '^NIFMDCP50'
            },
            {
                'name': 'NIFTY SMALLCAP 50',
                'ticker': '^NIFSMCP50'
            }
        ]

    def calculate_rsi(self, prices_tuple, periods=14):
        """RSI calculation method"""
        try:
            prices = pd.Series(prices_tuple)
            
            if len(prices) < periods + 1:
                return None
                
            # Calculate price changes
            delta = prices.diff()
            
            # Split gains and losses
            gains = delta.copy()
            losses = delta.copy()
            
            gains[gains < 0] = 0
            losses[losses > 0] = 0
            losses = abs(losses)
            
            # Calculate initial averages
            first_avg_gain = gains[1:periods+1].mean()
            first_avg_loss = losses[1:periods+1].mean()
            
            avg_gains = [first_avg_gain]
            avg_losses = [first_avg_loss]
            
            # Calculate subsequent values
            for i in range(periods+1, len(gains)):
                avg_gain = (avg_gains[-1] * (periods-1) + gains[i]) / periods
                avg_loss = (avg_losses[-1] * (periods-1) + losses[i]) / periods
                avg_gains.append(avg_gain)
                avg_losses.append(avg_loss)
            
            # Calculate final RSI
            if avg_losses[-1] == 0:
                rsi = 100.0
            else:
                rs = avg_gains[-1] / avg_losses[-1]
                rsi = 100 - (100 / (1 + rs))
            
            return float(rsi)
            
        except Exception as e:
            print(f"Error calculating RSI: {e}")
            return None

    def calculate_tema(self, prices, period=50):
        """
        Calculate Triple Exponential Moving Average (TEMA)
        
        Args:
            prices (pd.Series): Price series
            period (int): Period for TEMA calculation
        
        Returns:
            float: TEMA value
        """
        try:
            if len(prices) < period:
                return None
            
            # Calculate EMAs
            ema1 = prices.ewm(span=period, adjust=False).mean()
            ema2 = ema1.ewm(span=period, adjust=False).mean()
            ema3 = ema2.ewm(span=period, adjust=False).mean()
            
            # TEMA Formula: TEMA = 3 * EMA1 - 3 * EMA2 + EMA3
            tema = 3 * ema1.iloc[-1] - 3 * ema2.iloc[-1] + ema3.iloc[-1]
            
            return float(tema)
        
        except Exception as e:
            print(f"Error calculating TEMA: {e}")
            return None

    def get_index_data(self, index_info):
        """Fetch detailed index data"""
        try:
            time.sleep(1)  # Rate limiting
            ticker_symbol = index_info['ticker']
            ticker = yf.Ticker(ticker_symbol)
            
            # Get historical data for 1 year
            hist_data = ticker.history(period="1y", interval="1d")

            # Check if 1 year data is available, else get maximum available data
            if hist_data.empty:
                hist_data = ticker.history(period="max", interval="1d")
                if hist_data.empty:
                    print(f"No data available for {index_info['name']} ({ticker_symbol})")
                    return None

            # Calculate basic metrics
            latest_price = hist_data['Close'].iloc[-1]
            prices_tuple = tuple(hist_data['Close'].values)
            
            # Calculate indicators
            latest_rsi = self.calculate_rsi(prices_tuple)
            
            # Calculate EMA-50 and TEMA-50
            prices_series = hist_data['Close']
            ema_50 = prices_series.ewm(span=50, adjust=False).mean().iloc[-1]
            tema_50 = self.calculate_tema(prices_series)
            
            # Calculate 20-day and 50-day simple moving averages
            sma_20 = hist_data['Close'].rolling(window=20).mean().iloc[-1] if len(hist_data) >= 20 else None
            sma_50 = hist_data['Close'].rolling(window=50).mean().iloc[-1] if len(hist_data) >= 50 else None
            
            # Calculate daily returns
            daily_returns = hist_data['Close'].pct_change()
            
            # Calculate volatility (20-day standard deviation of returns)
            volatility_20 = daily_returns.rolling(window=20).std().iloc[-1] * (252 ** 0.5)  # Annualized
            
            # Calculate 52-week high and low
            high_52w = hist_data['High'].max()
            low_52w = hist_data['Low'].min()
            
            # Calculate percent from 52-week high
            pct_from_high = (latest_price - high_52w) / high_52w * 100
            
            # Prepare index information dictionary
            index_data = {
                'Index_Name': index_info['name'],
                'Ticker': ticker_symbol,
                'Latest_Price': round(latest_price, 2),
                'RSI_14': round(latest_rsi, 2) if latest_rsi is not None else 'N/A',
                'EMA_50': round(ema_50, 2) if ema_50 is not None else 'N/A',
                'TEMA_50': round(tema_50, 2) if tema_50 is not None else 'N/A',
                'TEMA_50-EMA_50': round(tema_50 - ema_50, 2) if tema_50 is not None and ema_50 is not None else 'N/A',
                'SMA_20': round(sma_20, 2) if sma_20 is not None else 'N/A',
                'SMA_50': round(sma_50, 2) if sma_50 is not None else 'N/A',
                'Volatility_20D': round(volatility_20 * 100, 2) if not pd.isna(volatility_20) else 'N/A',
                '52W_High': round(high_52w, 2),
                '52W_Low': round(low_52w, 2),
                'Pct_From_52W_High': round(pct_from_high, 2),
                'Volume': hist_data['Volume'].iloc[-1] if 'Volume' in hist_data else 'N/A',
                'Daily_Return': round(daily_returns.iloc[-1] * 100, 2) if not pd.isna(daily_returns.iloc[-1]) else 'N/A'
            }
            
            return index_data

        except Exception as e:
            print(f"Error processing {index_info['name']} ({index_info['ticker']}): {e}")
            return None

    def analyze_indices(self):
        """Analyze all indices and generate Excel report"""
        # Timestamp for unique filename
        current_date = datetime.now().strftime('%Y%m%d_%H%M%S')
        excel_file = os.path.join(self.results_dir, f'indian_indices_analysis_{current_date}.xlsx')
        
        # Process all indices
        results = []
        for index_info in self.equity_market_indices:
            print(f"Processing {index_info['name']} ({index_info['ticker']})...")
            index_data = self.get_index_data(index_info)
            if index_data:
                results.append(index_data)
        
        # Convert to DataFrame
        if results:
            df = pd.DataFrame(results)
            
            # Create Excel writer
            with pd.ExcelWriter(excel_file, engine='xlsxwriter') as writer:
                # Write main index data
                df.to_excel(writer, sheet_name='Indian_Indices', index=False)
                
                # Get workbook and worksheet objects
                workbook = writer.book
                worksheet = writer.sheets['Indian_Indices']
                
                # Add formats for conditional formatting
                green_format = workbook.add_format({'bg_color': '#C6EFCE', 'font_color': '#006100'})
                red_format = workbook.add_format({'bg_color': '#FFC7CE', 'font_color': '#9C0006'})
                
                # Apply conditional formatting to RSI column
                rsi_col = df.columns.get_loc('RSI_14') + 1  # +1 for Excel's 1-indexing
                worksheet.conditional_format(1, rsi_col, len(results), rsi_col, 
                                          {'type': 'cell', 'criteria': 'less than', 'value': 30, 'format': green_format})
                worksheet.conditional_format(1, rsi_col, len(results), rsi_col, 
                                          {'type': 'cell', 'criteria': 'greater than', 'value': 70, 'format': red_format})
            
            print(f"\nAnalysis complete. Results saved to {excel_file}")
        else:
            print("No index data was successfully retrieved.")

if __name__ == "__main__":
    analyzer = NSEIndexAnalyzer()
    analyzer.analyze_indices()

Processing NIFTY 50 (^NSEI)...
Processing NIFTY BANK (^NSEBANK)...
Processing NIFTY IT (^CNXIT)...
Processing NIFTY PHARMA (^CNXPHARMA)...
Processing NIFTY AUTO (^CNXAUTO)...
Processing NIFTY FINANCIAL SERVICES (^CNXFINANCE)...


$^CNXFINANCE: possibly delisted; no price data found  (period=1y) (Yahoo error = "No data found, symbol may be delisted")
$^CNXFINANCE: possibly delisted; no timezone found


No data available for NIFTY FINANCIAL SERVICES (^CNXFINANCE)
Processing NIFTY FMCG (^CNXFMCG)...
Processing NIFTY METAL (^CNXMETAL)...
Processing NIFTY REALTY (^CNXREALTY)...
Processing NIFTY MEDIA (^CNXMEDIA)...
Processing NIFTY PSU BANK (^CNXPSUBANK)...
Processing NIFTY MIDCAP 50 (^NIFMDCP50)...


$^NIFMDCP50: possibly delisted; no price data found  (period=1y) (Yahoo error = "No data found, symbol may be delisted")
$^NIFMDCP50: possibly delisted; no timezone found


No data available for NIFTY MIDCAP 50 (^NIFMDCP50)
Processing NIFTY SMALLCAP 50 (^NIFSMCP50)...


$^NIFSMCP50: possibly delisted; no price data found  (period=1y) (Yahoo error = "No data found, symbol may be delisted")
$^NIFSMCP50: possibly delisted; no timezone found


No data available for NIFTY SMALLCAP 50 (^NIFSMCP50)

Analysis complete. Results saved to index_analysis\results\indian_indices_analysis_20250226_143655.xlsx


In [1]:
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import requests
import time
import os
import json
import random

class NSEIndexAnalyzer:
    def __init__(self):
        self.base_dir = 'index_analysis'
        self.results_dir = os.path.join(self.base_dir, 'results')
        os.makedirs(self.results_dir, exist_ok=True)
        
        # List of Indian indices to analyze with their Yahoo Finance tickers
        self.equity_market_indices = [
            {
                'name': 'NIFTY 50',
                'ticker': '^NSEI'
            },
            {
                'name': 'NIFTY BANK',
                'ticker': '^NSEBANK'
            },
            {
                'name': 'NIFTY IT',
                'ticker': '^CNXIT'
            },
            {
                'name': 'NIFTY PHARMA',
                'ticker': '^CNXPHARMA'
            },
            {
                'name': 'NIFTY AUTO',
                'ticker': '^CNXAUTO'
            },
            {
                'name': 'NIFTY FINANCIAL SERVICES',
                'ticker': '^CNXFINANCE'
            },
            {
                'name': 'NIFTY FMCG',
                'ticker': '^CNXFMCG'
            },
            {
                'name': 'NIFTY METAL',
                'ticker': '^CNXMETAL'
            },
            {
                'name': 'NIFTY REALTY',
                'ticker': '^CNXREALTY'
            },
            {
                'name': 'NIFTY MEDIA',
                'ticker': '^CNXMEDIA'
            },
            {
                'name': 'NIFTY PSU BANK',
                'ticker': '^CNXPSUBANK'
            },
            {
                'name': 'NIFTY MIDCAP 50',
                'ticker': '^NIFMDCP50'
            },
            {
                'name': 'NIFTY SMALLCAP 50',
                'ticker': '^NIFSMCP50'
            }
        ]

    def calculate_rsi(self, prices_tuple, periods=14):
        """RSI calculation method"""
        try:
            prices = pd.Series(prices_tuple)
            
            if len(prices) < periods + 1:
                return None
                
            # Calculate price changes
            delta = prices.diff()
            
            # Split gains and losses
            gains = delta.copy()
            losses = delta.copy()
            
            gains[gains < 0] = 0
            losses[losses > 0] = 0
            losses = abs(losses)
            
            # Calculate initial averages
            first_avg_gain = gains[1:periods+1].mean()
            first_avg_loss = losses[1:periods+1].mean()
            
            avg_gains = [first_avg_gain]
            avg_losses = [first_avg_loss]
            
            # Calculate subsequent values
            for i in range(periods+1, len(gains)):
                avg_gain = (avg_gains[-1] * (periods-1) + gains[i]) / periods
                avg_loss = (avg_losses[-1] * (periods-1) + losses[i]) / periods
                avg_gains.append(avg_gain)
                avg_losses.append(avg_loss)
            
            # Calculate final RSI
            if avg_losses[-1] == 0:
                rsi = 100.0
            else:
                rs = avg_gains[-1] / avg_losses[-1]
                rsi = 100 - (100 / (1 + rs))
            
            return float(rsi)
            
        except Exception as e:
            print(f"Error calculating RSI: {e}")
            return None

    def calculate_tema(self, prices, period=50):
        """
        Calculate Triple Exponential Moving Average (TEMA)
        
        Args:
            prices (pd.Series): Price series
            period (int): Period for TEMA calculation
        
        Returns:
            float: TEMA value
        """
        try:
            if len(prices) < period:
                return None
            
            # Calculate EMAs
            ema1 = prices.ewm(span=period, adjust=False).mean()
            ema2 = ema1.ewm(span=period, adjust=False).mean()
            ema3 = ema2.ewm(span=period, adjust=False).mean()
            
            # TEMA Formula: TEMA = 3 * EMA1 - 3 * EMA2 + EMA3
            tema = 3 * ema1.iloc[-1] - 3 * ema2.iloc[-1] + ema3.iloc[-1]
            
            return float(tema)
        
        except Exception as e:
            print(f"Error calculating TEMA: {e}")
            return None

    def get_index_data(self, index_info, max_retries=3):
        """Fetch detailed index data with retry logic"""
        for attempt in range(max_retries):
            try:
                # Add randomized delay between 3-6 seconds to avoid rate limiting
                delay = 3 + random.random() * 3
                print(f"Waiting {delay:.1f} seconds before requesting data...")
                time.sleep(delay)
                
                ticker_symbol = index_info['ticker']
                ticker = yf.Ticker(ticker_symbol)
                
                # Get historical data for 1 year
                hist_data = ticker.history(period="1y", interval="1d")

                # Check if 1 year data is available, else get maximum available data
                if hist_data.empty:
                    hist_data = ticker.history(period="max", interval="1d")
                    if hist_data.empty:
                        print(f"No data available for {index_info['name']} ({ticker_symbol})")
                        return None

                # Calculate basic metrics
                latest_price = hist_data['Close'].iloc[-1]
                prices_tuple = tuple(hist_data['Close'].values)
                
                # Calculate indicators
                latest_rsi = self.calculate_rsi(prices_tuple)
                
                # Calculate EMA-50 and TEMA-50
                prices_series = hist_data['Close']
                ema_50 = prices_series.ewm(span=50, adjust=False).mean().iloc[-1]
                tema_50 = self.calculate_tema(prices_series)
                
                # Calculate 20-day and 50-day simple moving averages
                sma_20 = hist_data['Close'].rolling(window=20).mean().iloc[-1] if len(hist_data) >= 20 else None
                sma_50 = hist_data['Close'].rolling(window=50).mean().iloc[-1] if len(hist_data) >= 50 else None
                
                # Calculate daily returns
                daily_returns = hist_data['Close'].pct_change()
                
                # Calculate volatility (20-day standard deviation of returns)
                volatility_20 = daily_returns.rolling(window=20).std().iloc[-1] * (252 ** 0.5)  # Annualized
                
                # Calculate 52-week high and low
                high_52w = hist_data['High'].max()
                low_52w = hist_data['Low'].min()
                
                # Calculate percent from 52-week high
                pct_from_high = (latest_price - high_52w) / high_52w * 100
                
                # Prepare index information dictionary
                index_data = {
                    'Index_Name': index_info['name'],
                    'Ticker': ticker_symbol,
                    'Latest_Price': round(latest_price, 2),
                    'RSI_14': round(latest_rsi, 2) if latest_rsi is not None else 'N/A',
                    'EMA_50': round(ema_50, 2) if ema_50 is not None else 'N/A',
                    'TEMA_50': round(tema_50, 2) if tema_50 is not None else 'N/A',
                    'TEMA_50-EMA_50': round(tema_50 - ema_50, 2) if tema_50 is not None and ema_50 is not None else 'N/A',
                    'SMA_20': round(sma_20, 2) if sma_20 is not None else 'N/A',
                    'SMA_50': round(sma_50, 2) if sma_50 is not None else 'N/A',
                    'Volatility_20D': round(volatility_20 * 100, 2) if not pd.isna(volatility_20) else 'N/A',
                    '52W_High': round(high_52w, 2),
                    '52W_Low': round(low_52w, 2),
                    'Pct_From_52W_High': round(pct_from_high, 2),
                    'Volume': hist_data['Volume'].iloc[-1] if 'Volume' in hist_data else 'N/A',
                    'Daily_Return': round(daily_returns.iloc[-1] * 100, 2) if not pd.isna(daily_returns.iloc[-1]) else 'N/A'
                }
                
                return index_data

            except Exception as e:
                error_msg = str(e)
                print(f"Attempt {attempt+1}/{max_retries} - Error processing {index_info['name']} ({index_info['ticker']}): {error_msg}")
                
                if "Too Many Requests" in error_msg or "Rate limited" in error_msg:
                    # Exponential backoff - wait longer with each retry
                    wait_time = (2 ** attempt) * 10 + random.random() * 5
                    print(f"Rate limited. Waiting {wait_time:.1f} seconds before retrying...")
                    time.sleep(wait_time)
                else:
                    # For other errors, wait a shorter time
                    time.sleep(2)
        
        print(f"Failed to retrieve data for {index_info['name']} after {max_retries} attempts")
        return None

    def analyze_indices(self):
        """Analyze all indices and generate Excel report"""
        # Timestamp for unique filename
        current_date = datetime.now().strftime('%Y%m%d_%H%M%S')
        excel_file = os.path.join(self.results_dir, f'indian_indices_analysis_{current_date}.xlsx')
        
        # Process all indices
        results = []
        for index_info in self.equity_market_indices:
            print(f"Processing {index_info['name']} ({index_info['ticker']})...")
            index_data = self.get_index_data(index_info)
            if index_data:
                results.append(index_data)
        
        # Convert to DataFrame
        if results:
            df = pd.DataFrame(results)
            
            # Create Excel writer
            with pd.ExcelWriter(excel_file, engine='xlsxwriter') as writer:
                # Write main index data
                df.to_excel(writer, sheet_name='Indian_Indices', index=False)
                
                # Get workbook and worksheet objects
                workbook = writer.book
                worksheet = writer.sheets['Indian_Indices']
                
                # Add formats for conditional formatting
                green_format = workbook.add_format({'bg_color': '#C6EFCE', 'font_color': '#006100'})
                red_format = workbook.add_format({'bg_color': '#FFC7CE', 'font_color': '#9C0006'})
                
                # Apply conditional formatting to RSI column
                rsi_col = df.columns.get_loc('RSI_14') + 1  # +1 for Excel's 1-indexing
                worksheet.conditional_format(1, rsi_col, len(results), rsi_col, 
                                          {'type': 'cell', 'criteria': 'less than', 'value': 30, 'format': green_format})
                worksheet.conditional_format(1, rsi_col, len(results), rsi_col, 
                                          {'type': 'cell', 'criteria': 'greater than', 'value': 70, 'format': red_format})
            
            print(f"\nAnalysis complete. Results saved to {excel_file}")
        else:
            print("No index data was successfully retrieved.")

if __name__ == "__main__":
    analyzer = NSEIndexAnalyzer()
    analyzer.analyze_indices()

Processing NIFTY 50 (^NSEI)...
Waiting 4.7 seconds before requesting data...
Processing NIFTY BANK (^NSEBANK)...
Waiting 5.0 seconds before requesting data...
Processing NIFTY IT (^CNXIT)...
Waiting 4.5 seconds before requesting data...


KeyboardInterrupt: 

In [None]:
alpha vantage api key=EM154MX6UKVK7NJV