### Import necessary library

In [None]:
import time
import requests
import hashlib
import base64
import hmac
import json
from pprint import pprint
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from functools import reduce
from pyspark.sql import DataFrame

# Declaring API Key and params

In [None]:
ARKHAM_API_KEY = "f87e20f4-2053-4afc-8dd2-39c32fb5c7a8"

# Setting up functions to call whenever needed

In [None]:
class ArkhamAPI:
    BASE_URL = "https://arkm.com/api"

    def __init__(self, api_key):
        self.api_key = api_key

    def get_assets(self, chain=None):
        url = f"{self.BASE_URL}/public/assets"
        headers = {
            "API-Key": self.api_key,
            "Accept": "application/json"
        }

        params = {}
        if chain:
            params["chain"] = chain

        response = requests.get(url, headers=headers, params=params, timeout=10)
        print(f"HTTP Status: {response.status_code}")
        if response.status_code != 200:
            print("Error response:", response.json())
            return None

        return response.json()

    def get_contracts(self, chain=None):
        url = f"{self.BASE_URL}/public/contracts"
        headers = {
            "API-Key": self.api_key,
            "Accept": "application/json"
        }

        params = {}
        if chain:
            params["chain"] = chain

        response = requests.get(url, headers=headers, params=params, timeout=10)
        print(f"HTTP Status: {response.status_code}")
        if response.status_code != 200:
            print("Error response:", response.json())
            return None

        return response.json()

    def get_solana_assets(self):
        assets = self.get_assets(chain="solana")
        if not assets:
            print("No assets returned for Solana or error occurred.")
            return []

        sol_assets = [
            asset for asset in assets
            if any(chain_info.get("symbol") == "SOL" or chain_info.get("name", "").lower() == "solana"
                   for chain_info in asset.get("chains", []))
        ]
        return sol_assets

    def get_all_contracts(self):
        contracts = self.get_contracts()
        if not contracts:
            print("No contracts returned or error occurred.")
            return []
        return contracts



In [None]:
# =====================
# Usage
# =====================
if __name__ == "__main__":
    api = ArkhamAPI(ARKHAM_API_KEY)
    solana_assets = api.get_solana_assets()
    print(solana_assets)

HTTP Status: 200
[{'symbol': 'SOL', 'name': 'Solana', 'geckoId': 'solana', 'imageUrl': 'https://static.arkhamintelligence.com/tokens/solana.png', 'stablecoin': False, 'featuredPair': 'SOL_USDT', 'chains': [{'symbol': 'SOL', 'assetSymbol': 'SOL', 'name': 'Solana', 'type': 4, 'confirmations': 32, 'blockTime': 400000}], 'status': 'listed', 'minDeposit': '0.05', 'minWithdrawal': '0.05', 'withdrawalFee': '0.02'}, {'symbol': 'WIF', 'name': 'Dogwifhat', 'geckoId': 'dogwifcoin', 'imageUrl': 'https://static.arkhamintelligence.com/tokens/dogwifhat-eth.png', 'stablecoin': False, 'featuredPair': 'WIF_USDT', 'chains': [{'symbol': 'SOL', 'assetSymbol': 'SOL', 'name': 'Solana', 'type': 4, 'confirmations': 32, 'blockTime': 400000}], 'status': 'listed', 'minDeposit': '4', 'minWithdrawal': '4', 'withdrawalFee': '1.5'}, {'symbol': 'BONK', 'name': 'Bonk', 'geckoId': 'bonk', 'imageUrl': 'https://static.arkhamintelligence.com/tokens/bonk.png', 'stablecoin': False, 'featuredPair': 'BONK_USDT', 'chains': [{'s

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col

# =====================
# Initialize Spark
# =====================
spark = SparkSession.builder \
    .appName("SolanaAssetsAnalysis") \
    .getOrCreate()

df = spark.createDataFrame(solana_assets)

df.createOrReplaceTempView("solana_tokens")

print("These are some of the available token that is available for analysis in Solana chain")
spark.sql("SELECT DISTINCT name, symbol FROM solana_tokens").show(truncate=False)

These are some of the available token that is available for analysis in Solana chain
+--------------+--------+
|name          |symbol  |
+--------------+--------+
|Render        |RENDER  |
|Jupiter       |JUP     |
|Dogwifhat     |WIF     |
|Bonk          |BONK    |
|OFFICIAL TRUMP|TRUMP   |
|Melania Meme  |MELANIA |
|Solana        |SOL     |
|Pudgy Penguins|PENGU   |
|Grass         |GRASS   |
|Jito          |JTO     |
|Pump.fun      |PUMP    |
|Popcat        |POPCAT  |
|Fartcoin      |FARTCOIN|
|Pyth Network  |PYTH    |
+--------------+--------+



In [None]:
spark.sql("""
SELECT *
FROM solana_tokens
""").show(truncate=False)


+----------------------------------------------------------------------------------------------------------+-------------+-----------------------+------------------------------------------------------------------------+----------+-------------+--------------+----------+------+--------+-------------+
|chains                                                                                                    |featuredPair |geckoId                |imageUrl                                                                |minDeposit|minWithdrawal|name          |stablecoin|status|symbol  |withdrawalFee|
+----------------------------------------------------------------------------------------------------------+-------------+-----------------------+------------------------------------------------------------------------+----------+-------------+--------------+----------+------+--------+-------------+
|[{symbol -> SOL, assetSymbol -> SOL, name -> Solana, blockTime -> 400000, confirmations -> 32, t

Doesn't show any interesting data that could contribute for risk score analysis


In [None]:
# =====================
# Usage
# =====================
if __name__ == "__main__":
    api = ArkhamAPI(ARKHAM_API_KEY)
    contracts = api.get_contracts()
    print(contracts)

HTTP Status: 200
[{'symbol': 'BTC_USDT_PERP', 'baseSymbol': 'BTC.P', 'quoteSymbol': 'USDT', 'price': '84685.88', 'price24hAgo': '85744.04', 'high24h': '85766.02', 'low24h': '80621', 'volume24h': '37.41301', 'quoteVolume24h': '3147191.0233199', 'markPrice': '84749.61', 'indexPrice': '84762.921682027', 'fundingRate': '-0.000003', 'nextFundingRate': '-0.000003', 'nextFundingTime': 1763794800000000, 'productType': 'perpetual', 'openInterest': '2.86344', 'indexCurrency': 'USDT', 'usdVolume24h': '3147191.0233199', 'openInterestUSD': '242492.9362272'}, {'symbol': 'ETH_USDT_PERP', 'baseSymbol': 'ETH.P', 'quoteSymbol': 'USDT', 'price': '2766.34', 'price24hAgo': '2795.41', 'high24h': '2802.09', 'low24h': '2623.45', 'volume24h': '608.8565', 'quoteVolume24h': '1661478.641528', 'markPrice': '2765.66', 'indexPrice': '2766.158417438', 'fundingRate': '-0.000005', 'nextFundingRate': '-0.000005', 'nextFundingTime': 1763794800000000, 'productType': 'perpetual', 'openInterest': '75.68255', 'indexCurrency'

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col

# Create DataFrame for contracts
df_contracts = spark.createDataFrame(contracts)
df_contracts.createOrReplaceTempView("contracts")

# Create DataFrame for solana_assets and get distinct symbols
df_solana_assets = spark.createDataFrame(solana_assets)
df_distinct_solana_symbols = df_solana_assets.select("symbol").distinct()
df_distinct_solana_symbols.createOrReplaceTempView("solana_tokens_distinct")


# Let us pick 5 tokens to make a risk score metrics
df_solana_contracts = spark.sql("""
SELECT
    c.*,
    from_unixtime(nextFundingTime / 1000000) AS nextFundingTimestamp_transformed
FROM contracts c
JOIN solana_tokens_distinct s
  ON c.baseSymbol LIKE CONCAT(s.symbol, '%')
  order by baseSymbol
  limit 5;
""")

df_solana_contracts.createOrReplaceTempView("solana_contracts_view")

df_solana_contracts.show(truncate=False)


+----------+-----------+----------+-------------+-----------+----------+----------+---------------+----------------+------------+---------------+----------+-----------+-----------+-----------+--------------+------------------+--------------+----------+--------------------------------+
|baseSymbol|fundingRate|high24h   |indexCurrency|indexPrice |low24h    |markPrice |nextFundingRate|nextFundingTime |openInterest|openInterestUSD|price     |price24hAgo|productType|quoteSymbol|quoteVolume24h|symbol            |usdVolume24h  |volume24h |nextFundingTimestamp_transformed|
+----------+-----------+----------+-------------+-----------+----------+----------+---------------+----------------+------------+---------------+----------+-----------+-----------+-----------+--------------+------------------+--------------+----------+--------------------------------+
|BONK.P    |-0.000011  |0.00000959|USDT         |0.000008678|0.00000846|0.00000867|-0.000014      |1763794800000000|204656973   |1768.23624672

#  Risk Score



In [None]:
spark.sql("""

WITH stats AS (
    -- Compute min/max values for normalization
    SELECT
        MIN(openInterestUSD / NULLIF(usdVolume24h, 0)) AS min_ratio,
        MAX(openInterestUSD / NULLIF(usdVolume24h, 0)) AS max_ratio,
        MIN(ABS(nextFundingRate)) AS min_funding,
        MAX(ABS(nextFundingRate)) AS max_funding,
        MIN(ABS(price - markPrice)/NULLIF(markPrice,0)) AS min_price_gap,
        MAX(ABS(price - markPrice)/NULLIF(markPrice,0)) AS max_price_gap
    FROM solana_contracts_view
),
token_risks AS (
    SELECT
        c.baseSymbol,
        c.openInterestUSD,
        c.usdVolume24h,
        c.nextFundingRate AS fundingRate,
        c.price,
        c.markPrice,
        s.min_ratio,
        s.max_ratio,
        s.min_funding,
        s.max_funding,
        s.min_price_gap,
        s.max_price_gap,

        -- Liquidity risk: smaller OI → higher risk, clamped 0–1
        ROUND(
            1 - CASE
                WHEN c.usdVolume24h > 0 THEN
                    ( (c.openInterestUSD / c.usdVolume24h) - min_ratio )
                    / NULLIF((max_ratio - min_ratio), 0)
                ELSE
                    1  -- no volume = extremely illiquid = highest risk
            END,
        3
        ) AS liquidity_risk,

        -- Funding risk: scaled 0–1
        GREATEST(0, LEAST(1,
            CASE WHEN s.max_funding > s.min_funding THEN
                (ABS(c.nextFundingRate) - s.min_funding) / (s.max_funding - s.min_funding)
            ELSE 0 END
        )) AS funding_risk,

        -- Price gap risk: scaled 0–1
        GREATEST(0, LEAST(1,
            CASE WHEN s.max_price_gap > s.min_price_gap THEN
                (ABS(c.price - c.markPrice)/NULLIF(c.markPrice,0) - s.min_price_gap) / (s.max_price_gap - s.min_price_gap)
            ELSE 0 END
        )) AS price_gap_risk
    FROM solana_contracts_view c
    CROSS JOIN stats s
)

SELECT
    baseSymbol,
    openInterestUSD,
    usdVolume24h,
    fundingRate,
    price,
    markPrice,
    ROUND(liquidity_risk * 100, 2) AS liquidity_risk,
    ROUND(funding_risk * 100, 2) AS funding_risk,
    ROUND(price_gap_risk * 100, 2) AS price_gap_risk,

    -- Composite risk: average of three components, scaled 0–100
    ROUND((liquidity_risk + funding_risk + price_gap_risk)/3 * 100, 2) AS composite_risk_score,

    -- Risk tier classification
    CASE
        WHEN ROUND((liquidity_risk + funding_risk + price_gap_risk)/3 * 100, 2)  > 70 THEN 'HIGH RISK'
        WHEN ROUND((liquidity_risk + funding_risk + price_gap_risk)/3 * 100, 2)  > 50 THEN 'MODERATE RISK'
        WHEN ROUND((liquidity_risk + funding_risk + price_gap_risk)/3 * 100, 2) > 30 THEN 'LOW RISK'
        ELSE 'SAFE'
    END AS risk_tier

FROM token_risks
ORDER BY composite_risk_score DESC;


""").show(truncate=False)

+----------+---------------+--------------+-----------+----------+----------+--------------+------------+--------------+--------------------+-------------+
|baseSymbol|openInterestUSD|usdVolume24h  |fundingRate|price     |markPrice |liquidity_risk|funding_risk|price_gap_risk|composite_risk_score|risk_tier    |
+----------+---------------+--------------+-----------+----------+----------+--------------+------------+--------------+--------------------+-------------+
|BONK.P    |1768.23624672  |36407.83796539|-0.000014  |0.00000864|0.00000867|98.5          |100.0       |0.0           |66.17               |MODERATE RISK|
|FARTCOIN.P|0.08404        |347.18889     |0.000005   |0.2101    |0.21      |100.0         |25.0        |0.0           |41.67               |LOW RISK     |
|GRASS.P   |1.95518        |202.97898     |0.000003   |0.3371    |0.3417    |99.7          |8.33        |0.0           |36.01               |LOW RISK     |
|JUP.P     |20016.9736     |6889.55584    |-0.000003  |0.2392   

The weights for the risk score components were chosen to emphasize factors that are more indicative of potential rug pulls:

* low open interest (OI, signals thin depth/easy dumps),
* extreme positive funding rates (retail traps),
* low 24h volume (illiquidity),
* high OI/volume ratio (wash trading),
* price/mark gap (manipulation).




In [None]:
df_solana_contracts_test = spark.sql("""
SELECT
    c.*,
    from_unixtime(nextFundingTime / 1000000) AS nextFundingTimestamp_transformed
FROM contracts c
JOIN solana_tokens_distinct s
  ON c.baseSymbol LIKE CONCAT(s.symbol, '%')
  order by baseSymbol
""")

df_solana_contracts_test.createOrReplaceTempView("solana_contracts_view_test")

df_solana_contracts_test.show(truncate=False)

+----------+-----------+----------+-------------+-------------+----------+----------+---------------+----------------+--------------+----------------+----------+-----------+-----------+-----------+--------------+------------------+--------------+----------+--------------------------------+
|baseSymbol|fundingRate|high24h   |indexCurrency|indexPrice   |low24h    |markPrice |nextFundingRate|nextFundingTime |openInterest  |openInterestUSD |price     |price24hAgo|productType|quoteSymbol|quoteVolume24h|symbol            |usdVolume24h  |volume24h |nextFundingTimestamp_transformed|
+----------+-----------+----------+-------------+-------------+----------+----------+---------------+----------------+--------------+----------------+----------+-----------+-----------+-----------+--------------+------------------+--------------+----------+--------------------------------+
|BONK.P    |-0.000011  |0.00000959|USDT         |0.000008678  |0.00000846|0.00000867|-0.000014      |1763794800000000|204656973

# Test

In [None]:
def get_rug_pull_risk_test(spark):
    query = """
WITH stats AS (
    -- Compute min/max values for normalization
    SELECT
        MIN(openInterestUSD / NULLIF(usdVolume24h, 0)) AS min_ratio,
        MAX(openInterestUSD / NULLIF(usdVolume24h, 0)) AS max_ratio,
        MIN(ABS(nextFundingRate)) AS min_funding,
        MAX(ABS(nextFundingRate)) AS max_funding,
        MIN(ABS(price - markPrice)/NULLIF(markPrice,0)) AS min_price_gap,
        MAX(ABS(price - markPrice)/NULLIF(markPrice,0)) AS max_price_gap
    FROM solana_contracts_view_test
),
token_risks AS (
    SELECT
        c.baseSymbol,
        c.openInterestUSD,
        c.usdVolume24h,
        c.nextFundingRate AS fundingRate,
        c.price,
        c.markPrice,
        s.min_ratio,
        s.max_ratio,
        s.min_funding,
        s.max_funding,
        s.min_price_gap,
        s.max_price_gap,

        -- Liquidity risk: smaller OI → higher risk, clamped 0–1
        ROUND(
            1 - CASE
                WHEN c.usdVolume24h > 0 THEN
                    ( (c.openInterestUSD / c.usdVolume24h) - min_ratio )
                    / NULLIF((max_ratio - min_ratio), 0)
                ELSE
                    1  -- no volume = extremely illiquid = highest risk
            END,
        3
        ) AS liquidity_risk,

        -- Funding risk: scaled 0–1
        GREATEST(0, LEAST(1,
            CASE WHEN s.max_funding > s.min_funding THEN
                (ABS(c.nextFundingRate) - s.min_funding) / (s.max_funding - s.min_funding)
            ELSE 0 END
        )) AS funding_risk,

        -- Price gap risk: scaled 0–1
        GREATEST(0, LEAST(1,
            CASE WHEN s.max_price_gap > s.min_price_gap THEN
                (ABS(c.price - c.markPrice)/NULLIF(c.markPrice,0) - s.min_price_gap) / (s.max_price_gap - s.min_price_gap)
            ELSE 0 END
        )) AS price_gap_risk
    FROM solana_contracts_view_test c
    CROSS JOIN stats s
)

SELECT
    baseSymbol,
    openInterestUSD,
    usdVolume24h,
    fundingRate,
    price,
    markPrice,
    ROUND(liquidity_risk * 100, 2) AS liquidity_risk,
    ROUND(funding_risk * 100, 2) AS funding_risk,
    ROUND(price_gap_risk * 100, 2) AS price_gap_risk,

    -- Composite risk: average of three components, scaled 0–100
    ROUND((liquidity_risk + funding_risk + price_gap_risk)/3 * 100, 2) AS composite_risk_score,

    -- Risk tier classification
    CASE
        WHEN ROUND((liquidity_risk + funding_risk + price_gap_risk)/3 * 100, 2)  > 80 THEN 'HIGH RISK'
        WHEN ROUND((liquidity_risk + funding_risk + price_gap_risk)/3 * 100, 2)  > 60 THEN 'MODERATE RISK'
        WHEN ROUND((liquidity_risk + funding_risk + price_gap_risk)/3 * 100, 2) > 40 THEN 'LOW RISK'
        ELSE 'SAFE'
    END AS risk_tier

FROM token_risks
ORDER BY composite_risk_score DESC;
    """

    return spark.sql(query)


In [None]:
risk_results = get_rug_pull_risk_test(spark)
risk_results.show(truncate=False)


+----------+----------------+--------------+-----------+----------+----------+--------------+------------+--------------+--------------------+-------------+
|baseSymbol|openInterestUSD |usdVolume24h  |fundingRate|price     |markPrice |liquidity_risk|funding_risk|price_gap_risk|composite_risk_score|risk_tier    |
+----------+----------------+--------------+-----------+----------+----------+--------------+------------+--------------+--------------------+-------------+
|PUMP.P    |5530.7813       |6582.439279   |-0.00003   |0.002692  |0.002734  |94.9          |100.0       |100.0         |98.3                |HIGH RISK    |
|PENGU.P   |6.0144086       |382.1371473   |-0.000028  |0.010267  |0.010358  |99.9          |92.86       |100.0         |97.59               |HIGH RISK    |
|PYTH.P    |193.2964352     |319.993558    |-0.000024  |0.07216   |0.07293   |96.3          |78.57       |100.0         |91.62               |HIGH RISK    |
|BONK.P    |1768.23624672   |36407.83796539|-0.000014  |0.

# Summary (1):

### Insights or Next Steps
*   The updated risk assessment provides a more robust and targeted approach to identifying potential rug pulls by giving higher importance to Open Interest and Funding Rate, and by specifically penalizing extreme positive funding rates, which often precede coordinated dumps. The shift to absolute risk tiers ensures consistent and transparent classification across all tokens, making the risk levels more interpretable.



# Token concentration

In [90]:
BASE_URL = "https://api.arkhamintelligence.com"

def get_token_holders(token_address, chain="solana", group_by_entity=False):
    """
    Get top holders for a token on a specified chain.

    Args:
        token_address: The token's contract address
        chain: Blockchain (default "solana")
        group_by_entity: If True, group results by entity
    """

    url = f"{BASE_URL}/token/holders/{chain}/{token_address}"

    headers = {
        "API-Key": ARKHAM_API_KEY
    }

    params = {
        "groupByEntity": str(group_by_entity).lower()  # API expects "true" or "false"
    }

    response = requests.get(url, headers=headers, params=params)

    if response.status_code == 200:
        return response.json()
    else:
        print(f"Error: {response.status_code} for token {token_address}")
        print(response.text)
        return None

In [102]:
# ==========================
# Initialize Spark
# ==========================
spark = SparkSession.builder.appName("ArkhamHolderAnalysis").getOrCreate()


# ==========================
# 1. PARSE API JSON INTO DATAFRAME (for ONE token)
# ==========================
def parse_holders_to_dataframe(json_data):
    holders_list = json_data.get('addressTopHolders', {}).get('solana', [])
    total_supply = json_data.get('totalSupply', {}).get('solana', 0)
    token_info = json_data.get('token', {})

    flattened = []
    for h in holders_list:
        flattened.append({
            'address': h['address']['address'],
            'balance': float(h['balance']),
            'usd_value': float(h['usd']),
            'pct_of_cap': float(h['pctOfCap']),
            'entity_name': h['address'].get('arkhamEntity', {}).get('name'),
            'entity_type': h['address'].get('arkhamEntity', {}).get('type'),
            'label_name': h['address'].get('arkhamLabel', {}).get('name'),
            'chain': h['address']['chain'],
            'token_address': token_info['identifier']['address'],
            'token_name': token_info['name'],
            'token_symbol': token_info['symbol'],
            'total_supply': total_supply
        })

    return spark.createDataFrame(flattened)


# ==========================
# 2. FETCH MULTIPLE TOKENS AND UNION ALL
# ==========================

token_list = [
    "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v",      # Established stablecoin
    "So11111111111111111111111111111111111111112",      # Native SOL token
    "9BB6NFEcjBCtnNLFko2FqVQBq8HHM13kCyYcdQbgpump",      # Fartcoin
    "7xKXtg2CW87d97TXJSDpbD5jBkheTqA83TZRuJosgAsU",     # Samoyedcoin
    "JUPyiwrYJFskUPiHa7hkeR8VUtAeFoSYbKedZNsDvCN",       #Jupiter
    "DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263",      #Bonk
    "2zMMhcVQEXDtdE6vsFS7S7D5oUodfJHE8vd1gnBouauv",      #Pengu
    "9jZgvgS2bWtQiYzv48GcWzY4tnkeRSANbTm8Kp1LmSyS"       #Sharpe (Real Rug pull)
]

dfs = []

for addr in token_list:
    data = get_token_holders(addr)
    if data:
        df = parse_holders_to_dataframe(data)
        dfs.append(df)

# Combine all tokens using union
all_holders_df = reduce(DataFrame.unionByName, dfs)

# Register for SQL
all_holders_df.createOrReplaceTempView("token_holders")

In [103]:
test = """ SELECT * FROM token_holders """
spark.sql(test).show(truncate=False)

+--------------------------------------------+------------------+------+-------------------------+---------------------+--------------------------------+---------------------+--------------------------------------------+----------+------------+-------------------+------------------+
|address                                     |balance           |chain |entity_name              |entity_type          |label_name                      |pct_of_cap           |token_address                               |token_name|token_symbol|total_supply       |usd_value         |
+--------------------------------------------+------------------+------+-------------------------+---------------------+--------------------------------+---------------------+--------------------------------------------+----------+------------+-------------------+------------------+
|7VHUFJHWu2CuExkJcJrzhQPJ2oygupTWkL2A2For4BmE|8.44180586825984E8|solana|Circle                   |stablecoin           |NULL                        

In [101]:
# ==========================
# CONCENTRATION RISK ANALYSIS SQL
# ==========================
risk_analysis_query = """
WITH holder_stats AS (
    SELECT
        token_address,
        token_name,
        token_symbol,
        total_supply,
        COUNT(*) AS num_holders_shown,
        MAX(pct_of_cap) AS largest_holder_pct,
        APPROX_PERCENTILE(pct_of_cap, 0.5) AS median_holder_pct,
        AVG(pct_of_cap) AS avg_holder_pct,
        SUM(CASE WHEN entity_type = 'cex' THEN pct_of_cap ELSE 0 END) AS cex_concentration,
        SUM(CASE WHEN entity_type IN ('dex', 'dex-aggregator') THEN pct_of_cap ELSE 0 END) AS dex_concentration,
        SUM(CASE WHEN entity_type IS NULL THEN pct_of_cap ELSE 0 END) AS unknown_concentration
    FROM token_holders
    GROUP BY token_address, token_name, token_symbol, total_supply
),
top_concentrations AS (
    SELECT
        token_address,
        SUM(CASE WHEN rn <= 5  THEN pct_of_cap END) AS top_5_pct,
        SUM(CASE WHEN rn <= 10 THEN pct_of_cap END) AS top_10_pct
    FROM (
        SELECT
            token_address,
            pct_of_cap,
            ROW_NUMBER() OVER (PARTITION BY token_address ORDER BY pct_of_cap DESC) AS rn
        FROM token_holders
    ) ranked
    GROUP BY token_address
),
base AS (
    SELECT
        hs.*,
        tc.top_5_pct,
        tc.top_10_pct
    FROM holder_stats hs
    JOIN top_concentrations tc ON hs.token_address = tc.token_address
),
normalized AS (
    SELECT
        *,
        MIN(top_10_pct)     OVER () AS min_top10,
        MAX(top_10_pct)     OVER () AS max_top10,
        MIN(unknown_concentration) OVER () AS min_unknown,
        MAX(unknown_concentration) OVER () AS max_unknown,
        MIN(largest_holder_pct)    OVER () AS min_whale,
        MAX(largest_holder_pct)    OVER () AS max_whale
    FROM base
),
scored AS (
    SELECT
        *,
        GREATEST(0, LEAST(1, (top_10_pct - min_top10) / NULLIF(max_top10 - min_top10,0))) AS norm_concentration,
        GREATEST(0, LEAST(1, (unknown_concentration - min_unknown) / NULLIF(max_unknown - min_unknown,0))) AS norm_unknown,
        GREATEST(0, LEAST(1, (largest_holder_pct - min_whale) / NULLIF(max_whale - min_whale,0))) AS norm_whale
    FROM normalized
)
SELECT
    token_address,
    token_name,
    token_symbol,
    num_holders_shown,
    ROUND(largest_holder_pct * 100, 2) AS largest_holder_pct,
    ROUND(top_10_pct * 100, 2) AS top_10_concentration_pct,
    ROUND(top_5_pct * 100, 2) AS top_5_concentration_pct,
    ROUND(cex_concentration * 100, 2) AS cex_concentration_pct,
    ROUND(dex_concentration * 100, 2) AS dex_concentration_pct,
    ROUND(unknown_concentration * 100, 2) AS unknown_concentration_pct,
    ROUND(
        (norm_concentration * 0.45 +
         norm_unknown       * 0.30 +
         norm_whale         * 0.25) * 100, 2
    ) AS composite_risk_score,
    CASE
        WHEN (norm_concentration * 0.45 + norm_unknown * 0.30 + norm_whale * 0.25) > 0.70 THEN 'HIGH RISK'
        WHEN (norm_concentration * 0.45 + norm_unknown * 0.30 + norm_whale * 0.25) > 0.40 THEN 'MEDIUM RISK'
        WHEN (norm_concentration * 0.45 + norm_unknown * 0.30 + norm_whale * 0.25) > 0.15 THEN 'LOW RISK'
        ELSE 'VERY LOW RISK'
    END AS risk_tier
FROM scored
ORDER BY composite_risk_score DESC
"""

risk_analysis_df = spark.sql(risk_analysis_query)


# ==========================
# REPORTS
# ==========================
print("=" * 80)
print("HOLDER CONCENTRATION RISK ANALYSIS")
print("=" * 80)
risk_analysis_df.show(truncate=False, vertical=True)


holder_breakdown_query = """
SELECT
    token_name,
    entity_type,
    COUNT(*) as num_holders,
    SUM(pct_of_cap) * 100 as total_pct_held,
    AVG(balance) as avg_balance,
    SUM(usd_value) as total_usd_value
FROM token_holders
GROUP BY token_name, entity_type
ORDER BY token_name asc, total_pct_held DESC
"""

holder_breakdown_df = spark.sql(holder_breakdown_query)

print("\n" + "=" * 80)
print("HOLDER BREAKDOWN BY ENTITY TYPE")
print("=" * 80)
holder_breakdown_df.show(holder_breakdown_df.count(), truncate=False)


top_holders_query = """
SELECT
    token_name,
    address,
    entity_name,
    entity_type,
    label_name,
    ROUND(pct_of_cap * 100, 2) AS pct_of_supply,
    ROUND(balance, 2) AS balance,
    ROUND(usd_value, 2) AS usd_value
FROM (
    SELECT
        *,
        ROW_NUMBER() OVER (
            PARTITION BY token_name
            ORDER BY pct_of_cap DESC
        ) AS rn
    FROM token_holders
)
WHERE rn <= 20
ORDER BY token_name, pct_of_cap DESC;
"""

top_holders_df = spark.sql(top_holders_query)

print("\n" + "=" * 80)
print("TOP 20 HOLDERS")
print("=" * 80)
top_holders_df.show(top_holders_df.count(), truncate=False)


# Return all dataframes
result = {
    'risk_analysis': risk_analysis_df,
    'holder_breakdown': holder_breakdown_df,
    'top_holders': top_holders_df,
    'all_holders': all_holders_df
}


HOLDER CONCENTRATION RISK ANALYSIS
-RECORD 0-----------------------------------------------------------------
 token_address             | 9jZgvgS2bWtQiYzv48GcWzY4tnkeRSANbTm8Kp1LmSyS 
 token_name                | SHARPEI                                      
 token_symbol              | shar                                         
 num_holders_shown         | 100                                          
 largest_holder_pct        | 97.98                                        
 top_10_concentration_pct  | 98.82                                        
 top_5_concentration_pct   | 98.49                                        
 cex_concentration_pct     | 0.0                                          
 dex_concentration_pct     | 98.03                                        
 unknown_concentration_pct | 1.52                                         
 composite_risk_score      | 70.12                                        
 risk_tier                 | HIGH RISK                           

# Summary (2):
* Concentration risk matters: Highly concentrated holdings increase vulnerability to manipulations, sudden dumps, or rug pulls.
* Entity type distinction: CEX-controlled holdings are less risky operationally but may signal centralization; unknown wallets can be red flags.
* Weighted scoring: The weighting reflects the relative importance of each factor in assessing systemic risk:
* Top 10 concentration (largest group control) – 45%
* Unknown entities (opaque control) – 30%
* Largest single holder (whale) – 25%
* Dynamic and comparative: Scores are normalized relative to all analyzed tokens to make risk assessment meaningful across different projects.
