# ADX and RSI Trendline Analysis

This notebook provides simplified functions to draw trendlines based on ADX and RSI indicators,
showing convergence and divergence between high-to-high and low-to-low points.

In [20]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from datetime import datetime, timedelta

In [21]:
# Load dataset created by prepare_dataset.ipynb
df = pd.read_parquet('../data/dataset.parquet')
print(f"Loaded dataset with {len(df)} rows")
print(f"Date range: {df['date'].min()} to {df['date'].max()}")

Loaded dataset with 5001 rows
Date range: 2005-03-11 00:00:00 to 2025-01-24 00:00:00


In [22]:
def find_local_extrema(series, window=5, mode='high'):
    """Find local maxima or minima in a time series.
    
    Args:
        series: Time series data
        window: Window size to look before and after a point
        mode: 'high' for maxima, 'low' for minima
        
    Returns:
        List of indices where extrema are found
    """
    indices = []
    values = []
    
    # Skip edge cases where we don't have enough points for the window
    for i in range(window, len(series)-window):
        window_slice = series[i-window:i+window+1]
        if mode == 'high' and series[i] == max(window_slice):
            indices.append(i)
            values.append(series[i])
        elif mode == 'low' and series[i] == min(window_slice):
            indices.append(i)
            values.append(series[i])
    
    return indices, values

In [23]:
def pair_extrema_by_time(price_indices, indicator_indices, max_time_gap=10):
    """Pair price extrema with nearest indicator extrema within a time window.
    
    Args:
        price_indices: Indices of price extrema
        indicator_indices: Indices of indicator extrema
        max_time_gap: Maximum allowed index difference between pairs
        
    Returns:
        List of paired indices (price_idx, indicator_idx)
    """
    pairs = []
    
    for p_idx in price_indices:
        # Find the closest indicator extrema within the allowed time gap
        candidates = [i_idx for i_idx in indicator_indices 
                     if abs(i_idx - p_idx) <= max_time_gap]
        
        if candidates:
            # Select the closest indicator extrema
            closest_idx = min(candidates, key=lambda x: abs(x - p_idx))
            pairs.append((p_idx, closest_idx))
    
    return pairs

In [24]:
def analyze_convergence_divergence(df, extrema_pairs, price_col, indicator_col, mode='high'):
    """Analyze if consecutive extrema pairs show convergence or divergence.
    
    Args:
        df: DataFrame with price and indicator data
        extrema_pairs: List of paired indices (price_idx, indicator_idx)
        price_col: Column name for price data
        indicator_col: Column name for indicator data
        mode: 'high' for peaks, 'low' for troughs
        
    Returns:
        List of dictionaries with trendline information and convergence/divergence type
    """
    results = []
    
    # Need at least two pairs to analyze trendlines
    if len(extrema_pairs) < 2:
        return results
    
    for i in range(len(extrema_pairs)-1):
        # Get current and next pair
        curr_price_idx, curr_ind_idx = extrema_pairs[i]
        next_price_idx, next_ind_idx = extrema_pairs[i+1]
        
        # Calculate slopes
        price_slope = (df[price_col].iloc[next_price_idx] - df[price_col].iloc[curr_price_idx]) / (next_price_idx - curr_price_idx)
        ind_slope = (df[indicator_col].iloc[next_ind_idx] - df[indicator_col].iloc[curr_ind_idx]) / (next_ind_idx - curr_ind_idx)
        
        # Determine pattern type
        pattern_type = 'neutral'
        if mode == 'high':
            # For highs: price up + indicator down = bearish divergence
            if price_slope > 0 and ind_slope < 0:
                pattern_type = 'bearish_divergence'
            # For highs: price down + indicator up = bullish divergence
            elif price_slope < 0 and ind_slope > 0:
                pattern_type = 'bullish_divergence'
            elif price_slope * ind_slope > 0:
                pattern_type = 'convergence'
        else:  # mode == 'low'
            # For lows: price down + indicator up = bullish divergence
            if price_slope < 0 and ind_slope > 0:
                pattern_type = 'bullish_divergence'
            # For lows: price up + indicator down = bearish divergence
            elif price_slope > 0 and ind_slope < 0:
                pattern_type = 'bearish_divergence'
            elif price_slope * ind_slope > 0:
                pattern_type = 'convergence'
        
        results.append({
            'price_start': (curr_price_idx, df[price_col].iloc[curr_price_idx]),
            'price_end': (next_price_idx, df[price_col].iloc[next_price_idx]),
            'indicator_start': (curr_ind_idx, df[indicator_col].iloc[curr_ind_idx]),
            'indicator_end': (next_ind_idx, df[indicator_col].iloc[next_ind_idx]),
            'price_slope': price_slope,
            'indicator_slope': ind_slope,
            'pattern_type': pattern_type,
            'start_date': df['date'].iloc[min(curr_price_idx, curr_ind_idx)],
            'end_date': df['date'].iloc[max(next_price_idx, next_ind_idx)],
        })
    
    return results

In [39]:
def plot_trendlines(df, analysis_results, price_col, indicator_col, title=None):
    """Plot price and indicator trendlines on a single chart with color-coded signals.
    
    Args:
        df: DataFrame with price and indicator data
        analysis_results: Results from analyze_convergence_divergence
        price_col: Column name for price data
        indicator_col: Column name for indicator data
        title: Optional title for the plot
    """
    fig = go.Figure()
    
    # Plot price data
    fig.add_trace(
        go.Scatter(x=df['date'], y=df[price_col], name=price_col, line=dict(color='blue', width=1))
    )
    
    # Determine indicator color based on name (RSI = blue, ADX = black)
    indicator_color = 'blue' if 'rsi' in indicator_col.lower() else 'black'
    
    # Plot indicator data (scaled to match price range for better visualization)
    price_min, price_max = df[price_col].min(), df[price_col].max()
    ind_min, ind_max = df[indicator_col].min(), df[indicator_col].max()
    
    # Scale indicator values to price range
    scaled_indicator = ((df[indicator_col] - ind_min) / (ind_max - ind_min)) * (price_max - price_min) * 0.8 + price_min
    
    fig.add_trace(
        go.Scatter(x=df['date'], y=scaled_indicator, 
                  name=f"{indicator_col} (scaled)", 
                  line=dict(color=indicator_color, width=1, dash='dot'))
    )
    
    # Plot trendlines with different colors based on price movement
    for result in analysis_results:
        # Determine price trendline color based on slope
        if result['price_slope'] > 0.001:  # Increasing price
            price_color = 'green'
        elif result['price_slope'] < -0.001:  # Decreasing price
            price_color = 'red'
        else:  # Flat price
            price_color = 'gray'
        
        # Price trendline
        fig.add_trace(
            go.Scatter(
                x=[df['date'].iloc[result['price_start'][0]], df['date'].iloc[result['price_end'][0]]],
                y=[result['price_start'][1], result['price_end'][1]],
                mode='lines+markers',
                line=dict(color=price_color, width=2),
                marker=dict(color=price_color, size=8),
                name=f'Price {result["pattern_type"]}',
                showlegend=(result == analysis_results[0])  # Show only one legend item per type
            )
        )
        
        # Scale indicator trendline points to match price chart
        indicator_start_value = ((result['indicator_start'][1] - ind_min) / (ind_max - ind_min)) * (price_max - price_min) * 0.8 + price_min
        indicator_end_value = ((result['indicator_end'][1] - ind_min) / (ind_max - ind_min)) * (price_max - price_min) * 0.8 + price_min
        
        # Indicator trendline
        fig.add_trace(
            go.Scatter(
                x=[df['date'].iloc[result['indicator_start'][0]], df['date'].iloc[result['indicator_end'][0]]],
                y=[indicator_start_value, indicator_end_value],
                mode='lines+markers',
                line=dict(color=indicator_color, width=2, dash='dash'),
                marker=dict(color=indicator_color, size=8),
                name=f'{indicator_col} {result["pattern_type"]}',
                showlegend=(result == analysis_results[0])  # Show only one legend item per type
            )
        )
    
    # Add annotations for divergences
    for result in analysis_results:
        if 'divergence' in result['pattern_type']:
            mid_x = df['date'].iloc[int((result['price_end'][0] + result['price_start'][0])/2)]
            mid_y = (result['price_end'][1] + result['price_start'][1])/2
            
            annotation_color = 'green' if 'bullish' in result['pattern_type'] else 'red'
            
            fig.add_annotation(
                x=mid_x,
                y=mid_y,
                text=result['pattern_type'].replace('_', ' ').title(),
                showarrow=True,
                arrowhead=1,
                arrowsize=1,
                arrowwidth=2,
                arrowcolor=annotation_color,
                font=dict(color=annotation_color),
                bgcolor="rgba(255,255,255,0.8)"
            )
    
    # Update layout
    fig.update_layout(
        title=title or f'Trendline Analysis: {price_col} vs {indicator_col}',
        height=800,
        width=1200,
        legend=dict(orientation='h', y=1.02, xanchor='right', x=1),
        xaxis_title="Date",
        yaxis_title="Price",
        hovermode="x unified"
    )
    
    # Add secondary y-axis for the indicator's original values
    fig.update_layout(
        yaxis2=dict(
            title=f"{indicator_col} Value",
            overlaying="y",
            side="right",
            range=[ind_min, ind_max],
            showgrid=False
        )
    )
    
    # Add a note about the scaling
    fig.add_annotation(
        xref="paper", yref="paper",
        x=0.01, y=0.99,
        text=f"Note: {indicator_col} is scaled to match price range for visualization",
        showarrow=False,
        font=dict(size=10),
        bgcolor="rgba(255,255,255,0.8)"
    )
    
    fig.show()

In [None]:
def analyze_indicator_trendlines(df, price_col, indicator_col, extrema_window=10, 
                             time_gap=60, mode='high', min_patterns=3, plot=True):
    """Complete analysis pipeline for finding and plotting trendline convergence/divergence.
    
    Args:
        df: DataFrame with price and indicator data
        price_col: Column name for price data
        indicator_col: Column name for indicator data
        extrema_window: Window size for finding local extrema
        time_gap: Maximum time gap between price and indicator extrema to be paired
        mode: 'high' for peaks, 'low' for troughs
        min_patterns: Minimum number of patterns to find before returning results
        plot: Whether to plot the results
        
    Returns:
        Analysis results from analyze_convergence_divergence
    """
    # Find local extrema
    price_indices, _ = find_local_extrema(df[price_col].values, window=extrema_window, mode=mode)
    indicator_indices, _ = find_local_extrema(df[indicator_col].values, window=extrema_window, mode=mode)
    
    # Pair extrema by time
    extrema_pairs = pair_extrema_by_time(price_indices, indicator_indices, max_time_gap=time_gap)
    
    # Analyze convergence/divergence
    results = analyze_convergence_divergence(df, extrema_pairs, price_col, indicator_col, mode=mode)
    
    # If we don't have enough patterns, try with a smaller window
    if len(results) < min_patterns and extrema_window > 3:
        return analyze_indicator_trendlines(
            df, price_col, indicator_col, 
            extrema_window=extrema_window-2, 
            time_gap=time_gap,
            mode=mode,
            min_patterns=min_patterns,
            plot=plot
        )
    
    # Plot if requested
    if plot and results:
        mode_str = "Highs" if mode == "high" else "Lows"
        plot_trendlines(df, results, price_col, indicator_col, 
                      title=f'{mode_str} Trendline Analysis: {price_col} vs {indicator_col}')
    
    return results

## Run Trendline Analysis for ADX and RSI

Let's analyze both high-to-high and low-to-low trendlines for ADX and RSI.

In [45]:
# Filter data to a reasonable time period for clearer visualization
# Last year of data from the dataset using iloc to avoid index issues
recent_df = df.iloc[-1095:].copy()  # Last 3 years of data

# Analyze ADX trendlines
adx_high_results = analyze_indicator_trendlines(
    recent_df, 'high', 'adx', extrema_window=7, time_gap=90, mode='high'
)

adx_low_results = analyze_indicator_trendlines(
    recent_df, 'low', 'adx', extrema_window=7, time_gap=90, mode='low'
)


invalid value encountered in scalar divide



In [46]:
# Analyze RSI trendlines
rsi_high_results = analyze_indicator_trendlines(
    recent_df, 'high', 'rsi_14', extrema_window=7, time_gap=90, mode='high'
)

rsi_low_results = analyze_indicator_trendlines(
    recent_df, 'low', 'rsi_14', extrema_window=7, time_gap=90, mode='low'
)


invalid value encountered in scalar divide



## Summary of Detected Patterns

Let's summarize the convergence/divergence patterns we found.

In [47]:
def summarize_patterns(results, pattern_name):
    """Create a summary of detected patterns."""
    if not results:
        print(f"No {pattern_name} patterns detected.")
        return
    
    pattern_counts = {}
    for result in results:
        pattern_type = result['pattern_type']
        pattern_counts[pattern_type] = pattern_counts.get(pattern_type, 0) + 1
    
    print(f"\n{pattern_name} - {len(results)} total patterns:")
    for pattern_type, count in pattern_counts.items():
        print(f"  {pattern_type}: {count} patterns")
        
    # Show the most recent patterns of each type
    print("\nMost recent patterns:")
    for pattern_type in pattern_counts.keys():
        type_results = [r for r in results if r['pattern_type'] == pattern_type]
        if type_results:
            latest = max(type_results, key=lambda x: x['end_date'])
            print(f"  {pattern_type}: {latest['start_date'].date()} to {latest['end_date'].date()}")

# Print summaries
summarize_patterns(adx_high_results, "ADX High-to-High")
summarize_patterns(adx_low_results, "ADX Low-to-Low")
summarize_patterns(rsi_high_results, "RSI High-to-High")
summarize_patterns(rsi_low_results, "RSI Low-to-Low")


ADX High-to-High - 44 total patterns:
  bullish_divergence: 6 patterns
  convergence: 21 patterns
  bearish_divergence: 9 patterns
  neutral: 8 patterns

Most recent patterns:
  bullish_divergence: 2024-01-05 to 2024-03-20
  convergence: 2024-10-15 to 2024-12-26
  bearish_divergence: 2024-08-02 to 2024-09-04
  neutral: 2023-12-14 to 2024-01-24

ADX Low-to-Low - 42 total patterns:
  convergence: 21 patterns
  bearish_divergence: 7 patterns
  neutral: 9 patterns
  bullish_divergence: 5 patterns

Most recent patterns:
  convergence: 2024-06-24 to 2024-08-05
  bearish_divergence: 2024-09-13 to 2024-10-11
  neutral: 2023-12-04 to 2024-01-05
  bullish_divergence: 2024-10-07 to 2024-11-04

RSI High-to-High - 44 total patterns:
  convergence: 25 patterns
  neutral: 5 patterns
  bearish_divergence: 9 patterns
  bullish_divergence: 5 patterns

Most recent patterns:
  convergence: 2024-10-15 to 2024-12-26
  neutral: 2024-07-15 to 2024-08-05
  bearish_divergence: 2024-09-20 to 2024-10-15
  bullis

In [48]:
def find_strongest_signals(results, top_n=3):
    """Find the strongest divergence signals based on slope difference."""
    if not results:
        return []
    
    # Filter for divergence patterns only
    divergences = [r for r in results if 'divergence' in r['pattern_type']]
    
    # Calculate strength based on slope difference
    for div in divergences:
        div['strength'] = abs(div['price_slope'] - div['indicator_slope'])
    
    # Sort by strength
    strongest = sorted(divergences, key=lambda x: x['strength'], reverse=True)[:top_n]
    return strongest

# Find strongest divergence signals
strongest_adx = find_strongest_signals(adx_high_results + adx_low_results)
strongest_rsi = find_strongest_signals(rsi_high_results + rsi_low_results)

print("Strongest ADX divergence signals:")
for i, signal in enumerate(strongest_adx, 1):
    print(f"{i}. {signal['pattern_type']} from {signal['start_date'].date()} to {signal['end_date'].date()}")

print("\nStrongest RSI divergence signals:")
for i, signal in enumerate(strongest_rsi, 1):
    print(f"{i}. {signal['pattern_type']} from {signal['start_date'].date()} to {signal['end_date'].date()}")

Strongest ADX divergence signals:
1. bearish_divergence from 2021-07-15 to 2021-08-20
2. bullish_divergence from 2022-04-27 to 2022-06-01
3. bearish_divergence from 2022-03-03 to 2022-03-30

Strongest RSI divergence signals:
1. bearish_divergence from 2024-09-16 to 2024-10-10
2. bullish_divergence from 2024-10-07 to 2024-11-04
3. bullish_divergence from 2022-03-30 to 2022-04-29
