In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import cupy
import numpy as np
import math
import time
import torch
cupy.cuda.set_allocator(None)
from torch.utils.dlpack import from_dlpack

In [None]:
cupy_batched_barrier_option = cupy.RawKernel(r'''
extern "C" __global__ void batched_barrier_option(
    float *d_s,
    const float T,
    const float * K,
    const float * B,
    const float * S0,
    const float * sigma,
    const float * mu,
    const float * r,
    const float * d_normals,
    const long N_STEPS,
    const long N_PATHS,
    const long N_BATCH)
{
  unsigned idx =  threadIdx.x + blockIdx.x * blockDim.x;
  unsigned stride = blockDim.x * gridDim.x;
  unsigned tid = threadIdx.x;
  const float tmp3 = sqrt(T/N_STEPS);


  for (unsigned i = idx; i<N_PATHS * N_BATCH; i+=stride)
  {
    int batch_id = i / N_PATHS;
    int path_id = i % N_PATHS;
    float s_curr = S0[batch_id];
    float tmp1 = mu[batch_id]*T/N_STEPS;
    float tmp2 = exp(-r[batch_id]*T);
    unsigned n=0;
    double running_average = 0.0;
    for(unsigned n = 0; n < N_STEPS; n++){
       s_curr += tmp1 * s_curr + sigma[batch_id]*s_curr*tmp3*d_normals[path_id + batch_id * N_PATHS + n * N_PATHS * N_BATCH];
       running_average += (s_curr - running_average) / (n + 1.0);
       if (running_average <= B[batch_id]){
           break;
       }
    }

    float payoff = (running_average>K[batch_id] ? running_average-K[batch_id] : 0.f);
    d_s[i] = tmp2 * payoff;
  }
}

''', 'batched_barrier_option')

In [None]:
N_PATHS = 2048000
N_STEPS = 365
N_BATCH = 2
T = 1.0

K = cupy.array([110.0, 120.0], dtype=cupy.float32)
B = cupy.array([100.0, 90.0], dtype=cupy.float32)
S0 = cupy.array([120.0, 100.0], dtype=cupy.float32)
sigma = cupy.array([0.35, 0.2], dtype=cupy.float32)
mu = cupy.array([0.15, 0.1], dtype=cupy.float32)
r =cupy.array([0.05, 0.05], dtype=cupy.float32)


In [None]:
def batch_run():
    number_of_threads = 256
    number_of_blocks = (N_PATHS * N_BATCH - 1) // number_of_threads + 1
    randoms_gpu = cupy.random.normal(0, 1, N_BATCH*N_PATHS * N_STEPS, dtype=cupy.float32)
    output = cupy.zeros(N_BATCH*N_PATHS, dtype=cupy.float32)
    cupy.cuda.stream.get_current_stream().synchronize()
    s = time.time()
    cupy_batched_barrier_option((number_of_blocks,), (number_of_threads,),
                       (output, np.float32(T), K, B, S0, sigma, mu, r,
                        randoms_gpu, N_STEPS, N_PATHS, N_BATCH))
    v = output.reshape(N_BATCH, N_PATHS).mean(axis=1)
    cupy.cuda.stream.get_current_stream().synchronize()
    e = time.time()
    print('time', e-s, 'v',v)
batch_run()

time 0.49608469009399414 v [21.20576    0.8466831]


In [None]:
with open('/content/drive/MyDrive/asian_barrier_option/cupy_dataset.py', 'w') as f:
    f.write(""")
import cupy
import numpy as np
import torch
from torch.utils.dlpack import from_dlpack
cupy.cuda.set_allocator(None)
cupy_batched_barrier_option = cupy.RawKernel(r'''
extern "C" __global__ void batched_barrier_option(
    float *d_s,
    const float T,
    const float * K,
    const float * B,
    const float * S0,
    const float * sigma,
    const float * mu,
    const float * r,
    const float * d_normals,
    const long N_STEPS,
    const long N_PATHS,
    const long N_BATCH)
{
  unsigned idx =  threadIdx.x + blockIdx.x * blockDim.x;
  unsigned stride = blockDim.x * gridDim.x;
  unsigned tid = threadIdx.x;
  const float tmp3 = sqrt(T/N_STEPS);


  for (unsigned i = idx; i<N_PATHS * N_BATCH; i+=stride)
  {
    int batch_id = i / N_PATHS;
    int path_id = i % N_PATHS;
    float s_curr = S0[batch_id];
    float tmp1 = mu[batch_id]*T/N_STEPS;
    float tmp2 = exp(-r[batch_id]*T);
    unsigned n=0;
    double running_average = 0.0;
    for(unsigned n = 0; n < N_STEPS; n++){
       s_curr += tmp1 * s_curr + sigma[batch_id]*s_curr*tmp3*d_normals[path_id + batch_id * N_PATHS + n * N_PATHS * N_BATCH];
       running_average += (s_curr - running_average) / (n + 1.0);
       if (running_average <= B[batch_id]){
           break;
       }
    }

    float payoff = (running_average>K[batch_id] ? running_average-K[batch_id] : 0.f);
    d_s[i] = tmp2 * payoff;
  }
}

''', 'batched_barrier_option')
class OptionDataSet(torch.utils.data.IterableDataset):

    def __init__(self, max_len=10, number_path = 1000, batch=2, threads=256,seed=15):
        self.num = 0
        self.max_length = max_len
        self.N_PATHS = number_path
        self.N_STEPS = 365
        self.N_BATCH = batch
        self.T = np.float32(1.0)
        self.output = cupy.zeros(self.N_BATCH*self.N_PATHS, dtype=cupy.float32)
        self.number_of_blocks = (self.N_PATHS * self.N_BATCH - 1) // threads + 1
        self.number_of_threads = threads
        cupy.random.seed(seed)

    def __len__(self):
        return self.max_length

    def __iter__(self):
        self.num = 0
        return self

    def __next__(self):
        if self.num > self.max_length:
            raise StopIteration
        X = cupy.random.rand(self.N_BATCH, 6, dtype=cupy.float32)
        # scale the [0, 1) random numbers to the correct range for each of the option parameters
        X = X * cupy.array([200.0, 0.99, 200.0, 0.4, 0.2, 0.2], dtype=cupy.float32)
        # make sure the Barrier is smaller than the Strike price
        X[:, 1] = X[:, 0] * X[:, 1]
        randoms = cupy.random.normal(0, 1, self.N_BATCH * self.N_PATHS * self.N_STEPS, dtype=cupy.float32)
        cupy_batched_barrier_option((self.number_of_blocks,), (self.number_of_threads,), (self.output, self.T, cupy.ascontiguousarray(X[:, 0]),
                              cupy.ascontiguousarray(X[:, 1]), cupy.ascontiguousarray(X[:, 2]), cupy.ascontiguousarray(X[:, 3]), cupy.ascontiguousarray(X[:, 4]), cupy.ascontiguousarray(X[:, 5]), randoms, self.N_STEPS, self.N_PATHS, self.N_BATCH))
        Y = self.output.reshape(self.N_BATCH, self.N_PATHS).mean(axis=1)
        self.num += 1
        return (from_dlpack(X.toDlpack()), from_dlpack(Y.toDlpack()))
""")

In [None]:
ds = OptionDataSet(10, number_path=100000, batch=16, seed=15)
for i in ds:
    print(i[1])

tensor([1.6558e+02, 0.0000e+00, 8.0069e+01, 1.0866e+02, 7.7740e-03, 0.0000e+00,
        2.7772e+01, 0.0000e+00, 0.0000e+00, 6.4279e+01, 0.0000e+00, 5.1346e+00,
        0.0000e+00, 1.4733e+02, 4.1851e+01, 0.0000e+00], device='cuda:0')
tensor([ 57.1285,   0.0000,   0.0000, 151.9433,   0.0000,   0.0000,   0.0000,
          9.3306,   0.0000,   0.7246, 157.0885,  10.7096,   0.0000,   0.7067,
         59.1110,  14.6442], device='cuda:0')
tensor([106.4531,   0.0000,  51.1248,  12.7823,  67.4821,   0.0000,   7.3539,
          0.0000, 143.2203,  66.0655,  66.5476, 129.6811,   0.0000,  13.5559,
         27.5546,   0.0000], device='cuda:0')
tensor([4.1777e+01, 0.0000e+00, 2.5890e+00, 1.4500e+02, 0.0000e+00, 1.5099e+00,
        1.1183e+02, 5.6967e+01, 7.5750e-05, 1.2390e+01, 0.0000e+00, 3.0183e+01,
        1.3890e+01, 5.0533e+01, 3.8499e+01, 8.2232e+01], device='cuda:0')
tensor([1.0687e+02, 3.0590e+01, 8.5428e+01, 1.9835e+01, 3.0602e+01, 1.5230e+00,
        0.0000e+00, 0.0000e+00, 4.0244e+01, 0.00

In [None]:
with open('/content/drive/MyDrive/asian_barrier_option/model.py', 'w') as f:
    f.write('''
import torch.nn as nn
import torch.nn.functional as F
import torch


class Net(nn.Module):

    def __init__(self, hidden=1024):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(6, hidden)
        self.fc2 = nn.Linear(hidden, hidden)
        self.fc3 = nn.Linear(hidden, hidden)
        self.fc4 = nn.Linear(hidden, hidden)
        self.fc5 = nn.Linear(hidden, hidden)
        self.fc6 = nn.Linear(hidden, 1)
        self.register_buffer('norm',
                             torch.tensor([200.0,
                                           198.0,
                                           200.0,
                                           0.4,
                                           0.2,
                                           0.2]))

    def forward(self, x):
        # normalize the parameter to range [0-1]
        x = x / self.norm
        x = F.elu(self.fc1(x))
        x = F.elu(self.fc2(x))
        x = F.elu(self.fc3(x))
        x = F.elu(self.fc4(x))
        x = F.elu(self.fc5(x))
        return self.fc6(x)
''')

In [None]:
from apex import amp
from ignite.engine import Engine, Events
from torch.nn import MSELoss
from ignite.handlers import Timer
from torch.optim import Adam
from ignite.contrib.handlers.param_scheduler import CosineAnnealingScheduler
from ignite.handlers import ModelCheckpoint
from model import Net
from cupy_dataset import OptionDataSet
timer = Timer(average=True)
model = Net().cuda()
loss_fn = MSELoss()
optimizer = Adam(model.parameters(), lr=1e-3)
# set the AMP optimization level to O1
opt_level = 'O1'
# wrap the optimizer and model
model, optimizer = amp.initialize(model, optimizer, opt_level=opt_level)
dataset = OptionDataSet(max_len=10000, number_path = 1024, batch=4800)

def train_update(engine, batch):
    model.train()
    optimizer.zero_grad()
    x = batch[0]
    y = batch[1]
    y_pred = model(x)
    loss = loss_fn(y_pred[:,0], y)
    # amp handles the auto loss scaling
    with amp.scale_loss(loss, optimizer) as scaled_loss:
        scaled_loss.backward()
    optimizer.step()
    return loss.item()

trainer = Engine(train_update)
log_interval = 100
timer.attach(trainer,
             start=Events.EPOCH_STARTED,
             resume=Events.ITERATION_STARTED,
             pause=Events.ITERATION_COMPLETED,
             step=Events.ITERATION_COMPLETED)
scheduler = CosineAnnealingScheduler(optimizer, 'lr', 1e-4, 1e-6, len(dataset))
trainer.add_event_handler(Events.ITERATION_STARTED, scheduler)

@trainer.on(Events.ITERATION_COMPLETED)
def log_training_loss(engine):
    iter = (engine.state.iteration - 1) % len(dataset) + 1
    if iter % log_interval == 0:
        print('loss', engine.state.output, 'average time', timer.value())

trainer.run(dataset, max_epochs=100)

Selected optimization level O1:  Insert automatic casts around Pytorch functions and Tensor methods.

Defaults for this optimization level are:
enabled                : True
opt_level              : O1
cast_model_type        : None
patch_torch_functions  : True
keep_batchnorm_fp32    : None
master_weights         : None
loss_scale             : dynamic
Processing user overrides (additional kwargs that are not None)...
After processing overrides, optimization options are:
enabled                : True
opt_level              : O1
cast_model_type        : None
patch_torch_functions  : True
keep_batchnorm_fp32    : None
master_weights         : None
loss_scale             : dynamic
Gradient overflow.  Skipping step, loss scaler 0 reducing loss scale to 32768.0
Gradient overflow.  Skipping step, loss scaler 0 reducing loss scale to 16384.0
Gradient overflow.  Skipping step, loss scaler 0 reducing loss scale to 8192.0
Gradient overflow.  Skipping step, loss scaler 0 reducing loss scale to 40

In [None]:
with open('/content/drive/MyDrive/asian_barrier_option/distributed_train.py', 'w') as f:
    f.write('''
import cupy
import numpy as np
import math
import time
import os
import torch
from torch.utils.dlpack import from_dlpack
import torch.nn as nn
import torch.nn.functional as F
import torch
from apex import amp
from ignite.engine import Engine, Events
from torch.nn import MSELoss
from torch.optim import Adam
from ignite.contrib.handlers.param_scheduler import CosineAnnealingScheduler
from ignite.handlers import ModelCheckpoint
from apex.parallel import DistributedDataParallel
import argparse
from model import Net
from cupy_dataset import OptionDataSet

parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser()
# this local_rank arg is automaticall set by distributed launch
parser.add_argument("--local_rank", default=0, type=int)
args = parser.parse_args()

args.distributed = False
if 'WORLD_SIZE' in os.environ:
    args.distributed = int(os.environ['WORLD_SIZE']) > 1

if args.distributed:
    torch.cuda.set_device(args.local_rank)
    torch.distributed.init_process_group(backend='nccl',
                                         init_method='env://')

torch.backends.cudnn.benchmark = True


model = Net().cuda()
loss_fn = MSELoss()
optimizer = Adam(model.parameters(), lr=1e-3)
opt_level = 'O1'
model, optimizer = amp.initialize(model, optimizer, opt_level=opt_level)
if args.distributed:
    model = DistributedDataParallel(model)
dataset = OptionDataSet(max_len=10000, number_path = 1024, batch=10240, seed=args.local_rank)

def train_update(engine, batch):
    model.train()
    optimizer.zero_grad()
    x = batch[0]
    y = batch[1]
    y_pred = model(x)
    loss = loss_fn(y_pred[:,0], y)
    with amp.scale_loss(loss, optimizer) as scaled_loss:
        scaled_loss.backward()
    optimizer.step()
    return loss.item()

trainer = Engine(train_update)
log_interval = 100

scheduler = CosineAnnealingScheduler(optimizer, 'lr', 1e-4, 1e-6, len(dataset))
trainer.add_event_handler(Events.ITERATION_STARTED, scheduler)

@trainer.on(Events.ITERATION_COMPLETED)
def log_training_loss(engine):
    iter = (engine.state.iteration - 1) % len(dataset) + 1
    if iter % log_interval == 0:
        print('loss', engine.state.output)

trainer.run(dataset, max_epochs=100)
''')

In [None]:
import locale
locale.getpreferredencoding = lambda: "UTF-8"

In [None]:
# Clone Apex
!git clone https://github.com/NVIDIA/apex
%cd apex

# Set CUDA home explicitly and try installation
!CUDA_HOME=/usr/local/cuda-12.2 TORCH_CUDA_ARCH_LIST="8.0;8.6;8.9;9.0" pip install -v --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" ./

In [None]:
%reset -f

!python -m torch.distributed.launch --nproc_per_node=4 /content/drive/MyDrive/asian_barrier_option/distributed_train.py

and will be removed in future. Use torchrun.
Note that --use-env is set by default in torchrun.
If your script expects `--local-rank` argument to be set, please
change it to read from `os.environ['LOCAL_RANK']` instead. See 
https://pytorch.org/docs/stable/distributed.html#launch-utility for 
further instructions

  main()
W0111 02:45:41.069000 17689 torch/distributed/run.py:793] 
W0111 02:45:41.069000 17689 torch/distributed/run.py:793] *****************************************
W0111 02:45:41.069000 17689 torch/distributed/run.py:793] Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
W0111 02:45:41.069000 17689 torch/distributed/run.py:793] *****************************************
Traceback (most recent call last):
  File "/content/drive/MyDrive/asian_barrier_option/distributed_train.py", line 12, in <module>
    from apex imp