In [30]:
import os
import requests
from dotenv import load_dotenv
import sqlite3
import pandas as pd
import time
from typing import Dict, Any, List

In [31]:
load_dotenv()

API_KEY = os.getenv('WEBACY_API_KEY') # Fetching the Etherscan API key from environment variables

if not API_KEY: 
    raise ValueError('Please set the WEBACY_API_KEY environment variable.')

print("WEBACY_API_KEY loadeed sucessfully")

WEBACY_API_KEY loadeed sucessfully


In [71]:
# risk_engine_demo.py
from typing import Dict, Any, List, Tuple


# -----------------------------
# BaseRisk + Modules (as before)
# -----------------------------
class BaseRisk:
    def label(self, score: float) -> str:
        if 0 <= score <= 23:
            return "Low Risk"
        elif 23 < score <= 50:
            return "Medium Risk"
        elif 50 < score <= 100:
            return "High Risk"
        return "Unknown"


class GovernanceRisk(BaseRisk):
    def __init__(self, is_proxy: bool, access_control: bool, upgradeable_contract: bool):
        self.is_proxy = is_proxy
        self.access_control = access_control
        self.upgradeable_contract = upgradeable_contract
        self.weights = {"access_control": 0.5, "is_proxy": 0.4, "upgradeable_contract": 0.3}

    def score(self) -> float:
        score = (
            self.weights["access_control"] * int(self.access_control) +
            self.weights["is_proxy"] * int(self.is_proxy) +
            self.weights["upgradeable_contract"] * int(self.upgradeable_contract)
        )
        return round(score / sum(self.weights.values()) * 100, 2)


class LiquidityRisk(BaseRisk):
    def __init__(self, unlocked_liquidity: bool, lockedLiquidityPercent: float, creator_percent: float):
        self.unlocked_liquidity = unlocked_liquidity
        self.lockedLiquidityPercent = lockedLiquidityPercent
        self.creator_percent = creator_percent
        self.weights = {"unlocked_liquidity": 0.5, "lockedLiquidityPercent": 0.3, "creator_percent": 0.2}

    def score(self) -> float:
        unlocked_score = 100 if self.unlocked_liquidity else 0
        locked_score = max(0.0, min(100.0, 100 - (self.lockedLiquidityPercent or 0)))
        creator_score = max(0.0, min(100.0, self.creator_percent or 0))
        score = (
            self.weights["unlocked_liquidity"] * unlocked_score +
            self.weights["lockedLiquidityPercent"] * locked_score +
            self.weights["creator_percent"] * creator_score
        )
        return round(score / sum(self.weights.values()), 2)


class HolderRisk(BaseRisk):
    def __init__(self, percentageHeldByTop10: float):
        self.percentageHeldByTop10 = percentageHeldByTop10 or 0.0

    def score(self) -> float:
        return round(max(0.0, min(100.0, self.percentageHeldByTop10)), 2)


class TokenSecurityRisk(BaseRisk):
    def __init__(self, buy_tax: float, transfer_pausable: bool, is_blacklisted: bool, is_trusted: bool):
        self.buy_tax = buy_tax or 0.0
        self.transfer_pausable = transfer_pausable
        self.is_blacklisted = is_blacklisted
        self.is_trusted = is_trusted
        self.weights = {"buy_tax": 0.4, "transfer_pausable": 0.2, "is_blacklisted": 0.3, "is_trusted": 0.1}

    def score(self) -> float:
        tax_score = max(0.0, min(100.0, self.buy_tax))
        pausable_score = 100 if self.transfer_pausable else 0
        blacklist_score = 100 if self.is_blacklisted else 0
        trusted_score = 0 if self.is_trusted else 100
        score = (
            self.weights["buy_tax"] * tax_score +
            self.weights["transfer_pausable"] * pausable_score +
            self.weights["is_blacklisted"] * blacklist_score +
            self.weights["is_trusted"] * trusted_score
        )
        return round(score / sum(self.weights.values()), 2)


class MarketRisk(BaseRisk):
    def __init__(self, volatility: float, ath_change: float, atl_change: float, marketCapRank: int):
        self.volatility = volatility or 0.0
        self.ath_change = ath_change or 0.0
        self.atl_change = atl_change or 0.0
        self.marketCapRank = marketCapRank or 1000
        self.weights = {"volatility": 0.4, "ath_change": 0.2, "atl_change": 0.2, "marketCapRank": 0.2}

    def score(self) -> float:
        vol_score = max(0.0, min(100.0, abs(self.volatility)))
        ath_score = max(0.0, min(100.0, abs(self.ath_change)))
        atl_score = max(0.0, min(100.0, abs(self.atl_change)))
        # transform rank: lower rank = safer, scale so rank 1 => 0 risk contribution, rank 1000 => 100
        rank_score = max(0.0, min(100.0, (self.marketCapRank - 1) / max(1, (1000 - 1)) * 100))
        score = (
            self.weights["volatility"] * vol_score +
            self.weights["ath_change"] * ath_score +
            self.weights["atl_change"] * atl_score +
            self.weights["marketCapRank"] * rank_score
        )
        return round(score / sum(self.weights.values()), 2)


class FraudRisk(BaseRisk):
    def __init__(self, hacker: bool, drainer: bool, rugged: bool, fund_flows: bool):
        self.hacker = hacker
        self.drainer = drainer
        self.rugged = rugged
        self.fund_flows = fund_flows

    def score(self) -> float:
        score = (
            100 * int(bool(self.hacker)) +
            100 * int(bool(self.drainer)) +
            100 * int(bool(self.rugged)) +
            100 * int(bool(self.fund_flows))
        )
        return round(score / 4, 2)


class SanctionsRisk(BaseRisk):
    def __init__(self, ofac: bool, sanctioned: bool, dprk: bool):
        self.ofac = ofac
        self.sanctioned = sanctioned
        self.dprk = dprk

    def score(self) -> float:
        score = (
            100 * int(bool(self.ofac)) +
            100 * int(bool(self.sanctioned)) +
            100 * int(bool(self.dprk))
        )
        return round(score / 3, 2)


class MixerRisk(BaseRisk):
    def __init__(self, tornado: bool, mixer: bool, fundflow: bool):
        self.tornado = tornado
        self.mixer = mixer
        self.fundflow = fundflow

    def score(self) -> float:
        score = (
            100 * int(bool(self.tornado)) +
            100 * int(bool(self.mixer)) +
            100 * int(bool(self.fundflow))
        )
        return round(score / 3, 2)


class RiskEngine(BaseRisk):
    def __init__(self, modules: List[BaseRisk]):
        self.modules = modules

    def overall_score(self) -> float:
        scores = [m.score() for m in self.modules]
        return round(sum(scores) / len(scores), 2) if scores else 0.0

    def overall_risk(self) -> Tuple[float, str]:
        score = self.overall_score()
        return score, self.label(score)


# -----------------------------
# Helper: build engine from Webacy-like response
# -----------------------------
def _collect_issue_keys(response: Dict[str, Any]) -> List[str]:
    """Extract set of issue tag keys reported in 'issues' list."""
    keys = []
    for issue in response.get("issues", []):
        for tag in issue.get("tags", []):
            k = tag.get("key")
            if k:
                keys.append(k)
    return keys


def build_engine_from_webacy(response: Dict[str, Any]) -> Tuple[RiskEngine, Dict[str, BaseRisk]]:
    """
    Parse a Webacy-style response and build a RiskEngine and a dict of modules.
    Returns (engine, modules_dict).
    """
    # safety-first helpers
    issues_keys = set(_collect_issue_keys(response))
    details = response.get("details", {})
    token_risk = details.get("token_risk", {}) or {}
    token_meta = details.get("token_metadata_risk", {}) or {}
    market = details.get("marketData", {}) or details.get("marketData", {})
    ownership = details.get("marketData", {}).get("ownershipDistribution", {}) or details.get("ownershipDistribution", {}) or {}
    liquidity_entries = details.get("fund_flows", {}) or {}
    # direct flags from fund_flow risk
    fundflow_risk = details.get("fund_flow_risk", {}) or details.get("fund_flows", {}).get("risk", {}) or {}

    # Governance
    is_proxy = ("is_proxy" in issues_keys) or token_risk.get("is_proxy", False) or ("proxy" in issues_keys)
    access_control = bool(token_risk.get("access_control")) or ("access_control" in issues_keys)
    upgradeable_contract = ("upgradeable_contract" in issues_keys) or ("upgradeable" in issues_keys)

    gov = GovernanceRisk(is_proxy=is_proxy, access_control=access_control, upgradeable_contract=upgradeable_contract)

    # Liquidity
    # unlocked_liquidity tag may appear under issues; or check token-level data if available
    unlocked_liq_flag = ("unlocked-liquidity" in issues_keys) or ("unlocked_liquidity" in issues_keys)
    # lockedLiquidityPercent may be present at details['liquidityData'][i]['lockedLiquidityPercent'] or liquidity summary
    locked_pct = 0.0
    if isinstance(details.get("liquidityData"), list) and details.get("liquidityData"):
        # try to find an entry that explicitly has lockedLiquidityPercent
        found = False
        for entry in details.get("liquidityData", []):
            if entry.get("lockedLiquidityPercent") is not None:
                locked_pct = float(entry.get("lockedLiquidityPercent") or 0)
                found = True
                break
        if not found:
            # fallback 0
            locked_pct = 0.0
    else:
        locked_pct = float(details.get("lockedLiquidityPercent") or 0.0)

    # creator_percent may be under token_risk or ownership fields - fallback to 0
    creator_percent = float(details.get("creator_percent") or details.get("creatorPercent") or token_risk.get("creator_percent") or 0.0)
    liq = LiquidityRisk(unlocked_liquidity=unlocked_liq_flag, lockedLiquidityPercent=locked_pct, creator_percent=creator_percent)

    # Holder
    percent_top10 = 0.0
    # try ownershipDistribution first
    ownership_dist = details.get("marketData", {}).get("ownershipDistribution") or details.get("ownershipDistribution") or {}
    percent_top10 = float(ownership_dist.get("percentageHeldByTop10") or ownership_dist.get("percentageHeldByTop10", 0.0) or details.get("percentageHeldByTop10", 0.0))
    # fallback: any direct field
    percent_top10 = percent_top10 or float(details.get("percentageHeldByTop10") or 0.0)
    holder = HolderRisk(percentageHeldByTop10=percent_top10)

    # Token Security
    buy_tax = token_risk.get("buy_tax_percentage") or token_risk.get("buy_tax") or token_risk.get("buy_tax_percentage", 0.0) or 0.0
    transfer_pausable = ("transfer_pausable" in issues_keys) or bool(token_risk.get("transfer_pausable")) or ("transfer_pausable" in issues_keys)
    is_blacklisted = ("is_blacklisted" in issues_keys) or bool(token_risk.get("is_blacklisted"))
    is_trusted = bool(token_risk.get("is_trusted") or token_meta.get("is_trusted"))
    token = TokenSecurityRisk(buy_tax=float(buy_tax), transfer_pausable=transfer_pausable, is_blacklisted=is_blacklisted, is_trusted=is_trusted)

    # Market
    volatility = float(market.get("price_change_percentage_7d", 0) or market.get("price_change_percentage_7d_in_currency", 0) or 0.0)
    # fallback to sparkline or explicit 'volatility' tag
    if not volatility:
        volatility = float(response.get("volatility") or 0.0)
    ath_change = float(market.get("ath_change_percentage") or market.get("ath_change_percentage") or market.get("ath_change_percentage", 0.0) or 0.0)
    atl_change = float(market.get("atl_change_percentage") or market.get("atl_change_percentage") or 0.0)
    market_rank = int(market.get("market_cap_rank") or market.get("marketCapRank") or 1000)
    market_mod = MarketRisk(volatility=volatility, ath_change=ath_change, atl_change=atl_change, marketCapRank=market_rank)

    # Fraud
    # gather hacker/drainer/rugged/fundflow flags from fund_flow_risk or issues
    hacker_flag = fundflow_risk.get("hacker") if isinstance(fundflow_risk, dict) else False
    drainer_flag = fundflow_risk.get("drainer") if isinstance(fundflow_risk, dict) else False
    # generic rugged detection: check for tags like "*_rugged" in issues keys
    rugged_flag = any(k.endswith("_rugged") or "rugged" in k for k in issues_keys)
    fund_flows_flag = any(bool(v) for v in fundflow_risk.values()) if isinstance(fundflow_risk, dict) else False

    fraud = FraudRisk(hacker=bool(hacker_flag), drainer=bool(drainer_flag), rugged=bool(rugged_flag), fund_flows=bool(fund_flows_flag))

    # Sanctions
    ofac = details.get("address_info", {}).get("ofac_sanctioned") or fundflow_risk.get("ofac") or False
    sanctioned = details.get("address_info", {}).get("sanctioned") or ("sanctioned" in issues_keys) or False
    dprk = details.get("address_info", {}).get("dprk") or False
    sanctions = SanctionsRisk(ofac=bool(ofac), sanctioned=bool(sanctioned), dprk=bool(dprk))

    # Mixers
    tornado_flag = fundflow_risk.get("tornado") or ("tornado" in issues_keys) or False
    mixer_flag = any(k == "mixer" or "mixer" in k for k in issues_keys)
    fundflow_tag_flag = any(k.startswith("fundflow") or "_fundflow_" in k for k in issues_keys)
    mixers = MixerRisk(tornado=bool(tornado_flag), mixer=bool(mixer_flag), fundflow=bool(fundflow_tag_flag or fund_flows_flag))

    # Choose modules relevant for the entity (we include all; caller can pick subset)
    modules = {
        "governance": gov,
        "liquidity": liq,
        "holder": holder,
        "token_security": token,
        "market": market_mod,
        "fraud": fraud,
        "sanctions": sanctions,
        "mixers": mixers
    }

    # Build engine with all modules (caller can reduce)
    engine = RiskEngine(list(modules.values()))
    return engine, modules


# -----------------------------
# Main / Tester
# -----------------------------
def main(address: str, webacy_response: Dict[str, Any]) -> None:
    """
    Build risk engine from the given webacy_response and print module + overall results.
    address: the address being assessed (string)
    webacy_response: the JSON-like dict response from Webacy API for that address
    """
    engine, modules = build_engine_from_webacy(webacy_response)

    print(f"=== Risk assessment for {address} ===")
    for name, mod in modules.items():
        try:
            s = mod.score()
            lbl = mod.label(s)
        except Exception:
            s = None
            lbl = "Error"
        print(f"- {name:15s} | score: {s:6} | label: {lbl}")
    overall_score, overall_label = engine.overall_risk()
    print(f"\nOverall: {overall_score} → {overall_label}")


# -----------------------------
# CLI-like runner
# -----------------------------
if __name__ == "__main__":
    # Example usage:
    address = "0xdAC17F958D2ee523a2206206994597C13D831ec7"

    # Replace the following with the actual Webacy response dict (from earlier in this chat).
    # For demonstration I create a small mock; replace it with your real `webacy_response` var.
    webacy_response = {
        "issues": [
            {"tags": [{"key": "is_proxy"}, {"key": "unlocked-liquidity"}]},
            {"tags": [{"key": "top-10-holders-own-40-percent"}]}
        ],
        "details": {
            "token_risk": {"is_trusted": False, "buy_tax_percentage": 12, "access_control": {"interfaceType": "Ownable"}},
            "marketData": {"price_change_percentage_7d": 70, "ath_change_percentage": -60, "atl_change_percentage": 40, "market_cap_rank": 500,
                           "ownershipDistribution": {"percentageHeldByTop10": 45}},
            "liquidityData": [{"lockedLiquidityPercent": 10}],
            "fund_flow_risk": {"ofac": False, "tornado": True, "hacker": False},
            "address_info": {"ofac_sanctioned": False},
        }
    }

    main(address, webacy_response)


=== Risk assessment for 0xdAC17F958D2ee523a2206206994597C13D831ec7 ===
- governance      | score:   75.0 | label: High Risk
- liquidity       | score:   77.0 | label: High Risk
- holder          | score:   45.0 | label: Medium Risk
- token_security  | score:   14.8 | label: Low Risk
- market          | score:  57.99 | label: High Risk
- fraud           | score:   25.0 | label: Medium Risk
- sanctions       | score:    0.0 | label: Low Risk
- mixers          | score:  66.67 | label: High Risk

Overall: 45.18 → Medium Risk


In [73]:
# risk_engine_demo.py
from typing import Dict, Any, List, Tuple


# -----------------------------
# BaseRisk + Modules
# -----------------------------
class BaseRisk:
    def label(self, score: float) -> str:
        if 0 <= score <= 23:
            return "Low Risk"
        elif 23 < score <= 50:
            return "Medium Risk"
        elif 50 < score <= 100:
            return "High Risk"
        return "Unknown"


class GovernanceRisk(BaseRisk):
    def __init__(self, is_proxy: bool, access_control: bool, upgradeable_contract: bool):
        self.is_proxy = is_proxy
        self.access_control = access_control
        self.upgradeable_contract = upgradeable_contract
        self.weights = {"access_control": 0.5, "is_proxy": 0.4, "upgradeable_contract": 0.3}

    def score(self) -> float:
        score = (
            self.weights["access_control"] * int(self.access_control) +
            self.weights["is_proxy"] * int(self.is_proxy) +
            self.weights["upgradeable_contract"] * int(self.upgradeable_contract)
        )
        return round(score / sum(self.weights.values()) * 100, 2)


class LiquidityRisk(BaseRisk):
    def __init__(self, unlocked_liquidity: bool, lockedLiquidityPercent: float, creator_percent: float):
        self.unlocked_liquidity = unlocked_liquidity
        self.lockedLiquidityPercent = lockedLiquidityPercent
        self.creator_percent = creator_percent
        self.weights = {"unlocked_liquidity": 0.5, "lockedLiquidityPercent": 0.3, "creator_percent": 0.2}

    def score(self) -> float:
        unlocked_score = 100 if self.unlocked_liquidity else 0
        locked_score = max(0.0, min(100.0, 100 - (self.lockedLiquidityPercent or 0)))
        creator_score = max(0.0, min(100.0, self.creator_percent or 0))
        score = (
            self.weights["unlocked_liquidity"] * unlocked_score +
            self.weights["lockedLiquidityPercent"] * locked_score +
            self.weights["creator_percent"] * creator_score
        )
        return round(score / sum(self.weights.values()), 2)


class HolderRisk(BaseRisk):
    def __init__(self, percentageHeldByTop10: float):
        self.percentageHeldByTop10 = percentageHeldByTop10 or 0.0

    def score(self) -> float:
        return round(max(0.0, min(100.0, self.percentageHeldByTop10)), 2)


class TokenSecurityRisk(BaseRisk):
    def __init__(self, buy_tax: float, transfer_pausable: bool, is_blacklisted: bool, is_trusted: bool):
        self.buy_tax = buy_tax or 0.0
        self.transfer_pausable = transfer_pausable
        self.is_blacklisted = is_blacklisted
        self.is_trusted = is_trusted
        self.weights = {"buy_tax": 0.4, "transfer_pausable": 0.2, "is_blacklisted": 0.3, "is_trusted": 0.1}

    def score(self) -> float:
        tax_score = max(0.0, min(100.0, self.buy_tax))
        pausable_score = 100 if self.transfer_pausable else 0
        blacklist_score = 100 if self.is_blacklisted else 0
        trusted_score = 0 if self.is_trusted else 100
        score = (
            self.weights["buy_tax"] * tax_score +
            self.weights["transfer_pausable"] * pausable_score +
            self.weights["is_blacklisted"] * blacklist_score +
            self.weights["is_trusted"] * trusted_score
        )
        return round(score / sum(self.weights.values()), 2)


class MarketRisk(BaseRisk):
    def __init__(self, volatility: float, ath_change: float, atl_change: float, marketCapRank: int):
        self.volatility = volatility or 0.0
        self.ath_change = ath_change or 0.0
        self.atl_change = atl_change or 0.0
        self.marketCapRank = marketCapRank or 1000
        self.weights = {"volatility": 0.4, "ath_change": 0.2, "atl_change": 0.2, "marketCapRank": 0.2}

    def score(self) -> float:
        vol_score = max(0.0, min(100.0, abs(self.volatility)))
        ath_score = max(0.0, min(100.0, abs(self.ath_change)))
        atl_score = max(0.0, min(100.0, abs(self.atl_change)))
        rank_score = max(0.0, min(100.0, (self.marketCapRank - 1) / max(1, (1000 - 1)) * 100))
        score = (
            self.weights["volatility"] * vol_score +
            self.weights["ath_change"] * ath_score +
            self.weights["atl_change"] * atl_score +
            self.weights["marketCapRank"] * rank_score
        )
        return round(score / sum(self.weights.values()), 2)


class FraudRisk(BaseRisk):
    def __init__(self, hacker: bool, drainer: bool, rugged: bool, fund_flows: bool):
        self.hacker = hacker
        self.drainer = drainer
        self.rugged = rugged
        self.fund_flows = fund_flows

    def score(self) -> float:
        score = (
            100 * int(bool(self.hacker)) +
            100 * int(bool(self.drainer)) +
            100 * int(bool(self.rugged)) +
            100 * int(bool(self.fund_flows))
        )
        return round(score / 4, 2)


class SanctionsRisk(BaseRisk):
    def __init__(self, ofac: bool, sanctioned: bool, dprk: bool):
        self.ofac = ofac
        self.sanctioned = sanctioned
        self.dprk = dprk

    def score(self) -> float:
        score = (
            100 * int(bool(self.ofac)) +
            100 * int(bool(self.sanctioned)) +
            100 * int(bool(self.dprk))
        )
        return round(score / 3, 2)


class MixerRisk(BaseRisk):
    def __init__(self, tornado: bool, mixer: bool, fundflow: bool):
        self.tornado = tornado
        self.mixer = mixer
        self.fundflow = fundflow

    def score(self) -> float:
        score = (
            100 * int(bool(self.tornado)) +
            100 * int(bool(self.mixer)) +
            100 * int(bool(self.fundflow))
        )
        return round(score / 3, 2)


class RiskEngine(BaseRisk):
    def __init__(self, modules: List[BaseRisk]):
        self.modules = modules

    def overall_score(self) -> float:
        scores = [m.score() for m in self.modules]
        return round(sum(scores) / len(scores), 2) if scores else 0.0

    def overall_risk(self) -> Tuple[float, str]:
        score = self.overall_score()
        return score, self.label(score)


# -----------------------------
# Build Engine from Webacy-like Response
# -----------------------------
def build_engine_from_webacy(response: Dict[str, Any]) -> Tuple[RiskEngine, Dict[str, BaseRisk]]:
    issues_keys = set()
    for issue in response.get("issues", []):
        for tag in issue.get("tags", []):
            if "key" in tag:
                issues_keys.add(tag["key"])

    details = response.get("details", {})
    token_risk = details.get("token_risk", {}) or {}
    token_meta = details.get("token_metadata_risk", {}) or {}
    market = details.get("marketData", {}) or {}
    ownership = market.get("ownershipDistribution", {}) or {}

    # Governance
    gov = GovernanceRisk(
        is_proxy=("is_proxy" in issues_keys),
        access_control=bool(token_risk.get("access_control")),
        upgradeable_contract=("upgradeable_contract" in issues_keys),
    )

    # Liquidity
    liq = LiquidityRisk(
        unlocked_liquidity=("unlocked-liquidity" in issues_keys),
        lockedLiquidityPercent=float(details.get("lockedLiquidityPercent") or 10),
        creator_percent=float(details.get("creator_percent") or 30),
    )

    # Holder
    holder = HolderRisk(percentageHeldByTop10=float(ownership.get("percentageHeldByTop10") or 45))

    # Token Security
    token = TokenSecurityRisk(
        buy_tax=float(token_risk.get("buy_tax_percentage") or 12),
        transfer_pausable=("transfer_pausable" in issues_keys),
        is_blacklisted=("is_blacklisted" in issues_keys),
        is_trusted=bool(token_risk.get("is_trusted")),
    )

    # Market
    market_mod = MarketRisk(
        volatility=float(market.get("price_change_percentage_7d") or 70),
        ath_change=float(market.get("ath_change_percentage") or -60),
        atl_change=float(market.get("atl_change_percentage") or 40),
        marketCapRank=int(market.get("market_cap_rank") or 500),
    )

    # Fraud
    fraud = FraudRisk(hacker=False, drainer=False, rugged=("rugged" in issues_keys), fund_flows=True)

    # Sanctions
    sanctions = SanctionsRisk(ofac=False, sanctioned=False, dprk=False)

    # Mixers
    mixers = MixerRisk(tornado=("tornado" in issues_keys), mixer=("mixer" in issues_keys), fundflow=False)

    modules = {
        "governance": gov,
        "liquidity": liq,
        "holder": holder,
        "token_security": token,
        "market": market_mod,
        "fraud": fraud,
        "sanctions": sanctions,
        "mixers": mixers,
    }

    engine = RiskEngine(list(modules.values()))
    return engine, modules


# -----------------------------
# Main / Tester
# -----------------------------
def main(address: str, webacy_response: Dict[str, Any]) -> None:
    engine, modules = build_engine_from_webacy(webacy_response)

    details = webacy_response.get("details", {})
    token_meta = details.get("token_metadata_risk", {}) or {}
    market = details.get("marketData", {}) or {}

    symbol = token_meta.get("symbol") or market.get("symbol") or "N/A"
    name = token_meta.get("name") or market.get("name") or "Unknown Token"
    market_cap = market.get("market_cap") or "?"
    price = market.get("current_price") or market.get("price") or "?"

    # Header
    print("=" * 70)
    print(f"Risk Assessment Report for {address}")
    print("-" * 70)
    print(f"Token: {name} ({symbol})")
    print(f"Market Cap: {market_cap}")
    print(f"Price: {price}")
    print("=" * 70)

    # Module Breakdown
    scores = {}
    for module_name, mod in modules.items():
        s = mod.score()
        lbl = mod.label(s)
        scores[module_name] = s
        print(f"- {module_name:15s} | score: {s:6} | label: {lbl}")

    # Overall
    overall_score, overall_label = engine.overall_risk()
    print("=" * 70)
    print(f"Overall Risk: {overall_score} → {overall_label}")

    # Top contributors
    top_modules = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:3]
    print("-" * 70)
    print("Top Risk Contributors:")
    for mod_name, score in top_modules:
        print(f"• {mod_name:15s} → {score}")
    print("=" * 70)


# -----------------------------
# Example Run
# -----------------------------
if __name__ == "__main__":
    address = "0xdAC17F958D2ee523a2206206994597C13D831ec7"
    mock_response = {
        "issues": [
            {"tags": [{"key": "is_proxy"}, {"key": "unlocked-liquidity"}]},
            {"tags": [{"key": "top-10-holders-own-40-percent"}]},
        ],
        "details": {
            "token_risk": {"is_trusted": False, "buy_tax_percentage": 12, "access_control": {"interfaceType": "Ownable"}},
            "marketData": {
                "symbol": "USDT",
                "name": "Tether USD",
                "price_change_percentage_7d": 70,
                "ath_change_percentage": -60,
                "atl_change_percentage": 40,
                "market_cap_rank": 500,
                "market_cap": 83456789000,
                "current_price": 1.00,
                "ownershipDistribution": {"percentageHeldByTop10": 45},
            },
            "liquidityData": [{"lockedLiquidityPercent": 10}],
        },
    }
    main(address, mock_response)


Risk Assessment Report for 0xdAC17F958D2ee523a2206206994597C13D831ec7
----------------------------------------------------------------------
Token: Tether USD (USDT)
Market Cap: 83456789000
Price: 1.0
- governance      | score:   75.0 | label: High Risk
- liquidity       | score:   83.0 | label: High Risk
- holder          | score:   45.0 | label: Medium Risk
- token_security  | score:   14.8 | label: Low Risk
- market          | score:  57.99 | label: High Risk
- fraud           | score:   25.0 | label: Medium Risk
- sanctions       | score:    0.0 | label: Low Risk
- mixers          | score:    0.0 | label: Low Risk
Overall Risk: 37.6 → Medium Risk
----------------------------------------------------------------------
Top Risk Contributors:
• liquidity       → 83.0
• governance      → 75.0
• market          → 57.99


In [74]:
# risk_engine_report.py
from typing import Dict, Any, List, Tuple


# -----------------------------
# BaseRisk + Modules
# -----------------------------
class BaseRisk:
    def label(self, score: float) -> str:
        if 0 <= score <= 23:
            return "Low Risk"
        elif 23 < score <= 50:
            return "Medium Risk"
        elif 50 < score <= 100:
            return "High Risk"
        return "Unknown"

    def explain(self) -> str:
        return "No explanation available."


class GovernanceRisk(BaseRisk):
    def __init__(self, is_proxy: bool, access_control: bool, upgradeable_contract: bool):
        self.is_proxy = is_proxy
        self.access_control = access_control
        self.upgradeable_contract = upgradeable_contract
        self.weights = {"access_control": 0.5, "is_proxy": 0.4, "upgradeable_contract": 0.3}

    def score(self) -> float:
        score = (
            self.weights["access_control"] * int(self.access_control) +
            self.weights["is_proxy"] * int(self.is_proxy) +
            self.weights["upgradeable_contract"] * int(self.upgradeable_contract)
        )
        return round(score / sum(self.weights.values()) * 100, 2)

    def explain(self) -> str:
        reasons = []
        if self.access_control:
            reasons.append("Contract ownership is controlled (centralized access).")
        if self.is_proxy:
            reasons.append("Contract uses a proxy pattern, meaning it can be upgraded.")
        if self.upgradeable_contract:
            reasons.append("Contract is upgradeable.")
        if not reasons:
            reasons.append("No major governance centralization risks found.")
        return " ".join(reasons)


class LiquidityRisk(BaseRisk):
    def __init__(self, unlocked_liquidity: bool, lockedLiquidityPercent: float, creator_percent: float):
        self.unlocked_liquidity = unlocked_liquidity
        self.lockedLiquidityPercent = lockedLiquidityPercent
        self.creator_percent = creator_percent
        self.weights = {"unlocked_liquidity": 0.5, "lockedLiquidityPercent": 0.3, "creator_percent": 0.2}

    def score(self) -> float:
        unlocked_score = 100 if self.unlocked_liquidity else 0
        locked_score = max(0.0, min(100.0, 100 - (self.lockedLiquidityPercent or 0)))
        creator_score = max(0.0, min(100.0, self.creator_percent or 0))
        score = (
            self.weights["unlocked_liquidity"] * unlocked_score +
            self.weights["lockedLiquidityPercent"] * locked_score +
            self.weights["creator_percent"] * creator_score
        )
        return round(score / sum(self.weights.values()), 2)

    def explain(self) -> str:
        reasons = []
        if self.unlocked_liquidity:
            reasons.append("Liquidity is unlocked, posing exit risk.")
        reasons.append(f"{self.lockedLiquidityPercent}% of liquidity is locked.")
        reasons.append(f"Creator controls {self.creator_percent}% of liquidity pool.")
        return " ".join(reasons)


class HolderRisk(BaseRisk):
    def __init__(self, percentageHeldByTop10: float):
        self.percentageHeldByTop10 = percentageHeldByTop10 or 0.0

    def score(self) -> float:
        return round(max(0.0, min(100.0, self.percentageHeldByTop10)), 2)

    def explain(self) -> str:
        return f"Top 10 wallets hold {self.percentageHeldByTop10}% of total supply."


class TokenSecurityRisk(BaseRisk):
    def __init__(self, buy_tax: float, transfer_pausable: bool, is_blacklisted: bool, is_trusted: bool):
        self.buy_tax = buy_tax or 0.0
        self.transfer_pausable = transfer_pausable
        self.is_blacklisted = is_blacklisted
        self.is_trusted = is_trusted
        self.weights = {"buy_tax": 0.4, "transfer_pausable": 0.2, "is_blacklisted": 0.3, "is_trusted": 0.1}

    def score(self) -> float:
        tax_score = max(0.0, min(100.0, self.buy_tax))
        pausable_score = 100 if self.transfer_pausable else 0
        blacklist_score = 100 if self.is_blacklisted else 0
        trusted_score = 0 if self.is_trusted else 100
        score = (
            self.weights["buy_tax"] * tax_score +
            self.weights["transfer_pausable"] * pausable_score +
            self.weights["is_blacklisted"] * blacklist_score +
            self.weights["is_trusted"] * trusted_score
        )
        return round(score / sum(self.weights.values()), 2)

    def explain(self) -> str:
        reasons = [f"Buy tax: {self.buy_tax}%."]
        if self.transfer_pausable:
            reasons.append("Token transfers can be paused.")
        if self.is_blacklisted:
            reasons.append("Token has blacklisting enabled.")
        if self.is_trusted:
            reasons.append("Token is flagged as trusted.")
        return " ".join(reasons)


class MarketRisk(BaseRisk):
    def __init__(self, volatility: float, ath_change: float, atl_change: float, marketCapRank: int):
        self.volatility = volatility or 0.0
        self.ath_change = ath_change or 0.0
        self.atl_change = atl_change or 0.0
        self.marketCapRank = marketCapRank or 1000
        self.weights = {"volatility": 0.4, "ath_change": 0.2, "atl_change": 0.2, "marketCapRank": 0.2}

    def score(self) -> float:
        vol_score = max(0.0, min(100.0, abs(self.volatility)))
        ath_score = max(0.0, min(100.0, abs(self.ath_change)))
        atl_score = max(0.0, min(100.0, abs(self.atl_change)))
        rank_score = max(0.0, min(100.0, (self.marketCapRank - 1) / max(1, (1000 - 1)) * 100))
        score = (
            self.weights["volatility"] * vol_score +
            self.weights["ath_change"] * ath_score +
            self.weights["atl_change"] * atl_score +
            self.weights["marketCapRank"] * rank_score
        )
        return round(score / sum(self.weights.values()), 2)

    def explain(self) -> str:
        return (f"7d volatility: {self.volatility}%. "
                f"ATH change: {self.ath_change}%. "
                f"ATL change: {self.atl_change}%. "
                f"Market cap rank: {self.marketCapRank}.")


class FraudRisk(BaseRisk):
    def __init__(self, hacker: bool, drainer: bool, rugged: bool, fund_flows: bool):
        self.hacker = hacker
        self.drainer = drainer
        self.rugged = rugged
        self.fund_flows = fund_flows

    def score(self) -> float:
        score = (
            100 * int(bool(self.hacker)) +
            100 * int(bool(self.drainer)) +
            100 * int(bool(self.rugged)) +
            100 * int(bool(self.fund_flows))
        )
        return round(score / 4, 2)

    def explain(self) -> str:
        reasons = []
        if self.hacker:
            reasons.append("Address is flagged as hacker.")
        if self.drainer:
            reasons.append("Address is linked to drainer activity.")
        if self.rugged:
            reasons.append("Project has been rugged.")
        if self.fund_flows:
            reasons.append("Suspicious fund flows detected.")
        if not reasons:
            reasons.append("No fraud indicators found.")
        return " ".join(reasons)


class SanctionsRisk(BaseRisk):
    def __init__(self, ofac: bool, sanctioned: bool, dprk: bool):
        self.ofac = ofac
        self.sanctioned = sanctioned
        self.dprk = dprk

    def score(self) -> float:
        score = (
            100 * int(bool(self.ofac)) +
            100 * int(bool(self.sanctioned)) +
            100 * int(bool(self.dprk))
        )
        return round(score / 3, 2)

    def explain(self) -> str:
        if self.ofac or self.sanctioned or self.dprk:
            return "Address is linked to sanctions lists (OFAC/DPRK/etc)."
        return "No sanction flags detected."


class MixerRisk(BaseRisk):
    def __init__(self, tornado: bool, mixer: bool, fundflow: bool):
        self.tornado = tornado
        self.mixer = mixer
        self.fundflow = fundflow

    def score(self) -> float:
        score = (
            100 * int(bool(self.tornado)) +
            100 * int(bool(self.mixer)) +
            100 * int(bool(self.fundflow))
        )
        return round(score / 3, 2)

    def explain(self) -> str:
        reasons = []
        if self.tornado:
            reasons.append("Involvement with Tornado Cash.")
        if self.mixer:
            reasons.append("Mixer activity detected.")
        if self.fundflow:
            reasons.append("Obfuscated fund flows detected.")
        if not reasons:
            reasons.append("No mixer involvement found.")
        return " ".join(reasons)


class RiskEngine(BaseRisk):
    def __init__(self, modules: List[BaseRisk]):
        self.modules = modules

    def overall_score(self) -> float:
        scores = [m.score() for m in self.modules]
        return round(sum(scores) / len(scores), 2) if scores else 0.0

    def overall_risk(self) -> Tuple[float, str]:
        score = self.overall_score()
        return score, self.label(score)


# -----------------------------
# Build Engine from Webacy-like Response
# -----------------------------
def build_engine_from_webacy(response: Dict[str, Any]) -> Tuple[RiskEngine, Dict[str, BaseRisk]]:
    issues_keys = set()
    for issue in response.get("issues", []):
        for tag in issue.get("tags", []):
            if "key" in tag:
                issues_keys.add(tag["key"])

    details = response.get("details", {})
    token_risk = details.get("token_risk", {}) or {}
    market = details.get("marketData", {}) or {}
    ownership = market.get("ownershipDistribution", {}) or {}

    modules = {
        "governance": GovernanceRisk("is_proxy" in issues_keys,
                                     bool(token_risk.get("access_control")),
                                     "upgradeable_contract" in issues_keys),
        "liquidity": LiquidityRisk("unlocked-liquidity" in issues_keys,
                                   float(details.get("lockedLiquidityPercent") or 10),
                                   float(details.get("creator_percent") or 30)),
        "holder": HolderRisk(float(ownership.get("percentageHeldByTop10") or 45)),
        "token_security": TokenSecurityRisk(float(token_risk.get("buy_tax_percentage") or 12),
                                            "transfer_pausable" in issues_keys,
                                            "is_blacklisted" in issues_keys,
                                            bool(token_risk.get("is_trusted"))),
        "market": MarketRisk(float(market.get("price_change_percentage_7d") or 70),
                             float(market.get("ath_change_percentage") or -60),
                             float(market.get("atl_change_percentage") or 40),
                             int(market.get("market_cap_rank") or 500)),
        "fraud": FraudRisk(False, False, "rugged" in issues_keys, True),
        "sanctions": SanctionsRisk(False, False, False),
        "mixers": MixerRisk("tornado" in issues_keys, "mixer" in issues_keys, False),
    }

    engine = RiskEngine(list(modules.values()))
    return engine, modules


# -----------------------------
# Main / Reporter
# -----------------------------
def main(address: str, webacy_response: Dict[str, Any]) -> None:
    engine, modules = build_engine_from_webacy(webacy_response)

    details = webacy_response.get("details", {})
    token_meta = details.get("token_metadata_risk", {}) or {}
    market = details.get("marketData", {}) or {}

    symbol = token_meta.get("symbol") or market.get("symbol") or "N/A"
    name = token_meta.get("name") or market.get("name") or "Unknown Token"
    market_cap = market.get("market_cap") or "?"
    price = market.get("current_price") or market.get("price") or "?"

    # Header
    print("=" * 70)
    print(f"Risk Assessment Report for {address}")
    print("-" * 70)
    print(f"Token: {name} ({symbol})")
    print(f"Market Cap: {market_cap}")
    print(f"Price: {price}")
    print("=" * 70)

    # Module Breakdown with explanations
    scores = {}
    for module_name, mod in modules.items():
        s = mod.score()
        lbl = mod.label(s)
        scores[module_name] = s
        print(f"- {module_name:15s} | score: {s:6} | label: {lbl}")
        print(f"    ↳ {mod.explain()}")

    # Overall
    overall_score, overall_label = engine.overall_risk()
    print("=" * 70)
    print(f"Overall Risk: {overall_score} → {overall_label}")

    # Top contributors
    top_modules = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:3]
    print("-" * 70)
    print("Top Risk Contributors:")
    for mod_name, score in top_modules:
        print(f"• {mod_name:15s} → {score}")
    print("=" * 70)


# -----------------------------
# Example Run
# -----------------------------
if __name__ == "__main__":
    address = "0xdAC17F958D2ee523a2206206994597C13D831ec7"
    mock_response = {
        "issues": [
            {"tags": [{"key": "is_proxy"}, {"key": "unlocked-liquidity"}]},
            {"tags": [{"key": "top-10-holders-own-40-percent"}]},
        ],
        "details": {
            "token_risk": {"is_trusted": False, "buy_tax_percentage": 12, "access_control": {"interfaceType": "Ownable"}},
            "marketData": {
                "symbol": "USDT",
                "name": "Tether USD",
                "price_change_percentage_7d": 70,
                "ath_change_percentage": -60,
                "atl_change_percentage": 40,
                "market_cap_rank": 500,
                "market_cap": 83456789000,
                "current_price": 1.00,
                "ownershipDistribution": {"percentageHeldByTop10": 45},
            },
            "liquidityData": [{"lockedLiquidityPercent": 10}],
        },
    }
    main(address, mock_response)


Risk Assessment Report for 0xdAC17F958D2ee523a2206206994597C13D831ec7
----------------------------------------------------------------------
Token: Tether USD (USDT)
Market Cap: 83456789000
Price: 1.0
- governance      | score:   75.0 | label: High Risk
    ↳ Contract ownership is controlled (centralized access). Contract uses a proxy pattern, meaning it can be upgraded.
- liquidity       | score:   83.0 | label: High Risk
    ↳ Liquidity is unlocked, posing exit risk. 10.0% of liquidity is locked. Creator controls 30.0% of liquidity pool.
- holder          | score:   45.0 | label: Medium Risk
    ↳ Top 10 wallets hold 45.0% of total supply.
- token_security  | score:   14.8 | label: Low Risk
    ↳ Buy tax: 12.0%.
- market          | score:  57.99 | label: High Risk
    ↳ 7d volatility: 70.0%. ATH change: -60.0%. ATL change: 40.0%. Market cap rank: 500.
- fraud           | score:   25.0 | label: Medium Risk
    ↳ Suspicious fund flows detected.
- sanctions       | score:    0.0 | label

In [4]:
address = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48" # ERC 20 USDC address

url = f"https://api.webacy.com/addresses/{address}"

headers = {
    "accept": "application/json",
    "x-api-key": API_KEY
}

response = requests.get(url, headers=headers)

try:
    data = response.json()
except ValueError:
    data = response.text

data

{'count': 6,
 'medium': 0,
 'high': 0,
 'overallRisk': 10.488088481701514,
 'issues': [{'score': 0.21999999999999997,
   'tags': [{'name': 'Verified Listing',
     'description': 'This asset represents an established project with proven authenticity or reputation, and has been verified as authentic. This does not necessarily mean the asset is free of risks or issues.',
     'type': 'tokenRisk',
     'severity': 0,
     'key': 'verified_listing'},
    {'name': 'Has Been Sniped',
     'description': 'Token shows signs of sniper activity, where automated tools or bots were used to purchase significant amounts of supply within seconds/minutes of minting. This often indicates coordinated buying activity and potential for future price manipulation.',
     'type': 'tokenRisk',
     'severity': 0,
     'key': 'has_been_sniped'},
    {'name': 'Proxy',
     'description': 'Proxy contracts enable upgradability, meaning the logic or implementation contract that a proxy points to can be changed. Th

In [None]:
# Flatten nested JSON into a single row DataFrame
df = pd.json_normalize(data, sep="_")
df

Unnamed: 0,count,medium,high,overallRisk,issues,isContract,details_fund_flows_risk_ofac,details_fund_flows_risk_hacker,details_fund_flows_risk_mixers,details_fund_flows_risk_drainer,...,details_marketData_holderAnalysis_top_10_holders_analysis_topHolders,details_marketData_holderAnalysis_top_10_holders_analysis_totalSupply,details_marketData_holderAnalysis_top_10_holders_analysis_percentageHeldByTop5,details_marketData_holderAnalysis_top_10_holders_analysis_percentageHeldByTop10,details_marketData_holderAnalysis_top_10_holders_analysis_percentageHeldByTop20,details_marketData_holderAnalysis_dev_launched_tokens_in_24_hours,details_access_control_interfaceType,details_access_control_activeRoleHolders,details_buy_sell_taxes_has_buy_tax,details_buy_sell_taxes_has_sell_tax
0,6,0,0,10.488088,"[{'score': 0.21999999999999997, 'tags': [{'nam...",True,False,False,False,False,...,"[{'amount': '2184516156.3047490120', 'percenta...",49250149468.18405,17.608605,27.363271,36.012211,0,Ownable,"[{'name': 'OWNER_ROLE', 'role': 'owner', 'acco...",False,False


In [None]:
# Expand issues + tags in one go
df_issues = (
    pd.json_normalize(df["issues"][0], sep="_")        # expand issues
    .explode("tags")                                  # one row per tag
    .reset_index(drop=True)
    .pipe(lambda d: d.drop(columns=["tags"])          # drop old tags
          .join(pd.json_normalize(d["tags"]), rsuffix="_tag"))  # expand tags
)

df_issues.head()

Unnamed: 0,score,riskScore,categories_contract_issues_key,categories_contract_issues_name,categories_contract_issues_gradedDescription_high,categories_contract_issues_gradedDescription_medium,categories_contract_issues_gradedDescription_low,categories_contract_issues_tags_is_proxy,categories_governance_issues_key,categories_governance_issues_name,categories_governance_issues_description,categories_governance_issues_tags_access_control,name,description,type,severity,key
0,0.22,Low Risk,contract_issues,Contract Exploits,"Our detectors have found, with high certainty,...",Our detectors have found some logic in this sm...,Our detectors have found issues with this smar...,True,governance_issues,Contract Governance,Suspect logic in the smart contract for this a...,True,Verified Listing,This asset represents an established project w...,tokenRisk,0.0,verified_listing
1,0.22,Low Risk,contract_issues,Contract Exploits,"Our detectors have found, with high certainty,...",Our detectors have found some logic in this sm...,Our detectors have found issues with this smar...,True,governance_issues,Contract Governance,Suspect logic in the smart contract for this a...,True,Has Been Sniped,"Token shows signs of sniper activity, where au...",tokenRisk,0.0,has_been_sniped
2,0.22,Low Risk,contract_issues,Contract Exploits,"Our detectors have found, with high certainty,...",Our detectors have found some logic in this sm...,Our detectors have found issues with this smar...,True,governance_issues,Contract Governance,Suspect logic in the smart contract for this a...,True,Proxy,"Proxy contracts enable upgradability, meaning ...",tokenRisk,0.2,is_proxy
3,0.22,Low Risk,contract_issues,Contract Exploits,"Our detectors have found, with high certainty,...",Our detectors have found some logic in this sm...,Our detectors have found issues with this smar...,True,governance_issues,Contract Governance,Suspect logic in the smart contract for this a...,True,Access Control,"The token has users with special privileges, s...",tokenRisk,0.1,access_control
4,0.22,Low Risk,contract_issues,Contract Exploits,"Our detectors have found, with high certainty,...",Our detectors have found some logic in this sm...,Our detectors have found issues with this smar...,True,governance_issues,Contract Governance,Suspect logic in the smart contract for this a...,True,Unlocked Liquidity,Liquidity is unlocked and can be withdrawn by ...,tokenRisk,0.04,unlocked-liquidity


In [54]:
data["details"].keys()

dict_keys(['fund_flows', 'address_info', 'token_risk', 'token_metadata_risk', 'token_info', 'marketData', 'access_control', 'buy_sell_taxes'])

In [61]:
df_buy_sell_taxes = pd.json_normalize(data["details"]["buy_sell_taxes"], sep="_")
df_buy_sell_taxes

Unnamed: 0,has_buy_tax,has_sell_tax
0,False,False


In [55]:
df_fund_flows = pd.json_normalize(data["details"]["fund_flows"], sep="_")
df_fund_flows

Unnamed: 0,flows,label,risk_ofac,risk_hacker,risk_mixers,risk_drainer,risk_fbi_ic3,risk_tornado,accounts_0x0018aeCE08f42798Ad9F7c6919785B16929bca2f_type,accounts_0x0018aeCE08f42798Ad9F7c6919785B16929bca2f_label,...,accounts_0xfD9785b1148C6550e065A189d702560C67950D54_additional_labels_mixers,accounts_0xfD9785b1148C6550e065A189d702560C67950D54_additional_labels_drainer,accounts_0xfD9785b1148C6550e065A189d702560C67950D54_additional_labels_fbi_ic3,accounts_0xfD9785b1148C6550e065A189d702560C67950D54_additional_labels_tornado,fund_flow_risk_ofac,fund_flow_risk_hacker,fund_flow_risk_mixers,fund_flow_risk_drainer,fund_flow_risk_fbi_ic3,fund_flow_risk_tornado
0,[],Circle,False,False,False,False,False,False,eoa,0x0018aeCE08f42798Ad9F7c6919785B16929bca2f,...,False,False,False,False,False,False,False,False,False,False


In [47]:
df_address_info = pd.json_normalize(data["details"]["address_info"], sep="_")
df_address_info

Unnamed: 0,dprk,balance,time_1st_tx,time_verified,has_no_balance,ofac_sanctioned,transaction_count,has_no_transactions
0,False,0,2018-08-03T19:28:24.000Z,1758408922140,True,False,101,False


In [48]:
df_token_risk = pd.json_normalize(data["details"]["token_risk"], sep="_")
df_token_risk

Unnamed: 0,expiresAt,is_trusted,token_logo,token_name,token_symbol,illegal_unicode,contract_address,buy_tax_percentage,sell_tax_percentage,token_standard,decimals,access_control_interfaceType,access_control_activeRoleHolders
0,1761000921725,True,https://coin-images.coingecko.com/coins/images...,USDC,USDC,False,0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48,0,0,ERC20,6,Ownable,"[{'name': 'OWNER_ROLE', 'role': 'owner', 'acco..."


In [49]:
df_token_metadata_risk = pd.json_normalize(data["details"]["token_metadata_risk"], sep="_")
df_token_metadata_risk

Unnamed: 0,expiresAt,is_trusted,token_logo,token_name,token_symbol,illegal_unicode,contract_address,buy_tax_percentage,sell_tax_percentage,token_standard,decimals,access_control_interfaceType,access_control_activeRoleHolders
0,1761000921725,True,https://coin-images.coingecko.com/coins/images...,USDC,USDC,False,0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48,0,0,ERC20,6,Ownable,"[{'name': 'OWNER_ROLE', 'role': 'owner', 'acco..."


In [50]:
df_token_info = pd.json_normalize(data["details"]["token_info"], sep="_")
df_token_info

Unnamed: 0,name,description,symbol,logo,decimals,links_website,links_github
0,USDC,USDC is a US dollar-backed stablecoin issued b...,USDC,https://coin-images.coingecko.com/coins/images...,6,https://www.circle.com/en/usdc,[https://github.com/centrehq/centre-tokens]


In [68]:
df_marketData = pd.json_normalize(data["details"]["marketData"], sep="_")
df_marketData

Unnamed: 0,ath,atl,roi,low_24h,ath_date,atl_date,high_24h,market_cap,max_supply,last_updated,...,holderAnalysis_first_buyers_analysis_bundled_buyers_still_holding_count,holderAnalysis_first_buyers_analysis_snipers_current_holding_percentage,holderAnalysis_first_buyers_analysis_distributed_to_distinct_addresses_count,holderAnalysis_first_buyers_analysis_transferred_out_from_initially_acquired_percentage,holderAnalysis_top_10_holders_analysis_topHolders,holderAnalysis_top_10_holders_analysis_totalSupply,holderAnalysis_top_10_holders_analysis_percentageHeldByTop5,holderAnalysis_top_10_holders_analysis_percentageHeldByTop10,holderAnalysis_top_10_holders_analysis_percentageHeldByTop20,holderAnalysis_dev_launched_tokens_in_24_hours
0,1.17,0.877647,,0.999527,2019-05-08T00:40:28.300Z,2023-03-11T08:02:13.981Z,0.999731,73974654076,,2025-09-21T11:00:57.056Z,...,0,34.735488,0,0,"[{'amount': '2184516156.3047490120', 'percenta...",49250149468.18405,17.608605,27.363271,36.012211,0


In [66]:
data["details"]["marketData"].keys()

dict_keys(['ath', 'atl', 'roi', 'low_24h', 'ath_date', 'atl_date', 'high_24h', 'market_cap', 'max_supply', 'last_updated', 'total_supply', 'total_volume', 'current_price', 'market_cap_rank', 'sparkline_in_7d', 'fdv_to_tvl_ratio', 'price_change_24h', 'mcap_to_tvl_ratio', 'circulating_supply', 'total_value_locked', 'max_supply_infinite', 'market_cap_fdv_ratio', 'ath_change_percentage', 'atl_change_percentage', 'market_cap_change_24h', 'fully_diluted_valuation', 'price_change_percentage_1y', 'price_change_percentage_7d', 'price_change_percentage_14d', 'price_change_percentage_24h', 'price_change_percentage_30d', 'price_change_percentage_60d', 'price_change_24h_in_currency', 'price_change_percentage_200d', 'market_cap_change_percentage_24h', 'market_cap_change_24h_in_currency', 'price_change_percentage_1h_in_currency', 'price_change_percentage_1y_in_currency', 'price_change_percentage_7d_in_currency', 'price_change_percentage_14d_in_currency', 'price_change_percentage_24h_in_currency', 'pr

In [64]:
[x for x in df.columns if "liquid" in x.lower()]


['details_marketData_liquidityData']

In [67]:
df_holder_analysis = pd.json_normalize(
    data["details"]["marketData"]["holderAnalysis"],
    sep="_"
)

df_holder_analysis.head()

Unnamed: 0,minter,token_address,token_mint_tx,token_mint_time,total_holders_count,dev_launched_tokens_in_24_hours,sniper_analysis_sniper_count,sniper_analysis_sniper_addresses,sniper_analysis_block_range_analysis,sniper_analysis_median_time_since_mint,...,first_buyers_analysis_snipers_still_holding_percentage,first_buyers_analysis_bundled_buyers_still_holding_count,first_buyers_analysis_snipers_current_holding_percentage,first_buyers_analysis_distributed_to_distinct_addresses_count,first_buyers_analysis_transferred_out_from_initially_acquired_percentage,top_10_holders_analysis_topHolders,top_10_holders_analysis_totalSupply,top_10_holders_analysis_percentageHeldByTop5,top_10_holders_analysis_percentageHeldByTop10,top_10_holders_analysis_percentageHeldByTop20
0,0x5b6122c109b78c6755486966148c1d70a50a47d7,0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48,0xdc6bb2a1aff2dbb2613113984b5fbd560e582c0a4369...,2018-09-10T18:26:41.000Z,3877694,0,17,"[0x37305b1cd40574e4c5ce33f8e8306be057fd7341, 0...",[],0,...,34.735488,0,34.735488,0,0,"[{'amount': '2184516156.3047490120', 'percenta...",49250149468.18405,17.608605,27.363271,36.012211


In [None]:
df_liquidity_Data = pd.json_normalize(
    data["details"]["marketData"]["liquidityData"],
    sep="_"
)

df_liquidity_Data.head()

Unnamed: 0,dexName,pairAddress,totalLiquidity,liquidityType,lpHolders,lockedLiquidityPercent,address,lpMint
0,UniswapV3,0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8,35206200.0,UniV3,[{'address': '0x3eeb5e43bcb9ed151a13a2cfb584a1...,0.0,,
1,curve,0xbebc44782c7db0a1a60cb6fe97d0b483032ff1c7,184332700.0,,,,0xbebc44782c7db0a1a60cb6fe97d0b483032ff1c7,0xbebc44782c7db0a1a60cb6fe97d0b483032ff1c7
2,uniswap_v3,0x99ac8ca7087fa4a2a1fb6357269965a2014abc35,114482800.0,,,,0x99ac8ca7087fa4a2a1fb6357269965a2014abc35,0x99ac8ca7087fa4a2a1fb6357269965a2014abc35
3,fluid-ethereum,0x667701e51b4d1ca244f17c78f7ab8744b4c99f9b,104081000.0,,,,0x667701e51b4d1ca244f17c78f7ab8744b4c99f9b,0x667701e51b4d1ca244f17c78f7ab8744b4c99f9b
4,uniswap_v3,0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640,82925190.0,,,,0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640,0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640


In [60]:
# Expand the activeRoleHolders list into a DataFrame
df_access_control = pd.json_normalize(
    data["details"]["access_control"]["activeRoleHolders"], 
    sep="_"
)

df_access_control.head()


Unnamed: 0,name,role,account
0,OWNER_ROLE,owner,0xfcb19e6a322b27c06842a71e8c725399f049ae3a


In [None]:
# Flatten nested JSON into a single row DataFrame
df = pd.json_normalize(data, sep="_")
df

Unnamed: 0,count,medium,high,overallRisk,issues,isContract,details_fund_flows_risk_ofac,details_fund_flows_risk_hacker,details_fund_flows_risk_mixers,details_fund_flows_risk_drainer,...,details_marketData_holderAnalysis_top_10_holders_analysis_topHolders,details_marketData_holderAnalysis_top_10_holders_analysis_totalSupply,details_marketData_holderAnalysis_top_10_holders_analysis_percentageHeldByTop5,details_marketData_holderAnalysis_top_10_holders_analysis_percentageHeldByTop10,details_marketData_holderAnalysis_top_10_holders_analysis_percentageHeldByTop20,details_marketData_holderAnalysis_dev_launched_tokens_in_24_hours,details_access_control_interfaceType,details_access_control_activeRoleHolders,details_buy_sell_taxes_has_buy_tax,details_buy_sell_taxes_has_sell_tax
0,6,0,0,10.488088,"[{'score': 0.21999999999999997, 'tags': [{'nam...",True,False,False,False,False,...,"[{'amount': '2184516156.3047490120', 'percenta...",49250149468.18405,17.608605,27.363271,36.012211,0,Ownable,"[{'name': 'OWNER_ROLE', 'role': 'owner', 'acco...",False,False


In [35]:
import requests
import json
import time
from datetime import datetime
from typing import Dict, List, Optional, Union
import pandas as pd
import logging

class BlockchainAnomalyTracker:
    """
    Blockchain anomaly tracker using only fields available in Webacy API response
    """
    
    def __init__(self, api_key: str):
        self.api_key = API_KEY
        self.base_url = "https://api.webacy.com"
        self.headers = {
            "accept": "application/json",
            "x-api-key": self.api_key
        }
        
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
    
    def fetch_address_data(self, address: str) -> Optional[Dict]:
        """
        Fetch data for any address using Webacy API
        """
        url = f"{self.base_url}/addresses/{address}"
        
        try:
            response = requests.get(url, headers=self.headers, timeout=30)
            response.raise_for_status()
            
            data = response.json()
            self.logger.info(f"Successfully fetched data for {address}")
            return data
            
        except requests.exceptions.RequestException as e:
            self.logger.error(f"Error fetching data for {address}: {e}")
            return None
        except json.JSONDecodeError as e:
            self.logger.error(f"JSON decode error for {address}: {e}")
            return None
    
    def analyze_data(self, address: str) -> Dict:
        """
        Analyze address data using only available response fields
        """
        data = self.fetch_address_data(address)
        if not data:
            return {"error": "Failed to fetch data", "address": address}
        
        analysis = {
            'address': address,
            'analysis_timestamp': datetime.now().isoformat(),
            'isContract': data.get('isContract', False),
            'overall_risk': data.get('overallRisk', 0),
            'issue_count': data.get('count', 0),
            'medium_count': data.get('medium', 0),
            'high_count': data.get('high', 0)
        }
        
        # Extract issues data
        issues_list = data.get('issues', [])
        if issues_list:
            issue_data = issues_list[0]  # First issue group
            analysis['risk_score'] = issue_data.get('score', 0)
            analysis['risk_level'] = issue_data.get('riskScore', 'Unknown')
            
            # Extract tags
            tags = issue_data.get('tags', [])
            analysis['risk_tags'] = []
            for tag in tags:
                analysis['risk_tags'].append({
                    'name': tag.get('name', ''),
                    'severity': tag.get('severity', 0),
                    'key': tag.get('key', '')
                })
            
            # Extract categories
            categories = issue_data.get('categories', {})
            analysis['categories'] = {}
            for cat_key, cat_data in categories.items():
                analysis['categories'][cat_key] = {
                    'name': cat_data.get('name', ''),
                    'key': cat_data.get('key', '')
                }
        
        # Extract token information if available
        details = data.get('details', {})
        if 'token_info' in details:
            token_info = details['token_info']
            analysis['token_info'] = {
                'name': token_info.get('name', ''),
                'symbol': token_info.get('symbol', ''),
                'decimals': token_info.get('decimals', 0),
                'description': token_info.get('description', '')
            }
        
        # Extract address info
        if 'address_info' in details:
            addr_info = details['address_info']
            analysis['address_info'] = {
                'balance': addr_info.get('balance', 0),
                'transaction_count': addr_info.get('transaction_count', 0),
                'time_1st_tx': addr_info.get('time_1st_tx', ''),
                'ofac_sanctioned': addr_info.get('ofac_sanctioned', False),
                'dprk': addr_info.get('dprk', False),
                'has_no_balance': addr_info.get('has_no_balance', True),
                'has_no_transactions': addr_info.get('has_no_transactions', False)
            }
        
        # Extract fund flow risks
        if 'fund_flows' in details:
            fund_flows = details['fund_flows']
            analysis['fund_flows'] = {
                'label': fund_flows.get('label', ''),
                'risk_flags': fund_flows.get('risk', {})
            }
        
        # Extract market data if available
        if 'marketData' in details:
            market_data = details['marketData']
            analysis['market_data'] = {
                'current_price': market_data.get('current_price', 0),
                'market_cap': market_data.get('market_cap', 0),
                'total_supply': market_data.get('total_supply', 0),
                'circulating_supply': market_data.get('circulating_supply', 0),
                'price_change_24h': market_data.get('price_change_24h', 0),
                'price_change_percentage_24h': market_data.get('price_change_percentage_24h', 0)
            }
            
            # Ownership distribution
            ownership = market_data.get('ownershipDistribution', {})
            if ownership:
                analysis['ownership'] = {
                    'totalSupply': ownership.get('totalSupply', 0),
                    'percentageHeldByTop10': ownership.get('percentageHeldByTop10', 0),
                    'top_holders_count': len(ownership.get('topHolders', []))
                }
            
            # Holder analysis
            holder_analysis = market_data.get('holderAnalysis', {})
            if holder_analysis:
                sniper_analysis = holder_analysis.get('sniper_analysis', {})
                analysis['sniper_data'] = {
                    'sniper_count': sniper_analysis.get('sniper_count', 0),
                    'sniper_total_percentage': sniper_analysis.get('sniper_total_percentage', 0),
                    'sniper_confidence_score': sniper_analysis.get('sniper_confidence_score', 0)
                }
                
                analysis['holder_stats'] = {
                    'total_holders_count': holder_analysis.get('total_holders_count', 0),
                    'token_mint_time': holder_analysis.get('token_mint_time', ''),
                    'minter': holder_analysis.get('minter', '')
                }
        
        return analysis
    
    def monitor_addresses(self, addresses: List[str], interval_minutes: int = 60) -> None:
        """
        Monitor multiple addresses continuously
        """
        self.logger.info(f"Starting monitoring of {len(addresses)} addresses")
        
        while True:
            for address in addresses:
                try:
                    analysis = self.analyze_data(address)
                    
                    # Simple alerting based on available data
                    risk_score = analysis.get('overall_risk', 0)
                    if risk_score > 50:
                        self.logger.warning(f"HIGH RISK: {address} - Risk Score: {risk_score}")
                    
                    # Check for sanctions
                    addr_info = analysis.get('address_info', {})
                    if addr_info.get('ofac_sanctioned', False):
                        self.logger.critical(f"SANCTIONED ADDRESS: {address}")
                    
                    time.sleep(2)
                    
                except Exception as e:
                    self.logger.error(f"Error monitoring {address}: {e}")
            
            time.sleep(interval_minutes * 60)
    
    def generate_report(self, addresses: List[str]) -> pd.DataFrame:
        """
        Generate report for multiple addresses using available data fields
        """
        results = []
        
        for address in addresses:
            try:
                analysis = self.analyze_data(address)
                
                row = {
                    'Address': address,
                    'IsContract': analysis.get('isContract', False),
                    'OverallRisk': analysis.get('overall_risk', 0),
                    'IssueCount': analysis.get('issue_count', 0),
                    'HighCount': analysis.get('high_count', 0),
                    'MediumCount': analysis.get('medium_count', 0),
                    'RiskLevel': analysis.get('risk_level', 'Unknown')
                }
                
                # Add token info if available
                token_info = analysis.get('token_info', {})
                if token_info:
                    row['TokenName'] = token_info.get('name', '')
                    row['TokenSymbol'] = token_info.get('symbol', '')
                
                # Add address info
                addr_info = analysis.get('address_info', {})
                if addr_info:
                    row['OFAC_Sanctioned'] = addr_info.get('ofac_sanctioned', False)
                    row['TransactionCount'] = addr_info.get('transaction_count', 0)
                    row['HasBalance'] = not addr_info.get('has_no_balance', True)
                
                # Add ownership data
                ownership = analysis.get('ownership', {})
                if ownership:
                    row['Top10Percentage'] = ownership.get('percentageHeldByTop10', 0)
                
                # Add sniper data
                sniper_data = analysis.get('sniper_data', {})
                if sniper_data:
                    row['SniperCount'] = sniper_data.get('sniper_count', 0)
                    row['SniperPercentage'] = sniper_data.get('sniper_total_percentage', 0)
                
                results.append(row)
                
            except Exception as e:
                self.logger.error(f"Error analyzing {address}: {e}")
                results.append({
                    'Address': address,
                    'Error': str(e)
                })
        
        return pd.DataFrame(results)
    
    def get_real_time_data(self, address: str) -> Dict:
        """
        Get real-time data for a single address
        """
        return self.analyze_data(address)

# Usage example
def main():
    API_KEY = "your_webacy_api_key_here"
    tracker = BlockchainAnomalyTracker(API_KEY)
    
    # Single address real-time analysis
    address = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"
    result = tracker.get_real_time_data(address)
    
    print("Real-time Analysis:")
    print(f"Address: {result.get('address')}")
    print(f"Overall Risk: {result.get('overall_risk')}")
    print(f"Risk Level: {result.get('risk_level')}")
    print(f"Issue Count: {result.get('issue_count')}")
    
    # Batch analysis
    test_addresses = [
        "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
        "0xdAC17F958D2ee523a2206206994597C13D831ec7"
    ]
    
    report = tracker.generate_report(test_addresses)
    print("\nBatch Report:")
    print(report.to_string(index=False))

if __name__ == "__main__":
    main()

2025-09-21 21:20:14,501 - ERROR - Error fetching data for 0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48: 504 Server Error: Gateway Timeout for url: https://api.webacy.com/addresses/0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48


Real-time Analysis:
Address: 0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48
Overall Risk: None
Risk Level: None
Issue Count: None


2025-09-21 21:20:44,483 - ERROR - Error fetching data for 0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48: 504 Server Error: Gateway Timeout for url: https://api.webacy.com/addresses/0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48
2025-09-21 21:21:14,741 - INFO - Successfully fetched data for 0xdAC17F958D2ee523a2206206994597C13D831ec7



Batch Report:
                                   Address  IsContract  OverallRisk  IssueCount  HighCount  MediumCount   RiskLevel  TokenName TokenSymbol OFAC_Sanctioned  TransactionCount HasBalance  Top10Percentage  SniperCount  SniperPercentage
0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48       False     0.000000           0          0            0     Unknown        NaN         NaN             NaN               NaN        NaN              NaN          NaN               NaN
0xdAC17F958D2ee523a2206206994597C13D831ec7        True    32.557641           9          0            1 Medium Risk Tether USD        USDT           False             101.0       True            53.98          0.0               0.0


In [20]:
import requests
import time
import json
import os
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import logging
from dotenv import load_dotenv

class DetailedContractAnalyzer:
    def __init__(self, api_key: str):
        """
        Initialize the detailed contract analyzer
        
        Args:
            api_key: Webacy API key
        """
        self.api_key = api_key
        self.base_url = "https://api.webacy.com/contracts"
        
        # Setup logging
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
        self.logger = logging.getLogger(__name__)

    def fetch_contract_data(self, contract_address: str) -> Optional[Dict]:
        """Fetch contract data from Webacy API"""
        url = f"{self.base_url}/{contract_address}"
        
        headers = {
            "accept": "application/json",
            "x-api-key": self.api_key
        }
        
        try:
            response = requests.get(url, headers=headers)
            
            try:
                data = response.json()
            except ValueError:
                data = response.text
                self.logger.error(f"Invalid JSON response for {contract_address}: {data}")
                return None
                
            if response.status_code != 200:
                self.logger.error(f"API error {response.status_code} for {contract_address}: {data}")
                return None
                
            return data
            
        except requests.exceptions.RequestException as e:
            self.logger.error(f"Error fetching data for {contract_address}: {e}")
            return None

    def analyze_risk_assessment(self, data: Dict) -> Dict:
        """Extract and analyze risk assessment data"""
        risk_analysis = {
            'overall_risk_score': data.get('riskScore', 'Unknown'),
            'analysis_status': data.get('analysis_status', 'Unknown'),
            'analysis_type': data.get('analysis_type', 'Unknown'),
            'score': data.get('score', 0),
            'tags': data.get('tags', []),
            'categories': data.get('categories', {}),
        }
        
        # Source code analysis details
        source_analysis = data.get('source_code_analysis', {})
        if source_analysis:
            analysis_details = source_analysis.get('analysis', {})
            risk_analysis.update({
                'contract_address': analysis_details.get('contract_address', 'N/A'),
                'blockchain': analysis_details.get('chain', 'Unknown'),
                'analysis_date': analysis_details.get('analysis_date', 'Unknown'),
                'analysis_status_detailed': analysis_details.get('status', 'Unknown'),
                'findings': analysis_details.get('findings', []),
                'reference_urls': analysis_details.get('urls', [])
            })
        
        return risk_analysis

    def analyze_market_data(self, data: Dict) -> Dict:
        """Extract and analyze comprehensive market data"""
        metadata = data.get('metadata', {})
        market_data = metadata.get('market_data', {})
        
        if not market_data:
            return {'error': 'No market data available'}
        
        # Price analysis
        current_price = market_data.get('current_price', 0)
        price_change_24h = market_data.get('price_change_24h', 0)
        price_change_pct_24h = market_data.get('price_change_percentage_24h', 0)
        
        # Market cap analysis
        market_cap = market_data.get('market_cap', 0)
        market_cap_rank = market_data.get('market_cap_rank', 'N/A')
        market_cap_change_24h = market_data.get('market_cap_change_24h', 0)
        market_cap_change_pct_24h = market_data.get('market_cap_change_percentage_24h', 0)
        
        # Supply analysis
        circulating_supply = market_data.get('circulating_supply', 0)
        total_supply = market_data.get('total_supply', 0)
        max_supply = market_data.get('max_supply', 'Unlimited')
        
        # Historical performance
        ath = market_data.get('ath', 0)
        ath_date = market_data.get('ath_date', 'Unknown')
        ath_change_pct = market_data.get('ath_change_percentage', 0)
        atl = market_data.get('atl', 0)
        atl_date = market_data.get('atl_date', 'Unknown')
        atl_change_pct = market_data.get('atl_change_percentage', 0)
        
        # Volume and liquidity
        total_volume = market_data.get('total_volume', 0)
        fdv = market_data.get('fully_diluted_valuation', 0)
        
        return {
            'token_info': {
                'name': market_data.get('name', 'Unknown'),
                'symbol': market_data.get('symbol', 'Unknown').upper(),
                'image': market_data.get('image', 'N/A'),
                'last_updated': market_data.get('last_updated', 'Unknown')
            },
            'current_metrics': {
                'price': f"${current_price:,.2f}",
                'price_change_24h': f"${price_change_24h:,.2f}",
                'price_change_percentage_24h': f"{price_change_pct_24h:.2f}%",
                'market_cap': f"${market_cap:,.0f}",
                'market_cap_rank': market_cap_rank,
                'volume_24h': f"${total_volume:,.0f}"
            },
            'supply_metrics': {
                'circulating_supply': f"{circulating_supply:,.2f}",
                'total_supply': f"{total_supply:,.2f}",
                'max_supply': max_supply if max_supply else "Unlimited",
                'fully_diluted_valuation': f"${fdv:,.0f}"
            },
            'historical_performance': {
                'all_time_high': f"${ath:,.2f}",
                'ath_date': ath_date,
                'ath_change_percentage': f"{ath_change_pct:.2f}%",
                'all_time_low': f"${atl:.2f}",
                'atl_date': atl_date,
                'atl_change_percentage': f"{atl_change_pct:.2f}%"
            },
            'market_changes_24h': {
                'market_cap_change': f"${market_cap_change_24h:,.0f}",
                'market_cap_change_percentage': f"{market_cap_change_pct_24h:.2f}%"
            }
        }

    def analyze_price_trends(self, data: Dict) -> Dict:
        """Analyze price trends from sparkline data"""
        metadata = data.get('metadata', {})
        market_data = metadata.get('market_data', {})
        sparkline = market_data.get('sparkline_in_7d', {})
        prices = sparkline.get('price', [])
        
        if not prices:
            return {'error': 'No price trend data available'}
        
        # Calculate trend metrics
        first_price = prices[0]
        last_price = prices[-1]
        max_price = max(prices)
        min_price = min(prices)
        avg_price = sum(prices) / len(prices)
        
        # Calculate volatility (standard deviation)
        variance = sum((p - avg_price) ** 2 for p in prices) / len(prices)
        volatility = variance ** 0.5
        volatility_pct = (volatility / avg_price) * 100
        
        # Trend direction
        week_change = ((last_price - first_price) / first_price) * 100
        trend_direction = "Bullish" if week_change > 0 else "Bearish" if week_change < 0 else "Sideways"
        
        return {
            '7_day_summary': {
                'starting_price': f"${first_price:.2f}",
                'ending_price': f"${last_price:.2f}",
                'week_change': f"{week_change:.2f}%",
                'trend_direction': trend_direction
            },
            'price_extremes': {
                'highest_price_7d': f"${max_price:.2f}",
                'lowest_price_7d': f"${min_price:.2f}",
                'price_range': f"${max_price - min_price:.2f}",
                'range_percentage': f"{((max_price - min_price) / min_price) * 100:.2f}%"
            },
            'volatility_analysis': {
                'average_price_7d': f"${avg_price:.2f}",
                'volatility': f"${volatility:.2f}",
                'volatility_percentage': f"{volatility_pct:.2f}%",
                'volatility_rating': self.get_volatility_rating(volatility_pct)
            },
            'data_points': len(prices)
        }

    def get_volatility_rating(self, volatility_pct: float) -> str:
        """Rate volatility based on percentage"""
        if volatility_pct < 2:
            return "Very Low"
        elif volatility_pct < 5:
            return "Low"
        elif volatility_pct < 10:
            return "Moderate"
        elif volatility_pct < 20:
            return "High"
        else:
            return "Very High"

    def analyze_deployer_info(self, data: Dict) -> Dict:
        """Analyze deployer information"""
        deployer = data.get('deployer', {})
        deployed_contracts = deployer.get('deployed_contracts', [])
        
        return {
            'total_deployed_contracts': len(deployed_contracts),
            'deployed_contracts': deployed_contracts[:10],  # Show first 10
            'has_multiple_deployments': len(deployed_contracts) > 1
        }

    def generate_comprehensive_report(self, contract_address: str) -> Dict:
        """Generate a comprehensive contract analysis report"""
        self.logger.info(f"Generating comprehensive report for {contract_address}")
        
        data = self.fetch_contract_data(contract_address)
        if not data:
            return {'error': 'Failed to fetch contract data'}
        
        report = {
            'contract_address': contract_address,
            'analysis_timestamp': datetime.now().isoformat(),
            'risk_assessment': self.analyze_risk_assessment(data),
            'market_analysis': self.analyze_market_data(data),
            'price_trends': self.analyze_price_trends(data),
            'deployer_analysis': self.analyze_deployer_info(data),
            'raw_data_summary': {
                'total_fields': len(data),
                'has_market_data': 'metadata' in data and 'market_data' in data.get('metadata', {}),
                'has_risk_data': 'riskScore' in data,
                'has_analysis_data': 'source_code_analysis' in data,
                'data_completeness': self.calculate_data_completeness(data)
            }
        }
        
        return report

    def calculate_data_completeness(self, data: Dict) -> str:
        """Calculate how complete the data response is"""
        expected_fields = [
            'riskScore', 'metadata', 'source_code_analysis', 
            'deployer', 'analysis_status', 'score'
        ]
        
        present_fields = sum(1 for field in expected_fields if field in data)
        completeness_pct = (present_fields / len(expected_fields)) * 100
        
        if completeness_pct >= 90:
            return f"Excellent ({completeness_pct:.0f}%)"
        elif completeness_pct >= 70:
            return f"Good ({completeness_pct:.0f}%)"
        elif completeness_pct >= 50:
            return f"Fair ({completeness_pct:.0f}%)"
        else:
            return f"Poor ({completeness_pct:.0f}%)"

    def print_formatted_report(self, report: Dict):
        """Print a beautifully formatted report"""
        if 'error' in report:
            print(f"❌ Error: {report['error']}")
            return
        
        print("=" * 80)
        print(f"🔍 COMPREHENSIVE CONTRACT ANALYSIS REPORT")
        print("=" * 80)
        print(f"📋 Contract: {report['contract_address']}")
        print(f"🕒 Analysis Time: {report['analysis_timestamp']}")
        print()
        
        # Risk Assessment
        risk = report['risk_assessment']
        print("🛡️  RISK ASSESSMENT")
        print("-" * 40)
        print(f"Overall Risk Score: {risk['overall_risk_score']}")
        print(f"Security Score: {risk['score']}")
        print(f"Analysis Status: {risk['analysis_status_detailed']}")
        print(f"Blockchain: {risk['blockchain']}")
        if risk['tags']:
            print(f"Tags: {', '.join(risk['tags'])}")
        print()
        
        # Market Analysis
        market = report['market_analysis']
        if 'error' not in market:
            token_info = market['token_info']
            current = market['current_metrics']
            supply = market['supply_metrics']
            historical = market['historical_performance']
            
            print("💰 MARKET ANALYSIS")
            print("-" * 40)
            print(f"Token: {token_info['name']} ({token_info['symbol']})")
            print(f"Current Price: {current['price']}")
            print(f"24h Change: {current['price_change_24h']} ({current['price_change_percentage_24h']})")
            print(f"Market Cap: {current['market_cap']} (Rank #{current['market_cap_rank']})")
            print(f"24h Volume: {current['volume_24h']}")
            print()
            
            print("📊 SUPPLY METRICS")
            print("-" * 40)
            print(f"Circulating Supply: {supply['circulating_supply']}")
            print(f"Total Supply: {supply['total_supply']}")
            print(f"Max Supply: {supply['max_supply']}")
            print(f"Fully Diluted Valuation: {supply['fully_diluted_valuation']}")
            print()
            
            print("📈 HISTORICAL PERFORMANCE")
            print("-" * 40)
            print(f"All-Time High: {historical['all_time_high']} ({historical['ath_change_percentage']} from ATH)")
            print(f"All-Time Low: {historical['all_time_low']} ({historical['atl_change_percentage']} from ATL)")
            print()
        
        # Price Trends
        trends = report['price_trends']
        if 'error' not in trends:
            print("📉 7-DAY PRICE TRENDS")
            print("-" * 40)
            summary = trends['7_day_summary']
            extremes = trends['price_extremes']
            volatility = trends['volatility_analysis']
            
            print(f"Week Performance: {summary['week_change']} ({summary['trend_direction']})")
            print(f"Price Range: {extremes['lowest_price_7d']} - {extremes['highest_price_7d']}")
            print(f"Volatility: {volatility['volatility_percentage']} ({volatility['volatility_rating']})")
            print()
        
        # Deployer Analysis
        deployer = report['deployer_analysis']
        print("👤 DEPLOYER ANALYSIS")
        print("-" * 40)
        print(f"Total Deployed Contracts: {deployer['total_deployed_contracts']}")
        print(f"Multiple Deployments: {'Yes' if deployer['has_multiple_deployments'] else 'No'}")
        print()
        
        # Data Quality
        summary = report['raw_data_summary']
        print("📋 DATA COMPLETENESS")
        print("-" * 40)
        print(f"Data Quality: {summary['data_completeness']}")
        print(f"Market Data Available: {'✅' if summary['has_market_data'] else '❌'}")
        print(f"Risk Data Available: {'✅' if summary['has_risk_data'] else '❌'}")
        print(f"Analysis Data Available: {'✅' if summary['has_analysis_data'] else '❌'}")
        print("=" * 80)

# Usage Example
if __name__ == "__main__":
    # Load environment variables
    load_dotenv()
    
    # Fetch API key from environment variables
    API_KEY = os.getenv('WEBACY_API_KEY')
    
    if not API_KEY: 
        raise ValueError('Please set the WEBACY_API_KEY environment variable.')
    
    # Initialize the analyzer
    analyzer = DetailedContractAnalyzer(API_KEY)
    
    # Analyze the WETH contract
    contractAddress = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"  # WETH
    
    # Generate comprehensive report
    report = analyzer.generate_comprehensive_report(contractAddress)
    
    # Print formatted report
    analyzer.print_formatted_report(report)
    
    # Also save raw JSON for detailed analysis
    with open(f'contract_analysis_{contractAddress}.json', 'w') as f:
        json.dump(report, f, indent=2)
    
    print(f"\n💾 Detailed JSON report saved to: contract_analysis_{contractAddress}.json")

2025-09-21 18:24:29,880 - INFO - Generating comprehensive report for 0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2


🔍 COMPREHENSIVE CONTRACT ANALYSIS REPORT
📋 Contract: 0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2
🕒 Analysis Time: 2025-09-21T18:24:31.101148

🛡️  RISK ASSESSMENT
----------------------------------------
Overall Risk Score: Low Risk
Security Score: 0
Analysis Status: pending
Blockchain: eth

💰 MARKET ANALYSIS
----------------------------------------
Token: Unknown (UNKNOWN)
Current Price: $4,462.67
24h Change: $-9.83 (-0.22%)
Market Cap: $11,170,128,261 (Rank #24)
24h Volume: $153,388,182

📊 SUPPLY METRICS
----------------------------------------
Circulating Supply: 2,502,036.31
Total Supply: 2,502,036.31
Max Supply: Unlimited
Fully Diluted Valuation: $11,170,128,261

📈 HISTORICAL PERFORMANCE
----------------------------------------
All-Time High: $4,950.08 (-9.76% from ATH)
All-Time Low: $82.10 (5340.76% from ATL)

📉 7-DAY PRICE TRENDS
----------------------------------------
Week Performance: -4.07% (Bearish)
Price Range: $4448.68 - $4670.04
Volatility: 1.29% (Very Low)

👤 DEPLOYER ANA

In [21]:
import requests
import time
import re
import os
from typing import Dict, Optional, Union
from datetime import datetime, timedelta
from dotenv import load_dotenv
import json

class UniswapV3RiskTracker:
    """
    Optimized tracker for monitoring threat risk of EOA and contract interactions
    on Uniswap V3 router using Webacy API
    """
    
    def __init__(self, api_key: str, cache_duration_minutes: int = 30):
        """
        Initialize the risk tracker
        
        Args:
            api_key: Webacy API key
            cache_duration_minutes: How long to cache results (default 30 minutes)
        """
        if not api_key or api_key == "your_webacy_api_key_here":
            raise ValueError("Valid API key is required. Please set your actual Webacy API key.")
        
        self.api_key = api_key
        self.cache_duration = timedelta(minutes=cache_duration_minutes)
        self.cache = {}
        self.session = requests.Session()
        self.base_url = "https://api.webacy.com/addresses"
        
        # Set up session headers - using exact format from your working code
        self.session.headers.update({
            "accept": "application/json",
            "x-api-key": self.api_key
        })
        
        # Rate limiting (adjust based on your API plan)
        self.last_request_time = 0
        self.min_request_interval = 0.1  # 100ms between requests
    
    def validate_ethereum_address(self, address: str) -> bool:
        """
        Validate if the provided string is a valid Ethereum address
        
        Args:
            address: The address to validate
            
        Returns:
            bool: True if valid Ethereum address
        """
        if not isinstance(address, str):
            return False
        
        # Remove '0x' prefix if present and check format
        clean_address = address.lower()
        if clean_address.startswith('0x'):
            clean_address = clean_address[2:]
        
        # Check if it's 40 hex characters
        return len(clean_address) == 40 and re.match('^[0-9a-f]+$', clean_address)
    
    def _rate_limit(self):
        """Implement simple rate limiting"""
        current_time = time.time()
        time_since_last = current_time - self.last_request_time
        
        if time_since_last < self.min_request_interval:
            time.sleep(self.min_request_interval - time_since_last)
        
        self.last_request_time = time.time()
    
    def _is_cache_valid(self, address: str) -> bool:
        """Check if cached data is still valid"""
        if address not in self.cache:
            return False
        
        cache_time = self.cache[address]['timestamp']
        return datetime.now() - cache_time < self.cache_duration
    
    def test_api_connection(self) -> Dict:
        """
        Test API connection and authentication
        
        Returns:
            Dict: Connection test results
        """
        test_address = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"  # WETH
        url = f"{self.base_url}/{test_address}"
        
        try:
            # Use the same method as your working code
            headers = {
                "accept": "application/json",
                "x-api-key": self.api_key
            }
            response = requests.get(url, headers=headers, timeout=10)
            
            return {
                "success": response.status_code == 200,
                "status_code": response.status_code,
                "headers_sent": headers,
                "response_preview": response.text[:200] if response.text else "No response body",
                "api_key_preview": self.api_key[:10] + "..." if len(self.api_key) > 10 else "short_key"
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "headers_sent": {
                    "accept": "application/json",
                    "x-api-key": self.api_key[:10] + "..."
                }
            }
    
    def get_address_risk_data(self, address: str, force_refresh: bool = False) -> Dict:
        """
        Get comprehensive risk data for any EOA or contract address
        
        Args:
            address: Ethereum address to analyze
            force_refresh: Skip cache and fetch fresh data
            
        Returns:
            Dict: Risk analysis data or error information
        """
        
        # Validate address format
        if not self.validate_ethereum_address(address):
            return {
                "error": "Invalid Ethereum address format",
                "address": address,
                "timestamp": datetime.now().isoformat()
            }
        
        # Normalize address (ensure lowercase with 0x prefix)
        normalized_address = address.lower()
        if not normalized_address.startswith('0x'):
            normalized_address = '0x' + normalized_address
        
        # Check cache first (unless force refresh)
        if not force_refresh and self._is_cache_valid(normalized_address):
            cached_data = self.cache[normalized_address]['data'].copy()
            cached_data['data_source'] = 'cache'
            return cached_data
        
        # Rate limiting
        self._rate_limit()
        
        # Make API request using the exact same method as your working code
        url = f"{self.base_url}/{normalized_address}"
        headers = {
            "accept": "application/json",
            "x-api-key": self.api_key
        }
        
        try:
            response = requests.get(url, headers=headers, timeout=30)
            response.raise_for_status()
            
            # Parse JSON response - same as your working code
            try:
                raw_data = response.json()
            except ValueError:
                raw_data = response.text
                return {
                    "error": "API returned non-JSON response",
                    "address": normalized_address,
                    "timestamp": datetime.now().isoformat(),
                    "raw_response": raw_data[:500]  # First 500 chars
                }
            
            # Process and enrich the data
            processed_data = self._process_risk_data(raw_data, normalized_address)
            
            # Cache the result
            self.cache[normalized_address] = {
                'data': processed_data,
                'timestamp': datetime.now()
            }
            
            processed_data['data_source'] = 'api'
            return processed_data
            
        except requests.exceptions.Timeout:
            return {
                "error": "API request timeout",
                "address": normalized_address,
                "timestamp": datetime.now().isoformat()
            }
        except requests.exceptions.HTTPError as e:
            error_details = ""
            status_code = "Unknown"
            if hasattr(e, 'response') and e.response:
                error_details = e.response.text
                status_code = e.response.status_code
                
                # Special handling for 401 errors
                if e.response.status_code == 401:
                    return {
                        "error": "Authentication failed - check your API key",
                        "address": normalized_address,
                        "timestamp": datetime.now().isoformat(),
                        "status_code": 401,
                        "suggestion": "Verify your API key is correct and active",
                        "api_key_preview": self.api_key[:10] + "..." if len(self.api_key) > 10 else "short_key"
                    }
            
            return {
                "error": f"HTTP error: {status_code}",
                "address": normalized_address,
                "timestamp": datetime.now().isoformat(),
                "details": error_details[:300] if error_details else "No details available"
            }
        except requests.exceptions.RequestException as e:
            return {
                "error": f"Request failed: {str(e)}",
                "address": normalized_address,
                "timestamp": datetime.now().isoformat()
            }
    
    def _process_risk_data(self, raw_data: Dict, address: str) -> Dict:
        """
        Process and enrich the raw API response
        
        Args:
            raw_data: Raw response from Webacy API
            address: The analyzed address
            
        Returns:
            Dict: Processed risk data with additional insights
        """
        processed = {
            "address": address,
            "timestamp": datetime.now().isoformat(),
            "raw_data": raw_data
        }
        
        # Extract key risk metrics
        if isinstance(raw_data, dict):
            # Overall risk assessment
            processed["risk_assessment"] = {
                "overall_score": raw_data.get("overallRisk", 0),
                "risk_level": self._categorize_risk(raw_data.get("overallRisk", 0)),
                "total_issues": raw_data.get("count", 0),
                "medium_risk_issues": raw_data.get("medium", 0),
                "high_risk_issues": raw_data.get("high", 0),
                "is_contract": raw_data.get("isContract", False)
            }
            
            # Extract specific risk factors
            issues = raw_data.get("issues", [])
            if issues:
                processed["risk_factors"] = self._extract_risk_factors(issues)
            
            # Token-specific data if available
            token_data = raw_data.get("tokenData", {})
            if token_data:
                processed["token_analysis"] = self._extract_token_insights(token_data)
        
        return processed
    
    def _categorize_risk(self, score: float) -> str:
        """Categorize risk score into levels"""
        if score < 2:
            return "Low"
        elif score < 5:
            return "Medium"
        elif score < 8:
            return "High"
        else:
            return "Critical"
    
    def _extract_risk_factors(self, issues: list) -> Dict:
        """Extract and categorize risk factors"""
        risk_factors = {
            "critical_risks": [],
            "moderate_risks": [],
            "low_risks": [],
            "positive_indicators": []
        }
        
        for issue in issues:
            if isinstance(issue, dict):
                severity = issue.get("score", 0)
                tags = issue.get("tags", [])
                if tags and isinstance(tags[0], dict):
                    risk_item = {
                        "name": tags[0].get("name", "Unknown"),
                        "description": tags[0].get("description", ""),
                        "severity_score": severity,
                        "risk_type": tags[0].get("type", "unknown"),
                        "key": tags[0].get("key", "unknown")
                    }
                    
                    if severity >= 1.0:
                        risk_factors["critical_risks"].append(risk_item)
                    elif severity >= 0.5:
                        risk_factors["moderate_risks"].append(risk_item)
                    elif severity > 0:
                        risk_factors["low_risks"].append(risk_item)
                    else:
                        risk_factors["positive_indicators"].append(risk_item)
        
        return risk_factors
    
    def _extract_token_insights(self, token_data: Dict) -> Dict:
        """Extract token-specific insights"""
        insights = {}
        
        # Buy/sell taxes
        buy_sell_taxes = token_data.get("buy_sell_taxes", {})
        insights["has_buy_tax"] = buy_sell_taxes.get("has_buy_tax", False)
        insights["has_sell_tax"] = buy_sell_taxes.get("has_sell_tax", False)
        
        # Holder information
        holders = token_data.get("holders", {})
        insights["holder_concentration"] = holders.get("percentageHeldByTop10", 0)
        
        # Developer activity
        insights["recent_dev_activity"] = token_data.get("dev_launched_tokens_in_24_hours", 0)
        
        # Access control
        access_control = token_data.get("access_control", {})
        insights["active_role_holders"] = len(access_control.get("activeRoleHolders", []))
        
        return insights
    
    def batch_analyze(self, addresses: list, delay_between_requests: float = 0.2) -> Dict[str, Dict]:
        """
        Analyze multiple addresses in batch
        
        Args:
            addresses: List of addresses to analyze
            delay_between_requests: Delay in seconds between API calls
            
        Returns:
            Dict: Results keyed by address
        """
        results = {}
        
        print(f"Analyzing {len(addresses)} addresses...")
        for i, address in enumerate(addresses):
            if isinstance(address, str):
                print(f"Processing {i+1}/{len(addresses)}: {address}")
                results[address] = self.get_address_risk_data(address)
                
                # Delay between requests to be API-friendly
                if i < len(addresses) - 1:  # Don't delay after the last request
                    time.sleep(delay_between_requests)
        
        return results
    
    def get_cache_stats(self) -> Dict:
        """Get cache statistics"""
        now = datetime.now()
        valid_entries = sum(1 for addr, data in self.cache.items() 
                          if now - data['timestamp'] < self.cache_duration)
        
        return {
            "total_cached_addresses": len(self.cache),
            "valid_cached_entries": valid_entries,
            "cache_hit_ratio": f"{valid_entries}/{len(self.cache)}" if self.cache else "0/0",
            "cache_duration_minutes": self.cache_duration.total_seconds() / 60
        }
    
    def clear_cache(self):
        """Clear the address cache"""
        self.cache.clear()
    
    def update_api_key(self, new_api_key: str):
        """
        Update API key and refresh session headers
        
        Args:
            new_api_key: New API key to use
        """
        self.api_key = new_api_key
        self.session.headers.update({
            "x-api-key": new_api_key
        })
        # Clear cache when API key changes
        self.clear_cache()
    
    def export_results(self, results: Dict, filename: str = None) -> str:
        """
        Export analysis results to JSON file
        
        Args:
            results: Results dictionary from batch_analyze or single analysis
            filename: Optional filename, auto-generated if not provided
            
        Returns:
            str: Filename of exported file
        """
        if filename is None:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"webacy_risk_analysis_{timestamp}.json"
        
        with open(filename, 'w') as f:
            json.dump(results, f, indent=2, default=str)
        
        return filename


# Usage example and helper functions
def create_tracker(api_key: str, cache_minutes: int = 30) -> UniswapV3RiskTracker:
    """Factory function to create a tracker instance"""
    return UniswapV3RiskTracker(api_key, cache_minutes)

def analyze_address(tracker: UniswapV3RiskTracker, address: str, show_details: bool = True) -> Dict:
    """Helper function to analyze and display results for an address"""
    result = tracker.get_address_risk_data(address)
    
    if "error" in result:
        print(f"❌ Error analyzing {address}: {result['error']}")
        if "suggestion" in result:
            print(f"💡 Suggestion: {result['suggestion']}")
        return result
    
    if show_details:
        print(f"\n🔍 === Risk Analysis for {address} ===")
        risk_assessment = result.get("risk_assessment", {})
        overall_score = risk_assessment.get('overall_score', 'N/A')
        risk_level = risk_assessment.get('risk_level', 'N/A')
        
        # Risk level emoji
        risk_emoji = {
            'Low': '🟢',
            'Medium': '🟡', 
            'High': '🟠',
            'Critical': '🔴'
        }.get(risk_level, '⚪')
        
        print(f"{risk_emoji} Overall Risk Score: {overall_score}")
        print(f"📊 Risk Level: {risk_level}")
        print(f"🏷️ Address Type: {'Contract' if risk_assessment.get('is_contract') else 'EOA'}")
        print(f"⚠️ Total Issues Found: {risk_assessment.get('total_issues', 0)}")
        
        # Show risk factors if available
        risk_factors = result.get("risk_factors", {})
        
        if risk_factors.get("critical_risks"):
            print(f"\n🚨 Critical Risks ({len(risk_factors['critical_risks'])}):")
            for risk in risk_factors["critical_risks"][:3]:  # Show top 3
                print(f"  • {risk['name']}: {risk['description'][:80]}...")
        
        if risk_factors.get("positive_indicators"):
            print(f"\n✅ Positive Indicators ({len(risk_factors['positive_indicators'])}):")
            for indicator in risk_factors["positive_indicators"][:2]:  # Show top 2
                print(f"  • {indicator['name']}")
        
        # Token analysis if available
        token_analysis = result.get("token_analysis", {})
        if token_analysis:
            print(f"\n💰 Token Analysis:")
            if token_analysis.get("has_buy_tax") or token_analysis.get("has_sell_tax"):
                print(f"  • Buy Tax: {'Yes' if token_analysis.get('has_buy_tax') else 'No'}")
                print(f"  • Sell Tax: {'Yes' if token_analysis.get('has_sell_tax') else 'No'}")
            
            concentration = token_analysis.get("holder_concentration", 0)
            if concentration > 0:
                print(f"  • Top 10 Holders Own: {concentration:.1f}% of supply")
    
    return result

def quick_risk_check(api_key: str, address: str) -> str:
    """Quick risk check that returns a simple risk level"""
    tracker = create_tracker(api_key)
    result = tracker.get_address_risk_data(address)
    
    if "error" in result:
        return f"Error: {result['error']}"
    
    risk_assessment = result.get("risk_assessment", {})
    return risk_assessment.get('risk_level', 'Unknown')

# Example usage and testing
if __name__ == "__main__":
    # Load environment variables
    load_dotenv()
    API_KEY = os.getenv('WEBACY_API_KEY')
    
    if not API_KEY: 
        raise ValueError('Please set the WEBACY_API_KEY environment variable.')
    
    print(f"🔑 Using API Key: {API_KEY[:10]}..." if len(API_KEY) > 10 else "short_key")
    
    # Create tracker instance
    tracker = create_tracker(API_KEY)
    
    # Test API connection first
    print("\n🔗 Testing API connection...")
    connection_test = tracker.test_api_connection()
    print(f"Connection Status: {'✅ Success' if connection_test.get('success') else '❌ Failed'}")
    
    if not connection_test.get('success'):
        print(f"Error Details: {connection_test}")
        print("❌ API connection failed. Check your API key and network connection.")
        exit(1)
    
    print("✅ API connection successful!")
    
    # Test addresses
    test_addresses = [
        "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",  # WETH contract
        "0xA0b86a33E6411E3b9F15D3D5dCdb9C8A02b4B2B9",  # Example EOA
    ]
    
    # Analyze single addresses
    print("\n" + "="*60)
    print("🔍 INDIVIDUAL ADDRESS ANALYSIS")
    print("="*60)
    
    for address in test_addresses:
        result = analyze_address(tracker, address)
        time.sleep(1)  # Be nice to the API
    
    # Demonstrate batch analysis
    print("\n" + "="*60)
    print("📊 BATCH ANALYSIS")
    print("="*60)
    
    batch_results = tracker.batch_analyze(test_addresses)
    
    print(f"\n📈 Batch Analysis Summary:")
    for addr, result in batch_results.items():
        if "error" not in result:
            risk_level = result.get("risk_assessment", {}).get("risk_level", "Unknown")
            risk_emoji = {'Low': '🟢', 'Medium': '🟡', 'High': '🟠', 'Critical': '🔴'}.get(risk_level, '⚪')
            print(f"  {risk_emoji} {addr}: {risk_level}")
        else:
            print(f"  ❌ {addr}: Error")
    
    # Cache statistics
    cache_stats = tracker.get_cache_stats()
    print(f"\n💾 Cache Statistics:")
    print(f"  • Total cached addresses: {cache_stats['total_cached_addresses']}")
    print(f"  • Valid cached entries: {cache_stats['valid_cached_entries']}")
    print(f"  • Cache duration: {cache_stats['cache_duration_minutes']} minutes")
    
    # Export results
    filename = tracker.export_results(batch_results)
    print(f"\n💾 Results exported to: {filename}")
    
    print(f"\n🎉 Analysis complete! Processed {len(test_addresses)} addresses.")

🔑 Using API Key: DdhFEmvHL8...

🔗 Testing API connection...
Connection Status: ❌ Failed
Error Details: {'success': False, 'error': "HTTPSConnectionPool(host='api.webacy.com', port=443): Read timed out. (read timeout=10)", 'headers_sent': {'accept': 'application/json', 'x-api-key': 'DdhFEmvHL8...'}}
❌ API connection failed. Check your API key and network connection.
✅ API connection successful!

🔍 INDIVIDUAL ADDRESS ANALYSIS

🔍 === Risk Analysis for 0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2 ===
🟠 Overall Risk Score: 5.47722557505166
📊 Risk Level: High
🏷️ Address Type: Contract
⚠️ Total Issues Found: 5

🔍 === Risk Analysis for 0xA0b86a33E6411E3b9F15D3D5dCdb9C8A02b4B2B9 ===
🟢 Overall Risk Score: 0
📊 Risk Level: Low
🏷️ Address Type: EOA
⚠️ Total Issues Found: 2

✅ Positive Indicators (1):
  • Insufficient Balance

📊 BATCH ANALYSIS
Analyzing 2 addresses...
Processing 1/2: 0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2
Processing 2/2: 0xA0b86a33E6411E3b9F15D3D5dCdb9C8A02b4B2B9

📈 Batch Analysis

In [None]:
import requests
import json
import os
from typing import Dict, Any, List
from dotenv import load_dotenv

class ThreatRiskAnalysis:
    def __init__(self, api_key: str, base_url: str = "https://api.webacy.com"):
        """
        Initialize Threat Risk Analysis
        """
        self.api_key = api_key
        self.base_url = base_url
        self.data = None

    # ---------------- API Fetch ----------------
    def fetch_analysis(self, address: str) -> None:
        """Fetch risk analysis data from Webacy API for a given address."""
        url = f"{self.base_url}/addresses/{address}"
        headers = {
            "accept": "application/json",
            "x-api-key": self.api_key
        }

        response = requests.get(url, headers=headers)

        try:
            data = response.json()
        except ValueError:
            raise Exception(f"Invalid JSON response: {response.text}")

        if response.status_code != 200:
            raise Exception(f"API request failed: {response.status_code} {data}")

        self.data = data

    # ---------------- Overall Scorecard ----------------
    def get_overall_scorecard(self) -> Dict[str, Any]:
        risk_score = self.data.get("overallRisk", 0)
        if risk_score < 30:
            label = "Low"
        elif risk_score < 70:
            label = "Medium"
        else:
            label = "High"
        return {"overallRisk": risk_score, "label": label}

    # ---------------- Risk Categories ----------------
    def get_risk_categories(self) -> Dict[str, List[str]]:
        categories = {}
        for issue in self.data.get("issues", []):
            for tag in issue.get("tags", []):
                cat_name = tag.get("type", "Uncategorized")
                categories.setdefault(cat_name, []).append(tag.get("name"))
        return categories

    # ---------------- Badges / Flags ----------------
    def get_badges(self) -> List[str]:
        badges = []
        for issue in self.data.get("issues", []):
            for tag in issue.get("tags", []):
                badges.append(tag.get("name"))
        return list(set(badges))

    # ---------------- Behavioral Analysis ----------------
    def get_behavioral_analysis(self) -> Dict[str, Any]:
        fund_flows = self.data.get("details", {}).get("fund_flows", {})
        risk_data = fund_flows.get("risk", {})
        accounts = fund_flows.get("accounts", {})
        
        # Extract exchange names from accounts
        exchanges = []
        for addr, account_info in accounts.items():
            if account_info.get("type") == "eoa" and account_info.get("label") != addr:
                exchanges.append(account_info.get("label"))
        
        return {
            "ofac": risk_data.get("ofac", False),
            "hackers": risk_data.get("hacker", False),
            "mixers": risk_data.get("mixers", False),
            "tornado": risk_data.get("tornado", False),
            "drainer": risk_data.get("drainer", False),
            "exchanges": exchanges
        }

    # ---------------- Market & Holder Distribution ----------------
    def get_market_distribution(self) -> Dict[str, Any]:
        market_data = self.data.get("details", {}).get("marketData", {})
        ownership_dist = market_data.get("ownershipDistribution", {})
        
        return {
            "market_cap": market_data.get("market_cap"),
            "circulating_supply": market_data.get("circulating_supply"),
            "total_supply": market_data.get("total_supply"),
            "current_price": market_data.get("current_price"),
            "market_cap_rank": market_data.get("market_cap_rank"),
            "top_5_holder_ratio": ownership_dist.get("percentageHeldByTop5"),
            "top_10_holder_ratio": ownership_dist.get("percentageHeldByTop10"),
            "top_20_holder_ratio": ownership_dist.get("percentageHeldByTop20")
        }

    # ---------------- Sniper Detection ----------------
    def get_sniper_analysis(self) -> Dict[str, Any]:
        holder_analysis = self.data.get("details", {}).get("holderAnalysis", {})
        sniper = holder_analysis.get("sniper_analysis", {})
        
        return {
            "sniper_count": sniper.get("sniper_count", 0),
            "sniper_percentage": sniper.get("sniper_total_percentage", 0),
            "confidence_score": sniper.get("sniper_confidence_score", 0),
            "frontrunning_detected": sniper.get("potential_frontrunning_detected", False),
            "token_mint_time": holder_analysis.get("token_mint_time", "Unknown")
        }

    # ---------------- Combined Summary ----------------
    def summary(self) -> Dict[str, Any]:
        if not self.data:
            raise Exception("No data loaded. Call fetch_analysis(address) first.")
        return {
            "scorecard": self.get_overall_scorecard(),
            "categories": self.get_risk_categories(),
            "badges": self.get_badges(),
            "behavioral": self.get_behavioral_analysis(),
            "market": self.get_market_distribution(),
            "sniper": self.get_sniper_analysis()
        }


# ---------------- Tester ----------------
if __name__ == "__main__":
    load_dotenv()
    API_KEY = os.getenv('WEBACY_API_KEY')
    
    if not API_KEY:
        raise ValueError('Please set the WEBACY_API_KEY environment variable.')
    
    ADDRESS = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"  # USDC contract

    analyzer = ThreatRiskAnalysis(api_key=API_KEY)

    try:
        analyzer.fetch_analysis(ADDRESS)
        print(json.dumps(analyzer.summary(), indent=2))
    except Exception as e:
        print("Error:", e)

In [22]:
## @akinthomasbishop
## https://github.com/AKIN-THOMAS/analytic-sages-fastapi-nextjs/tree/main