In [None]:
import numpy as np
import random
import threading
import multiprocessing
import pandas as pd 
import io
import signal
import logging
from datetime import datetime, timedelta

In [1]:
def generate_path_heston(a_0, # Interval for Parameter a_0
              a_1, # Interval for  Parameter a_1
              b_0, # Interval for  Parameter b_0
              b_1, # Interval for  Parameter b_1
              gamma, # Interval for  Gamma
              v_0, # Initial value of Variation
              s_0, # Initial value of Assetprice
              rho, # Correlation of Brownian Motion
              T, # Maturity (in years)
              n, # Nr of trading days
              seed = 0): 

    #Time difference between trading days:
    dt = T/n
    # Create the differences of a Brownian Motion
    # Set the random seed for the Brownian motion - if desired
    if seed != 0:
        np.random.seed(seed)    
    dW1 = np.sqrt(dt) * np.random.randn(n)
    dW2 = np.sqrt(dt) * np.random.randn(n)
    # Dummy for Integral
    dV = rho*dW1+np.sqrt(1-rho**2)*dW2
    
    # Apply the Euler Maruyama Scheme
    # Use real randomness for the parameters!
    np.random.seed()
    # Initial value
    V = [v_0[0] + (v_0[1]-v_0[0])*np.random.uniform()]
    S = [s_0]
    for i in range(n):
        # Choose random samples in each step
        a_0_sample = a_0[0] + (a_0[1]-a_0[0])*np.random.uniform() 
        a_1_sample = a_1[0] + (a_1[1]-a_1[0])*np.random.uniform()
        b_0_sample = b_0[0] + (b_0[1]-b_0[0])*np.random.uniform()
        b_1_sample = b_1[0] + (b_1[1]-b_1[0])*np.random.uniform()
        gamma_sample = gamma[0] + (gamma[1]-gamma[0])*np.random.uniform()
        #Compute the discretized value
        S += [S[-1] + np.sqrt(V[-1])*S[-1]*dV[i]]
        V += [np.maximum(V[-1]+(b_0_sample+b_1_sample*V[-1])*dt+((a_0_sample+a_1_sample*np.max([V[-1],0]))**gamma_sample)*dW1[i],0)]
    return S


def generate_path_heston_return_dict(a_0, # Interval for Parameter a_0
              a_1, # Interval for  Parameter a_1
              b_0, # Interval for  Parameter b_0
              b_1, # Interval for  Parameter b_1
              gamma, # Interval for  Gamma
              v_0, # Initial value of Variation
              s_0, # Initial value of Assetprice
              rho, # Correlation of Brownian Motion
              T, # Maturity (in years)
              n, # Nr of trading days
              return_dict,
              pos, # core this function runs on
              number_of_runs, # loops to run in this thread
              seed = 0
              ): 

    data = np.empty((0,n+1))
    for i in range(number_of_runs):
        data = np.append(data, np.atleast_2d(generate_path_heston(a_0,a_1,b_0,b_1,gamma,v_0,s_0,rho,T,n)),axis=0)
        
    return_dict[pos] = data


def generate_heston_traing_data(a_0, # Interval for Parameter a_0
              a_1, # Interval for  Parameter a_1
              b_0, # Interval for  Parameter b_0
              b_1, # Interval for  Parameter b_1
              gamma, # Interval for  Gamma
              v_0, # Initial value of Variation
              s_0, # Initial value of Assetprice
              rho, # Correlation of Brownian Motion
              T, # Maturity (in years)
              n, # Nr of trading days
              batch_size,
              training_steps,                                
              seed = 0,
              ): 

    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    no_cpu_cores = multiprocessing.cpu_count()
    number_of_runs = int(batch_size * training_steps / no_cpu_cores)
    
    for i in range(multiprocessing.cpu_count()):
        p = multiprocessing.Process(target=generate_path_heston_return_dict, args=(a_0,a_1,b_0,b_1,gamma,v_0,s_0,rho,T,n, return_dict, i, number_of_runs))
        jobs.append(p)
        p.start()
    
    for proc in jobs:
        proc.join()

    data = np.empty((0,n+1))
    for i in range(len(return_dict.values())):
        data = np.append(data, np.atleast_2d(return_dict[i]),axis=0)
        
    return data

Generate data for a set duration. Will use all cpu cores available in your system.

In [None]:

dataset_path = "YourPathHere"

n=250
timeout = "00:05:00"


class GracefulKiller:
  kill_now = False
  def __init__(self):
    signal.signal(signal.SIGINT, self.exit_gracefully)
    signal.signal(signal.SIGTERM, self.exit_gracefully)

  def exit_gracefully(self, signum, frame):
    self.kill_now = True


killer = GracefulKiller()
start_time =datetime.now()
t = datetime.strptime(timeout, '%H:%M:%S')
delta = timedelta(hours=t.hour, minutes=t.minute, seconds=t.second)

timeout = datetime.now()

while not killer.kill_now:
    write_data = io.BytesIO()
    data = generate_heston_traing_data(a_0,a_1,b_0,b_1,gamma,v_0,s_0,rho,T,n, 256, 500)
    np.savetxt(write_data, data, fmt="%1.20f", delimiter=",", newline="\n")
    with open(dataset_path, "ba") as file:
        file.write(write_data.getbuffer())
    #df = pd.DataFrame(data)
    #df.to_csv(dataset_path, mode='a', header=False)
    if(datetime.now() > start_time + delta):
        logging.warning("Timeout reached, will exit")
        break