📈 Financial Coding Round: Volatility Spike Detector and Trade Scheduler

You’re designing part of a quantitative trading system that detects volatility spikes in stock price data and schedules trades accordingly.

⸻

🔧 Functions to Implement

⸻

1. detect_volatility_spikes

Signature:

def detect_volatility_spikes(prices: list[float], window: int, threshold: float) -> list[dict]

Description:

Detect rolling-window volatility and flag “spikes” when standard deviation exceeds the given threshold.

Rules:
	•	Use a rolling window of window days to calculate standard deviation.
	•	For each day:
	•	'day': int (0-indexed)
	•	'price': float
	•	'volatility': float or None if not enough data
	•	'spike': bool — True if volatility exceeds threshold, else False

Example Input:

prices = [100, 102, 99, 101, 98, 150, 100, 102, 97]
window = 3
threshold = 5.0

Example Output (abridged):

[
  {'day': 0, 'price': 100, 'volatility': None, 'spike': False},
  {'day': 1, 'price': 102, 'volatility': None, 'spike': False},
  {'day': 2, 'price': 99, 'volatility': 1.25, 'spike': False},
  ...
  {'day': 5, 'price': 150, 'volatility': 27.73, 'spike': True},
  ...
]


⸻

2. generate_scheduled_trades

Signature:

def generate_scheduled_trades(spike_data: list[dict], cooldown: int) -> list[str]

Description:

Generate a simple trade schedule in response to spikes.

Rules:
	•	If 'spike' is True on a day, schedule a "TRADE" on that day.
	•	After a "TRADE", wait cooldown days before scheduling another.
	•	Otherwise, mark "WAIT".

Example Output:

["WAIT", "WAIT", "WAIT", "TRADE", "WAIT", "WAIT", "TRADE", "WAIT", "WAIT"]


⸻

3. evaluate_strategy_cost

Signature:

def evaluate_strategy_cost(prices: list[float], schedule: list[str], trade_fee: float) -> dict

Description:

Evaluate the total cost of executing the strategy.

Rules:
	•	Each "TRADE" day incurs a fixed trade_fee and a notional “market impact” cost of 1% of the price that day.
	•	"WAIT" days incur no cost.
	•	Return:
	•	'num_trades': int
	•	'total_fee_cost': float
	•	'total_market_impact_cost': float
	•	'total_cost': float

Example Input:
prices = [100, 102, 99, 101, 98, 150, 100, 102, 97]
schedule = ["WAIT", "WAIT", "WAIT", "TRADE", "WAIT", "WAIT", "TRADE", "WAIT", "WAIT"]
trade_fee = 5.0

Example Output:

{
  'num_trades': 2,
  'total_fee_cost': 10.0,
  'total_market_impact_cost': 2.01,
  'total_cost': 12.01
}


⸻

In [4]:
import math

def standard_deviation(values):
    if len(values) < 2:
        raise ValueError("At least two values are required to compute standard deviation.")
    
    mean = sum(values) / len(values)
    variance = sum((x - mean) ** 2 for x in values) / (len(values) - 1)  # Sample variance
    return math.sqrt(variance)

In [5]:
def detect_volatility_spikes(prices, window, threshold):
    # iterate through the prices
        # check if a valid window backwards can be created
            # if yes, calculate standard deviation

            # if no, None
        # add spike according to value of volatility
    result = []
    for i in range(len(prices)):
        if i >= window - 1:
            sd_list = []
            for j in range (i, i-window, -1): # can also just slice
                sd_list.append(prices[j])
            std_dvn = standard_deviation(sd_list)
            if std_dvn > threshold:
                spike = True
            else:
                spike = False
        else:
            std_dvn = None
            spike = False
        record = {
            'day': i, 'price': prices[i], 'volatility': std_dvn, 'spike': spike
        }
        result.append(record)
        
    return result

detect_volatility_spikes([100, 102, 99, 101, 98, 150, 100, 102, 97], 3, 5.0)

[{'day': 0, 'price': 100, 'volatility': None, 'spike': False},
 {'day': 1, 'price': 102, 'volatility': None, 'spike': False},
 {'day': 2, 'price': 99, 'volatility': 1.5275252316519468, 'spike': False},
 {'day': 3, 'price': 101, 'volatility': 1.5275252316519468, 'spike': False},
 {'day': 4, 'price': 98, 'volatility': 1.5275252316519468, 'spike': False},
 {'day': 5, 'price': 150, 'volatility': 29.19474838619667, 'spike': True},
 {'day': 6, 'price': 100, 'volatility': 29.46183972531247, 'spike': True},
 {'day': 7, 'price': 102, 'volatility': 28.307831660749525, 'spike': True},
 {'day': 8, 'price': 97, 'volatility': 2.516611478423583, 'spike': False}]

In [22]:
def detect_volatility_spikes(prices, window, threshold):
    # iterate through the prices
        # check if a valid window backwards can be created
            # if yes, calculate standard deviation

            # if no, None
        # add spike according to value of volatility
    result = []
    for i in range(len(prices)):
        if i >= window - 1:
            sd_list = prices[i - window + 1: i +1] # slicing instead
            std_dvn = standard_deviation(sd_list)
            if std_dvn > threshold:
                spike = True
            else:
                spike = False
        else:
            std_dvn = None
            spike = False
        record = {
            'day': i, 'price': prices[i], 'volatility': std_dvn, 'spike': spike
        }
        result.append(record)
        
    return result

output = detect_volatility_spikes([100, 102, 99, 101, 98, 150, 100, 102, 97], 3, 5.0)

In [23]:
def generate_scheduled_trades(spike_data, cooldown):
    # create result array with "WAIT" as default
    # iterate through spike_data
        # obtain "spike" form each dict

        # if there is a spike, change the current value to "TRADE"

        # move forward cooldown days

    result = ["WAIT"] * len(spike_data)

    i = 0

    while i < (len(spike_data)):
        spike = spike_data[i]["spike"]
        if spike:
            result[i] = "TRADE"
            i += cooldown + 1
        else:
            i += 1
            continue

    return result


generate_scheduled_trades(output, 2)

['WAIT', 'WAIT', 'WAIT', 'WAIT', 'WAIT', 'TRADE', 'WAIT', 'WAIT', 'WAIT']

In [24]:
def evaluate_strategy_cost(prices, schedule, trade_fee):
    num_trades = 0 
    total_market_cost = 0
    # iterate through schedule
        # when you see trade
            # add to num_trades
            # calculate market cost and add
    for i in range(len(schedule)):
        if schedule[i] == "TRADE":
            num_trades += 1
            total_market_cost += prices[i] * 0.01 
        else:
            continue

    total_fee_cost = num_trades * trade_fee
    # calculate total cost
    total_cost = total_market_cost + total_fee_cost

    return {
        'num_trades': num_trades,
        'total_fee_cost': total_fee_cost,
        'total_market_impact_cost': total_market_cost,
        'total_cost': total_cost
    }

prices = [100, 102, 99, 101, 98, 150, 100, 102, 97]
schedule = ["WAIT", "WAIT", "WAIT", "TRADE", "WAIT", "WAIT", "TRADE", "WAIT", "WAIT"]
trade_fee = 5.0
evaluate_strategy_cost(prices, schedule, trade_fee)

TypeError: can't multiply sequence by non-int of type 'float'