## Notebook 2: Solana Client Setup and Wallet Analysis
### This notebook handles all Solana blockchain interactions

In [3]:
# Import dependencies and setup
import requests
import asyncio
import aiohttp
import json
import time
from datetime import datetime, timezone, timedelta
from typing import List, Dict, Tuple, Set, Iterable, Optional, Any, Callable
import pandas as pd
import os
import logging
from collections import defaultdict
import inspect
from dataclasses import dataclass, asdict
from database import DatabaseManager
import sqlite3
import psycopg2
import os
import asyncio
from pathlib import Path
import joblib
from dune_client.client import DuneClient
from dune_client.query import QueryBase
from dateutil.relativedelta import relativedelta


logger = logging.getLogger(__name__)

# Solana specific imports
from dotenv import load_dotenv
from solana.rpc.async_api import AsyncClient
from solana.rpc.commitment import Commitment
from solana.rpc.types import TokenAccountOpts
from solders.pubkey import Pubkey, Pubkey as PublicKey

In [4]:
# Load environment variables from .env file
load_dotenv()

API_KEY = os.getenv("HELIUS_API_KEY")
BASE_URL = f"https://mainnet.helius-rpc.com/?api-key={API_KEY}"
birdeye_key = os.getenv("BIRDEYE_API_KEY")
DUNE_API_KEY = os.getenv("DUNE_API_KEY")
query_id = 5668844

# Check if the API key is loaded successfully
print("Helius API Key loaded successfully:", API_KEY is not None)
print("Bird eye API Key loaded successfully:", birdeye_key is not None)
print("Dune API Key loaded successfully:", DUNE_API_KEY is not None)

BIRDEYE_URL = "https://public-api.birdeye.so/defi/v2/tokens/new_listing?meme_platform_enabled=true"


Helius API Key loaded successfully: True
Bird eye API Key loaded successfully: True
Dune API Key loaded successfully: True


In [5]:
# Define time range (past 14 days, excluding today)
start_time = int((datetime.now() - relativedelta(days=14)).timestamp())
end_time = int((datetime.now() - relativedelta(days=1)).timestamp())

In [6]:
class SolanaAlphaClient:
    """Async Solana client for alpha detection and wallet analysis"""

    def __init__(self, rpc_url: str = BASE_URL):
        self.rpc_url = rpc_url
        self.headers = {"Content-Type": "application/json"}

    async def make_rpc_call(self, method: str, params: List[Any]) -> Dict[str, Any]:
        """Async helper to make RPC calls"""
        payload = {
            "jsonrpc": "2.0",
            "id": "1",
            "method": method,
            "params": params
        }

        async with aiohttp.ClientSession() as session:
            try:
                async with session.post(self.rpc_url, json=payload, headers=self.headers, timeout=10) as response:
                    response.raise_for_status()
                    return await response.json()
            except aiohttp.ClientError as e:
                print(f"❌ RPC call error: {e}")
                return {"error": str(e)}

    async def test_connection(self) -> bool:
        """Async test for Solana RPC health"""
        response = await self.make_rpc_call("getHealth", [])
        if response.get("result") == "ok":
            print("✅ Solana RPC connection successful")
            return True
        else:
            print(f"❌ Solana RPC connection failed: {response.get('error', 'Unknown error')}")
            return False

    async def get_account_info(self, wallet_address: str) -> Dict[str, Any]:
        """Async fetch for wallet account info"""
        response = await self.make_rpc_call("getAccountInfo", [wallet_address])
        if "error" in response:
            print(f"❌ Error getting account info for {wallet_address}: {response['error']}")
            return {"balance": 0, "exists": False, "error": response["error"]}

        result = response.get("result", {})
        account_info = result.get("value")

        if not account_info:
            return {"balance": 0, "exists": False}

        balance_sol = account_info["lamports"] / 1_000_000_000

        return {
            "balance": balance_sol,
            "exists": True,
            "owner": account_info["owner"],
            "executable": account_info.get("executable", False),
            "lamports": account_info["lamports"]
        }



In [7]:
async def test_solana_setup():
    """Test the async Solana client setup"""
    print("🧪 Testing Solana client setup...")

    client = SolanaAlphaClient(BASE_URL)

    connection_ok = await client.test_connection()

    if not connection_ok:
        print("❌ Connection test failed")
    else:
        print("✅ Connection test successful")
        test_wallet = "Fiiu1ZnaEwVcvcTxazkR14A1Va6K6VbJfoEiNVMbfTw5"
        account_info = await client.get_account_info(test_wallet)
        print(f"📊 Test wallet info: {account_info}")

# Run the test
await test_solana_setup()


🧪 Testing Solana client setup...
✅ Solana RPC connection successful
✅ Connection test successful
📊 Test wallet info: {'balance': 0.012267414, 'exists': True, 'owner': '11111111111111111111111111111111', 'executable': False, 'lamports': 12267414}


In [8]:
class TransactionAnalyzer:
    """Analyze Solana transactions for wallet behavior patterns"""

    def __init__(self, client: SolanaAlphaClient):
        self.client = client

    async def get_wallet_transactions(self, wallet_address: str, limit: int = 100) -> List[Dict[str, Optional[any]]]:
        """Fetch recent successful transactions for a wallet using RPC call"""
        try:
            params = [
                wallet_address,
                {
                    "limit": limit,
                    "commitment": "confirmed"
                }
            ]
            response = await self.client.make_rpc_call("getSignaturesForAddress", params)

            if "error" in response:
                print(f"❌ RPC error for {wallet_address}: {response['error']}")
                return []

            signature_list = response.get("result", [])
            if not signature_list:
                print(f"⚠️ No transactions found for {wallet_address[:8]}")
                return []

            return self._parse_successful_transactions(signature_list, wallet_address)

        except Exception as e:
            print(f"❌ Failed to fetch transactions for {wallet_address}: {e}")
            return []

    def _parse_successful_transactions(self, signature_list: List[Dict], wallet_address: str) -> List[Dict]:
        """Extract successful transaction data"""
        transactions = [
            {
                "signature": sig["signature"],
                "slot": sig["slot"],
                "block_time": sig.get("blockTime"),
                "timestamp": datetime.fromtimestamp(sig["blockTime"]) if sig.get("blockTime") else None
            }
            for sig in signature_list if sig.get("err") is None
        ]

        print(f"📥 Parsed {len(transactions)} successful transactions for {wallet_address[:8]}")
        return transactions

    async def analyze_wallet_performance(self, wallet_address: str, days: int = 30) -> Dict[str, any]:
        """Analyze wallet activity over a given time window"""
        print(f"🔍 Starting performance analysis for {wallet_address[:8]} over {days} days...")

        transactions = await self.get_wallet_transactions(wallet_address, limit=200)

        if not transactions:
            return self._empty_analysis(wallet_address, days)

        recent_transactions = self._filter_recent_transactions(transactions, days)

        return self._build_analysis_report(wallet_address, transactions, recent_transactions, days)

    def _filter_recent_transactions(self, transactions: List[Dict], days: int) -> List[Dict]:
        """Filter transactions within the specified time window"""
        cutoff = datetime.now() - timedelta(days=days)
        return [tx for tx in transactions if tx["timestamp"] and tx["timestamp"] > cutoff]

    def _build_analysis_report(
        self,
        wallet_address: str,
        all_tx: List[Dict],
        recent_tx: List[Dict],
        days: int
    ) -> Dict[str, any]:
        """Generate performance metrics"""
        return {
            "wallet": wallet_address,
            "total_transactions": len(all_tx),
            "recent_transactions": len(recent_tx),
            "analysis_period_days": days,
            "first_transaction": all_tx[-1]["timestamp"] if all_tx else None,
            "last_transaction": all_tx[0]["timestamp"] if all_tx else None,
            "activity_score": round(len(recent_tx) / days, 2),
            "signatures": [tx["signature"] for tx in recent_tx[:5]]
        }

    def _empty_analysis(self, wallet_address: str, days: int) -> Dict[str, any]:
        """Return default analysis when no transactions are found"""
        return {
            "wallet": wallet_address,
            "total_transactions": 0,
            "recent_transactions": 0,
            "analysis_period_days": days,
            "error": "No transactions found"
        }



In [9]:
TEST_WALLET = "Fiiu1ZnaEwVcvcTxazkR14A1Va6K6VbJfoEiNVMbfTw5"
async def run_transaction_tests():
    client = SolanaAlphaClient(BASE_URL)
    analyzer = TransactionAnalyzer(client)

    print("\n🔧 Testing get_wallet_transactions...")
    transactions = await analyzer.get_wallet_transactions(TEST_WALLET, limit=50)
    print(f"✅ Retrieved {len(transactions)} transactions")

    print("\n📊 Testing analyze_wallet_performance...")
    performance = await analyzer.analyze_wallet_performance(TEST_WALLET, days=30)
    print("✅ Performance Analysis:")
    for key, value in performance.items():
        print(f"  {key}: {value}")

# Run the test
await run_transaction_tests()



🔧 Testing get_wallet_transactions...
📥 Parsed 48 successful transactions for Fiiu1Zna
✅ Retrieved 48 transactions

📊 Testing analyze_wallet_performance...
🔍 Starting performance analysis for Fiiu1Zna over 30 days...
📥 Parsed 198 successful transactions for Fiiu1Zna
✅ Performance Analysis:
  wallet: Fiiu1ZnaEwVcvcTxazkR14A1Va6K6VbJfoEiNVMbfTw5
  total_transactions: 198
  recent_transactions: 2
  analysis_period_days: 30
  first_transaction: 2024-12-20 21:25:14
  last_transaction: 2025-08-09 10:17:36
  activity_score: 0.07
  signatures: ['5JsZS6ZW3QVzmDQH8kkAeRxAjZM9ojqZZco8mVjGEPmiPJafQRbibSDvwHzKFw3TU6AKD6HgroRDdS7XdUzqrXSU', 'haATLavQqPywf5foexKZkYfAnCotDBY7Hmfc2QBcnsgDULSDrPZKcQUgwBYjjpNs2HzimfTwKHpt4qPLguagQbk']


In [10]:
class WalletPerformanceTracker:
    """Track and score wallet performance for alpha detection.

    Notes:
      - `analyzer` is expected to expose `analyze_wallet_performance(wallet_address)` which may be async or sync.
      - `token_discovery` is expected to expose:
           - async get_token_holders(token_address, ...) -> List[dict] where each dict has keys:
             'wallet' (str), 'balance_raw' (int), optionally 'balance' (float), 'balance_formatted' (str)
           - async analyze_token_distribution(token_address, ...) -> dict (optionally contains 'supply_raw' and 'decimals')
    """

    def __init__(
        self,
        analyzer: Any,
        token_discovery: Any,
        *,
        concurrency: int = 10,
        min_score: float = 70.0,
        top_k_holders_per_token: Optional[int] = None,
        min_balance_raw: int = 0,
        min_holding_percentage: float = 0.0,
        score_weights: Optional[Dict[str, float]] = None,
        analyzer_timeout: float = 30.0,
    ):
        self.analyzer = analyzer
        self.token_discovery = token_discovery
        self.concurrency = max(1, int(concurrency))
        self.min_score_default = float(min_score)
        self.top_k_holders_per_token = top_k_holders_per_token
        self.min_balance_raw = int(min_balance_raw)
        self.min_holding_percentage = float(min_holding_percentage)
        self.score_weights = score_weights or {
            "activity": 30.0,
            "volume": 40.0,
            "recency": 30.0,
        }
        self.analyzer_timeout = float(analyzer_timeout)
        self._wallet_analysis_cache: Dict[str, Dict[str, Any]] = {}

    def calculate_wallet_score(self, wallet_analysis: Dict[str, Any]) -> float:
        """Calculate a performance score for a wallet (0-100) using configurable weights."""
        if not wallet_analysis or wallet_analysis.get("error"):
            return 0.0

        activity = float(wallet_analysis.get("activity_score", 0.0))
        total_tx = float(wallet_analysis.get("total_transactions", 0))
        recent_tx = int(wallet_analysis.get("recent_transactions", 0))

        activity_score = min(activity * self.score_weights["activity"], self.score_weights["activity"])
        volume_score = min((total_tx / 10.0), self.score_weights["volume"])
        recency_score = self.score_weights["recency"] if recent_tx > 0 else 0.0

        total = activity_score + volume_score + recency_score
        total = min(total, sum(self.score_weights.values()))
        weight_sum = sum(self.score_weights.values())
        score_0_100 = (total / weight_sum) * 100.0 if weight_sum > 0 else 0.0
        return float(min(max(score_0_100, 0.0), 100.0))

    async def _maybe_async_call(self, func: Callable, *args, **kwargs):
        """Call func; handle both async and sync functions. Return its result."""
        if inspect.iscoroutinefunction(func):
            return await func(*args, **kwargs)
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, lambda: func(*args, **kwargs))

    async def _analyze_wallet_with_timeout(self, wallet_address: str) -> Dict[str, Any]:
        """Run analyzer.analyze_wallet_performance with timeout, using cache if available."""
        if wallet_address in self._wallet_analysis_cache:
            return self._wallet_analysis_cache[wallet_address]

        try:
            coro = self._maybe_async_call(self.analyzer.analyze_wallet_performance, wallet_address)
            result = await asyncio.wait_for(coro, timeout=self.analyzer_timeout)
            self._wallet_analysis_cache[wallet_address] = result if isinstance(result, dict) else {"result": result}
            return self._wallet_analysis_cache[wallet_address]
        except asyncio.TimeoutError:
            logger.warning("Analyzer timed out for wallet %s", wallet_address)
            result = {"error": "analyzer_timeout"}
            self._wallet_analysis_cache[wallet_address] = result
            return result
        except Exception as e:
            logger.exception("Analyzer exception for wallet %s: %s", wallet_address, e)
            result = {"error": f"analyzer_exception: {e}"}
            self._wallet_analysis_cache[wallet_address] = result
            return result

    async def discover_successful_wallets(
        self,
        token_list: List[str],
        *,
        min_score: Optional[float] = None,
        concurrency: Optional[int] = None,
        fetch_supply_for_percentage: bool = True,
        holders_limit_per_token: Optional[int] = None,
        skip_on_error_tokens: bool = True,
    ) -> List[Dict[str, Any]]:
        min_score = float(min_score if min_score is not None else self.min_score_default)
        concurrency = int(concurrency or self.concurrency)
        holders_limit_per_token = (
            holders_limit_per_token if holders_limit_per_token is not None else self.top_k_holders_per_token
        )

        sem = asyncio.Semaphore(concurrency)
        discovered: List[Dict[str, Any]] = []

        async def _analyze_single_holder(holder: Dict[str, Any], token_addr: str):
            async with sem:
                wallet_address = holder.get("wallet") or holder.get("owner") or holder.get("address")
                if not wallet_address:
                    return None

                try:
                    raw = int(holder.get("balance_raw", holder.get("balance", 0) or 0))
                except Exception:
                    raw = 0
                if raw < self.min_balance_raw:
                    return None

                pct = holder.get("percentage")
                if pct is not None and pct < self.min_holding_percentage:
                    return None

                analysis = await self._analyze_wallet_with_timeout(wallet_address)
                score = self.calculate_wallet_score(analysis)

                if score >= min_score:
                    return {
                        "address": wallet_address,
                        "score": float(score),
                        "analysis": analysis,
                        "discovered_via_token": token_addr,
                        "holding_percentage": float(pct) if pct is not None else None,
                        "balance_raw": int(raw),
                        "balance": holder.get("balance"),
                    }
                return None

        for token_addr in token_list:
            logger.info("Starting token analysis for %s", token_addr)
            supply_raw = None
            dist = None
            try:
                if fetch_supply_for_percentage and hasattr(self.token_discovery, "analyze_token_distribution"):
                    dist = await self.token_discovery.analyze_token_distribution(token_addr, limit=1000, top_n=1)
                    if dist and dist.get("found_on_chain") and "supply_raw" in dist:
                        supply_raw = int(dist.get("supply_raw", 0))
            except Exception:
                logger.exception("Error fetching supply via analyze_token_distribution for %s", token_addr)
                if skip_on_error_tokens:
                    continue

            holders = None
            try:
                if holders_limit_per_token is not None and hasattr(self.token_discovery, "get_token_holders"):
                    holders = await self.token_discovery.get_token_holders(
                        token_addr,
                        limit=1000,
                        max_pages=None,
                        decimals=None,
                    )
                    if holders_limit_per_token:
                        holders = holders[: holders_limit_per_token]
                else:
                    if dist:
                        holders = dist.get("holders_top_n") or []
                    else:
                        holders = await self.token_discovery.get_token_holders(token_addr)
            except Exception:
                logger.exception("Error fetching holders for %s", token_addr)
                if skip_on_error_tokens:
                    continue
                else:
                    raise

            if supply_raw and holders:
                for h in holders:
                    try:
                        raw = int(h.get("balance_raw", h.get("balance", 0) or 0))
                        h.setdefault("percentage", (raw / supply_raw * 100.0) if supply_raw > 0 else 0.0)
                    except Exception:
                        h.setdefault("percentage", None)

            tasks = [asyncio.create_task(_analyze_single_holder(h, token_addr)) for h in holders]
            if tasks:
                results = await asyncio.gather(*tasks, return_exceptions=False)
                for r in results:
                    if r:
                        discovered.append(r)

        unique: Dict[str, Dict[str, Any]] = {}
        for w in discovered:
            addr = w["address"]
            if addr not in unique or w["score"] > unique[addr]["score"]:
                unique[addr] = w

        sorted_wallets = sorted(unique.values(), key=lambda x: x["score"], reverse=True)
        logger.info("Discovered %d wallets with score >= %s", len(sorted_wallets), min_score)
        return sorted_wallets


In [11]:
class HolderAggregator:
    """Robust holder aggregation service for Solana tokens."""

    def __init__(self, client: "SolanaAlphaClient"):
        self.client = client

    async def get_token_holders(
        self,
        token_mint: str,
        *,
        sleep_between: float = 0.15,
        limit: int = 1000,
        max_pages: Optional[int] = None,
        decimals: Optional[int] = None,
    ) -> List[Dict[str, Any]]:
        """Fetch token holders from Helius getTokenAccounts with pagination."""
        page = 1
        owner_balances = defaultdict(int)
        owner_token_account_counts = defaultdict(int)

        while True:
            payload_params = {
                "mint": token_mint,
                "page": page,
                "limit": limit,
                "displayOptions": {},
            }

            data = await self.client.make_rpc_call("getTokenAccounts", payload_params)
            token_accounts = data.get("result", {}).get("token_accounts", [])

            if not token_accounts:
                break

            for ta in token_accounts:
                owner = ta.get("owner") or ta.get("address")
                amt_raw = ta.get("amount", 0)

                # Handle nested account shapes
                if "account" in ta and isinstance(ta["account"], dict):
                    acct = ta["account"]
                    owner = owner or acct.get("owner")
                    amt_raw = acct.get("amount", 0)

                # Normalize amount
                if isinstance(amt_raw, dict):
                    amt_raw = int(float(amt_raw.get("amount") or amt_raw.get("uiAmount", 0)))
                else:
                    try:
                        amt_raw = int(amt_raw)
                    except Exception:
                        amt_raw = int(float(amt_raw)) if amt_raw else 0

                if owner:
                    owner_balances[owner] += amt_raw
                    owner_token_account_counts[owner] += 1

            page += 1
            if max_pages and page > max_pages:
                break
            await asyncio.sleep(sleep_between)

        # Build holders list
        holders = []
        for owner, raw in owner_balances.items():
            human_balance = raw / (10 ** decimals) if decimals else None
            holders.append({
                "wallet": owner,
                "balance_raw": raw,
                "balance": human_balance,
                "balance_formatted": f"{human_balance:,.{decimals}f}" if human_balance is not None else str(raw),
                "num_token_accounts": owner_token_account_counts[owner],
            })

        holders.sort(key=lambda x: x["balance_raw"], reverse=True)
        return holders

    def analyze_holders(self, holders: List[Dict[str, Any]], top_n_for_concentration: int = 10) -> Dict[str, Any]:
        """
        Return basic holder analytics:
          - total_holders
          - total_balance_raw
          - concentration_metrics: top_n sum/raw & percentage, gini_like
        This method is defensive and always fills the same keys for callers.
        """
        out: Dict[str, Any] = {"total_holders": 0, "total_balance_raw": 0, "concentration_metrics": {}}
        if not holders:
            return out

        total = sum(int(h.get("balance_raw", 0)) for h in holders)
        out["total_holders"] = len(holders)
        out["total_balance_raw"] = int(total)

        top_n = holders[:top_n_for_concentration]
        top_sum = sum(int(h.get("balance_raw", 0)) for h in top_n)
        top_pct = (top_sum / total * 100.0) if total > 0 else 0.0

        # Put both specific and convenience keys to match older tests/consumers
        out["concentration_metrics"][f"top_{top_n_for_concentration}_sum_raw"] = int(top_sum)
        out["concentration_metrics"][f"top_{top_n_for_concentration}_percentage"] = float(top_pct)
        # convenience alias for top_10_percentage (backwards compat) when top_n == 10, otherwise keep a top_10 entry too
        if top_n_for_concentration == 10:
            out["concentration_metrics"]["top_10_percentage"] = float(top_pct)
        else:
            # compute top 10 percentage as well if possible
            top_10 = holders[:10]
            top_10_sum = sum(int(h.get("balance_raw", 0)) for h in top_10)
            out["concentration_metrics"]["top_10_percentage"] = float((top_10_sum / total * 100.0) if total > 0 else 0.0)

        # gini-like measure (not normalized to classic Gini, but a useful inequality indicator)
        vals = sorted([int(h.get("balance_raw", 0)) for h in holders])
        n = len(vals)
        if n > 1 and total > 0:
            cum = 0
            for i, v in enumerate(vals, start=1):
                cum += (2 * i - n - 1) * v
            gini = cum / (n * total)
            out["concentration_metrics"]["gini_like"] = float(gini)
        else:
            out["concentration_metrics"]["gini_like"] = 0.0

        return out


In [12]:
client = SolanaAlphaClient(BASE_URL)   
holder_agg = HolderAggregator(client)
summary = await holder_agg.get_token_holders("BusxEFRTayALb5nYBdXdy1iZGq9GgoqLMpRVGQB3FeYt", max_pages=2)
print("holders:", len(summary))
analysis = holder_agg.analyze_holders(summary, top_n_for_concentration=10)
print("analysis:", analysis)


holders: 13
analysis: {'total_holders': 13, 'total_balance_raw': 84469108852, 'concentration_metrics': {'top_10_sum_raw': 84245728991, 'top_10_percentage': 99.73554845784938, 'gini_like': 0.7356593936164546}}


In [13]:
@dataclass
class TradingStart:
    mint: Optional[str]
    block_time: Optional[int]
    program_id: Optional[str]
    detected_via: Optional[str] = None
    extra: Optional[Dict[str, Any]] = None


class TokenDiscovery:
    """
    TokenDiscovery fetches new token launches from:
      - Birdeye (liquidity added today)
      - Dune (tokens whose first trade was yesterday)

    This version includes robust Dune caching.
    """

    def __init__(
        self,
        client: Optional[Any] = None,
        *,
        birdeye_api_key: Optional[str] = None,
        dune_api_key: Optional[str] = None,
        dune_query_id: Optional[int] = None,
        dune_cache_file: str = "./data/dune_recent.pkl",
        debug: bool = False,
    ):
        self.client = client
        self.debug = bool(debug)

        # Birdeye setup (unchanged)
        self.birdeye_key = birdeye_api_key or globals().get("birdeye_key")
        self.birdeye_url = globals().get("BIRDEYE_URL")

        # Dune setup
        self.dune_api_key = dune_api_key or os.getenv("DUNE_API_KEY")
        self.dune_query_id = dune_query_id
        self.dune_client = DuneClient(self.dune_api_key) if self.dune_api_key else None

        # Dune cache path
        self.dune_cache_file = dune_cache_file

        if self.debug:
            if not self.birdeye_key:
                print("⚠️ BIRDEYE_API_KEY not set")
            if not self.dune_api_key:
                print("⚠️ DUNE_API_KEY not set")
            print(f"TokenDiscovery initialized. Dune cache: {self.dune_cache_file}")

    # ---------- Small utility to unwrap dune client responses ----------
    def _rows_from_dune_payload(self, payload: Any) -> List[Dict[str, Any]]:
        """
        Extract a list-of-dicts 'rows' from dune_client.get_latest_result return value.
        Handles ResultsResponse-like objects, dicts, or direct lists.
        """
        # If it's None -> empty
        if payload is None:
            return []

        # If it has attribute 'result' that contains 'rows' (ResultsResponse-like)
        if hasattr(payload, "result"):
            try:
                result_obj = getattr(payload, "result")
                # result_obj may be a dict-like or object; try both
                if isinstance(result_obj, dict):
                    rows = result_obj.get("rows", [])
                else:
                    # object with attribute rows
                    rows = getattr(result_obj, "rows", [])
                if isinstance(rows, list):
                    return rows
            except Exception:
                pass

        # If payload is a dict
        if isinstance(payload, dict):
            # common shapes: {"result": {"rows": [...]}} or {"rows": [...]}
            rows = payload.get("result", {}).get("rows") if payload.get("result") else None
            if rows is None:
                rows = payload.get("rows", None)
            if isinstance(rows, list):
                return rows
            # Some payloads directly carry list
            if isinstance(payload.get("data", None), list):
                return payload["data"]

        # If payload itself is a list of rows
        if isinstance(payload, list):
            return payload

        # Fallback: try to get attribute 'rows' directly
        if hasattr(payload, "rows"):
            rows = getattr(payload, "rows")
            if isinstance(rows, list):
                return rows

        # nothing matched
        return []

    # ---------- Dune: fetch latest rows (no caching) ----------
    def fetch_dune_latest_rows(self) -> List[Dict[str, Any]]:
        """
        Fetch latest result for the configured query_id from Dune and return rows list.
        Does not touch cache.
        """
        if not self.dune_client or not self.dune_query_id:
            raise RuntimeError("Dune client or query_id not configured.")

        if self.debug:
            print(f"[Dune] fetching latest result for query {self.dune_query_id}...")

        payload = self.dune_client.get_latest_result(self.dune_query_id)
        rows = self._rows_from_dune_payload(payload)

        if self.debug:
            print(f"[Dune] extracted {len(rows)} rows from payload")

        return rows

    # ---------- Dune: cached access, returns List[TradingStart] ----------
    def get_tokens_launched_yesterday_cached(
        self,
        cache_max_age_days: int = 7
    ) -> List[TradingStart]:
        """
        Return tokens whose first trade was yesterday using cached Dune results.
        Cache is stored at self.dune_cache_file. If cache doesn't contain yesterday's data,
        fetch fresh from Dune and update cache.

        Returns a list of TradingStart objects (so it can be concatenated with Birdeye results).
        """
        cache_path = self.dune_cache_file

        # helper to convert rows -> list[TradingStart] filtered to yesterday
        def rows_to_trading_starts(rows: List[Dict[str, Any]]) -> List[TradingStart]:
            if not rows:
                return []
            df = pd.DataFrame(rows)
            # accept both column names from you earlier variants
            candidate_date_col = None
            for col in ("first_buy_date", "first_buy_date_utc", "block_date"):
                if col in df.columns:
                    candidate_date_col = col
                    break
            # minted column
            candidate_mint_col = None
            for col in ("mint_address", "mint", "token_bought_mint_address"):
                if col in df.columns:
                    candidate_mint_col = col
                    break

            if candidate_date_col is None or candidate_mint_col is None:
                # if columns not found, return empty list
                if self.debug:
                    print("[Dune] expected columns not present in rows -> returning []")
                return []

            df[candidate_date_col] = pd.to_datetime(df[candidate_date_col], errors="coerce")
            yesterday = (datetime.now(timezone.utc).date() - timedelta(days=1))
            filtered = df[df[candidate_date_col].dt.date == yesterday]

            out: List[TradingStart] = []
            for _, row in filtered.iterrows():
                ts = None
                try:
                    # convert to UTC timestamp
                    dt = pd.to_datetime(row[candidate_date_col])
                    if pd.isna(dt):
                        continue
                    # ensure tz-aware UTC
                    if dt.tzinfo is None:
                        dt = dt.tz_localize("UTC")
                    dt_utc = dt.tz_convert("UTC")
                    ts = int(dt_utc.timestamp())
                except Exception:
                    # fallback: skip row
                    continue

                out.append(
                    TradingStart(
                        mint=row[candidate_mint_col],
                        block_time=ts,
                        program_id="dune",
                        detected_via="dune",
                        extra={candidate_date_col: str(row[candidate_date_col])}
                    )
                )
            return out

        # 1) Try cache
        if os.path.exists(cache_path):
            try:
                cache_obj = joblib.load(cache_path)
                # cache_obj expected shape: {"rows": [...], "fetched_at": "ISO"}
                if isinstance(cache_obj, dict) and "rows" in cache_obj and "fetched_at" in cache_obj:
                    fetched_at = None
                    try:
                        fetched_at = datetime.fromisoformat(cache_obj["fetched_at"])
                        if fetched_at.tzinfo is None:
                            fetched_at = fetched_at.replace(tzinfo=timezone.utc)
                    except Exception:
                        fetched_at = None

                    rows = cache_obj.get("rows", [])
                    starts = rows_to_trading_starts(rows)

                    # If cache already contains yesterday's rows, return them immediately
                    if starts:
                        if self.debug:
                            print(f"[Dune/cache] using cached data fetched_at={cache_obj.get('fetched_at')}, found {len(starts)} yesterday tokens")
                        return starts

                    # If cache is recent (within cache_max_age_days) but doesn't have yesterday tokens,
                    # treat as stale and continue to fetch fresh below
                    if fetched_at:
                        age_days = (datetime.now(timezone.utc) - fetched_at).days
                        if age_days <= cache_max_age_days:
                            if self.debug:
                                print(f"[Dune/cache] cached but no yesterday rows; cache age {age_days}d <= {cache_max_age_days}d -> will fetch fresh")
                        else:
                            if self.debug:
                                print(f"[Dune/cache] cache older than {cache_max_age_days} days (age {age_days}d) -> fetching fresh")
                else:
                    if self.debug:
                        print("[Dune/cache] cache file format not recognized, ignoring")
            except Exception as e:
                if self.debug:
                    print(f"[Dune/cache] error reading cache file: {e} -- will fetch fresh")

        # 2) Fetch fresh rows from Dune
        rows = []
        try:
            rows = self.fetch_dune_latest_rows()
        except Exception as e:
            if self.debug:
                print(f"[Dune] error fetching latest rows: {e}")
            # If fetch failed but cache existed above we already tried to return any useful cached starts.
            return []

        # Save cache (rows + timestamp ISO)
        try:
            joblib.dump({"rows": rows, "fetched_at": datetime.now(timezone.utc).isoformat()}, cache_path)
            if self.debug:
                print(f"[Dune/cache] wrote {len(rows)} rows to cache {cache_path}")
        except Exception as e:
            if self.debug:
                print(f"[Dune/cache] failed to write cache: {e}")

        # Convert to TradingStart objects filtered to yesterday
        starts = rows_to_trading_starts(rows)
        if self.debug:
            print(f"[Dune] found {len(starts)} yesterday tokens after fresh fetch")
        return starts

    # ---------- Existing Birdeye methods (unchanged) ----------
    async def _fetch_birdeye_items(self, limit: int = 200, timeout: int = 15) -> List[Dict[str, Any]]:
        if not self.birdeye_key or not self.birdeye_url:
            raise RuntimeError("Birdeye API key or URL not set.")
        url = f"{self.birdeye_url}&limit={int(limit)}" if "?" in self.birdeye_url else f"{self.birdeye_url}?limit={int(limit)}"
        headers = {
            "accept": "application/json",
            "x-chain": "solana",
            "X-API-KEY": self.birdeye_key,
        }
        async with aiohttp.ClientSession() as sess:
            async with sess.get(url, headers=headers, timeout=timeout) as resp:
                resp.raise_for_status()
                data = await resp.json()
        items = []
        if isinstance(data, dict):
            if "data" in data:
                d = data["data"]
                if isinstance(d, dict) and "items" in d:
                    items = d["items"] or []
                elif isinstance(d, list):
                    items = d
            elif "items" in data:
                items = data["items"] or []
        elif isinstance(data, list):
            items = data
        return [it for it in items if isinstance(it, dict)]

    @staticmethod
    def _parse_liquidity_added_at(val: Any) -> Optional[int]:
        if not val:
            return None
        if isinstance(val, (int, float)):
            return int(val)
        try:
            dt = datetime.fromisoformat(str(val))
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=timezone.utc)
            return int(dt.astimezone(timezone.utc).timestamp())
        except Exception:
            try:
                s = str(val).rstrip("Z")
                dt = datetime.strptime(s, "%Y-%m-%dT%H:%M:%S")
                dt = dt.replace(tzinfo=timezone.utc)
                return int(dt.timestamp())
            except Exception:
                return None

    @staticmethod
    def _utc_day_bounds_for_date(dt: Optional[datetime] = None) -> Tuple[int, int]:
        d = (dt or datetime.now(timezone.utc)).astimezone(timezone.utc)
        start = datetime(d.year, d.month, d.day, 0, 0, 0, tzinfo=timezone.utc)
        end = start + timedelta(days=1) - timedelta(seconds=1)
        return int(start.timestamp()), int(end.timestamp())

    async def get_tokens_with_liquidity_today(self, limit: int = 500) -> List[TradingStart]:
        items = await self._fetch_birdeye_items(limit=limit)
        start_ts, end_ts = self._utc_day_bounds_for_date(datetime.now(timezone.utc))
        out: List[TradingStart] = []
        for it in items:
            la = it.get("liquidityAddedAt") or it.get("liquidity_added_at") or it.get("first_listed_at")
            ts = self._parse_liquidity_added_at(la)
            if ts and start_ts <= ts <= end_ts:
                out.append(
                    TradingStart(
                        mint=it.get("address"),
                        block_time=ts,
                        program_id="birdeye",
                        detected_via="birdeye",
                        extra={
                            "symbol": it.get("symbol"),
                            "name": it.get("name"),
                            "decimals": it.get("decimals"),
                            "liquidity": it.get("liquidity"),
                            "logoURI": it.get("logoURI"),
                            "source": it.get("source"),
                        },
                    )
                )
        return out


In [14]:
# Instantiate
td = TokenDiscovery(debug=True) 

# tokens with liquidity added today (UTC)
today_tokens = await td.get_tokens_with_liquidity_today(limit=20)
print("today:", len(today_tokens))
for t in today_tokens[:20]:
    print(t.mint, t.extra.get("symbol"), datetime.fromtimestamp(t.block_time, timezone.utc).isoformat(), t.extra.get("liquidity"))



TokenDiscovery initialized. Dune cache: ./data/dune_recent.pkl
today: 20
7ieoyr4sajqqJ7GmmuobyibnFgqgnWQdL5zheoVGHFB1 NVIDIA🔥 2025-08-26T20:35:19+00:00 82767.09169172078
mQR6PW18n7yuHkHCB5258wdXuHnG1e1WEY2UWMxpump j*b 2025-08-26T20:35:18+00:00 6263.927567960085
Fpkyd1UnUtGFEzPBf1C3QPzUtrn13HuSCGzREJtLpump money 2025-08-26T20:35:13+00:00 6311.124115433562
4Wbbi8NN3ACFsKswkLWVkAes4BnFtq2NnqnXzH7JELLY JCAT 2025-08-26T20:35:12+00:00 0
EGWnXZYCkhZbFHzLNbdBZGdeFDi89wZzSMuftfuCpump pablo 2025-08-26T20:35:06+00:00 6166.976176913179
EtZGpKic1yePwPjm1Y3hmeDfX4JMV8m43TFsfBYNpump yah 2025-08-26T20:35:05+00:00 5507.77337300237
BGeYeaDWUW8ApTnV1W5afBfeXnNQ2wuwCdQNGeK4pump JAWBREAKER 2025-08-26T20:35:04+00:00 5920.800655628733
DRenw8frnHhsh68yQe4Bo5mUjZDVzFZkSKKDNnKLHYgr thing 2025-08-26T20:35:02+00:00 97967.16977615582
AqL6UNjLUJKw23EiK4dBztXoTq9uWFTGJmHL2nz4qJSy TRUMP 2025-08-26T20:34:59+00:00 236.14911217644325
GnvHgNbeFuhFGTCVz2f5Hqzkq8sMprsfsWXHFo8YjmLt NVIDIA🔥 2025-08-26T20:34:58+00:00 393.5270

In [15]:
class JobLibTokenUpdater:
    """Token data storage using joblib for persistence."""
    
    def __init__(self, data_dir: str = "./data/token_data", expiry_hours: int = 24, debug: bool = False):
        self.data_dir = Path(data_dir)
        self.data_dir.mkdir(parents=True, exist_ok=True)
        self.tokens_file = self.data_dir / "tokens.pkl"
        self.expiry_hours = expiry_hours
        self.debug = debug

    async def save_trading_starts_async(self, trading_starts: List[TradingStart], skip_existing: bool = True) -> Dict[str, int]:
        """Save trading starts to joblib file."""
        if not trading_starts:
            return {"saved": 0, "skipped": 0, "errors": 0}

        # Load existing data
        existing_tokens = []
        if self.tokens_file.exists():
            try:
                existing_tokens = joblib.load(self.tokens_file)
                if not isinstance(existing_tokens, list):
                    existing_tokens = []
            except Exception as e:
                if self.debug:
                    print(f"Error loading existing tokens: {e}")
                existing_tokens = []

        # Convert existing to dict for fast lookup
        existing_mints = {token.mint for token in existing_tokens if hasattr(token, 'mint')} if skip_existing else set()
        
        saved = 0
        skipped = 0
        errors = 0
        
        for start in trading_starts:
            try:
                if skip_existing and start.mint in existing_mints:
                    skipped += 1
                    continue
                
                existing_tokens.append(start)
                saved += 1
                
            except Exception as e:
                if self.debug:
                    print(f"Error saving token {start.mint}: {e}")
                errors += 1

        # Save updated list
        try:
            joblib.dump(existing_tokens, self.tokens_file)
            if self.debug:
                print(f"Saved {len(existing_tokens)} total tokens to {self.tokens_file}")
        except Exception as e:
            if self.debug:
                print(f"Error saving tokens file: {e}")
            errors += len(trading_starts)

        return {"saved": saved, "skipped": skipped, "errors": errors}

    async def cleanup_old_tokens_async(self) -> int:
        """Remove tokens older than expiry_hours."""
        if not self.tokens_file.exists():
            return 0

        try:
            tokens = joblib.load(self.tokens_file)
            if not isinstance(tokens, list):
                return 0

            cutoff_time = time.time() - (self.expiry_hours * 3600)
            filtered_tokens = [
                token for token in tokens 
                if hasattr(token, 'block_time') and token.block_time and token.block_time > cutoff_time
            ]

            deleted_count = len(tokens) - len(filtered_tokens)
            
            if deleted_count > 0:
                joblib.dump(filtered_tokens, self.tokens_file)
                if self.debug:
                    print(f"Cleaned up {deleted_count} old tokens")

            return deleted_count

        except Exception as e:
            if self.debug:
                print(f"Error during cleanup: {e}")
            return 0

    async def get_tracked_tokens_async(self, limit: Optional[int] = None) -> List[TradingStart]:
        """Get all tracked tokens."""
        if not self.tokens_file.exists():
            return []

        try:
            tokens = joblib.load(self.tokens_file)
            if not isinstance(tokens, list):
                return []

            # Sort by block_time descending (newest first)
            sorted_tokens = sorted(
                tokens,
                key=lambda x: x.block_time if hasattr(x, 'block_time') and x.block_time else 0,
                reverse=True
            )

            if limit:
                sorted_tokens = sorted_tokens[:limit]

            return sorted_tokens

        except Exception as e:
            if self.debug:
                print(f"Error loading tokens: {e}")
            return []

In [None]:
# Main execution function
async def main():
    """Main function to run token discovery and storage."""
    
    # Initialize components
    client = SolanaAlphaClient(BASE_URL)
    
    # Test connection
    connection_ok = await client.test_connection()
    if not connection_ok:
        print("Failed to connect to Solana RPC")
        return

    # Initialize token updater
    updater = JobLibTokenUpdater(
        data_dir="./data/token_data",
        expiry_hours=24,
        debug=True
    )

    # Initialize TokenDiscovery
    td = TokenDiscovery(
        client=client,
        birdeye_api_key=birdeye_key,
        dune_api_key=DUNE_API_KEY,
        dune_query_id=query_id,
        dune_cache_file="./data/dune_recent.pkl",
        debug=True
    )

    try:
        # 1) Get yesterday's tokens from Dune (cached)
        print("Fetching tokens from Dune (yesterday)...")
        dune_starts = td.get_tokens_launched_yesterday_cached()
        print(f"Fetched from Dune: {len(dune_starts)} tokens")

        # 2) Get today's tokens from Birdeye
        print("Fetching tokens from Birdeye (today)...")
        today_starts = await td.get_tokens_with_liquidity_today(limit=20)
        print(f"Fetched from Birdeye: {len(today_starts)} tokens")

        # 3) Save both to storage
        combined = dune_starts + today_starts
        saved_stats = await updater.save_trading_starts_async(combined, skip_existing=True)
        print(f"Save results: {saved_stats}")

        # 4) Cleanup old tokens
        deleted_count = await updater.cleanup_old_tokens_async()
        print(f"Cleaned up {deleted_count} old tokens")

        # 5) Show current tracked tokens
        stored_tokens = await updater.get_tracked_tokens_async(limit=10)
        print(f"\nCurrently tracking {len(stored_tokens)} tokens (showing first 10):")
        for token in stored_tokens[:10]:
            dt_str = datetime.fromtimestamp(token.block_time, timezone.utc).strftime("%Y-%m-%d %H:%M:%S") if token.block_time else "Unknown"
            symbol = token.extra.get("symbol", "Unknown") if token.extra else "Unknown"
            print(f"  {token.mint} | {symbol} | {dt_str} | via {token.detected_via}")

    except Exception as e:
        print(f"Error during execution: {e}")
        import traceback
        traceback.print_exc()

In [17]:
# Run the main function
await main()

✅ Solana RPC connection successful
TokenDiscovery initialized. Dune cache: ./data/dune_recent.pkl
Fetching tokens from Dune (yesterday)...
[Dune/cache] cached but no yesterday rows; cache age 0d <= 7d -> will fetch fresh
[Dune] fetching latest result for query 5668844...
[Dune] extracted 3 rows from payload
[Dune/cache] wrote 3 rows to cache ./data/dune_recent.pkl
[Dune] found 3 yesterday tokens after fresh fetch
Fetched from Dune: 3 tokens
Fetching tokens from Birdeye (today)...
Fetched from Birdeye: 20 tokens
Saved 63 total tokens to data\token_data\tokens.pkl
Save results: {'saved': 23, 'skipped': 0, 'errors': 0}
Cleaned up 23 old tokens
Cleaned up 23 old tokens

Currently tracking 10 tokens (showing first 10):
  6Egn3FVPA6WpANr2uWoehdxstgyyLDFDpDc3yJUMgray | STINK | 2025-08-26 20:35:58 | via birdeye
  68pRHfBuged76wNnDGC49JnBcXBXGLJTjf47ix6Nqgn9 | TGH | 2025-08-26 20:35:57 | via birdeye
  4LUEVB7SDPdFnA7QNdPBBNkwk51tjVspDwShUhgyuv3a | CLIPPY AI  | 2025-08-26 20:35:55 | via birdeye


In [3]:
with open(f"./data/token_data/dune_cache/dune_cache_20250909.pkl", "rb") as f:
    tokens = joblib.load(f)
    print(f"Total tokens stored: {len(tokens)}")
    for t in tokens[:5]:
        print(t)

FileNotFoundError: [Errno 2] No such file or directory: './data/token_data/dune_cache/dune_cache_20250909.pkl'

In [None]:
from pathlib import Path
import joblib
import pandas as pd

file_path = Path(r"C:\Users\HP USER\Documents\Data Analyst\degen smart\data\dune_cache\dune_cache_20250916.pkl")
if file_path.exists():
    with open(file_path, "rb") as f:
        tokens = joblib.load(f)
        print(f"Total tokens stored: {len(tokens)}")
        # for t in tokens[:5]:
            # print(t)
else:
    print(f"File not found: {file_path}")
    # Try reading from the relative path as fallback
    alt_path = Path(r"data/dune_cache/dune_cache_20250909.pkl")
    if alt_path.exists():
        with open(alt_path, "rb") as f:
            tokens = joblib.load(f)
            print(f"Total tokens stored: {len(tokens)}")
            for t in tokens[:5]:
                print(t)
    else:
        print(f"File not found at fallback path: {alt_path}")

Total tokens stored: 0


In [4]:
tokens_df = pd.DataFrame(tokens)

In [None]:
tokens_df

Unnamed: 0,token_to_top_holders,saved_at,day
6MTUuKfBunWm2pbZQBhXbWWhutuPspfbcv7SY6dtHQU3,"[4LFKWTftGU2XnhD8FYKtshx1sE3bThK5yaJ6JzNXSSZC,...",2025-09-16T22:57:39.895543+00:00,20250916
GJpbPgniZCzRnDX5mcy623zHVyFAbPGYqJnBrVqh7JGc,"[CGQ7wxM8Lvcjqn4J2fGVybxe2B2vkSHZqXfooSrRvw7X,...",2025-09-16T22:57:39.895543+00:00,20250916
EW4DUDr7sjP5vvgwkbVNjbyhtZTQnXJjUrGphGg4q6Ge,"[EcPD5Eu1r3jfKgMY3Te1ZrSJ1BpB8gmVGLooajuUcphs,...",2025-09-16T22:57:39.895543+00:00,20250916
75gp7J5Wiyrc4rqaQrYrJPdQTgZn49MZh8pP21CQjY77,"[29nJ7QDvJxiePXCt5fqNgKXs9LMgxtMTj42pSpTbh99i,...",2025-09-16T22:57:39.895543+00:00,20250916
HMRrJMjErvLLVFvmbEaMXJrxV3ptVwUSR1sVzeYu4Hc6,"[92JcB53LWjDQibn3MCzzHsp1WQrF5hjjB91anHKPW5t8,...",2025-09-16T22:57:39.895543+00:00,20250916
...,...,...,...
CwSYzhxB2tQuitPtLP42Dm5nFXUgzTRRNYbC3ew4pump,"[67hWpmWAokVjJDfXAgsY4y9aWx1cxyhp5of4aRyXPUY1,...",2025-09-16T22:57:39.895543+00:00,20250916
3WN36y3qwuazyMRHBjnUG6BP9jWQi1JccEN6NxaDpump,"[9ocU6giTSu96wEBLa3WKnaYddUb81xz8FSaXhY4q2XVL,...",2025-09-16T22:57:39.895543+00:00,20250916
38sV3hNV5jsRwzKTCbDoke8d9u4GMvqeYRkVdvGFpump,"[FiZ8BfXxvE36UjFMyy32q3TUxeu1aY1d9m4TLnt3Vtvz,...",2025-09-16T22:57:39.895543+00:00,20250916
FZUK99Z47fujJfBhuqQ2jGpA3p762THi4WmnvByQpump,"[3hSnXV64MFBPgLsJui68sXo8qTxGMEMuUBrDP2US46Lo,...",2025-09-16T22:57:39.895543+00:00,20250916


: 

In [None]:
import joblib
import json

# Load the .pkl file
data = joblib.load(r"C:\Users\HP USER\Documents\Data Analyst\degen smart\data\overlap_results.pkl")


In [51]:

# Save as JSON
with open("overlap_results.json", "w") as f:
    json.dump(data, f, indent=4, default=str)  # `default=str` handles non-serializable objects like datetime


In [57]:
from typing import Dict, Any, Optional, Tuple
import logging


In [54]:
def load_latest_tokens_from_overlap() -> Dict[str, Dict[str, Any]]:
    try:
        # data = joblib.load(OVERLAP_FILE)
        logging.info("DEBUG: Loaded data keys: %s", list(data.keys())[:5])  # Log first 5 token IDs
        latest_tokens = {}
        for token_id, history in data.items():
            logging.info("DEBUG: Token %s history length: %d", token_id, len(history))
            if not history:
                continue
            latest_check = history[-1]
            result = latest_check.get("result", {})
            logging.info("DEBUG: Sample check for %s: %s", token_id, result)

            norm = {
                "grade": result.get("grade", "NONE"),
                "token_metadata": {
                    "mint": token_id,
                    "name": result.get("token_metadata", {}).get("name"),
                    "symbol": result.get("token_metadata", {}).get("symbol", "")
                },
                "overlap_percentage": result.get("overlap_percentage", 0.0),
                "concentration": result.get("concentration", 0.0),
                "checked_at": result.get("checked_at")
            }
            latest_tokens[token_id] = norm
        return latest_tokens
    except Exception as e:
        logging.exception("Failed to load overlap_results.pkl: %s", e)
        return {}


In [58]:
alert = load_latest_tokens_from_overlap()
print(f"Loaded {len(alert)} tokens from overlap results")


Loaded 333 tokens from overlap results


In [59]:
alert

{'6cw1abXZKcYVYEuJHhw2AcmTTqLjo3wKqVBZUd9eqao2': {'grade': 'NONE',
  'token_metadata': {'mint': '6cw1abXZKcYVYEuJHhw2AcmTTqLjo3wKqVBZUd9eqao2',
   'name': 'MOVIES',
   'symbol': ''},
  'overlap_percentage': 0.0,
  'concentration': 0.0,
  'checked_at': '2025-09-09T16:23:34.439067+00:00'},
 '3iZR1YX5t1fw4vU6Bvjr6easkAeSJCvnfxp9YNTCpump': {'grade': 'NONE',
  'token_metadata': {'mint': '3iZR1YX5t1fw4vU6Bvjr6easkAeSJCvnfxp9YNTCpump',
   'name': 'PROJECT',
   'symbol': ''},
  'overlap_percentage': 0.0,
  'concentration': 0.0,
  'checked_at': '2025-09-09T16:23:34.929180+00:00'},
 'BWXrr5dCPtNjYK7T54G1nxU441rdYyVn3vCfMKJnpump': {'grade': 'NONE',
  'token_metadata': {'mint': 'BWXrr5dCPtNjYK7T54G1nxU441rdYyVn3vCfMKJnpump',
   'name': 'ElonEra',
   'symbol': ''},
  'overlap_percentage': 0.0,
  'concentration': 0.0,
  'checked_at': '2025-09-09T16:23:36.627213+00:00'},
 '8guLySesX4yAwNHXkq77f58HZ2dFYnpKwcDP9BRYpump': {'grade': 'NONE',
  'token_metadata': {'mint': '8guLySesX4yAwNHXkq77f58HZ2dFYnpKwc

In [None]:
async def background_loop(app: Application):
    logging.info("Background alert loop started...")
    alerts_state = safe_load(ALERTS_STATE_FILE, {})  # Enhanced structure
    first_run = True
    while True:
        try:
            tokens = load_latest_tokens_from_overlap()
            if first_run:
                logging.info("DEBUG: Loaded %d tokens from overlap_results.pkl", len(tokens))
                sample_items = list(tokens.items())[:3]
                for tid, info in sample_items:
                    logging.info("DEBUG sample token: %s grade=%s checked_at=%s", tid, info.get("grade"), info.get("checked_at"))
                first_run = False

            for token_id, token in tokens.items():
                grade = token.get("grade")
                if not grade:
                    continue

                # Get current alert state or create new one
                current_state = alerts_state.get(token_id, {})
                last_grade = current_state.get("last_grade")

                if grade != last_grade:
                    logging.info("New/changed grade for %s: %s -> %s", token_id, last_grade, grade)

                    # For first-time alerts, capture initial market cap/FDV
                    if last_grade is None:
                        mc, fdv, _ = fetch_marketcap_and_fdv(token_id)

                        # Create new alert state entry
                        alerts_state[token_id] = {
                            "last_grade": grade,
                            "initial_marketcap": mc,
                            "initial_fdv": fdv,
                            "first_alert_at": datetime.utcnow().isoformat() + "Z"
                        }

                        logging.info("Captured initial market data for %s: MC=%s, FDV=%s",
                                   token_id, format_marketcap_display(mc), format_marketcap_display(fdv))
                    else:
                        # Update existing state
                        alerts_state[token_id]["last_grade"] = grade

                    # Send alert with historical market cap data
                    await send_alert_to_subscribers(
                        app,
                        token,
                        grade,
                        previous_grade=last_grade,
                        initial_mc=current_state.get("initial_marketcap"),
                        initial_fdv=current_state.get("initial_fdv"),
                        first_alert_at=current_state.get("first_alert_at")
                    )

            # Persist alerts_state and push to Supabase
            safe_save(ALERTS_STATE_FILE, alerts_state)
            upload_bot_data_to_supabase()

        except Exception as e:
            logging.exception("Error in background loop: %s", e)

        await asyncio.sleep(POLL_INTERVAL_SECS)
