# Question 3. [Index] S&P 500 Market Corrections Analysis

Calculate the median duration (in days) of significant market corrections in the S&P 500 index.

For this task, define a correction as an event when a stock index goes down by more than 5% from the closest all-time high maximum.

Steps:

1. Download S&P 500 historical data (1950-present) using yfinance
2. Identify all-time high points (where price exceeds all previous prices)
3. For each pair of consecutive all-time highs, find the minimum price in between
4. Calculate drawdown percentages: (high - low) / high × 100
5. Filter for corrections with at least 5% drawdown
6. Calculate the duration in days for each correction period
7. Determine the 25th, 50th (median), and 75th percentiles for correction durations

Investors often wonder about the typical length of market corrections when deciding "when to buy the dip" (Reddit discussion)

https://www.reddit.com/r/investing/comments/1jrqnte/when_are_you_buying_the_dip/?rdt=64135

https://awealthofcommonsense.com/2022/01/how-often-should-you-expect-a-stock-market-correction/

In [106]:
import yfinance as yf
import pandas_datareader as pdr
import pandas as pd
import time
import datetime
from datetime import date
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [103]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.master("local[1]") \
.appName("SparkByExamples.com") \
.getOrCreate()

In [89]:
spark.sparkContext.setLogLevel("ERROR")

In [115]:
# Step 1: Download historical S&P 500 data
sp500 = yf.download("^GSPC", start="1950-01-01")[['Close']].reset_index()
sp500.columns = ['Date', 'Close']
df = spark.createDataFrame(sp500)


[*********************100%***********************]  1 of 1 completed


In [116]:
df.show()

+-------------------+------------------+
|               Date|             Close|
+-------------------+------------------+
|1950-01-03 00:00:00| 16.65999984741211|
|1950-01-04 00:00:00|16.850000381469727|
|1950-01-05 00:00:00| 16.93000030517578|
|1950-01-06 00:00:00|16.979999542236328|
|1950-01-09 00:00:00|17.079999923706055|
|1950-01-10 00:00:00|17.030000686645508|
|1950-01-11 00:00:00| 17.09000015258789|
|1950-01-12 00:00:00|16.760000228881836|
|1950-01-13 00:00:00|16.670000076293945|
|1950-01-16 00:00:00|16.719999313354492|
|1950-01-17 00:00:00|16.860000610351562|
|1950-01-18 00:00:00|16.850000381469727|
|1950-01-19 00:00:00|  16.8700008392334|
|1950-01-20 00:00:00|16.899999618530273|
|1950-01-23 00:00:00|16.920000076293945|
|1950-01-24 00:00:00|16.860000610351562|
|1950-01-25 00:00:00|16.739999771118164|
|1950-01-26 00:00:00|16.729999542236328|
|1950-01-27 00:00:00| 16.81999969482422|
|1950-01-30 00:00:00|17.020000457763672|
+-------------------+------------------+
only showing top

In [117]:
#  Identify all-time highs
window_spec = Window.partitionBy().orderBy("Date").rowsBetween(Window.unboundedPreceding, -1)
# for the current row, the window includes all prior rows, but not the current row.
# unboundedPreceding: start from the very first row.
# -1: end at the row just before the current row.
df = df.withColumn("PrevMax", F.max("Close").over(window_spec))
df.filter(F.col("Date") == '1973-01-11').show()
# For each row (i.e., each day), we compute the maximum closing price seen before that day.
# This becomes the PrevMax column.
# If today's price exceeds PrevMax, it’s a new all-time high.
df = df.withColumn("IsAllTimeHigh", F.when(F.col("Close") > F.col("PrevMax"), 1).otherwise(0))
df.filter(F.col("Date") == '1973-01-11').show()
df = df.withColumn("Date_key", F.date_format(F.col("Date"), "yyyyMMdd"))

+-------------------+------------------+------------------+
|               Date|             Close|           PrevMax|
+-------------------+------------------+------------------+
|1973-01-11 00:00:00|120.23999786376953|119.87000274658203|
+-------------------+------------------+------------------+

+-------------------+------------------+------------------+-------------+
|               Date|             Close|           PrevMax|IsAllTimeHigh|
+-------------------+------------------+------------------+-------------+
|1973-01-11 00:00:00|120.23999786376953|119.87000274658203|            1|
+-------------------+------------------+------------------+-------------+



In [None]:
df.show()

+-------------------+------------------+------------------+-------------+--------+
|               Date|             Close|           PrevMax|IsAllTimeHigh|Date_key|
+-------------------+------------------+------------------+-------------+--------+
|1950-01-03 00:00:00| 16.65999984741211|              NULL|            0|19500103|
|1950-01-04 00:00:00|16.850000381469727| 16.65999984741211|            1|19500104|
|1950-01-05 00:00:00| 16.93000030517578|16.850000381469727|            1|19500105|
|1950-01-06 00:00:00|16.979999542236328| 16.93000030517578|            1|19500106|
|1950-01-09 00:00:00|17.079999923706055|16.979999542236328|            1|19500109|
|1950-01-10 00:00:00|17.030000686645508|17.079999923706055|            0|19500110|
|1950-01-11 00:00:00| 17.09000015258789|17.079999923706055|            1|19500111|
|1950-01-12 00:00:00|16.760000228881836| 17.09000015258789|            0|19500112|
|1950-01-13 00:00:00|16.670000076293945| 17.09000015258789|            0|19500113|
|195

25/05/25 18:53:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/25 18:53:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/25 18:53:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/25 18:53:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/25 18:53:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [122]:
# Filter only the rows with all-time highs
ath_df = df.filter(F.col("IsAllTimeHigh") == 1).select("Date", "Close").withColumnRenamed("Close", "ATH_Close") ## we filter only rows where a new all-time high occurs.

# Step 4: Get pairs of consecutive ATHs
ath_df = ath_df.withColumn("NextDate", F.lead("Date").over(Window.orderBy("Date")))  
ath_df = ath_df.withColumn("NextClose", F.lead("ATH_Close").over(Window.orderBy("Date")))
ath_df = ath_df.withColumn("DaysSinceLastATH", F.datediff(F.col("NextDate"), F.col("Date")))
ath_df = ath_df.withColumn("Date_key", F.date_format(F.col("Date"), "yyyyMMdd")).withColumn("NextDate_key", F.date_format(F.col("NextDate"), "yyyyMMdd"))
ath_pairs = ath_df.dropna()

In [123]:
ath_pairs.filter(F.col("Date") == '1973-01-11').show()

+-------------------+------------------+-------------------+------------------+----------------+--------+------------+
|               Date|         ATH_Close|           NextDate|         NextClose|DaysSinceLastATH|Date_key|NextDate_key|
+-------------------+------------------+-------------------+------------------+----------------+--------+------------+
|1973-01-11 00:00:00|120.23999786376953|1980-07-17 00:00:00|121.44000244140625|            2744|19730111|    19800717|
+-------------------+------------------+-------------------+------------------+----------------+--------+------------+



In [None]:
# Alias tables
df_alias = df.alias("a")
ath_alias = ath_pairs.alias("b")

# Join using aliases
joined = ath_alias.join(
    df_alias, # - Join condition: keep rows where a.Date is strictly between b.Date and b.NextDate
    (F.col("a.Date") > F.col("b.Date")) & (F.col("a.Date") < F.col("b.NextDate")),
    "left"
).groupBy(
    F.col("b.Date"),
    F.col("b.ATH_Close"),
    F.col("b.NextDate"),
    F.col("b.NextClose")
).agg(
    F.min(F.struct("a.Close", "a.Date")).alias("min_struct") # - Use F.struct to package ("a.Close", "a.Date") together
# - F.min(struct) picks the row with the lowest Close, retaining both Close and Date
).select(
    F.col("Date"),
    F.col("ATH_Close"),
    F.col("NextDate"),
    F.col("NextClose"),
        F.col("min_struct.Date").alias("LowDate"),
    F.col("min_struct.Close").alias("MinClose")

)

In [131]:
joined.filter(F.col("Date") == '1973-01-11').show()

+-------------------+------------------+-------------------+------------------+-------------------+------------------+
|               Date|         ATH_Close|           NextDate|         NextClose|            LowDate|          MinClose|
+-------------------+------------------+-------------------+------------------+-------------------+------------------+
|1973-01-11 00:00:00|120.23999786376953|1980-07-17 00:00:00|121.44000244140625|1974-10-03 00:00:00|62.279998779296875|
+-------------------+------------------+-------------------+------------------+-------------------+------------------+



In [132]:
joined.filter(F.col("Date") == '1973-01-11').show()

+-------------------+------------------+-------------------+------------------+-------------------+------------------+
|               Date|         ATH_Close|           NextDate|         NextClose|            LowDate|          MinClose|
+-------------------+------------------+-------------------+------------------+-------------------+------------------+
|1973-01-11 00:00:00|120.23999786376953|1980-07-17 00:00:00|121.44000244140625|1974-10-03 00:00:00|62.279998779296875|
+-------------------+------------------+-------------------+------------------+-------------------+------------------+



In [133]:

# Step 6: Compute drawdown and duration
joined = joined.withColumn("Drawdown", ((F.col("ATH_Close") - F.col("MinClose")) / F.col("ATH_Close")) * 100)

joined = joined.withColumn("Duration", F.datediff("LowDate", "Date"))

# Step 7: Filter for corrections > 5%
corrections = joined.filter(F.col("Drawdown") >= 5)

# Step 8: Compute percentiles
percentiles = corrections.approxQuantile("Duration", [0.25, 0.5, 0.75], 0.01)
print(f"Correction duration percentiles (in days):\n25th: {percentiles[0]}\n50th (Median): {percentiles[1]}\n75th: {percentiles[2]}")

Correction duration percentiles (in days):
25th: 20.0
50th (Median): 35.0
75th: 87.0


In [134]:
corrections.show()

+-------------------+------------------+-------------------+------------------+-------------------+------------------+------------------+--------+
|               Date|         ATH_Close|           NextDate|         NextClose|            LowDate|          MinClose|          Drawdown|Duration|
+-------------------+------------------+-------------------+------------------+-------------------+------------------+------------------+--------+
|1950-06-12 00:00:00|19.399999618530273|1950-09-22 00:00:00|19.440000534057617|1950-07-17 00:00:00| 16.68000030517578|14.020615292983996|      35|
|1950-11-24 00:00:00| 20.31999969482422|1950-12-28 00:00:00|  20.3799991607666|1950-12-04 00:00:00|              19.0|6.4960615878377235|      10|
|1951-05-03 00:00:00|22.809999465942383|1951-08-02 00:00:00| 22.81999969482422|1951-06-29 00:00:00|20.959999084472656|  8.11047972285998|      57|
|1951-10-15 00:00:00|23.850000381469727|1952-01-03 00:00:00|  23.8799991607666|1951-11-23 00:00:00|22.399999618530273|

In [135]:
corrections.sort("Duration", ascending=False).limit(10).show()

+-------------------+------------------+-------------------+------------------+-------------------+------------------+------------------+--------+
|               Date|         ATH_Close|           NextDate|         NextClose|            LowDate|          MinClose|          Drawdown|Duration|
+-------------------+------------------+-------------------+------------------+-------------------+------------------+------------------+--------+
|2000-03-24 00:00:00|   1527.4599609375|2007-05-30 00:00:00|  1530.22998046875|2002-10-09 00:00:00|  776.760009765625| 49.14694789846553|     929|
|1973-01-11 00:00:00|120.23999786376953|1980-07-17 00:00:00|121.44000244140625|1974-10-03 00:00:00|62.279998779296875| 48.20359290935836|     630|
|1980-11-28 00:00:00|140.52000427246094|1982-11-03 00:00:00| 142.8699951171875|1982-08-12 00:00:00|102.41999816894531| 27.11358165748537|     622|
|1968-11-29 00:00:00|108.37000274658203|1972-03-06 00:00:00| 108.7699966430664|1970-05-26 00:00:00| 69.29000091552734|

Correction duration percentiles (in days):
1. 25th: 20.0
2. 50th (Median): 35.0
3. 75th: 87.0