<a href="https://colab.research.google.com/github/mikalainis/AI-Agent-Capstone/blob/main/Trader_Unit.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -q google-genai ddgs alpaca-py nest_asyncio slack_sdk schedule

In [None]:
import os
import json
import logging
import sys
import time
import schedule
import re
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Callable, Optional

import pandas as pd
import nest_asyncio

# --- THIRD PARTY IMPORTS ---
from google import genai
from google.genai import types
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

from alpaca.trading.client import TradingClient
from alpaca.trading.requests import MarketOrderRequest
from alpaca.trading.enums import OrderSide, TimeInForce
from alpaca.data.historical import StockHistoricalDataClient, CryptoHistoricalDataClient
from alpaca.data.requests import StockBarsRequest, CryptoBarsRequest
from alpaca.data.timeframe import TimeFrame
from alpaca.data.enums import DataFeed

# Patch for Colab/Jupyter compatibility
nest_asyncio.apply()

# --- 1. LOGGING CONFIGURATION ---
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger("DarwinianSwarm")

# --- 2. CONFIGURATION MANAGEMENT ---
@dataclass(frozen=True)
class Config:
    GOOGLE_KEY: str
    ALPACA_KEY: str
    ALPACA_SECRET: str
    SLACK_TOKEN: str
    SLACK_CHANNEL: str
    DEFAULT_QTY: float = 1.0
    IS_PAPER: bool = True

    @classmethod
    def load(cls) -> "Config":
        try:
            from google.colab import userdata
            get_secret = userdata.get
        except ImportError:
            get_secret = os.getenv

        secrets = {
            "GOOGLE_KEY": get_secret("GOOGLE_API_KEY"),
            "ALPACA_KEY": get_secret("ALPACA_API_KEY"),
            "ALPACA_SECRET": get_secret("ALPACA_SECRET"),
            "SLACK_TOKEN": get_secret("SLACK_BOT_TOKEN"),
            "SLACK_CHANNEL": "D0A1C7TBB5E", # Replace with your Channel ID
            "DEFAULT_QTY": float(os.getenv("DEFAULT_QTY", "1.0"))
        }

        # Validate Critical Keys
        missing = [k for k, v in secrets.items() if (not v or v == "YOUR_KEY") and k != "DEFAULT_QTY"]
        if missing:
            logger.critical(f"Missing Critical Secrets: {missing}")
            sys.exit(1)

        return cls(**secrets)

# --- 3. STATE MANAGEMENT ---
@dataclass
class MarketState:
    ticker: str
    news_summary: str = ""
    decision: str = "HOLD"
    confidence: int = 0
    reasoning: str = ""
    current_price: float = 0.0
    current_rsi: float = 0.0
    asset_class: str = "STOCK"
    position_qty: float = 0.0

# --- 4. CORE ANALYST AGENT ---
class DarwinianSwarm:
    CRYPTO_PAIRS = {"BTCUSD", "ETHUSD", "SOLUSD", "LINKUSD", "DOGEUSD"}

    def __init__(self, ticker: str, config: Config, existing_qty: Optional[float] = None):
        self.config = config

        # Initialize Clients
        self.gemini = genai.Client(api_key=config.GOOGLE_KEY)
        self.alpaca_trade = TradingClient(config.ALPACA_KEY, config.ALPACA_SECRET, paper=config.IS_PAPER)
        self.alpaca_stock_data = StockHistoricalDataClient(config.ALPACA_KEY, config.ALPACA_SECRET)
        self.alpaca_crypto_data = CryptoHistoricalDataClient(config.ALPACA_KEY, config.ALPACA_SECRET)
        self.slack = WebClient(token=config.SLACK_TOKEN)

        # Detect Position (if not provided by Portfolio Audit)
        if existing_qty is None:
            try:
                pos = self.alpaca_trade.get_open_position(ticker.upper())
                detected_qty = float(pos.qty)
            except Exception:
                detected_qty = 0.0
        else:
            detected_qty = existing_qty

        self.state = MarketState(ticker=ticker.upper(), position_qty=detected_qty)
        self.state.asset_class = "CRYPTO" if self.state.ticker in self.CRYPTO_PAIRS else "STOCK"

    def _notify_slack_simple(self, message: str):
        """Simple text fallback"""
        try:
            self.slack.chat_postMessage(channel=self.config.SLACK_CHANNEL, text=message)
        except SlackApiError as e:
            logger.error(f"Slack Notification Failed: {e.response['error']}")

    # --- PIPELINE STEP 1: TECHNICALS ---
    def check_technicals(self) -> bool:
        logger.info(f"[{self.state.ticker}] Step 1: Checking Technicals...")
        end_time = datetime.now(timezone.utc)
        start_time = end_time - timedelta(days=45)

        try:
            if self.state.asset_class == "CRYPTO":
                req = CryptoBarsRequest(symbol_or_symbols=self.state.ticker, timeframe=TimeFrame.Hour, start=start_time, limit=200)
                bars = self.alpaca_crypto_data.get_crypto_bars(req)
            else:
                req = StockBarsRequest(symbol_or_symbols=self.state.ticker, timeframe=TimeFrame.Hour, start=start_time, limit=200, feed=DataFeed.IEX)
                bars = self.alpaca_stock_data.get_stock_bars(req)

            if not bars.data:
                logger.warning(f"No market data found for {self.state.ticker}.")
                return False

            df = bars.df
            if isinstance(df.index, pd.MultiIndex): df = df.reset_index()
            df.columns = [c.lower() for c in df.columns]

            delta = df['close'].diff()
            gain = (delta.where(delta > 0, 0)).fillna(0)
            loss = (-delta.where(delta < 0, 0)).fillna(0)
            avg_gain = gain.ewm(com=13, min_periods=14).mean()
            avg_loss = loss.ewm(com=13, min_periods=14).mean()
            rs = avg_gain / avg_loss
            df['rsi'] = 100 - (100 / (1 + rs))

            self.state.current_price = float(df['close'].iloc[-1])
            self.state.current_rsi = float(df['rsi'].iloc[-1])
            return True
        except Exception as e:
            logger.error(f"Technical Analysis Error: {e}")
            return False

    # --- PIPELINE STEP 2: NEWS ---
    def fetch_news_context(self) -> bool:
        logger.info(f"[{self.state.ticker}] Step 2: Fetching News...")
        try:
            today = datetime.now().strftime("%Y-%m-%d")
            prompt = f"Find the latest financial news and analyst ratings for {self.state.ticker} as of {today}. Summarize top 3 headlines."

            response = self.gemini.models.generate_content(
                model="gemini-2.5-flash-lite",
                contents=prompt,
                config=types.GenerateContentConfig(
                    tools=[types.Tool(google_search=types.GoogleSearch())],
                    response_mime_type="text/plain"
                )
            )
            self.state.news_summary = response.text if response.text else "No specific news found."
            return True
        except Exception as e:
            logger.error(f"News Fetch Error: {e}")
            return False

    # --- PIPELINE STEP 3: HYBRID AI ANALYSIS ---
    def analyze_sentiment(self) -> bool:
        logger.info(f"[{self.state.ticker}] Step 3: Hybrid Analysis (News + RSI)...")
        response_schema = {
            "type": "OBJECT",
            "properties": {
                "decision": {"type": "STRING", "enum": ["BUY", "SELL", "HOLD"]},
                "confidence": {"type": "INTEGER"},
                "reasoning": {"type": "STRING"}
            }
        }

        context_str = f"You currently own {self.state.position_qty} units." if self.state.position_qty > 0 else "You do NOT own this asset."

        prompt = f"""
        Act as a Quantitative Portfolio Manager.

        1. ASSET: {self.state.ticker}
        2. TECHNICAL DATA:
           - Current Price: ${self.state.current_price:.2f}
           - RSI (14-period): {self.state.current_rsi:.2f}
           (Note: RSI > 70 is Overbought, RSI < 30 is Oversold)
        3. PORTFOLIO CONTEXT: {context_str}
        4. NEWS CONTEXT: {self.state.news_summary}

        TASK: Synthesize News and Technicals to make a decision.

        OUTPUT: JSON with decision, confidence (0-100), and reasoning.
        """
        try:
            response = self.gemini.models.generate_content(
                model="gemini-2.5-flash-lite",
                contents=prompt,
                config=types.GenerateContentConfig(
                    response_mime_type="application/json",
                    response_schema=response_schema
                )
            )
            data = json.loads(response.text)
            self.state.decision = data["decision"]
            self.state.confidence = data["confidence"]
            self.state.reasoning = data["reasoning"]
            logger.info(f"   Analyst Decision: {self.state.decision} ({self.state.confidence}%)")
            return True
        except Exception as e:
            logger.error(f"Analysis Error: {e}")
            return False

    # --- EXECUTION ENGINE ---
    def execute_strategy(self, verbose: bool = False):
        # 1. HOLD Logic
        if self.state.decision == "HOLD":
            logger.info(f"[{self.state.ticker}] Strategy: HOLD.")
            if verbose:
                print(f"   üîé Monitoring {self.state.ticker} (RSI: {self.state.current_rsi:.1f}). No Action.")
            return

        if self.state.confidence < 75:
            logger.info(f"[{self.state.ticker}] Low Confidence ({self.state.confidence}%).")
            return

        # 2. RSI Safety Gate
        if self.state.decision == "BUY" and self.state.current_rsi > 75:
             logger.warning(f"üõë HARD STOP: RSI {self.state.current_rsi:.1f} is dangerously high. Trade blocked.")
             return

        # --- INTERACTIVE COLAB APPROVAL GATE ---
        print("\n" + "="*50)
        print(f"ü§ñ AI PROPOSAL: {self.state.decision} {self.state.ticker}")
        print("="*50)
        print(f"   üí∞ Price:     ${self.state.current_price:.2f}")
        print(f"   üìâ RSI:       {self.state.current_rsi:.2f}")
        print(f"   üß† Confidence: {self.state.confidence}%")
        print(f"   üìù Reasoning:  {self.state.reasoning}")
        print("-" * 50)

        # 1. Ask for Approval
        approval = input(f"üëâ Do you want to proceed with {self.state.decision}? (y/n): ").lower().strip()

        if approval != 'y':
            print("üõë Action Cancelled by User.")
            self._notify_slack_simple(f"üõë {self.state.ticker} Trade Cancelled by User.")
            return

        # 2. Ask for Quantity
        default_q = self.config.DEFAULT_QTY
        qty_input = input(f"üì¶ Enter Quantity to {self.state.decision} (Default: {default_q}): ").strip()

        try:
            final_qty = float(qty_input) if qty_input else default_q
        except ValueError:
            print("‚ö†Ô∏è Invalid number. Using default.")
            final_qty = default_q

        # 3. Sanity Check for Selling
        if self.state.decision == "SELL":
            if self.state.position_qty == 0:
                print("‚ùå You don't own this asset. Cannot sell.")
                return
            if final_qty > self.state.position_qty:
                print(f"‚ö†Ô∏è Reduced Sell Qty to Max Owned ({self.state.position_qty})")
                final_qty = self.state.position_qty

        # 4. Execute
        print(f"üöÄ Executing Order: {self.state.decision} {final_qty} {self.state.ticker}...")
        try:
            order_data = MarketOrderRequest(
                symbol=self.state.ticker,
                qty=final_qty,
                side=OrderSide.BUY if self.state.decision == "BUY" else OrderSide.SELL,
                time_in_force=TimeInForce.GTC
            )
            order = self.alpaca_trade.submit_order(order_data=order_data)
            print(f"‚úÖ Order Filled! {self.state.decision} {final_qty} {self.state.ticker} @ ~${self.state.current_price:.2f}")

            # --- 5. RICH SLACK NOTIFICATION (Summary + Analysis) ---
            action_emoji = "üü¢" if self.state.decision == "BUY" else "üî¥"

            blocks = [
                {
                    "type": "header",
                    "text": {"type": "plain_text", "text": f"{action_emoji} Trade Executed: {self.state.ticker}"}
                },
                {
                    "type": "section",
                    "fields": [
                        {"type": "mrkdwn", "text": f"*Action:*\n{self.state.decision} {final_qty}"},
                        {"type": "mrkdwn", "text": f"*Price:*\n${self.state.current_price:.2f}"},
                        {"type": "mrkdwn", "text": f"*RSI:*\n{self.state.current_rsi:.2f}"},
                        {"type": "mrkdwn", "text": f"*Confidence:*\n{self.state.confidence}%"}
                    ]
                },
                {
                    "type": "section",
                    "text": {"type": "mrkdwn", "text": f"*Analysis Summary:*\n{self.state.reasoning}"}
                },
                {
                    "type": "context",
                    "elements": [{"type": "mrkdwn", "text": "üöÄ Executed via DarwinianSwarm (Colab)"}]
                }
            ]

            self.slack.chat_postMessage(channel=self.config.SLACK_CHANNEL, text="Trade Executed", blocks=blocks)

        except Exception as e:
            error_msg = f"‚ùå Order Failed: {e}"
            print(error_msg)
            self._notify_slack_simple(error_msg)

    def run(self, verbose: bool = False):
        if not self.check_technicals(): return
        if not self.fetch_news_context(): return
        if not self.analyze_sentiment(): return
        self.execute_strategy(verbose=verbose)

# --- 5. PORTFOLIO MANAGER ---
class PortfolioAudit:
    def __init__(self, config: Config):
        self.config = config
        self.alpaca = TradingClient(config.ALPACA_KEY, config.ALPACA_SECRET, paper=config.IS_PAPER)

    def scan(self, verbose: bool = False):
        print(f"\nüïµÔ∏è‚Äç‚ôÇÔ∏è STARTING PORTFOLIO AUDIT")
        try:
            positions = self.alpaca.get_all_positions()
        except Exception as e:
            logger.error(f"Could not fetch positions: {e}")
            return

        if not positions:
            print("   ‚ö†Ô∏è Portfolio is empty.")
            return

        print(f"   üìâ Found {len(positions)} active positions.")
        for p in positions:
            print(f"\nüëâ Analyzing {p.symbol} (Owned: {p.qty})...")
            bot = DarwinianSwarm(ticker=p.symbol, config=self.config, existing_qty=float(p.qty))
            bot.run(verbose=verbose) # Will trigger interactive inputs if Actionable
            time.sleep(1)

        print("\n‚úÖ Portfolio Audit Complete.")

# --- MAIN EXECUTION ---
if __name__ == "__main__":
    # Load Config from Colab Userdata
    conf = Config.load()

    print("\nSelect Mode:")
    print("1. Sniper Mode (Monitor single ticker)")
    print("2. Portfolio Audit (Scan all positions)")

    mode = input("Enter choice (1 or 2): ").strip()

    if mode == "2":
        auditor = PortfolioAudit(conf)
        print("üöÄ Starting Scan...")
        auditor.scan(verbose=True)

    else:
        target = input("Enter Ticker (e.g. NVDA): ").strip().upper() or "NVDA"
        print(f"üöÄ Analyzing {target}...")
        bot = DarwinianSwarm(target, conf)
        bot.run(verbose=True)

    print("\nüèÅ Process Finished.")