In [None]:
import os
import yaml
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
from typing import Dict, Any
from google_sheet_api import GoogleSheetsUploader

# ----------------------------- Configuration and Directory Setup -----------------------------
BASE_DIR = os.getcwd()
CONFIG_PATH = os.path.join(BASE_DIR, "config.yaml")
PLOTS_DIR = os.path.join(BASE_DIR, "plots")
CREDENTIAL_PATH = os.path.join(BASE_DIR, "credential_google_sheets.json")

# ----------------------------- Utility: Clean Up Existing Plot -----------------------------
def cleanup_existing_plots(plot_name: str):
    os.makedirs(PLOTS_DIR, exist_ok=True)
    plot_path = os.path.join(PLOTS_DIR, plot_name)
    if os.path.exists(plot_path):
        os.remove(plot_path)

# ----------------------------- Yahoo Finance Data Fetcher -----------------------------
class YahooFinanceDataFetcher:
    def __init__(self, config_file: str) -> None:
        with open(config_file, "r") as file:
            self.config: Dict[str, Any] = yaml.safe_load(file)

        self.symbol_map = self.config.get("symbols_yfinance", {})
        self.coeff_map = self.config.get("symbol_coefficients", {})
        self.daily_period = self.config.get("daily_period", "10y")
        self.daily_interval = self.config.get("daily_interval", "1d")
        self.std_multiplier = float(self.config.get("std_multiplier", 1.97))

    def fetch_data(self, ticker: str) -> pd.DataFrame:
        try:
            data = yf.download(ticker, period=self.daily_period, interval=self.daily_interval, progress=False)
            if data.empty:
                print(f"⚠️ No data for '{ticker}'.")
            return data
        except Exception as e:
            print(f"❌ Error fetching '{ticker}': {e}")
            return pd.DataFrame()

    def clean_data(self, data: pd.DataFrame, symbol: str) -> pd.DataFrame:
        if data.empty:
            return data

        data = data.reset_index()
        if isinstance(data.columns, pd.MultiIndex):
            data.columns = data.columns.droplevel(1)

        data.rename(columns={"Date": "Datetime", "datetime": "Datetime"}, inplace=True)
        data["Datetime"] = pd.to_datetime(data["Datetime"], errors="coerce", utc=True)
        data["Datetime"] = data["Datetime"].dt.strftime("%Y-%m-%d %H:%M:%S")

        numeric_cols = ["Open", "High", "Low", "Close", "Volume"]
        for col in numeric_cols:
            if col in data.columns:
                data[col] = pd.to_numeric(data[col], errors="coerce")

        data["Symbol"] = symbol
        return data[["Symbol", "Datetime"] + [col for col in numeric_cols if col in data.columns]]

    def process_all_symbols(self) -> Dict[str, pd.DataFrame]:
        symbol_data = {}
        for symbol, ticker in self.symbol_map.items():
            print(f"📈 Fetching {symbol} ({ticker})...")
            raw_data = self.fetch_data(ticker)
            if not raw_data.empty:
                symbol_data[symbol] = self.clean_data(raw_data, symbol)
        if not symbol_data:
            print("⚠️ No data fetched for any symbols.")
        return symbol_data

# ----------------------------- Exploratory Data Analysis (EDA) -----------------------------
def perform_eda(df: pd.DataFrame) -> pd.DataFrame:
    if df.empty:
        print("⚠️ DataFrame is empty. Skipping EDA summary.")
        return pd.DataFrame()
    df["Datetime"] = pd.to_datetime(df["Datetime"], errors="coerce")
    summary = df.groupby("Symbol")["Datetime"].agg(Start_Date="min", End_Date="max")
    summary["Duration_Days"] = (summary["End_Date"] - summary["Start_Date"]).dt.days
    return summary

# ----------------------------- Plotting Price Gain Analysis -----------------------------
def plot_price_gain(data, symbol, avg, std, upper_1std, lower_1std, upper_custom_std, lower_custom_std, std_multiplier):
    sns.set_theme(style="whitegrid")
    plt.figure(figsize=(14, 8))

    sns.scatterplot(data=data[data['Price_Gain_Percentage'] >= 0], x='Date', y='Price_Gain_Percentage',
                    label='Gain ≥ 0%', color='green', alpha=0.6, s=10)
    sns.scatterplot(data=data[data['Price_Gain_Percentage'] < 0], x='Date', y='Price_Gain_Percentage',
                    label='Gain < 0%', color='red', alpha=0.6, s=10)

    thresholds = [
        ("Avg Gain", avg, 'blue'),
        ("+1 Std", upper_1std, 'purple'),
        ("-1 Std", lower_1std, 'orange'),
        (f"+{std_multiplier} Std", upper_custom_std, 'darkgreen'),
        (f"-{std_multiplier} Std", lower_custom_std, 'darkred')
    ]
    for label, val, color in thresholds:
        plt.axhline(val, color=color, linestyle='--', linewidth=1.2, label=f"{label}: {val}%")

    plt.xlabel('Date', fontsize=12)
    plt.ylabel('Price Gain Percentage (%)', fontsize=12)
    plt.title(f'{symbol} - 365-Day Price Gain % Over Time', fontsize=16, weight='bold')
    plt.legend(loc='upper center')
    plt.tight_layout()

    plot_filename = f"{symbol}.jpg"
    cleanup_existing_plots(plot_filename)
    plot_path = os.path.join(PLOTS_DIR, plot_filename)
    print(f"🖼️ Saving plot -> {plot_path}")
    plt.savefig(plot_path, format='jpg', dpi=300)
    plt.close()

# ----------------------------- 365-Day Price Gain Calculation -----------------------------
def analyze_365_day_gain(data: pd.DataFrame, symbol: str, std_multiplier: float):
    data = data.copy()
    data['Date'] = pd.to_datetime(data['Datetime'], errors='coerce')
    data.sort_values('Date', inplace=True)

    if 'Close' in data.columns:
        data['Price'] = data['Close']
    elif 'Price' in data.columns:
        data['Price'] = data['Price']
    else:
        print(f"⚠️ Skipping {symbol}: Missing 'Close' or 'Price' column.")
        return data, None, None, None, None, None

    if data['Price'].isnull().all():
        print(f"⚠️ Skipping {symbol}: 'Price' column is entirely null.")
        return data, None, None, None, None, None

    latest_row = data.dropna(subset=['Price']).iloc[-1]
    latest_date = latest_row['Date'].date()
    latest_price = round(latest_row['Price'], 2)

    data['Price_365_Days_Later'] = data['Price'].shift(-365)
    data['Price_Gain_Percentage'] = ((data['Price_365_Days_Later'] - data['Price']) / data['Price']) * 100
    data.dropna(subset=['Price_Gain_Percentage'], inplace=True)
    data['Price_Gain_Percentage'] = data['Price_Gain_Percentage'].round(2)

    avg = round(data['Price_Gain_Percentage'].mean(), 2)
    std = round(data['Price_Gain_Percentage'].std(), 2)
    upper_1std = round(avg + std, 2)
    lower_1std = round(avg - std, 2)
    upper_custom_std = round(avg + std_multiplier * std, 2)
    lower_custom_std = round(avg - std_multiplier * std, 2)

    plot_price_gain(data, symbol, avg, std, upper_1std, lower_1std, upper_custom_std, lower_custom_std, std_multiplier)

    return data, avg, upper_custom_std, lower_custom_std, latest_date, latest_price

# ----------------------------- Negative Gain Date Distribution Analysis -----------------------------
def analyze_negative_gain_distribution(symbol_data: Dict[str, pd.DataFrame]):
    print("\n📉 Analyzing negative gain distributions across symbols...")
    worst_rows = []
    dist_list = []

    for symbol, df in symbol_data.items():
        if 'Price_Gain_Percentage' not in df.columns:
            continue
        worst = df.loc[df['Price_Gain_Percentage'].idxmin()]
        worst_rows.append({
            "Symbol": symbol,
            "Worst Gain (%)": worst['Price_Gain_Percentage'],
            "Date of Worst Gain": worst['Date'].date()
        })
        dist_list.append(pd.DataFrame({
            "Symbol": symbol,
            "Negative Gain Dates": df[df['Price_Gain_Percentage'] < 0]['Date'].dt.date
        }))

    worst_df = pd.DataFrame(worst_rows).sort_values("Worst Gain (%)")
    print("\n🔍 Worst Gain Summary:")
    print(worst_df)
    return worst_df

# ----------------------------- Main Execution -----------------------------
if __name__ == "__main__":
    fetcher = YahooFinanceDataFetcher(CONFIG_PATH)
    symbol_data = fetcher.process_all_symbols()

    full_df = pd.concat(symbol_data.values(), ignore_index=True)
    eda_summary = perform_eda(full_df)
    print("\n📊 EDA Summary:")
    print(eda_summary)

    final_summary = []
    analyzed_data = {}
    std_multiplier = fetcher.std_multiplier

    for symbol, df in symbol_data.items():
        annotated_df, avg, upper_custom_std, lower_custom_std, latest_dt, latest_price = analyze_365_day_gain(
            df, symbol, std_multiplier)

        if avg is None or latest_dt is None:
            continue

        analyzed_data[symbol] = annotated_df
        coeff = fetcher.coeff_map.get(symbol)
        max_price = round(df["Close"].max(), 2) if "Close" in df.columns else None

        final_summary.append({
            "Symbol": symbol,
            "Date": latest_dt,
            "Price": latest_price,
            "Max Price": max_price,
            "Std": lower_custom_std * -1,
            "Coefficient": coeff
        })

    final_df = pd.DataFrame(final_summary)
    print("\n✅ Final Summary:")
    print(final_df)

    analyze_negative_gain_distribution(analyzed_data)

    try:
        print("\n📤 Uploading to Google Sheets...")
        uploader = GoogleSheetsUploader(CREDENTIAL_PATH, "Financial Report - Indonesia")
        uploader.upload_dataframe(final_df, "Overview")
        print("✅ Upload successful!")
    except Exception as e:
        print(f"❌ Upload failed: {e}")


📈 Fetching AUS200 (^AXJO)...
