### import modules

In [27]:
# --- Imports and Initial Setup ---
import os
from dotenv import load_dotenv
from typing import Dict, List, Optional
import time
import asyncio
import aiohttp
from web3 import Web3
import joblib
import requests
from web3.exceptions import InvalidAddress
import logging
import streamlit as st

### Error logging

In [28]:

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

### Load environment variable

In [29]:

# Load .env file for local development
load_dotenv()

def get_secret(key):
    """
    Universal secret getter that works:
    - Locally with .env files
    - Locally with Streamlit secrets
    - On Streamlit Cloud with secrets
    - With system environment variables
    """
    # Method 1: Try Streamlit secrets
    try:
        return st.secrets[key]
    except:
        pass
    
    # Method 2: Try environment variables (from .env or system)
    env_value = os.getenv(key)
    if env_value:
        return env_value
    
    # Method 3: Return None if not found
    return None

### Set API

In [53]:
# Usage
WEB3_PROVIDER_URL = get_secret("WEB3_PROVIDER_URL")
COINGECKO_API_KEY = get_secret("COINGECKO_API_KEY")
ETHERSCAN_API_KEY = get_secret("ETHERSCAN_API_KEY")

# Validation
if not WEB3_PROVIDER_URL:
    st.error("Missing WEB3_PROVIDER_URL")
    st.stop()

# CoinGecko is now optional (fallback only)
if not COINGECKO_API_KEY:
    st.warning("⚠️ COINGECKO_API_KEY not found. Using 1inch only for pricing.")
    logger.warning("No CoinGecko API key found, using 1inch as primary source")


### # --- Constants and Configuration ---

In [31]:
# ERC-20 ABI: A minimal contract ABI to interact with ERC-20 tokens.
ERC20_ABI = [
    {"constant": True, "inputs": [{"name": "_owner", "type": "address"}], "name": "balanceOf", "outputs": [{"name": "balance", "type": "uint256"}], "type": "function"},
    {"constant": True, "inputs": [], "name": "decimals", "outputs": [{"name": "", "type": "uint8"}], "type": "function"},
    {"constant": True, "inputs": [], "name": "symbol", "outputs": [{"name": "", "type": "string"}], "type": "function"},
    {"constant": True, "inputs": [], "name": "name", "outputs": [{"name": "", "type": "string"}], "type": "function"}
]

# Database Cache settings
DATABASE_FOLDER = 'database'
TOKEN_DATABASE_CACHE = os.path.join(DATABASE_FOLDER, 'token_price_database.joblib')
CACHE_EXPIRATION_TIME = 30 * 60  # 30 minutes
MAX_TOKEN_PRICE = 200000.0

# # Additional constants
# WETH_ADDRESS = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
# USDC_ADDRESS = "0xA0b86a33E6441bF98d4C9A2cb50F79aB2e8f4C24"

### Helper functions used to Connect to the blockchain

In [32]:
# Global Web3 instance 
w3: Optional[Web3] = None
web3_initialized = False

def initialize_web3_connection() -> bool:
    """Initialize Web3 connection lazily when needed."""
    global w3, web3_initialized
    
    if web3_initialized:
        return w3 is not None
    
    try:
        # Create Web3 instance with timeout
        w3 = Web3(Web3.HTTPProvider(
            WEB3_PROVIDER_URL,
            request_kwargs={
                'timeout': 20,
                'headers': {
                    'User-Agent': 'CryptoPortfolioTracker/1.0'
                }
            }
        ))
        
        web3_initialized = True
        logger.info("Web3 instance created successfully")
        return True
        
    except Exception as e:
        logger.error(f"Failed to initialize Web3: {e}")
        w3 = None
        web3_initialized = True
        return False

def test_web3_connection() -> bool:
    """Test Web3 connection only when needed."""
    if not initialize_web3_connection():
        return False
    
    try:
        # Simple test - get latest block number
        block_number = w3.eth.block_number
        logger.info(f"Web3 connected. Latest block: {block_number}")
        return True
    except Exception as e:
        logger.error(f"Web3 connection test failed: {e}")
        return False


In [33]:
def check_api_keys() -> Dict[str, bool]:
    """Check API keys without initializing connections that might cause recursion."""
    return {
        "web3": bool(WEB3_PROVIDER_URL),  # Just check if URL exists
        "coingecko": bool(COINGECKO_API_KEY)
    }

def validate_ethereum_address(address: str) -> bool:
    """Validate if a given string is a valid Ethereum address."""
    if not address or not isinstance(address, str):
        return False
    
    try:
        # Use Web3's static method (doesn't need connection)
        return Web3.is_address(address)
    except Exception as e:
        logger.error(f"Error validating address {address}: {e}")
        return False


In [34]:
def get_eth_balance(wallet_address: str) -> float:
    """Retrieve the native ETH balance of a wallet address."""
    if not test_web3_connection():
        logger.error("Web3 connection not available")
        return 0.0
        
    try:
        if not validate_ethereum_address(wallet_address):
            logger.error(f"Invalid ETH address provided: {wallet_address}")
            return 0.0
        
        # Get balance in Wei and convert to Ether
        balance_wei = w3.eth.get_balance(Web3.to_checksum_address(wallet_address))
        return float(Web3.from_wei(balance_wei, 'ether'))
        
    except Exception as e:
        logger.error(f"Error getting ETH balance for {wallet_address}: {e}")
        return 0.0


In [35]:
get_eth_balance("0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe")

INFO:__main__:Web3 instance created successfully
INFO:__main__:Web3 connected. Latest block: 23117758


181774.35501503313

In [36]:
def get_token_info_and_balance(wallet_address: str, contract_address: str) -> Dict:
    """Fetch ERC-20 token info and balance for a wallet."""
    if not test_web3_connection():
        logger.error("Web3 connection not available")
        return {"address": contract_address, "symbol": "Error", "name": "Web3 Not Connected", "balance": 0.0, "decimals": 18}
    
    try:
        if not validate_ethereum_address(wallet_address) or not validate_ethereum_address(contract_address):
            return {"address": contract_address, "symbol": "Invalid", "name": "Invalid Address", "balance": 0.0, "decimals": 18}

        # Create contract instance
        token_contract = w3.eth.contract(
            address=Web3.to_checksum_address(contract_address), 
            abi=ERC20_ABI
        )
        
        # Get token information with individual try-catch blocks
        balance_wei = 0
        decimals = 18
        symbol = "Unknown"
        name = "Unknown Token"
        
        try:
            balance_wei = token_contract.functions.balanceOf(Web3.to_checksum_address(wallet_address)).call()
        except Exception as e:
            logger.error(f"Error getting balance for {contract_address}: {e}")
        
        try:
            decimals = token_contract.functions.decimals().call()
        except Exception as e:
            logger.error(f"Error getting decimals for {contract_address}: {e}")
            
        try:
            symbol = token_contract.functions.symbol().call()
        except Exception as e:
            logger.error(f"Error getting symbol for {contract_address}: {e}")
            
        try:
            name = token_contract.functions.name().call()
        except Exception as e:
            logger.error(f"Error getting name for {contract_address}: {e}")
            name = symbol  # Fallback to symbol

        # Convert balance from Wei-like units to readable float
        balance = balance_wei / (10 ** decimals) if decimals > 0 else 0

        return {
            "address": contract_address.lower(),
            "symbol": symbol,
            "name": name,
            "balance": float(balance),
            "decimals": decimals
        }
        
    except Exception as e:
        logger.error(f"Error getting token info for {contract_address}: {e}")
        return {
            "address": contract_address.lower(), 
            "symbol": "Error", 
            "name": "Token Read Error", 
            "balance": 0.0, 
            "decimals": 18
        }


In [37]:
get_token_info_and_balance("0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe", "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2")

INFO:__main__:Web3 connected. Latest block: 23117758


{'address': '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2',
 'symbol': 'WETH',
 'name': 'Wrapped Ether',
 'balance': 214.1409680216,
 'decimals': 18}

In [54]:
def get_eth_price() -> Dict:
    # Check CoinGecko if available
    if COINGECKO_API_KEY:
        try:
            url = "https://pro-api.coingecko.com/api/v3/simple/price"
            params = {
                "ids": "ethereum",
                "vs_currencies": "usd",
                "include_24hr_change": "true"
            }
            headers = {"x-cg-pro-api-key": COINGECKO_API_KEY}
            
            response = requests.get(url, params=params, headers=headers, timeout=15)
            response.raise_for_status()
            data = response.json()
            
            eth_data = data.get("ethereum", {})
            price = eth_data.get("usd", 0.0)
            change = eth_data.get("usd_24h_change", 0.0)
            
            logger.info(f"Got ETH price from CoinGecko: ${price:.2f}")
            return {"price": price, "change_24h": change, "source": "coingecko"}
            
        except Exception as e:
            logger.error(f"Error getting ETH price from CoinGecko: {e}")
    
    logger.warning("Could not get ETH price from any source")
    return {"price": 0.0, "change_24h": 0.0, "source": "failed"}


In [55]:
get_eth_price()

INFO:__main__:Got ETH price from CoinGecko: $4187.59


{'price': 4187.59, 'change_24h': -0.030177936245862985, 'source': 'coingecko'}

### Get token holdings

In [None]:
async def get_token_balances_etherscan(wallet_address: str, session: aiohttp.ClientSession) -> List[Dict]:
    """Get current token balances using Etherscan API."""
    if not validate_ethereum_address(wallet_address):
        logger.error("Invalid wallet address provided to Etherscan function")
        return []
    
    url = "https://api.etherscan.io/api"
    params = {
        "module": "account",
        "action": "tokenbalance",
        "address": wallet_address,
        "tag": "latest",
        "apikey": ETHERSCAN_API_KEY
    }

In [64]:
async with aiohttp.ClientSession() as session:
    tokens = await get_token_balances_etherscan("0x226cc0Bae5251EBb637B9ecF5B1CdB99764abBCD", session)
tokens

In [42]:

async def fetch_coingecko_prices(session: aiohttp.ClientSession, tokens: List[str]) -> Dict:
    """Fetch prices from CoinGecko"""
    if not tokens or not COINGECKO_API_KEY:
        return {}

    logger.info(f"Fetching fallback prices for {len(tokens)} tokens from CoinGecko...")
    
    all_prices = {}
    headers = {"accept": "application/json", "x-cg-pro-api-key": COINGECKO_API_KEY}
    
    # Process in smaller chunks for better reliability
    chunk_size = 15
    for i in range(0, len(tokens), chunk_size):
        chunk = tokens[i:i + chunk_size]
        contract_addresses_str = ','.join(chunk)
        url = f"https://pro-api.coingecko.com/api/v3/simple/token_price/ethereum?contract_addresses={contract_addresses_str}&vs_currencies=usd&include_24hr_change=true"

        for attempt in range(2):  # Reduced attempts for fallback
            try:
                async with session.get(url, headers=headers, timeout=15) as response:
                    if response.status == 429:
                        wait_time = 2 ** attempt
                        logger.warning(f"CoinGecko rate limited, waiting {wait_time}s...")
                        await asyncio.sleep(wait_time)
                        continue
                        
                    if response.status == 200:
                        data = await response.json()
                        
                        if isinstance(data, dict):
                            for contract, price_info in data.items():
                                if contract and isinstance(price_info, dict) and "usd" in price_info:
                                    price = price_info.get("usd", 0.0)
                                    if 0 < price <= MAX_TOKEN_PRICE:
                                        all_prices[contract.lower()] = {
                                            "price": price,
                                            "change_24h": price_info.get("usd_24h_change", 0.0),
                                            "source": "coingecko"
                                        }
                                        logger.debug(f"CoinGecko: {contract} = ${price:.6f}")
                    break
                    
            except Exception as e:
                logger.error(f"CoinGecko fallback error: {e}")
                if attempt == 1:
                    break

        await asyncio.sleep(0.3)  # Rate limiting

    logger.info(f"CoinGecko fallback: Found prices for {len(all_prices)} tokens")
    return all_prices


In [43]:
async with aiohttp.ClientSession() as session:
    prices = await fetch_coingecko_prices(session, ['0xB8c77482e45F1F44dE1745F52C74426C631bDD52',
 '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2',
 '0x67954768e721fad0f0f21e33e874497c73ed6a82',
 '0xb44377b74ef1773639b663d0754cb8410a847d02',
 '0xbe060eb32343208ed36c333020fc7c3d91754958',
 '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2'])
prices

INFO:__main__:Fetching fallback prices for 6 tokens from CoinGecko...
ERROR:__main__:CoinGecko fallback error: Cannot connect to host pro-api.coingecko.com:443 ssl:default [getaddrinfo failed]
ERROR:__main__:CoinGecko fallback error: Cannot connect to host pro-api.coingecko.com:443 ssl:default [getaddrinfo failed]
INFO:__main__:CoinGecko fallback: Found prices for 0 tokens


{}

In [44]:

# async def fetch_token_prices(session: aiohttp.ClientSession, tokens: List[str]) -> Dict:    
#     if not tokens:
#         return {}

#     all_prices = {}
        
#     # Get CoinGecko for tokens
#     tokens_ = [token for token in tokens if token.lower() not in all_prices]
    
#     if tokens_ and COINGECKO_API_KEY:
#         logger.info(f"Step 2: Fetching {len(tokens_)} tokens from CoinGecko...")
#         coingecko_prices = await fetch_coingecko_prices(session, tokens_)
#         all_prices.update(coingecko_prices)
    
#     # Mark remaining tokens as not found
#     missing_tokens = [token for token in tokens if token.lower() not in all_prices]
#     for token in missing_tokens:
#         all_prices[token.lower()] = {
#             "price": 0.0,
#             "change_24h": 0.0,
#             "source": "not_found"
#         }
#         logger.warning(f"No price found for token {token}")

#     # Log summary
#     sources = {}
#     for price_data in all_prices.values():
#         source = price_data.get("source", "unknown")
#         sources[source] = sources.get(source, 0) + 1
    
#     logger.info(f"Price fetching complete. Sources: {dict(sources)}")
#     return all_prices


In [45]:
# async with aiohttp.ClientSession() as session:
#     fetch_token_prices = await fetch_token_prices(session, ['0x243cacb4d5ff6814ad668c3e225246efa886ad5a',
#     '0x4521c9ad6a3d4230803ab752ed238be11f8b342f'])
# fetch_token_prices

In [46]:
def save_token_database(tokens: Dict):
    """Save token price cache."""
    try:
        joblib.dump(tokens, TOKEN_DATABASE_CACHE)
        logger.info(f"Saved {len(tokens)} tokens to cache")
    except Exception as e:
        logger.error(f"Failed to save cache: {e}")

In [47]:

def load_or_create_token_database() -> Dict:
    """Load token price cache."""
    os.makedirs(DATABASE_FOLDER, exist_ok=True)
    
    try:
        tokens = joblib.load(TOKEN_DATABASE_CACHE)
        logger.info(f"Loaded {len(tokens)} tokens from cache")
        
        # Clean expired entries
        cleaned_tokens = {}
        current_time = time.time()
        
        for addr, data in tokens.items():
            if (isinstance(data, dict) and 
                "price" in data and 
                "timestamp" in data and
                current_time - data["timestamp"] <= CACHE_EXPIRATION_TIME):
                cleaned_tokens[addr] = data
        
        if len(cleaned_tokens) < len(tokens):
            logger.info(f"Cleaned {len(tokens) - len(cleaned_tokens)} expired entries")
            save_token_database(cleaned_tokens)
        
        return cleaned_tokens
        
    except FileNotFoundError:
        logger.info("No cache found, starting fresh")
        return {}
    except Exception as e:
        logger.error(f"Cache error: {e}, starting fresh")
        return {}


In [None]:
async def get_portfolio_data(wallet_address: str) -> Dict:
    """Main function to get a full portfolio breakdown: ETH balance, token balances, and their values."""
    logger.info(f"Analyzing portfolio for {wallet_address}")

    # Load the local token price cache.
    token_database = load_or_create_token_database()

    # Get ETH balance and its price.
    eth_balance = get_eth_balance(wallet_address)
    eth_price_data = get_eth_price()

    # Initialize the portfolio dictionary.
    portfolio = {
        "wallet_address": wallet_address,
        "eth_balance": eth_balance,
        "eth_price": eth_price_data["price"],
        "eth_change_24h": eth_price_data["change_24h"],
        "eth_value": eth_balance * eth_price_data["price"],
        "tokens": [],
        "total_value": eth_balance * eth_price_data["price"],
        "last_updated": time.strftime("%Y-%m-%d %H:%M:%S")
    }

    # Fetch held tokens from Alchemy API.
    async with aiohttp.ClientSession() as session:
        held_token_addresses = await get_held_tokens_alchemy(wallet_address, session)

        # Identify which token prices need to be fetched (not in cache or expired).
        missing_tokens = []
        for addr in held_token_addresses:
            addr_lower = addr.lower()
            if addr_lower not in token_database or (token_database[addr_lower].get("timestamp", 0) < time.time() - CACHE_EXPIRATION_TIME):
                missing_tokens.append(addr)
            else:
                logger.debug(f"Using cached price for {addr_lower}: ${token_database[addr_lower]['price']:.8f}")

        # Fetch prices for missing tokens and update the cache.
        if missing_tokens:
            logger.info(f"Fetching prices for {len(missing_tokens)} tokens...")
            new_prices = await fetch_coingecko_prices(session, missing_tokens)
            for addr, price_data in new_prices.items():
                token_database[addr.lower()] = {
                    "price": price_data["price"],
                    "change_24h": price_data["change_24h"],
                    "timestamp": time.time()
                }
            save_token_database(token_database)
        else:
            logger.info("All token prices are in the cache and up-to-date.")

        # Process all held tokens to get their details and calculate values.
        if held_token_addresses:
            logger.info(f"Processing {len(held_token_addresses)} tokens...")

            # Run balance and info fetching concurrently using asyncio.to_thread for blocking calls.
            token_info_tasks = [
                asyncio.to_thread(get_token_info_and_balance, wallet_address, addr)
                for addr in held_token_addresses
            ]

            token_infos = await asyncio.gather(*token_info_tasks)

            # Build the final portfolio data structure.
            for token_info in token_infos:
                if token_info["balance"] > 0:
                    addr_lower = token_info["address"].lower()
                    price_data = token_database.get(addr_lower, {"price": 0.0, "change_24h": 0.0, "timestamp": 0})

                    token_info.update({
                        "price": price_data["price"],
                        "change_24h": price_data["change_24h"],
                        "value": token_info["balance"] * price_data["price"]
                    })

                    portfolio["tokens"].append(token_info)
                    portfolio["total_value"] += token_info["value"]

    # Sort tokens by value in descending order.
    portfolio["tokens"].sort(key=lambda x: x["value"], reverse=True)

    logger.info(f"Portfolio analysis complete. Total value: ${portfolio['total_value']:,.2f}")
    return portfolio


In [49]:

# --- Main Execution ---
async def main():
    """Main execution function."""
    wallet_address = "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"

    logger.info("Starting simplified portfolio analysis (1inch primary, CoinGecko fallback)...")
    if not validate_ethereum_address(wallet_address):
        logger.error("Invalid Ethereum address")
        return

    # Get portfolio data
    portfolio = await get_portfolio_data(wallet_address)
    
    if "error" in portfolio:
        print(f"Error: {portfolio['error']}")
        return

    # Print results
    print("\n" + "="*60)
    print(f"Portfolio Summary for {portfolio['wallet_address']}")
    print("="*60)
    print(f"ETH Balance: {portfolio['eth_balance']:.4f} ETH")
    print(f"ETH Price: ${portfolio['eth_price']:,.2f} ({portfolio['eth_change_24h']:+.2f}%)")
    print(f"ETH Value: ${portfolio['eth_value']:,.2f}")
    print(f"Total Portfolio: ${portfolio['total_value']:,.2f}")
    print(f"Tokens: {len(portfolio['tokens'])}")
    # print(f"Sources: {', '.join(portfolio['price_sources_used'])}")

    if portfolio['tokens']:
        print(f"\nTop 10 Holdings:")
        print("-" * 90)
        print(f"{'Token':<20} {'Balance':<15} {'Price':<15} {'Value':<12} {'24h':<10} {'Source':<12}")
        print("-" * 90)

        for token in portfolio['tokens'][:10]:
            balance_str = f"{token['balance']:.4f}"
            price_str = f"${token['price']:.8f}" if token['price'] < 0.01 else f"${token['price']:,.4f}"
            value_str = f"${token['value']:,.2f}"
            change_str = f"{token['change_24h']:+.2f}%" if token['change_24h'] is not None and token['change_24h'] != 0 else "N/A"
            source_str = token.get('price_source', 'coingecko')[:12]

            print(f"{token['symbol']:<20} {balance_str:<15} {price_str:<15} {value_str:<12} {change_str:<10} {source_str:<12}")


In [50]:
await main()

INFO:__main__:Starting simplified portfolio analysis (1inch primary, CoinGecko fallback)...
INFO:__main__:Analyzing portfolio for 0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe using Etherscan API
INFO:__main__:Loaded 6 tokens from cache
INFO:__main__:Web3 connected. Latest block: 23117759
ERROR:__main__:Error getting ETH price from CoinGecko: HTTPSConnectionPool(host='pro-api.coingecko.com', port=443): Max retries exceeded with url: /api/v3/simple/price?ids=ethereum&vs_currencies=usd&include_24hr_change=true (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x000001453421EFB0>: Failed to resolve 'pro-api.coingecko.com' ([Errno 11002] getaddrinfo failed)"))


NameError: name 'get_token_balances_etherscan' is not defined