# OpenGRIS Scaler Demo With Multiple Backends (IBM Symphony + AWS ECS)

## Example: Heston model Bermudan call option pricing using Monte Carlo simulation

A Bermudan option is an exotic option exercisable on predetermined dates, usually monthly. It differs from European options which can only be exercised on the date of expiration and American options which can be exercised at any time before expiration.

Given a basket of stocks, download price data from Yahoo finance and use the Heston model to price a Bermudan call option using Monte Carlo simulation.

In [1]:
import numpy as np
import pandas as pd
import yfinance as yf
import logging
from time import time
import warnings
from tqdm import tqdm
from scaler import Client

warnings.filterwarnings('ignore')

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

## Settings

In [2]:
TICKERS = [
    'NVDA', 'MSFT', 'AAPL', 'GOOGL', 'AMZN', 'META', 'AVGO', '2222.SR',
    'TSLA', 'TSM', 'BRK-B', 'ORCL', 'WMT', 'JPM', 'TCEHY', 'LLY',
    'V', 'NFLX', 'MA', 'XOM'
]

HESTON_DEFAULTS = {
    'kappa': 2.0,
    'theta': 0.04,
    'sigma': 0.3,
    'rho': -0.7,
    'v0': 0.04,
}

r_default = 0.05
q = 0.0
basis_degree = 3
barrier_frac = 1.0

scheduler_address="tcp://127.0.0.1:8080"

## Download, Generate Tasks, Calculate 

In [3]:
def download_prices(tickers, start, end):
    data = yf.download(tickers, start=start, end=end, auto_adjust=False)['Adj Close']
    last_prices = data.iloc[-1].values
    returns = np.log(data / data.shift(1)).dropna()
    corr_matrix = np.corrcoef(returns.T)
    np.fill_diagonal(corr_matrix, 1.0)
    logger.info(f"Last prices: {dict(zip(tickers, last_prices))}")
    return last_prices, corr_matrix


def generate_one_path_using_rng(S0, corr_chol, dt, num_steps, heston, rng):
    """
    Generate one path using a provided numpy.random.Generator 'rng' (so it's safe to call
    in different processes with controlled seeding).
    Returns array shape (num_steps+1, n_assets)
    """
    n = len(S0)
    S = np.zeros((num_steps + 1, n))
    V = np.zeros((num_steps + 1, n))
    S[0] = S0
    V[0] = heston['v0']
    sqrt_dt = np.sqrt(dt)
    rho = heston['rho']
    sqrt_1mrho2 = np.sqrt(max(0, 1 - rho**2))
    kappa = heston['kappa']
    theta = heston['theta']
    sigma_v = heston['sigma']

    for t in range(1, num_steps + 1):
        Z_stock = rng.standard_normal(n)
        dW_stock = corr_chol @ Z_stock * sqrt_dt
        Z_vol_ind = rng.standard_normal(n) * sqrt_dt
        dW_vol = rho * dW_stock + sqrt_1mrho2 * Z_vol_ind
        drift_s = (r_default - q - 0.5 * V[t - 1]) * dt
        S[t] = S[t - 1] * np.exp(drift_s + np.sqrt(np.maximum(V[t - 1], 0)) * dW_stock)
        drift_v = kappa * (theta - V[t - 1]) * dt
        vol_sqrtv = sigma_v * np.sqrt(np.maximum(V[t - 1], 0))
        V[t] = np.maximum(V[t - 1] + drift_v + vol_sqrtv * dW_vol, 1e-6)
    return S


def generate_paths_batch(num_paths_batch, S0, corr_chol, dt, num_steps, heston, seed=None):
    """
    Worker function for distributed execution. Generates 'num_paths_batch' paths and returns
    a numpy array with shape (num_paths_batch, num_steps+1, n_assets).
    This function is picklable and safe to submit to the scaler.Client.
    """
    rng = np.random.default_rng(seed)
    batch = []
    for i in range(num_paths_batch):
        path = generate_one_path_using_rng(S0, corr_chol, dt, num_steps, heston, rng)
        batch.append(path)
    return np.stack(batch, axis=0)



def generate_many_path_tasks(num_paths, S0, corr_chol, dt, num_steps, heston, batch_size):
    """
    Use scaler.Client to submit batches of path generation as separate tasks.
    - batch_size controls how many paths each submitted job generates.
    - show_progress toggles a tqdm bar for completed batches.
    Returns stacked paths array shape (num_paths, num_steps+1, n_assets)
    """
    # Compute batches
    num_batches = (num_paths + batch_size - 1) // batch_size
    batches = []
    # compute exact sizes for each batch (last may be smaller)
    batch_sizes = [batch_size] * num_batches
    last_mod = num_paths - batch_size * (num_batches - 1)
    batch_sizes[-1] = last_mod if last_mod > 0 else batch_size

    base_seed = int(time()) & 0x7FFFFFFF
    
    tasks = []
    for i, bs in enumerate(batch_sizes):
        seed = base_seed + i
        tasks.append((generate_paths_batch, (bs, S0, corr_chol, dt, num_steps, heston, seed)))
        
    return tasks


def price_bermudan(paths, K, barrier, r, exercise_indices, ex_times, degree):
    num_paths, num_steps, n_assets = paths.shape
    basket_paths = np.mean(paths, axis=2)
    basket_ex = basket_paths[:, exercise_indices]
    num_ex = basket_ex.shape[1]
    hit_mask = basket_ex < barrier
    has_hit = np.any(hit_mask, axis=1)
    first_hit_idx = np.argmax(hit_mask, axis=1)
    first_hit_idx[~has_hit] = num_ex
    cashflow = np.zeros(num_paths)
    alive_at_last = first_hit_idx == num_ex
    cashflow[alive_at_last] = np.maximum(basket_ex[alive_at_last, -1] - K, 0)
    for j in range(num_ex - 2, -1, -1):
        delta_t_j = ex_times[j + 1] - ex_times[j]
        disc_factor = np.exp(-r * delta_t_j)
        alive = first_hit_idx > j
        num_alive = np.sum(alive)
        if num_alive == 0:
            continue
        disc_cash_next = disc_factor * cashflow[alive]
        basket_j = basket_ex[alive, j]
        itm = basket_j > K
        num_itm = np.sum(itm)
        if num_itm < degree + 1:
            cont = np.full(num_alive, np.mean(disc_cash_next))
        else:
            X = np.vander(basket_j[itm], degree + 1, increasing=True)
            y = disc_cash_next[itm]
            beta, _, _, _ = np.linalg.lstsq(X, y, rcond=None)
            X_all = np.vander(basket_j, degree + 1, increasing=True)
            cont = X_all @ beta
        intrinsic = np.maximum(basket_j - K, 0)
        exercise = intrinsic > cont
        exercised_idx = np.where(alive)[0][exercise]
        cashflow[exercised_idx] = intrinsic[exercise]
    price = np.mean(np.exp(-r * ex_times[-1]) * cashflow)
    print(f"The price of the Bermudan down-and-out call option on the equally-weighted basket is {price:.4f}")
    return price


def generate_tasks(start_date, end_date, num_paths, num_steps, maturity, num_exercises,
                   risk_free_rate, batch_size):
    """
    Main entry. Set use_distributed=True to use scaler.Client and submit batch jobs.
    """
    global r_default
    r_default = risk_free_rate

    S0, corr = download_prices(TICKERS, start_date, end_date)
    try:
        corr_chol = np.linalg.cholesky(corr)
    except np.linalg.LinAlgError:
        logger.warning("Correlation matrix not positive semi-definite, adding jitter.")
        corr += np.eye(corr.shape[0]) * 1e-6
        corr_chol = np.linalg.cholesky(corr)

    basket0 = np.mean(S0)
    K = basket0
    barrier = barrier_frac * K
    dt = maturity / num_steps
    step_per_period = num_steps // num_exercises
    exercise_indices = np.arange(1, num_exercises + 1) * step_per_period
    exercise_indices = exercise_indices[exercise_indices <= num_steps]
    ex_times = exercise_indices * dt
    heston = HESTON_DEFAULTS

    tasks = generate_many_path_tasks(num_paths, S0, corr_chol, dt, num_steps, heston, batch_size=batch_size)
    return K, barrier, r_default, exercise_indices, ex_times, basis_degree, tasks


def get_bermudan_price(K, barrier, r_default, exercise_indices, ex_times, basis_degree, results):
    paths = np.concatenate(results, axis=0)
    # If we over-allocated for any reason, trim to requested num_paths
    # if paths.shape[0] > num_paths:
    #     paths = paths[:num_paths]

    return price_bermudan(paths, K, barrier, r_default, exercise_indices, ex_times, basis_degree)


## Generate Task Batches

In [4]:
K, barrier, r_default, exercise_indices, ex_times, basis_degree, tasks = generate_tasks(
    start_date='2024-10-17', end_date='2025-10-17', num_paths=100_000, num_steps=252, maturity=1.0, num_exercises=12, risk_free_rate=0.05, batch_size=10
)

[*********************100%***********************]  20 of 20 completed
INFO:__main__:Last prices: {'NVDA': 24.97572135925293, 'MSFT': 247.21035766601562, 'AAPL': 214.47000122070312, 'GOOGL': 354.1499938964844, 'AMZN': 488.80999755859375, 'META': 251.4600067138672, 'AVGO': 298.5400085449219, '2222.SR': 818.1784057617188, 'TSLA': 549.8800048828125, 'TSM': 712.0700073242188, 'BRK-B': 511.6099853515625, 'ORCL': 118.35900115966797, 'WMT': 181.80999755859375, 'JPM': 313.0, 'TCEHY': 79.68000030517578, 'LLY': 428.75, 'V': 299.8399963378906, 'NFLX': 334.7369689941406, 'MA': 106.47000122070312, 'XOM': 109.68067169189453}


## Compute On IBM Symphony Only

In [5]:
def compute_symphony(scheduler_address, K, barrier, r_default, exercise_indices, ex_times, basis_degree):
    with Client(scheduler_address) as client:
        # Submit all batch jobs
        sym_futures = [client.submit_verbose(*task, kwargs={}, capabilities={"symphony": -1}) for task in tasks]
    
        symphony_result = get_bermudan_price(
            K, barrier, r_default, exercise_indices, ex_times, basis_degree,
            [sym_futures[idx].result() for idx in tqdm(range(len(sym_futures)), desc="batches", unit="batch")]
        )

compute_symphony(scheduler_address, K, barrier, r_default, exercise_indices, ex_times, basis_degree)

INFO:root:ScalerClient: connect to scheduler at tcp://127.0.0.1:8080
INFO:root:ZMQAsyncConnector: started
INFO:root:ZMQAsyncConnector: started
INFO:root:ClientHeartbeatManager: started
INFO:root:ScalerClient: connect to object storage at tcp://127.0.0.1:8081
batches: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 10000/10000 [05:50<00:00, 28.50batch/s]
INFO:root:ScalerClient: disconnect from tcp://127.0.0.1:8080
INFO:root:canceling 0 task(s)
INFO:root:ClientAgent: client quitting
INFO:root:ZMQAsyncConnector: exited


The price of the Bermudan down-and-out call option on the equally-weighted basket is 14.5174


## Compute On AWS ECS Only

In [6]:
def compute_ecs(scheduler_address, K, barrier, r_default, exercise_indices, ex_times, basis_degree):
    with Client(scheduler_address) as client:
        # Submit all batch jobs
        futures = [client.submit_verbose(*task, kwargs={}, capabilities={"ecs": -1}) for task in tasks]
    
        # Gather results with progress
        results = [futures[idx].result() for idx in tqdm(range(len(futures)), desc="batches", unit="batch")]
        get_bermudan_price(K, barrier, r_default, exercise_indices, ex_times, basis_degree, results)

compute_ecs(scheduler_address, K, barrier, r_default, exercise_indices, ex_times, basis_degree)

INFO:root:ScalerClient: connect to scheduler at tcp://127.0.0.1:8080
INFO:root:ZMQAsyncConnector: started
INFO:root:ZMQAsyncConnector: started
INFO:root:ClientHeartbeatManager: started
INFO:root:ScalerClient: connect to object storage at tcp://127.0.0.1:8081
batches: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 10000/10000 [01:29<00:00, 111.57batch/s]
INFO:root:ScalerClient: disconnect from tcp://127.0.0.1:8080
INFO:root:canceling 0 task(s)
INFO:root:ClientAgent: client quitting
INFO:root:ZMQAsyncConnector: exited


The price of the Bermudan down-and-out call option on the equally-weighted basket is 14.5174


## Compute on Both IBM Symphony and AWS ECS

In [7]:
def compute_both(scheduler_address, K, barrier, r_default, exercise_indices, ex_times, basis_degree):
    with Client(scheduler_address) as client:
        # Submit all batch jobs
        futures = [client.submit_verbose(*task, kwargs={}, capabilities={"python3": -1}) for task in tasks]
        get_bermudan_price(
            K, barrier, r_default, exercise_indices, ex_times, basis_degree,
            [futures[idx].result() for idx in tqdm(range(len(futures)), desc="batches", unit="batch")]
        )

compute_both(scheduler_address, K, barrier, r_default, exercise_indices, ex_times, basis_degree)

INFO:root:ScalerClient: connect to scheduler at tcp://127.0.0.1:8080
INFO:root:ZMQAsyncConnector: started
INFO:root:ZMQAsyncConnector: started
INFO:root:ClientHeartbeatManager: started
INFO:root:ScalerClient: connect to object storage at tcp://127.0.0.1:8081
batches: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 10000/10000 [06:04<00:00, 27.42batch/s]
INFO:root:ScalerClient: disconnect from tcp://127.0.0.1:8080
INFO:root:canceling 0 task(s)
INFO:root:ClientAgent: client quitting
INFO:root:ZMQAsyncConnector: exited


The price of the Bermudan down-and-out call option on the equally-weighted basket is 14.6561
