<a href="https://colab.research.google.com/github/rahulombale/swing_strategy_rahul/blob/main/Moving_Average_Strategy_with_S3_Integration_(Telegram_Report_with_Plots_Zip).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Moving Average Strategy Daily Opportunity Scanner with AWS S3 Integration

This script scans for moving average crossover opportunities, reading stock
data and company lists from AWS S3, and saving analysis results
back to S3, organized by date. It uses a specified AWS CLI profile for authentication.
It generates plots for identified opportunities, uploads them individually to S3
into source-specific folders (V40, V40Next, V200), and then compiles all plots
into a zip file with the same folder structure to be sent via Telegram
along with the CSV report.
"""

# --- 1. SETUP AND IMPORTS ---
import pandas as pd
import plotly.graph_objects as go
from datetime import datetime
import os
import warnings
import numpy as np
import boto3
from botocore.exceptions import ClientError
import io
import logging
import requests
import zipfile
from typing import Optional, List

warnings.filterwarnings('ignore')

# Configure logging
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# --- 2. CONFIGURATION ---

# AWS S3 Constants
AWS_BUCKET_NAME = 'stock-data-rahulombale'  # <--- IMPORTANT: REPLACE WITH YOUR BUCKET NAME
AWS_REGION = 'ap-south-1'  # <--- IMPORTANT: REPLACE WITH YOUR AWS REGION
AWS_PROFILE_NAME = 'stock_data_upload_profile' # <--- IMPORTANT: REPLACE WITH YOUR AWS CLI PROFILE NAME

# S3 Base Prefixes
S3_STOCK_DATA_PREFIX = 'stock_data/' # For historical stock data files (input)
S3_COMPANY_LIST_PREFIX = 'company_lists/' # For company token list CSVs (input)
S3_V20_ANALYSIS_OUTPUT_PREFIX = 'analysis/v20_strategy/' # For V20 strategy results (output)

# Dynamic Date for S3 paths
TODAY_DATE_STR = datetime.today().strftime('%Y-%m-%d') # e.g., '2025-06-27'
TODAY_YEAR_STR = datetime.today().strftime('%Y')
TODAY_MONTH_STR = datetime.today().strftime('%m')
TODAY_DAY_STR = datetime.today().strftime('%d')

# Full S3 paths based on today's date for output
S3_OUTPUT_BASE_PATH = f"{S3_V20_ANALYSIS_OUTPUT_PREFIX}{TODAY_DATE_STR}/"
S3_OUTPUT_CSV_KEY = f"{S3_OUTPUT_BASE_PATH}v20_opportunities_{TODAY_DATE_STR}.csv"

# Input Data Path (assuming daily stock data is stored by date)
S3_INPUT_STOCK_DATA_PATH = f"{S3_STOCK_DATA_PREFIX}{TODAY_YEAR_STR}/{TODAY_MONTH_STR}/{TODAY_DAY_STR}/"


# Strategy Parameters
SMA_PERIOD = 200
MISSED_TRIGGER_THRESHOLD = 1.10 # 10% above the buy level
TICKER_COLUMN_NAME = 'ticker'

# Stock list files (read from S3_COMPANY_LIST_PREFIX)
STOCK_LIST_FILES = [
    'v40_token.csv',
    'v40next_token.csv',
    'v200_token.csv'
]
SOURCE_FILE_MAPPING = {
    'v40_token.csv': 'V40',
    'v40next_token.csv': 'V40Next',
    'v200_token.csv': 'V200'
}

# --- Telegram Bot Configuration ---
# IMPORTANT: Replace with your actual Bot Token and Chat ID
TELEGRAM_BOT_TOKEN = 'YOUR_TELEGRAM_BOT_TOKEN'  # e.g., '1234567890:ABC-DEF1234ghIkl-zyx57W2v1u123ew11'
TELEGRAM_CHAT_ID = 'YOUR_TELEGRAM_CHAT_ID'    # e.g., '-123456789' for a group, '123456789' for a user

if TELEGRAM_BOT_TOKEN == 'YOUR_TELEGRAM_BOT_TOKEN' or TELEGRAM_CHAT_ID == 'YOUR_TELEGRAM_CHAT_ID':
    logger.warning("WARNING: Telegram bot token or chat ID not configured. Telegram reports will not be sent.")


# --- 3. S3 MANAGER CLASS ---
class S3Manager:
    """Manages interactions with AWS S3, using a specific profile."""

    def __init__(self, bucket_name: str, region_name: str, profile_name: Optional[str] = None):
        """
        Initializes the S3Manager with bucket and region, optionally using a specific AWS profile.

        Args:
            bucket_name: The name of the S3 bucket.
            region_name: The AWS region where the bucket is located.
            profile_name: (Optional) The name of the AWS profile to use.
                          If None, boto3 uses its default credential chain.
        """
        self.bucket_name = bucket_name
        if profile_name:
            session = boto3.Session(profile_name=profile_name, region_name=region_name)
            self.s3_client = session.client('s3')
            logger.info(f"S3Manager initialized for bucket '{bucket_name}' in region '{region_name}' using profile '{profile_name}'.")
        else:
            self.s3_client = boto3.client('s3', region_name=region_name)
            logger.info(f"S3Manager initialized for bucket '{bucket_name}' in region '{region_name}' using default credentials.")

    def download_csv_to_dataframe(self, s3_key: str) -> Optional[pd.DataFrame]:
        """
        Downloads a CSV file from S3 and returns it as a pandas DataFrame.

        Args:
            s3_key: The full S3 object key (path) to the CSV file.

        Returns:
            A pandas DataFrame, or None if the object does not exist or an error occurs.
        """
        try:
            response = self.s3_client.get_object(Bucket=self.bucket_name, Key=s3_key)
            df = pd.read_csv(io.BytesIO(response['Body'].read()))
            logger.info(f"Successfully downloaded s3://{self.bucket_name}/{s3_key}")
            return df
        except ClientError as e:
            if e.response['Error']['Code'] == 'NoSuchKey':
                logger.warning(f"Object not found in S3: s3://{self.bucket_name}/{s3_key}")
            else:
                logger.error(f"Error downloading from S3 ({s3_key}): {e}")
            return None
        except Exception as e:
            logger.error(f"Failed to read CSV from S3 ({s3_key}): {e}")
            return None

    def upload_dataframe_to_s3_csv(self, df: pd.DataFrame, s3_key: str) -> None:
        """
        Uploads a pandas DataFrame to S3 as a CSV file.

        Args:
            df: The pandas DataFrame to upload.
            s3_key: The full S3 object key (path) for the CSV file.
        """
        if df.empty:
            logger.warning(f"Attempting to upload an empty DataFrame to s3://{self.bucket_name}/{s3_key}. Skipping.")
            return

        try:
            csv_buffer = io.StringIO()
            df.to_csv(csv_buffer, index=False)
            self.s3_client.put_object(Bucket=self.bucket_name, Key=s3_key, Body=csv_buffer.getvalue())
            logger.info(f"Successfully uploaded DataFrame to s3://{self.bucket_name}/{s3_key}")
        except ClientError as e:
            logger.error(f"Error uploading DataFrame to S3 ({s3_key}): {e}")
            raise
        except Exception as e:
            logger.error(f"Failed to convert DataFrame to CSV or upload to S3 ({s3_key}): {e}")
            raise

    def upload_image_to_s3(self, image_buffer: io.BytesIO, s3_key: str) -> None:
        """
        Uploads an image (from a BytesIO buffer) to S3.

        Args:
            image_buffer: BytesIO object containing the image data.
            s3_key: The full S3 object key (path) for the image file.
        """
        try:
            image_buffer.seek(0) # Rewind the buffer to the beginning
            self.s3_client.put_object(Bucket=self.bucket_name, Key=s3_key, Body=image_buffer.getvalue(), ContentType='image/png')
            logger.info(f"Successfully uploaded image to s3://{self.bucket_name}/{s3_key}")
        except ClientError as e:
            logger.error(f"Error uploading image to S3 ({s3_key}): {e}")
            raise
        except Exception as e:
            logger.error(f"Failed to upload image to S3 ({s3_key}): {e}")
            raise

# --- 4. CORE FUNCTIONS (MODIFIED FOR S3) ---

def load_data(s3_manager: S3Manager, s3_key: str, stock_name_for_print: str) -> Optional[pd.DataFrame]:
    """
    Loads and preprocesses stock data from S3, including calculating the SMA.

    Args:
        s3_manager: An instance of S3Manager for S3 interactions.
        s3_key: The S3 key for the stock's CSV data file.
        stock_name_for_print: Stock name for logging/debugging.

    Returns:
        DataFrame with historical data and SMA, or None if loading fails.
    """
    logger.info(f"  - Loading data for {stock_name_for_print} from s3://{s3_manager.bucket_name}/{s3_key}")
    df = s3_manager.download_csv_to_dataframe(s3_key)

    if df is None:
        logger.warning(f"  - WARNING: Data file not found or could not be loaded for {stock_name_for_print} at {s3_key}. Skipping.")
        return None

    if df.empty:
        logger.warning(f"  - WARNING: Loaded data for {stock_name_for_print} from {s3_key} is empty. Skipping.")
        return None

    try:
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df.set_index('timestamp', inplace=True)
        df.index = df.index.tz_localize(None) # Make index timezone-naive
        df.sort_index(inplace=True)
        df[f'SMA_{SMA_PERIOD}'] = df['close'].rolling(window=SMA_PERIOD).mean()
        return df
    except Exception as e:
        logger.error(f"  - ERROR: Could not preprocess file for {stock_name_for_print}. Reason: {e}")
        return None

def find_v20_setups(df: pd.DataFrame, pct_change_threshold: float = 0.20) -> List[dict]:
    """Identifies all qualifying v20 setups in the historical data."""
    setups = []
    i = 0
    while i < len(df):
        if i < 1: # Cannot check previous candle if it's the first one
            i+=1
            continue

        # A setup can only start after a red candle or at the beginning
        is_previous_red = df['close'].iloc[i-1] <= df['open'].iloc[i-1]
        is_current_green = df['close'].iloc[i] > df['open'].iloc[i]

        if is_previous_red and is_current_green:
            start_index = i
            j = i
            while j + 1 < len(df) and df['close'].iloc[j+1] > df['open'].iloc[j+1]:
                j += 1
            end_index = j

            segment = df.iloc[start_index:end_index + 1]
            lowest_low = segment['low'].min()
            highest_high = segment['high'].max()

            if (highest_high - lowest_low) / lowest_low > pct_change_threshold:
                setups.append({
                    'start_date': segment.index[0],
                    'end_date': segment.index[-1],
                    'lowest_low': lowest_low,
                    'highest_high': highest_high,
                    'scan_from_index': end_index + 1
                })
            i = end_index + 1
        else:
            i += 1
    return setups

# Global list to store plots to be zipped
# This is used to collect all plots generated across different stock scans
# and then zip them together at the end.
_plots_for_zipping = []

def plot_opportunity(df: pd.DataFrame, opportunity: dict, s3_manager: S3Manager, source_file_name: str) -> None:
    """
    Generates a chart using Plotly and saves it to S3, and collects it for zipping.

    Args:
        df: DataFrame containing stock data and SMAs.
        opportunity: Dictionary containing opportunity details (Stock, Signal Type, etc.).
        s3_manager: An instance of S3Manager for S3 interactions.
        source_file_name: The original company list filename (e.g., 'v40_token.csv')
                          used to determine the source-specific folder.
    """
    stock_name = opportunity['Stock']
    signal_type = opportunity['Signal Type']
    buy_level = opportunity['Buy Level']
    sell_target = opportunity['Sell Target']
    setup_start = opportunity['Setup Start']
    setup_end = opportunity['Setup End']

    # Determine the source type folder (e.g., 'V40', 'V40Next')
    source_type_folder_name = SOURCE_FILE_MAPPING.get(source_file_name, 'UnknownSource')

    # Determine plot key and box color based on signal type
    # Construct the S3 key prefix for plots: S3_OUTPUT_BASE_PATH / buyplots / {source_type_folder} /
    # This aligns with s3_bucket/analysis/v20_strategy/{date}/buyplots/{v40 or v40next or v200}/
    if 'IMMEDIATE_TRIGGER' in signal_type or 'MISSED_TRIGGER_NEARBY' in signal_type:
        metadata_text_box_color = "#b6d8a8" # Greenish for buy related signals
        s3_plot_key_prefix = f"{S3_OUTPUT_BASE_PATH}buyplots/{source_type_folder_name}/"
        plot_file_name = f"{stock_name}_{signal_type.lower()}.png" # e.g., AAPL_immediate_trigger.png
    else: # Fallback for any other unexpected signal type
        metadata_text_box_color = "#cccccc"
        s3_plot_key_prefix = f"{S3_OUTPUT_BASE_PATH}other_plots/{source_type_folder_name}/" # A new folder for other types
        plot_file_name = f"{stock_name}_signal.png"

    s3_full_plot_key = f"{s3_plot_key_prefix}{plot_file_name}"

    # Define the path for the plot *inside the zip file*
    # This creates the desired nested folder structure within the zip: buyplots/{source_type_folder}/{filename}
    plot_path_in_zip = f"buyplots/{source_type_folder_name}/{plot_file_name}"

    # Create Plotly figure
    fig = go.Figure(data=[
        go.Candlestick(
            x=df.index,
            open=df['open'],
            high=df['high'],
            low=df['low'],
            close=df['close'],
            name='Candles'
        ),
        go.Scatter(x=df.index, y=df[f'SMA_{SMA_PERIOD}'], line=dict(color='blue', width=2), name=f'SMA {SMA_PERIOD}')
    ])

    # Add buy level and sell target lines
    fig.add_shape(
        type="line",
        x0=df.index[0], y0=buy_level,
        x1=df.index[-1], y1=buy_level,
        line=dict(color='green', dash='dash', width=2),
        name='Buy Level'
    )
    fig.add_shape(
        type="line",
        x0=df.index[0], y0=sell_target,
        x1=df.index[-1], y1=sell_target,
        line=dict(color='red', dash='dash', width=2),
        name='Sell Target'
    )

    # Highlight the setup period
    fig.add_vrect(
        x0=setup_start, x1=setup_end,
        fillcolor="LightSalmon", opacity=0.2, line_width=0,
        annotation_text="V20 Setup", annotation_position="top left"
    )

    fig.update_layout(
        title={
            'text': f"{stock_name} - V20 Setup",
            'y':0.9, 'x':0.5, 'xanchor': 'center', 'yanchor': 'top'
        },
        xaxis_title="Date",
        yaxis_title="Price",
        xaxis_rangeslider_visible=False,
        template="plotly_white",
        height=600, width=1000 # Adjust size for plots
    )

    # Add metadata text as annotation
    distance_pct = (opportunity['Current Close'] - opportunity['Buy Level']) / opportunity['Buy Level'] * 100
    metadata_text = (
        f"Stock List: {opportunity['Source File']}<br>"
        f"Signal Type: {opportunity['Signal Type']}<br>"
        f"Setup Date Range: {opportunity['Setup Start']} to {opportunity['Setup End']}<br>"
        f"Buy Level: {opportunity['Buy Level']:.2f}<br>"
        f"Sell Target: {opportunity['Sell Target']:.2f}<br>"
        f"Current Close: {opportunity['Current Close']:.2f}<br>"
        f"Distance from Buy Level: {distance_pct:.2f}%<br>"
        f"Potential Upside: {opportunity['Potential Upside %']:.2f}%<br>"
        f"SMA Filter: {opportunity['SMA Filter Status (for V200)']}"
    )

    fig.add_annotation(
        xref="paper", yref="paper",
        x=0.02, y=0.98,
        text=metadata_text,
        showarrow=False,
        font=dict(size=12, color="black"),
        bgcolor=metadata_text_box_color,
        bordercolor="black",
        borderwidth=1,
        borderpad=4,
        align="left",
        valign="top"
    )

    try:
        # Convert Plotly figure to bytes directly (requires kaleido)
        img_bytes = fig.to_image(format="png")
        img_buffer = io.BytesIO(img_bytes)

        # Upload individual plot to S3
        s3_manager.upload_image_to_s3(img_buffer, s3_full_plot_key)

        # Add to global list for zipping later, with its path inside the zip
        _plots_for_zipping.append((img_buffer, plot_path_in_zip))

    except ImportError:
        logger.error("Plotly image export requires 'kaleido'. Please install it: pip install kaleido")
    except Exception as e:
        logger.error(f"Could not save or upload image for {stock_name}. Error: {e}")


def scan_for_opportunities(df: pd.DataFrame, stock_name: str, source_file_name: str, s3_manager: S3Manager) -> List[dict]:
    """
    Scans a single stock for current V20 trading opportunities.

    Args:
        df: DataFrame containing stock data.
        stock_name: The ticker symbol of the stock.
        source_file_name: The name of the company list file (e.g., 'v40_token.csv').
        s3_manager: An instance of S3Manager for S3 interactions (used for plotting).

    Returns:
        A list of dictionaries, each representing an opportunity.
    """
    opportunities = []

    # We only care about the most recent valid setup
    setups = find_v20_setups(df)
    if not setups:
        return []

    latest_setup = setups[-1]
    buy_level = latest_setup['lowest_low']
    sell_target = latest_setup['highest_high']

    # Analyze data from the point the setup was confirmed
    scan_df = df.iloc[latest_setup['scan_from_index']:]
    if scan_df.empty:
        return []

    # Check the entire history after the setup for a trigger
    triggered_candles = scan_df[scan_df['low'] <= buy_level]

    current_candle = df.iloc[-1]
    current_close = current_candle['close']
    current_low = current_candle['low']

    metadata = {
        'Stock': stock_name,
        'Source File': SOURCE_FILE_MAPPING.get(source_file_name),
        'Setup Start': latest_setup['start_date'].date(),
        'Setup End': latest_setup['end_date'].date(),
        'Buy Level': round(buy_level, 2),
        'Sell Target': round(sell_target, 2),
        'Current Close': round(current_close, 2),
        'Distance From Buy Level %': round(((current_close - buy_level) / buy_level) * 100, 2),
        'Potential Upside %': round(((sell_target - current_close) / current_close) * 100, 2),
        'SMA Filter Status (for V200)': 'N/A'
    }

    # Condition 1: Immediate Trigger Today
    if current_low <= buy_level:
        metadata['Signal Type'] = 'IMMEDIATE_TRIGGER'
        opportunities.append(metadata)

    # Condition 2: Missed Trigger but still nearby
    elif not triggered_candles.empty:
        # Check if the current price is within the 10% threshold
        if buy_level < current_close <= buy_level * MISSED_TRIGGER_THRESHOLD:
            metadata['Signal Type'] = 'MISSED_TRIGGER_NEARBY'
            opportunities.append(metadata)

    # Apply V200 SMA Filter to any found opportunities
    if opportunities and SOURCE_FILE_MAPPING.get(source_file_name) == 'V200':
        sma_value = current_candle.get(f'SMA_{SMA_PERIOD}') # Use .get() to avoid KeyError if SMA not calculated
        if sma_value is None or pd.isna(sma_value) or buy_level >= sma_value:
            opportunities[0]['SMA Filter Status (for V200)'] = f'Not Met (Buy Level {buy_level:.2f} >= SMA {sma_value if sma_value is not None else "N/A":.2f})'
        else:
            opportunities[0]['SMA Filter Status (for V200)'] = 'Met'

    # Plot only if an opportunity is found
    if opportunities:
        plot_opportunity(df, opportunities[0], s3_manager, source_file_name) # Pass s3_manager and source_file_name for plot organization

    return opportunities

def send_telegram_document(file_content: io.BytesIO, file_name: str, caption: str, content_type: str = 'text/csv') -> bool:
    """
    Sends a document (e.g., CSV file, zip file) to a Telegram chat.

    Args:
        file_content: A BytesIO object containing the file's content.
        file_name: The name of the file to send (e.g., 'report.csv').
        caption: A text caption for the document.
        content_type: The MIME type of the file (e.g., 'text/csv', 'application/zip').

    Returns:
        True if the document was sent successfully, False otherwise.
    """
    if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID or TELEGRAM_BOT_TOKEN == 'YOUR_TELEGRAM_BOT_TOKEN':
        logger.error("Telegram bot token or chat ID not configured. Skipping Telegram document send.")
        return False

    url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendDocument"

    files = {
        'document': (file_name, file_content.getvalue(), content_type)
    }
    data = {
        'chat_id': TELEGRAM_CHAT_ID,
        'caption': caption
    }

    try:
        response = requests.post(url, data=data, files=files)
        response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)
        logger.info(f"Telegram document '{file_name}' sent successfully.")
        return True
    except requests.exceptions.RequestException as e:
        logger.error(f"Failed to send Telegram document '{file_name}': {e}")
        if hasattr(e, 'response') and e.response is not None:
            logger.error(f"Telegram API Response: {e.response.text}")
        return False
    except Exception as e:
        logger.error(f"An unexpected error occurred while sending Telegram document: {e}")
        return False


# --- 5. MAIN EXECUTION BLOCK ---

if __name__ == '__main__':
    # Clear the global list of plots at the beginning of execution
    _plots_for_zipping = []

    all_opportunities = []

    # Initialize S3Manager with the specified profile
    s3_manager = S3Manager(AWS_BUCKET_NAME, AWS_REGION, AWS_PROFILE_NAME)

    logger.info("--- Starting V20 Daily Opportunity Scanner ---")
    logger.info(f"Reading input data from S3 path: s3://{AWS_BUCKET_NAME}/{S3_INPUT_STOCK_DATA_PATH}")
    logger.info(f"Saving results to S3 path: s3://{AWS_BUCKET_NAME}/{S3_OUTPUT_BASE_PATH}")

    # Consolidate all tickers from all files into one list (read from S3)
    all_tickers_to_scan = {} # Using dict to store ticker and its source file
    for list_file_name in STOCK_LIST_FILES:
        stock_list_s3_key = f"{S3_COMPANY_LIST_PREFIX}{list_file_name}"
        logger.info(f"  - Attempting to load stock list from s3://{AWS_BUCKET_NAME}/{stock_list_s3_key}")

        stocks_df = s3_manager.download_csv_to_dataframe(stock_list_s3_key)

        if stocks_df is None or stocks_df.empty:
            logger.warning(f"  - WARNING: Stock list file not found or empty at s3://{AWS_BUCKET_NAME}/{stock_list_s3_key}. Skipping this list.")
            continue # Skip to the next list file if this one fails

        for ticker in stocks_df[TICKER_COLUMN_NAME].dropna().unique():
            if ticker not in all_tickers_to_scan:
                all_tickers_to_scan[ticker] = list_file_name

    if not all_tickers_to_scan:
        logger.fatal("FATAL ERROR: No unique stocks found to scan from any company list. Exiting.")
        exit(1) # Exit if no essential stock list can be loaded

    logger.info(f"Found a total of {len(all_tickers_to_scan)} unique stocks to scan.")

    # Loop through the consolidated list of tickers
    for i, (ticker, source_file) in enumerate(all_tickers_to_scan.items()):
        logger.info(f"  [{i+1}/{len(all_tickers_to_scan)}] Analyzing: {ticker}")

        # Construct S3 key for individual stock data
        data_s3_key = f"{S3_INPUT_STOCK_DATA_PATH}{ticker}.csv"
        df = load_data(s3_manager, data_s3_key, ticker)

        if df is not None and not df.empty:
            # Pass s3_manager AND source_file_name to scan_for_opportunities
            opportunities = scan_for_opportunities(df, ticker, source_file, s3_manager)
            if opportunities:
                all_opportunities.extend(opportunities)

    logger.info("\n--- Scan Complete ---")

    # --- 6. FINAL REPORTING (MODIFIED FOR S3 & TELEGRAM) ---
    if all_opportunities:
        report_df = pd.DataFrame(all_opportunities)
        # Reorder columns for better readability
        cols_order = ['Stock', 'Source File', 'Signal Type', 'Current Close',
                      'Buy Level', 'Distance From Buy Level %',
                      'Sell Target', 'Potential Upside %', 'SMA Filter Status (for V200)',
                      'Setup Start', 'Setup End']
        report_df = report_df.reindex(columns=cols_order)

        # Sort by Signal Type and Distance for priority
        report_df['sort_key'] = report_df['Signal Type'].apply(lambda x: {'IMMEDIATE_TRIGGER':0, 'MISSED_TRIGGER_NEARBY':1}.get(x, 99))
        report_df = report_df.sort_values(by=['sort_key', 'Distance From Buy Level %'], ascending=[False, True])
        report_df = report_df.drop(columns=['sort_key'])

        # Save report CSV to S3
        s3_manager.upload_dataframe_to_s3_csv(report_df, S3_OUTPUT_CSV_KEY)

        logger.info(f"\nSUCCESS: Found {len(report_df)} opportunities.")
        logger.info("Detailed report saved to S3 at:")
        logger.info(f"s3://{AWS_BUCKET_NAME}/{S3_OUTPUT_BASE_PATH}")
        logger.info("\n" + "="*28 + " OPPORTUNITY SUMMARY " + "="*28)
        logger.info("\n" + report_df.to_string())

        # --- Send CSV report via Telegram ---
        telegram_csv_buffer = io.BytesIO()
        report_df.to_csv(telegram_csv_buffer, index=False)
        telegram_csv_buffer.seek(0) # Rewind the buffer

        csv_caption = f"V20 Daily Opportunity Report for {TODAY_DATE_STR}\n\n" \
                      f"Found {len(report_df)} opportunities.\n" \
                      f"See attached CSV for full details."

        send_telegram_document(
            file_content=telegram_csv_buffer,
            file_name=f"v20_opportunities_{TODAY_DATE_STR}.csv",
            caption=csv_caption,
            content_type='text/csv'
        )

        # --- Send plots as a zip file via Telegram ---
        if _plots_for_zipping:
            zip_buffer = io.BytesIO()
            with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
                for plot_img_buffer, plot_path_in_zip in _plots_for_zipping:
                    # Rewind the image buffer before writing to zip
                    plot_img_buffer.seek(0)
                    # Add the plot to the zip file with its intended internal path
                    zf.writestr(plot_path_in_zip, plot_img_buffer.getvalue())
            zip_buffer.seek(0) # Rewind the zip buffer

            zip_file_name = f"v20_plots_{TODAY_DATE_STR}.zip"
            zip_caption = f"V20 Daily Opportunity Plots for {TODAY_DATE_STR}\n\n" \
                          f"Attached are {len(_plots_for_zipping)} plots for the identified opportunities, organized by source list."

            send_telegram_document(
                file_content=zip_buffer,
                file_name=zip_file_name,
                caption=zip_caption,
                content_type='application/zip'
            )
        else:
            logger.info("No plots were generated to send via Telegram.")

    else:
        logger.info("\nNo V20 opportunities found for the given date.")