In [None]:
# Market Volume and Volatility Screener
# This notebook analyzes cryptocurrency markets to identify high-volume and high-volatility trading opportunities
# It uses CLOB (Central Limit Order Book) data to fetch and analyze candle data from various trading pairs
import warnings

warnings.filterwarnings("ignore")

from core.data_sources.clob import CLOBDataSource
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from core.features.candles.volatility import Volatility, VolatilityConfig
from core.features.candles.volume import Volume, VolumeConfig


# Initialize CLOB data source for fetching market data
clob = CLOBDataSource()

In [None]:
# Configuration Parameters
# These constants control the data fetching and analysis behavior

# Market configuration
CONNECTOR_NAME = "binance_perpetual"  # Exchange connector to use
QUOTE_ASSET = "USDT"                  # Quote currency for trading pairs
INTERVAL = "1m"                       # Candle interval (1 minute)

# Technical analysis parameters
VOLATILITY_WINDOW = 20        # Window size for volatility calculations (20 periods)
VOLUME_SHORT_WINDOW = 60      # Short-term volume moving average window
VOLUME_LONG_WINDOW = 600       # Long-term volume moving average window

# Data fetching configuration
FETCH_CANDLES = False         # Set to True to fetch fresh data from exchange
DAYS = 15                    # Number of days of historical data to fetch
BATCH_CANDLES_REQUEST = 2    # Number of concurrent requests when fetching
SLEEP_REQUEST = 2.0          # Sleep time between batch requests (seconds)

# Filtering parameters (used later in the notebook)
VOLUME_QUANTILE = 0.5          # Markets must be above 50th percentile in volume
NATR_QUANTILE = 0.5            # Markets must be above 50th percentile in volatility
TOP_X_MARKETS = 20             # Number of top markets to analyze

# Data Fetching and Caching
This section either fetches fresh data from the exchange or loads cached data. Caching helps avoid unnecessary API calls and speeds up repeated analysis.

In [None]:
# Fetch or load candle data
# If FETCH_CANDLES is True: Downloads fresh data from the exchange
# If FETCH_CANDLES is False: Loads previously cached data

if FETCH_CANDLES:
    # Get all available trading rules from the exchange
    trading_rules = await clob.get_trading_rules(CONNECTOR_NAME)
    
    # Filter trading pairs to exclude those with matching quote asset
    # This ensures we only get USDT-quoted pairs (e.g., BTC-USDT, ETH-USDT)
    trading_pairs = [
        trading_pair for trading_pair in trading_rules.get_all_trading_pairs() 
        if trading_pair.split("-")[1] != QUOTE_ASSET
    ]
    
    # Fetch candle data for all trading pairs
    # Uses batch processing to avoid rate limiting
    candles = await clob.get_candles_batch_last_days(
        CONNECTOR_NAME, 
        trading_pairs, 
        INTERVAL, 
        DAYS, 
        BATCH_CANDLES_REQUEST, 
        SLEEP_REQUEST
    )
    
    # Save fetched data to cache for future use
    clob.dump_candles_cache()
    print(f"Fetched data for {len(candles)} trading pairs")
else:
    # Load previously cached data
    clob.load_candles_cache()
    print("Loaded data from cache")

In [None]:
# Filter candles to only include the desired connector and interval
# The cache may contain data from multiple connectors and intervals
candles = [
    value for key, value in clob.candles_cache.items() 
    if key[2] == INTERVAL and key[0] == CONNECTOR_NAME
]
print(f"Found {len(candles)} candle datasets for {CONNECTOR_NAME} with {INTERVAL} interval")

In [None]:
# Market Analysis: Calculate Volume and Volatility Metrics with Improved Momentum Analysis
# This uses the simplified volume feature that provides meaningful buy/sell momentum signals

# Initialize configurations for technical indicators
volatility_config = VolatilityConfig(window=VOLATILITY_WINDOW)
volume_config = VolumeConfig(short_window=VOLUME_SHORT_WINDOW, long_window=VOLUME_LONG_WINDOW)

# Process each trading pair and calculate metrics
report = []
for candle in candles:
    try:
        # Add volatility and volume features to the candle data
        candle.add_features([
            Volatility(volatility_config), 
            Volume(volume_config)
        ])
        df = candle.data
        
        # Calculate volatility metrics
        mean_volatility = df['volatility'].mean()
        mean_natr = df['natr'].mean()
        mean_bb_width = df['bb_width'].mean()
        
        # Get meaningful trend signals from improved volume metrics
        latest_divergence = df['buy_pressure_divergence'].iloc[-1]
        latest_momentum = df['buy_momentum'].iloc[-1]
        latest_volume_surge = df['volume_surge'].iloc[-1]
        latest_buy_signal = df['volume_buy_signal'].iloc[-1]
        latest_accumulation = df['accumulation_score'].iloc[-1]
        latest_distribution = df['distribution_score'].iloc[-1]
        
        # Calculate volume metrics
        total_volume_usd = df['volume_usd'].sum()
        total_hours = (df.index[-1] - df.index[0]).total_seconds() / 3600
        average_volume_per_hour = total_volume_usd / total_hours
        
        # Price range analysis
        max_price = df['close'].max()
        min_price = df['close'].min()
        range_price = max_price - min_price
        
        # Calculate how far current price is from the high (as percentage)
        range_price_pct = (max_price - df['close'].iloc[-1]) / df['close'].iloc[-1]
        
        # Current position in the range (0 = at min, 1 = at max)
        current_position = (df['close'].iloc[-1] - min_price) / range_price if range_price > 0 else 0.5
        
        # Enhanced composite score incorporating momentum signals
        # Normalize divergence to 0-1 scale (0.5 is neutral)
        normalized_divergence = (latest_divergence + 1) / 2 if not pd.isna(latest_divergence) else 0.5
        
        # Calculate momentum-weighted score
        momentum_factor = max(0.1, normalized_divergence * latest_volume_surge)
        
        score = (
            mean_natr * 
            average_volume_per_hour * 
            momentum_factor * 
            (1 - abs(current_position - 0.5))  # Prefer mid-range positions
        )
        
        report.append({
            'trading_pair': candle.trading_pair,
            'mean_volatility': mean_volatility,
            'mean_natr': mean_natr,
            'mean_bb_width': mean_bb_width,
            'buy_pressure_divergence': latest_divergence,
            'buy_momentum': latest_momentum,
            'volume_surge': latest_volume_surge,
            'volume_buy_signal': latest_buy_signal,
            'accumulation_score': latest_accumulation,
            'distribution_score': latest_distribution,
            'average_volume_per_hour': average_volume_per_hour,
            'current_position': current_position,
            'range_price_pct': range_price_pct,
            'score': score
        })
    except Exception as e:
        print(f"Error processing {candle.trading_pair}: {e}")
        continue

# Convert to DataFrame and normalize scores
report_df = pd.DataFrame(report)
max_score = report_df['score'].max()
report_df['normalized_score'] = report_df['score'] / max_score if max_score > 0 else 0
report_df.drop(columns=['score'], inplace=True)

# Display the report sorted by normalized score
print(f"\n📊 MARKET ANALYSIS COMPLETE")
print("=" * 60)
print(f"Analyzed {len(report_df)} trading pairs")
print("\n🎯 TOP 10 MARKETS BY MOMENTUM-WEIGHTED SCORE:")
print("-" * 60)

# Show top markets with key metrics
top_10 = report_df.sort_values('normalized_score', ascending=False).head(10)
display_cols = ['trading_pair', 'normalized_score', 'buy_pressure_divergence', 
                'volume_surge', 'accumulation_score', 'mean_natr', 'average_volume_per_hour']
top_10[display_cols]

In [None]:
# Momentum Analysis: Understanding Buy/Sell Pressure Dynamics
print("\n🔍 MOMENTUM INSIGHTS")
print("=" * 60)

# Identify markets with strong momentum signals
bullish_divergence = report_df[report_df['buy_pressure_divergence'] > 0.05].sort_values('buy_pressure_divergence', ascending=False)
bearish_divergence = report_df[report_df['buy_pressure_divergence'] < -0.05].sort_values('buy_pressure_divergence')

print(f"\n📈 BULLISH MOMENTUM (Short-term buying > Long-term):")
print("-" * 40)
if not bullish_divergence.empty:
    for _, row in bullish_divergence.head(5).iterrows():
        print(f"  {row['trading_pair']:10s} | Divergence: {row['buy_pressure_divergence']:+.4f} | Volume Surge: {row['volume_surge']:.2f}x")
else:
    print("  No markets showing strong bullish divergence")

print(f"\n📉 BEARISH MOMENTUM (Short-term selling > Long-term):")
print("-" * 40)
if not bearish_divergence.empty:
    for _, row in bearish_divergence.head(5).iterrows():
        print(f"  {row['trading_pair']:10s} | Divergence: {row['buy_pressure_divergence']:+.4f} | Volume Surge: {row['volume_surge']:.2f}x")
else:
    print("  No markets showing strong bearish divergence")

# Identify accumulation and distribution zones
accumulation_markets = report_df[report_df['accumulation_score'] > report_df['accumulation_score'].quantile(0.8)]
distribution_markets = report_df[report_df['distribution_score'] > report_df['distribution_score'].quantile(0.8)]

print(f"\n💰 ACCUMULATION ZONES (Smart money buying):")
print("-" * 40)
if not accumulation_markets.empty:
    for _, row in accumulation_markets.head(5).iterrows():
        print(f"  {row['trading_pair']:10s} | Score: {row['accumulation_score']:.4f} | Volume: ${row['average_volume_per_hour']:,.0f}/hr")
else:
    print("  No clear accumulation patterns detected")

print(f"\n🔴 DISTRIBUTION ZONES (Smart money selling):")
print("-" * 40)
if not distribution_markets.empty:
    for _, row in distribution_markets.head(5).iterrows():
        print(f"  {row['trading_pair']:10s} | Score: {row['distribution_score']:.4f} | Volume: ${row['average_volume_per_hour']:,.0f}/hr")
else:
    print("  No clear distribution patterns detected")

In [None]:
## Market Filtering Strategy
# Filter markets based on quantile thresholds to identify the best trading opportunities
# Markets must meet minimum volume and volatility requirements and have favorable price positioning

# Filtering parameters (adjust these to be more or less selective)
VOLUME_QUANTILE = 0.5          # Markets must be above 50th percentile in volume
NATR_QUANTILE = 0.5            # Markets must be above 50th percentile in volatility
TOP_X_MARKETS = 20             # Number of top markets to analyze

# Apply filters to identify high-opportunity markets
# These are markets with good liquidity, high volatility, and favorable price positioning
top_markets = report_df[
    (report_df['average_volume_per_hour'] > report_df['average_volume_per_hour'].quantile(VOLUME_QUANTILE)) &
    (report_df['mean_natr'] > report_df['mean_natr'].quantile(NATR_QUANTILE))
]

# Sort by current position (markets closest to their highs) and take top X
top_markets = top_markets.sort_values(by='current_position', ascending=False).head(TOP_X_MARKETS)

# Create a dictionary of candles for selected markets for detailed analysis
top_markets_candles = {
    candle.trading_pair: candle 
    for candle in candles 
    if candle.trading_pair in top_markets["trading_pair"].values
}

print(f"\\n🎯 FILTERED MARKETS")
print("=" * 50)
print(f"Applied filters:")
print(f"  - Volume > {VOLUME_QUANTILE*100}th percentile")
print(f"  - Volatility (NATR) > {NATR_QUANTILE*100}th percentile")
print(f"\\nMarkets meeting criteria: {len(top_markets_candles)}")
print(f"Selected markets: {list(top_markets_candles.keys())}")

In [None]:
# Enhanced Correlation Analysis: Volume, Volatility, and Momentum Metrics
# This helps understand relationships between volume, volatility, and buy/sell momentum

# Select key metrics for correlation analysis
correlation_data = report_df[[
    'average_volume_per_hour', 'mean_volatility', 'mean_natr', 
    'buy_pressure_divergence', 'volume_surge', 'volume_buy_signal',
    'accumulation_score', 'distribution_score', 'current_position'
]].copy()

# Log transform volume for better correlation analysis
correlation_data['log_volume'] = np.log10(correlation_data['average_volume_per_hour'] + 1)

# Calculate correlation matrix
corr_matrix = correlation_data.corr()

# Create enhanced correlation heatmap with Plotly
fig_corr = go.Figure(data=go.Heatmap(
    z=corr_matrix.values,
    x=corr_matrix.columns,
    y=corr_matrix.columns,
    colorscale='RdBu',
    zmid=0,
    text=np.round(corr_matrix.values, 2),
    texttemplate='%{text}',
    textfont={"size": 10},
    colorbar=dict(title="Correlation")
))

fig_corr.update_layout(
    title="Enhanced Correlation Matrix: Volume, Volatility & Momentum Metrics",
    height=700,
    width=900,
    xaxis_title="",
    yaxis_title="",
    xaxis={'tickangle': 45}
)

fig_corr.show()

# Key correlation insights
print("\n🔍 CORRELATION INSIGHTS")
print("=" * 60)

# Volume-Momentum correlations
vol_divergence_corr = correlation_data[['log_volume', 'buy_pressure_divergence']].corr().iloc[0, 1]
vol_surge_corr = correlation_data[['log_volume', 'volume_surge']].corr().iloc[0, 1]

print(f"\n📊 Volume-Momentum Relationships:")
print(f"  - Volume vs Buy Pressure Divergence: {vol_divergence_corr:.3f}")
if abs(vol_divergence_corr) > 0.3:
    print(f"    → {'High volume markets show stronger momentum shifts' if vol_divergence_corr > 0 else 'High volume markets show momentum stability'}")

print(f"  - Volume vs Volume Surge: {vol_surge_corr:.3f}")
print(f"    → {'Large markets tend to have volume spikes' if vol_surge_corr > 0 else 'Smaller markets show more volume volatility'}")

# Accumulation vs Distribution correlation
acc_dist_corr = correlation_data[['accumulation_score', 'distribution_score']].corr().iloc[0, 1]
print(f"\n💰 Market Regime Analysis:")
print(f"  - Accumulation vs Distribution: {acc_dist_corr:.3f}")
if acc_dist_corr < -0.5:
    print("    → Strong inverse relationship: Markets are clearly in either accumulation OR distribution")
else:
    print("    → Mixed regimes: Some markets show both accumulation and distribution characteristics")

# Momentum-Volatility relationship
momentum_vol_corr = correlation_data[['buy_pressure_divergence', 'mean_natr']].corr().iloc[0, 1]
print(f"\n🎢 Momentum-Volatility Relationship:")
print(f"  - Buy Pressure Divergence vs NATR: {momentum_vol_corr:.3f}")
if abs(momentum_vol_corr) > 0.3:
    print(f"    → {'Volatile markets show stronger momentum shifts' if momentum_vol_corr > 0 else 'Stable markets show clearer momentum patterns'}")

# Statistical tests for key relationships
from scipy import stats

# Test if volume surge predicts buy pressure divergence
if len(correlation_data) > 10:
    slope, intercept, r_value, p_value, std_err = stats.linregress(
        correlation_data['volume_surge'].fillna(1), 
        correlation_data['buy_pressure_divergence'].fillna(0)
    )
    print(f"\n📈 PREDICTIVE ANALYSIS (Volume Surge → Buy Pressure Divergence)")
    print(f"  - R-squared: {r_value**2:.3f}")
    print(f"  - P-value: {p_value:.4f}")
    if p_value < 0.05:
        print(f"  → Volume surges {'predict bullish momentum' if slope > 0 else 'predict bearish momentum'} (statistically significant!)")
    else:
        print(f"  → No significant predictive relationship found")