# Solana Data Explorer - Delta Lake Tables

Simple viewer for the most recent data in each Delta Lake table from our DAG pipeline.

## Tables:
- **Bronze**: token_metrics, whale_holders, transaction_history
- **Silver**: tracked_tokens_delta, tracked_whales_delta, wallet_pnl
- **Gold**: smart_traders_delta

In [None]:
import pandas as pd
from pyspark.sql import SparkSession
from datetime import datetime

# Configure pandas display
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', 50)
pd.set_option('display.max_rows', 10)

print("📊 Solana Data Explorer - Delta Lake Edition")
print(f"🕐 Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

In [None]:
# Create Spark session with Delta Lake support
print("🚀 Initializing Spark with Delta Lake...")

spark = SparkSession.builder \
    .appName("SolanaDataExplorer") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.1.0,org.apache.hadoop:hadoop-aws:3.3.4") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin123") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

print("✅ Spark session ready with Delta Lake")

## 🥉 Bronze Layer Tables

In [None]:
# Bronze Token Metrics
print("📦 BRONZE: Token Metrics (Latest 10 records)")
print("="*70)

try:
    bronze_tokens_df = spark.read.format("delta").load("s3a://smart-trader/bronze/token_metrics")
    
    # Get record count and latest 10 records
    total_count = bronze_tokens_df.count()
    latest_tokens = bronze_tokens_df.orderBy("_delta_timestamp", ascending=False).limit(10).toPandas()
    
    print(f"Total records: {total_count:,}")
    print(f"\nLatest 10 tokens:")
    display_cols = ['token_address', 'symbol', 'liquidity', 'price', 'processing_date', '_delta_timestamp']
    print(latest_tokens[[col for col in display_cols if col in latest_tokens.columns]])
    
except Exception as e:
    print(f"❌ Error loading bronze tokens: {e}")

In [None]:
# Bronze Whale Holders
print("\n📦 BRONZE: Whale Holders (Latest 10 records)")
print("="*70)

try:
    bronze_whales_df = spark.read.format("delta").load("s3a://smart-trader/bronze/whale_holders")
    
    total_count = bronze_whales_df.count()
    latest_whales = bronze_whales_df.orderBy("_delta_timestamp", ascending=False).limit(10).toPandas()
    
    print(f"Total records: {total_count:,}")
    print(f"\nLatest 10 whale holders:")
    display_cols = ['wallet_address', 'token_symbol', 'rank', 'holdings_amount', 'holdings_value_usd', '_delta_timestamp']
    print(latest_whales[[col for col in display_cols if col in latest_whales.columns]])
    
except Exception as e:
    print(f"❌ Error loading bronze whales: {e}")

In [None]:
# Bronze Transaction History
print("\n📦 BRONZE: Transaction History (Latest 10 records)")
print("="*70)

try:
    bronze_txns_df = spark.read.format("delta").load("s3a://smart-trader/bronze/transaction_history")
    
    total_count = bronze_txns_df.count()
    latest_txns = bronze_txns_df.orderBy("timestamp", ascending=False).limit(10).toPandas()
    
    print(f"Total records: {total_count:,}")
    print(f"\nLatest 10 transactions:")
    display_cols = ['wallet_address', 'timestamp', 'base_symbol', 'quote_symbol', 'base_type_swap', 'quote_type_swap']
    print(latest_txns[[col for col in display_cols if col in latest_txns.columns]])
    
except Exception as e:
    print(f"❌ Error loading bronze transactions: {e}")

## 🥈 Silver Layer Tables

In [None]:
# Silver Tracked Tokens
print("💎 SILVER: Tracked Tokens (Latest 10 records)")
print("="*70)

try:
    silver_tokens_df = spark.read.format("delta").load("s3a://smart-trader/silver/tracked_tokens_delta")
    
    total_count = silver_tokens_df.count()
    latest_tokens = silver_tokens_df.orderBy("silver_created_at", ascending=False).limit(10).toPandas()
    
    print(f"Total records: {total_count:,}")
    print(f"\nLatest 10 tracked tokens:")
    display_cols = ['token_address', 'symbol', 'liquidity', 'liquidity_tier', 'whale_fetch_status', 'silver_created_at']
    print(latest_tokens[[col for col in display_cols if col in latest_tokens.columns]])
    
except Exception as e:
    print(f"❌ Error loading silver tokens: {e}")

In [None]:
# Silver Tracked Whales
print("\n💎 SILVER: Tracked Whales (Latest 10 records)")
print("="*70)

try:
    silver_whales_df = spark.read.format("delta").load("s3a://smart-trader/silver/tracked_whales_delta")
    
    total_count = silver_whales_df.count()
    latest_whales = silver_whales_df.orderBy("silver_created_at", ascending=False).limit(10).toPandas()
    
    print(f"Total records: {total_count:,}")
    print(f"\nLatest 10 tracked whales:")
    display_cols = ['whale_id', 'wallet_address', 'token_symbol', 'whale_tier', 'processing_status', 'silver_created_at']
    print(latest_whales[[col for col in display_cols if col in latest_whales.columns]])
    
except Exception as e:
    print(f"❌ Error loading silver whales: {e}")

In [None]:
# Silver Wallet PnL
print("\n💎 SILVER: Wallet PnL (Latest 10 records)")
print("="*70)

try:
    silver_pnl_df = spark.read.format("delta").load("s3a://smart-trader/silver/wallet_pnl")
    
    # Filter for portfolio-level records
    portfolio_df = silver_pnl_df.filter(silver_pnl_df.token_address == "ALL_TOKENS")
    
    total_count = portfolio_df.count()
    latest_pnl = portfolio_df.orderBy("processed_at", ascending=False).limit(10).toPandas()
    
    print(f"Total portfolio records: {total_count:,}")
    print(f"\nLatest 10 wallet PnL records:")
    display_cols = ['wallet_address', 'total_pnl', 'portfolio_roi', 'win_rate', 'trade_count', 'processed_at']
    print(latest_pnl[[col for col in display_cols if col in latest_pnl.columns]])
    
except Exception as e:
    print(f"❌ Error loading silver PnL: {e}")

## 🥇 Gold Layer Tables

In [None]:
# Gold Smart Traders
print("🏆 GOLD: Smart Traders (Latest 10 records)")
print("="*70)

try:
    gold_traders_df = spark.read.format("delta").load("s3a://smart-trader/gold/smart_traders_delta")
    
    total_count = gold_traders_df.count()
    latest_traders = gold_traders_df.orderBy("total_pnl", ascending=False).limit(10).toPandas()
    
    print(f"Total smart traders: {total_count:,}")
    
    # Get tier breakdown
    tier_counts = gold_traders_df.groupBy("performance_tier").count().toPandas()
    print(f"\nPerformance tiers:")
    for _, row in tier_counts.iterrows():
        print(f"  {row['performance_tier']}: {row['count']:,}")
    
    print(f"\nTop 10 smart traders by PnL:")
    display_cols = ['wallet_address', 'total_pnl', 'roi', 'win_rate', 'performance_tier', 'gold_processed_at']
    print(latest_traders[[col for col in display_cols if col in latest_traders.columns]])
    
except Exception as e:
    print(f"❌ Error loading gold traders: {e}")

## 📊 Pipeline Summary

In [None]:
# Pipeline Summary
print("\n📊 PIPELINE SUMMARY")
print("="*70)

# Collect all table stats
table_stats = []

tables = [
    ("Bronze Token Metrics", "s3a://smart-trader/bronze/token_metrics"),
    ("Bronze Whale Holders", "s3a://smart-trader/bronze/whale_holders"),
    ("Bronze Transactions", "s3a://smart-trader/bronze/transaction_history"),
    ("Silver Tracked Tokens", "s3a://smart-trader/silver/tracked_tokens_delta"),
    ("Silver Tracked Whales", "s3a://smart-trader/silver/tracked_whales_delta"),
    ("Silver Wallet PnL", "s3a://smart-trader/silver/wallet_pnl"),
    ("Gold Smart Traders", "s3a://smart-trader/gold/smart_traders_delta")
]

for table_name, table_path in tables:
    try:
        df = spark.read.format("delta").load(table_path)
        count = df.count()
        
        # Get Delta table history
        from delta.tables import DeltaTable
        delta_table = DeltaTable.forPath(spark, table_path)
        history = delta_table.history(1).collect()
        latest_version = history[0]['version'] if history else 0
        
        table_stats.append({
            "Table": table_name,
            "Records": f"{count:,}",
            "Version": latest_version,
            "Status": "✅ Active"
        })
    except Exception as e:
        table_stats.append({
            "Table": table_name,
            "Records": "0",
            "Version": "N/A",
            "Status": "❌ Missing"
        })

# Display summary
summary_df = pd.DataFrame(table_stats)
print(summary_df.to_string(index=False))

print(f"\n✅ Data exploration complete!")
print(f"🕐 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

In [None]:
# Clean up Spark session
spark.stop()
print("🛑 Spark session closed")