In [2]:
import pandas as pd
import numpy as np
import yfinance as yf
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# IPython display imports
from IPython.display import display, HTML, Markdown

In [21]:
def find_drawdowns_and_recoveries(ticker="VTI", start="2000-01-01", end=None, min_dd_pct=5.0, min_recovery_pct=0.0):
    """
    Find all major drawdowns and recoveries for a given ticker.

    Args:
        ticker: Stock/ETF ticker (default: VTI)
        start: Start date (default: 2000-01-01)
        end: End date (default: today)
        min_dd_pct: Minimum drawdown threshold in % (default: 5%)
        min_recovery_pct: Percent above the prior peak required to count as recovered (default: 0%)

    Returns:
        Dictionary with drawdown events and recovery points
    """

    # Fetch price data
    print(f"Fetching {ticker} data from {start} to {end}...")
    df = yf.download(ticker, start=start, end=end, progress=False)
    prices = df['Close'].copy()

    if prices.empty:
        print(f"No data found for {ticker}")
        return None

    # Convert to numpy arrays for easier iteration
    dates = prices.index.to_numpy()
    values = prices.values

    # Calculate running maximum
    running_max = np.maximum.accumulate(values)

    # Calculate drawdown percentages
    drawdown_pct = (values / running_max - 1.0) * 100

    results = {
        'ticker': ticker,
        'fetch_start': start,
        'fetch_end': end if end else 'today',
        'data_period': f"{prices.index[0].date()} to {prices.index[-1].date()}",
        'drawdown_events': []
    }

    # Find peaks and troughs
    drawdown_events = []
    in_drawdown = False
    peak_idx = 0
    last_peak_search_start = 0  # ensures next peak is after prior recovery threshold

    for i in range(1, len(drawdown_pct)):
        dd_now = drawdown_pct[i]

        # Entering a new drawdown
        if not in_drawdown and dd_now < -min_dd_pct:
            in_drawdown = True
            # Find the peak since the last recovery threshold, not the all-time peak
            peak_idx = last_peak_search_start + np.argmax(values[last_peak_search_start:i+1])

        # Find trough while in drawdown
        if in_drawdown:
            # Check if we've recovered back to the required threshold
            recovery_threshold = values[peak_idx] * (1 + min_recovery_pct / 100)
            if values[i] >= recovery_threshold:
                in_drawdown = False

                # Record the event
                peak_date = dates[peak_idx]
                peak_price = values[peak_idx]

                # Find minimum after peak up to current point
                min_idx = peak_idx + np.argmin(values[peak_idx:i+1])
                trough_date = dates[min_idx]
                trough_price = values[min_idx]

                dd_magnitude = (trough_price / peak_price - 1.0) * 100

                if dd_magnitude <= -min_dd_pct:
                    recovery_idx = i
                    recovery_date = dates[recovery_idx]
                    recovery_price = values[recovery_idx]

                    # Convert numpy datetime64 to Python datetime
                    peak_dt = pd.Timestamp(peak_date).to_pydatetime().date()
                    trough_dt = pd.Timestamp(trough_date).to_pydatetime().date()
                    recovery_dt = pd.Timestamp(recovery_date).to_pydatetime().date()

                    # Calculate days using Python datetime differences
                    duration_to_trough = (trough_dt - peak_dt).days if peak_dt and trough_dt else None
                    recovery_days = (recovery_dt - trough_dt).days if recovery_dt and trough_dt else None

                    drawdown_events.append({
                        'event_num': len(drawdown_events) + 1,
                        'peak_date': peak_dt,
                        'peak_price': round(float(peak_price), 2),
                        'trough_date': trough_dt,
                        'trough_price': round(float(trough_price), 2),
                        'drawdown_pct': round(float(dd_magnitude), 2),
                        'recovery_date': recovery_dt,
                        'recovery_price': round(float(recovery_price), 2),
                        'recovery_days': recovery_days,
                        'duration_to_trough_days': duration_to_trough,
                        'recovery_threshold_pct': 100 + min_recovery_pct,
                    })

                # Set the next peak search window start at the recovery point
                last_peak_search_start = i

    results['drawdown_events'] = drawdown_events
    return results


def print_drawdown_summary(results):
    """Display the drawdown and recovery timeline with IPython formatting"""
    if not results:
        return

    df_events = pd.DataFrame(results['drawdown_events'])

    if df_events.empty:
        display(Markdown(f"### No major drawdowns (>5%) found in this period."))
        return

    # Title
    display(Markdown(f"## Drawdown & Recovery Analysis: {results['ticker']}"))
    display(Markdown(f"**Period:** {results['data_period']}"))

    # Format the main table
    display_df = df_events[[
        'event_num', 'peak_date', 'peak_price', 'trough_date', 'trough_price',
        'drawdown_pct', 'duration_to_trough_days', 'recovery_date', 'recovery_days'
    ]].copy()

    display_df.columns = [
        'Event', 'Peak Date', 'Peak Price', 'Trough Date', 'Trough Price',
        'DD %', 'Days to Trough', 'Recovery Date', 'Recovery Days'
    ]

    # Style the dataframe
    styled = display_df.style\
        .format({
            'Peak Price': '${:,.2f}',
            'Trough Price': '${:,.2f}',
            'DD %': '{:.2f}%',
            'Days to Trough': '{:.0f}',
            'Recovery Days': '{:.0f}'
        })\
        .background_gradient(subset=['DD %'], cmap='RdYlGn_r', vmin=-50, vmax=0)\
        .background_gradient(subset=['Recovery Days'], cmap='YlOrRd')\
        .set_properties(**{'text-align': 'center'})\
        .set_table_styles([
            {'selector': 'th', 'props': [('background-color', '#4472C4'), ('color', 'white'), ('font-weight', 'bold')]},
            {'selector': 'td', 'props': [('padding', '10px')]}
        ])

    display(styled)

    # Summary statistics
    display(Markdown("### Summary Statistics"))
    stats_data = {
        'Metric': [
            'Total Major Drawdowns',
            'Average Drawdown',
            'Maximum Drawdown',
            'Average Days to Trough',
            'Average Recovery Days'
        ],
        'Value': [
            f"{len(df_events)}",
            f"{df_events['drawdown_pct'].mean():.2f}%",
            f"{df_events['drawdown_pct'].min():.2f}%",
            f"{df_events['duration_to_trough_days'].mean():.0f} days",
            f"{df_events['recovery_days'].mean():.0f} days"
        ]
    }
    stats_df = pd.DataFrame(stats_data)

    stats_styled = stats_df.style\
        .set_properties(**{'text-align': 'left', 'padding': '10px'})\
        .set_table_styles([
            {'selector': 'th', 'props': [('background-color', '#70AD47'), ('color', 'white'), ('font-weight', 'bold')]},
            {'selector': 'td', 'props': [('padding', '10px')]}
        ])

    display(stats_styled)



def get_key_dates(results, output_path="timeline_periods.json"):
    """
    Build unified chronologically ordered periods (drawdowns with ±1 month tails and bulls in between), save JSON, and return a label-keyed dict.

    Output schema:
    {
      "periods": { "<label>": [start, end, trough?], ... }  # insertion order is chronological
    }
    """
    import json
    from collections import OrderedDict

    if not results:
        periods = OrderedDict()
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump({"periods": periods}, f, indent=2)
        display(Markdown(f"Saved periods JSON to {output_path}"))
        return {"periods": periods}

    events = results.get('drawdown_events', [])

    # Parse overall data period boundaries
    data_start_str, data_end_str = [s.strip() for s in results['data_period'].split('to')]
    data_start = pd.Timestamp(data_start_str).date()
    data_end = pd.Timestamp(data_end_str).date()

    # 1) Build extended drawdown intervals and capture entries
    dd_intervals = []  # for merging to find bull gaps
    entries = []       # (start_date, label, value_list)

    for ev in events:
        peak_dt = pd.Timestamp(ev['peak_date'])
        recovery_dt = pd.Timestamp(ev['recovery_date'])
        trough_dt = pd.Timestamp(ev['trough_date'])

        start_dt = (peak_dt - pd.DateOffset(months=1)).date()
        end_dt = (recovery_dt + pd.DateOffset(months=1)).date()

        label = f"{ev['peak_date'].isoformat()} to {ev['recovery_date'].isoformat()}"
        value = [start_dt.isoformat(), end_dt.isoformat(), ev['trough_date'].isoformat()]
        entries.append((start_dt, label, value))
        dd_intervals.append((start_dt, end_dt))

    # 2) Merge drawdown intervals to compute bull gaps cleanly
    dd_intervals.sort(key=lambda x: x[0])
    merged = []
    for s, e in dd_intervals:
        if not merged:
            merged.append([s, e])
        else:
            last_s, last_e = merged[-1]
            if s <= last_e:
                merged[-1][1] = max(last_e, e)
            else:
                merged.append([s, e])

    # 3) Compute bull gaps and add to entries
    cursor = data_start
    for s, e in merged:
        if cursor < s:
            bull_label = f"{cursor.isoformat()} to {s.isoformat()}"
            bull_value = [cursor.isoformat(), s.isoformat()]
            entries.append((cursor, bull_label, bull_value))
        cursor = max(cursor, e)
    # Last bull: open-ended
    if cursor < data_end:
        bull_label = f"{cursor.isoformat()} to {data_end.isoformat()}"
        bull_value = [cursor.isoformat(), None]
        entries.append((cursor, bull_label, bull_value))

    # 4) Sort entries by start date, build ordered dict
    entries.sort(key=lambda x: x[0])
    ordered_periods = OrderedDict((label, value) for _, label, value in entries)

    payload = {"periods": ordered_periods}

    # Save JSON (OrderedDict preserves insertion order in Python; JSON object order will reflect insertion)
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(payload, f, indent=2)

    display(Markdown(f"Saved periods JSON to {output_path}"))
    return payload

In [25]:
# Run the analysis
results = find_drawdowns_and_recoveries(
    ticker="VTI",
    start="2000-01-01",
    end=None,  # Use today's date
    min_dd_pct=8,  # Find drawdowns of X% or more
    min_recovery_pct=4  # Require full X% recovery to prior peak
)

# Display results with nice formatting
print_drawdown_summary(results)


Fetching VTI data from 2000-01-01 to None...


## Drawdown & Recovery Analysis: VTI

**Period:** 2001-06-15 to 2025-12-26

Unnamed: 0,Event,Peak Date,Peak Price,Trough Date,Trough Price,DD %,Days to Trough,Recovery Date,Recovery Days
0,1,2001-07-02,$36.76,2002-10-09,$24.02,-34.66%,464,2004-11-04,757
1,2,2006-05-08,$46.32,2006-06-13,$42.40,-8.46%,36,2006-10-26,135
2,3,2007-07-13,$55.07,2009-03-09,$24.84,-54.89%,605,2012-03-26,1113
3,4,2012-04-02,$57.37,2012-06-04,$51.60,-10.06%,63,2012-09-14,102
4,5,2015-06-23,$92.52,2016-02-11,$78.51,-15.13%,233,2016-08-15,186
5,6,2018-01-26,$129.46,2018-12-24,$107.53,-16.94%,332,2019-04-23,120
6,7,2020-02-19,$157.64,2020-03-23,$102.47,-35.00%,33,2020-08-28,158
7,8,2020-09-02,$167.53,2020-09-23,$152.19,-9.16%,21,2020-11-27,65
8,9,2022-01-03,$229.42,2022-10-12,$171.23,-25.36%,282,2024-02-02,478
9,10,2024-07-16,$274.00,2024-08-05,$250.52,-8.57%,20,2024-11-06,93


### Summary Statistics

Unnamed: 0,Metric,Value
0,Total Major Drawdowns,11
1,Average Drawdown,-21.59%
2,Maximum Drawdown,-54.89%
3,Average Days to Trough,194 days
4,Average Recovery Days,301 days


In [23]:
# Extract periods and save to JSON
periods = get_key_dates(results)

Saved periods JSON to timeline_periods.json

In [None]:
periods = {
    "DotCom Bubble Drawdown": [
      "2001-06-02",
      "2004-12-04",
      "2002-10-09"
    ],
    "2005 Bull": [
      "2004-12-04",
      "2006-04-08"
    ],
    "2006 Inflation Consolidation": [
      "2006-04-08",
      "2006-11-26",
      "2006-06-13"
    ],
    "2007 Bull": [
      "2006-11-26",
      "2007-06-13"
    ],
    "GFC Drawdown": [
      "2007-06-13",
      "2012-04-26",
      "2009-03-09"
    ],
    "Eurozone Consolidation": [
      "2012-03-02",
      "2012-10-14",
      "2012-06-04"
    ],
    "2012–2015 Bull": [
      "2012-10-14",
      "2015-05-23"
    ],
    "2015-2016 Selloff Drawdown": [
      "2015-05-23",
      "2016-09-15",
      "2016-02-11"
    ],
    "2016-2017 Bull": [
      "2016-09-15",
      "2017-12-26"
    ],
    "Volmageddon Drawdown": [
      "2017-12-26",
      "2019-05-23",
      "2018-12-24"
    ],
    "2019-2020 Bull": [
      "2019-05-23",
      "2020-01-19"
    ],
    "Covid Drawdown": [
      "2020-01-19",
      "2020-09-28",
      "2020-03-23"
    ],
    "leadership Rotation Consolidation": [
      "2020-08-02",
      "2020-12-27",
      "2020-09-23"
    ],
    "2021 Bull": [
      "2020-12-27",
      "2021-12-03"
    ],
    "2022 Rate Hike Drawdown": [
      "2021-12-03",
      "2024-03-02",
      "2022-10-12"
    ],
    "2024 Bull": [
      "2024-03-02",
      "2024-06-16"
    ],
    "2024 Consolidation": [
      "2024-06-16",
      "2024-12-06",
      "2024-08-05"
    ],
    "2025 Transition Bull": [
      "2024-12-06",
      "2025-01-19"
    ],
    "Trump Tariffs Drawdown": [
      "2025-01-19",
      "2025-08-25",
      "2025-04-08"
    ],
    "AI Bull": [
      "2025-08-25",
      None
    ]
  }