### Assignment 1, MACS 30123, Coding Appendix
#### Professor Jon Clindaniel
#### Submitted by Junho Choi

### Part A. Question 1

#### A-(1) q1_actual.py

In [None]:
from mpi4py import MPI

import numpy as np
import pandas as pd
import scipy.stats as sts
import time

def q1_mpi_helper(mu, n_runs, n_steps, eps_sub, rho):
    '''
    Helper function to be run for each process

    Input:
    - mu: parameter for the AR(1) process
    - n_runs: number of lives to simulate
    - n_steps: number of periods to simulate
    - eps_sub: epsilon_t matrix of size (epr, n_steps)
    - rho: parameter for the AR(1) process

    Output:
    - result_mat: sub-simulation completed for this process;
        should be the same size as eps_sub

    '''
    # place to record the computations
    result_mat = np.zeros_like(eps_sub)

    # computation process; requires for loop to do a scan-like operation
    for i in range(int(n_steps)):
        if i == 0:
            input_col = eps_sub[:, i]
        else:
            input_col = result_mat[:, i-1] * rho + eps_sub[:, i]
        result_mat[:, i] = input_col

    # add mu to finish things
    result_mat = result_mat + np.float32(mu)   

    return result_mat

def q1_mpi(n_runs=1000, n_steps=4160, mu=3.0, sigma=1.0,
           rho=0.5, rand_seed=25):
    '''
    mpi4py implementation of the AR(1) process simulation.

    Input:
    - n_runs: number of lives to simulate
    - n_steps: number of periods to simulate
    - mu: parameter for the AR(1) process; also starting health level
    - sigma: scale for the epsilons ~ N(0, sigma)
    - rho: parameter for the AR(1) process
    - rand_seed: randomization seed for replicability

    Output:
    - t1 (printed): seconds elapsed for total simulation

    '''
    # Get rank of process and overall size of communicator
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()

    # start time
    t0 = time.time()

    # initializing the error matrix
    np.random.seed(rand_seed)
    eps_mat = sts.norm.rvs(loc=0, scale=sigma, size=(n_runs, n_steps))

    # eps per rank
    epr = int(n_runs/size)

    # cases of epsilons to be used in this rank  
    eps_sub = eps_mat[int(epr*rank):int(epr*(rank+1)), :]
                                          
    # calculation for the sub-case, using the helper function
    # sub_result will contain the average lengths till nonpositive health
    # for each of the rhos in rho_sub
    sub_result = q1_mpi_helper(mu, n_runs, n_steps, eps_sub, rho)

    # preparing the results into a vector
    sub_result_vec = sub_result.flatten()

    # getting a container for gathering the sub-results
    results = None
    if rank == 0:
        # should be epr*n_steps
        subcase_length = sub_result_vec.shape[0]
        results = np.empty(int(subcase_length*size), dtype='float')

    # gathering the results
    comm.Gatherv(sendbuf=sub_result_vec, recvbuf=results, root=0)

    # printing the results in rank 0
    if rank == 0:
        # reformatting as dataframe-style
        df = results.reshape((int(epr*size), n_steps))

        # elapsed time
        t1 = time.time() - t0
        print("Simulation required %10.6f seconds" % t1)

    return None

def main():
    q1_mpi()

if __name__ == '__main__':
    main()

#### A-(2) q1.sbatch

The code below shows an example for using 17 cores; numbers should be changed corresponding to the number of cores to be used.

In [None]:
#!/bin/bash

#SBATCH --job-name=q1
#SBATCH --output=q1_core17.out
#SBATCH --error=q1_output.err
#SBATCH --ntasks=17
#SBATCH --partition=broadwl
#SBATCH --constraint=fdr

module load mpi4py/3.0.1a0_py3

mpirun -n 17 python ./q1_actual.py

### Part B. Question 2

#### B-(1) q2.py

In [None]:
import numpy as np
import pyopencl as cl
import pyopencl.array as cl_array
import pyopencl.clrandom as clrand
import pyopencl.tools as cltools
import scipy.stats as sts
import time
import pandas as pd
import scipy.optimize as opt

from pyopencl.scan import GenericScanKernel

def q2(n_runs=1000, n_steps=4160, mu=3, sigma=1, rho=0.5,
       rand_seed=25):
    '''
    AR(1) process simulation of (n_runs) lives for (n_steps) periods
    
    Input:
    - n_runs: number of lives to simulate
    - n_steps: number of periods to simulate
    - mu: parameter value; also used as starting point
    - sigma: standard dev. of the error term (~N(0, sigma))
    - rho: parameter of the AR(1) process
    - rand_seed: random seed for replication reasons (for the errors)

    Output:
    - elapsed (printed): time taken for overall computation

    '''
    # Set up OpenCL context and command queue
    ctx = cl.create_some_context()
    queue = cl.CommandQueue(ctx)

    # timing starts
    t0 = time.time()

    # generating a vector of epsilons (errors); length n_runs*n_steps
    NT = int(n_runs*n_steps)
    rand_gen = clrand.PhiloxGenerator(ctx, seed=rand_seed)
    eps = rand_gen.normal(queue, NT, np.float32,
                          mu=0, sigma=sigma)
    
    # establishing boundaries for each simulated walk
    # necessary for performing scans only within lives and not between
    seg_boundaries = [1] + [0]*(n_steps-1)
    seg_boundaries = np.array(seg_boundaries, dtype=np.uint8)
    seg_boundary_flags = np.tile(seg_boundaries, int(n_runs))
    seg_boundary_flags = cl_array.to_device(queue, seg_boundary_flags)

    # Defining segmented kernel operation
    prefix_sum = GenericScanKernel(ctx, np.float32,
                 arguments="__global float *ary, __global char *segflags, "
                           "__global float rho, __global float *out",
                 input_expr="ary[i]",
                 scan_expr="across_seg_boundary ? b : (rho*a+b)", neutral="0",
                 is_segment_start_expr="segflags[i]",
                 output_statement="out[i] = item",
                 options=[])

    # Allocate space for result of kernel on device
    dev_result = cl_array.empty_like(eps)

    # Enqueue and Run Scan Kernel
    prefix_sum(eps, seg_boundary_flags, rho, dev_result)

    # need to add mu to everything, after retrieval
    simulation_done = dev_result.get()
    simulation_done = simulation_done + mu

    # Results transformed into a matrix form
    simulation_done = simulation_done.reshape((n_runs, n_steps))

    # elapsed time
    final_time = time.time()
    elapsed = final_time - t0
    print("Simulation required %10.6f seconds" % elapsed)

    return None

def main():
    q2()

if __name__ == '__main__':
    main()

#### B-(2) q2.sbatch

In [None]:
#!/bin/bash
#SBATCH --job-name=gpu      # job name
#SBATCH --output=q2_try.out # output log file
#SBATCH --error=q2_try.err  # error file
#SBATCH --time=00:20:00  # 5 minutes of wall time
#SBATCH --nodes=1        # 1 GPU node
#SBATCH --partition=gpu2 # GPU2 partition
#SBATCH --ntasks=1       # 1 CPU core to drive GPU
#SBATCH --gres=gpu:1     # Request 1 GPU

module load cuda
module load mpi4py/3.0.1a0_py3

python ./q2.py

### Part C. Question 3-(a)

#### C-(1) q3.py

In [None]:
from mpi4py import MPI

import numpy as np
import pandas as pd
import scipy.stats as sts
import time

def q3_mpi_helper(mu, n_runs, n_steps, eps_mat, rho_sub):
    '''
    Helper function to be run for each process,
    calculating the average length till reaching nonpositive
    health event

    Input:
    - mu: parameter for the AR(1) process
    - n_runs: number of lives to simulate
    - n_steps: number of periods to simulate
    - eps_mat: epsilon_t matrix of size (n_runs, n_steps)
    - rho_sub: subcase of rhos to be tested

    Output:
    - mean_per_rho: numpy array to contain the average lengths till
        reaching nonpositive health event (for given values of rho);
        same length as rho_sub

    '''

    # stacking the eps
    rho_n = int(rho_sub.shape[0])
    eps_stack = np.tile(eps_mat, (rho_n, 1))

    # rho stacked, for computation
    rho_stack = np.tile(rho_sub.reshape((rho_n, 1)), int(n_runs)).flatten()

    # place to record the computations
    result_mat = np.zeros_like(eps_stack)

    # computation process; requires for loop to do a scan-like operation
    for i in range(int(n_steps)):
        if i == 0:
            input_col = eps_stack[:, i]
        else:
            input_col = result_mat[:, i-1] * rho_stack + eps_stack[:, i]
        result_mat[:, i] = input_col

    # add mu to finish things
    result_mat = result_mat + np.float32(mu)   

    # finding the first occurrence of nonpositive health case
    res = np.argmax(result_mat <= 0, axis=1)
    res = res.reshape((rho_n, int(n_runs)))
    mean_per_rho = res.mean(axis=1)

    return mean_per_rho

def q3_mpi(n_runs=1000, n_steps=4160, mu=3.0, sigma=1.0,
           rho_ub=0.95, rho_lb=-0.95, rho_n=200, rand_seed=25):
    '''
    mpi4py implementation of the "embarrassingly parallel" simulation
    of calculating average lengths till reaching nonpositive
    health event.

    Input:
    - n_runs: number of lives to simulate
    - n_steps: number of periods to simulate
    - mu: parameter for the AR(1) process; also starting health level
    - sigma: scale for the epsilons ~ N(0, sigma)
    - rho_ub: upper bound for rho (parameter for the AR(1) process)
    - rho_lb: lower bound for rho
    - rho_n: cases of rhos to test in this grid search
    - rand_seed: randomization seed for replicability

    Output:
    - t1 (printed): elapsed computation time
    - q3_mpi.csv: collecting the dataset containing the rho values and
        corresponding average lengths
    - best_rho, best_length (printed): optimal rho value and the
        corresponding average length

    '''
    # Get rank of process and overall size of communicator
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()

    # start time
    t0 = time.time()

    # initializing the error matrix
    np.random.seed(rand_seed)
    eps_mat = sts.norm.rvs(loc=0, scale=sigma, size=(n_runs, n_steps))

    # rho cases setup
    rpc = int(rho_n/size)

    # total cases of rho
    rho_cases = np.linspace(rho_lb, rho_ub, rho_n, dtype=np.float32)

    # cases of rho to be tested in this rank  
    rho_sub = rho_cases[int(rank*rpc):int(rpc*(rank+1))]
                                          
    # calculation for the sub-case, using the helper function
    # sub_result will contain the average lengths till nonpositive health
    # for each of the rhos in rho_sub
    sub_result = q3_mpi_helper(mu, n_runs, n_steps, eps_mat, rho_sub)

    # preparing the results into a vector (rho and avg length alternating)
    sub_result_vec = np.vstack((rho_sub, sub_result)).T.flatten()

    # getting a container for gathering the sub-results
    results = None
    if rank == 0:
        results = np.empty(int(2*rpc*size), dtype='float')

    # gathering the results
    comm.Gatherv(sendbuf=sub_result_vec, recvbuf=results, root=0)

    # printing the results in rank 0
    if rank == 0:
        # reformatting as dataframe-style
        df = results.reshape((int(rpc*size), 2))

        # recording and printing the best cases
        loca = np.argmax(df[:, 1])
        best_rho = df[loca, 0]
        best_length = df[loca, 1]

        print("Best avg. length of periods is %10.6f" % best_length)
        print("This is achieved by rho of %6.5f" % best_rho)

        # exporting the collected dataset
        df = pd.DataFrame(df, columns=['rho', 'avg_length'])
        df.to_csv("q3_mpi.csv", index=False)

        # elapsed time
        t1 = time.time() - t0
        print("Simulation required %10.6f seconds" % t1)

    return None

def main():
    q3_mpi()

if __name__ == '__main__':
    main()

#### C-(2) q3_a.sbatch

In [None]:
#!/bin/bash
#SBATCH --job-name=q3_try   # job name
#SBATCH --output=q3_mpi.out # output log
#SBATCH --error=q3_mpi.err  # error file
#SBATCH --ntasks=20         # cores
#SBATCH --partition=broadwl # Broadwell partition
#SBATCH --constraint=fdr    # FDR constraint

module load mpi4py/3.0.1a0_py3

mpirun -n 20 python ./q3.py

### Part D. Question 3-(b)

#### D-(1) q3_gpu.py

In [None]:
import numpy as np
import pyopencl as cl
import pyopencl.array as cl_array
import pyopencl.clrandom as clrand
import pyopencl.tools as cltools
import scipy.stats as sts
import matplotlib.pyplot as plt
import time
import pandas as pd

from pyopencl.scan import GenericScanKernel
import scipy.optimize as opt

def crit_fn(par, *args):
    '''
    Criterion function to be used in PyOpenCL implementation
    of finding optimal rho (and average length of periods)
    using scipy.optimize.minimize_scalar and also the
    "embarrassing" approach.

    Inputs:
    - par: the rho value to be tested (and optimized later)
    - *args: broken down into...
        - mu: mu value used in the AR(1) process
        - shaping: length-2 tuple; =(n_runs, n_steps)
        - eps: PyOpenCL array (length n_runs*n_steps) of AR(1) errors
        - boundaries: PyOpenCL array of segmented boundaries, to be
            used in the scan kernel
        - prefix_sum: scan kernel implementation for calculating the
            weighted (by rho and its powers) sum of errors (in eps)
    
    Outputs:
    - rtn_val: negative of the average length of periods until
        reaching nonpositive health outcomes; negative one is multiplied
        as we are using a minimization function (to find the maximum
        average length)

    '''
    ## unravelling the arguments
    mu, shaping, eps, boundaries, prefix_sum = args
    rho = par

    ## generating an array of results, then calculating the AR(1) process
    results = cl_array.empty_like(eps)
    prefix_sum(eps, boundaries, rho, results)
    results = results + mu

    ## reshaping; row = number of lives, column = number of periods
    res = results.get().reshape(shaping)

    ## for each row, trying to find the first time it reaches
    ## non-positive health outcome
    res = np.argmax(res <= 0, axis=1)

    ## calculating the negative of average length of period till
    ## reaching a nonpositive health outcome
    rtn_val = -res.mean()

    return rtn_val

def q3_gpu(n_runs=1000, n_steps=4160, mu=3.0, sigma=1,
           rho_ub=0.95, rho_lb=-0.95, rho_n=200, rand_seed=25):
    '''
    "Embarrassing" application of finding the argmin rho for minimizing
    the number of negative-or-zero health occurrences (with AR(1) process)

    Input:
    - n_runs: number of lives to simulate
    - n_steps: number of periods to simulate
    - mu: parameter value; also used as starting point
    - sigma: standard dev. of the error term (~N(0, sigma))
    - rho_ub: upper bound for the rho; rho is the parameter of AR(1)
    - rho_lb: lower bound for the rho
    - rho_n: number of rho cases to be used for simulation
    - rand_seed: random seed for replication reasons (for the errors)

    Output:
    - elapsed (printed): time taken for overall computation
    - best_rate, best_rho (printed): best (i.e., lowest) rate of non-
        positive cases, and the accompanying rho
    - q3_gpu_results.csv (csv output): collects rho and rate

    '''
    # Set up OpenCL context and command queue
    ctx = cl.create_some_context()
    queue = cl.CommandQueue(ctx)

    # timing starts
    t0 = time.time()

    # generating a vector of epsilons (errors); length n_runs*n_steps
    NT = int(n_runs*n_steps)
    rand_gen = clrand.PhiloxGenerator(ctx, seed=rand_seed)
    eps = rand_gen.normal(queue, NT, np.float32,
                          mu=0, sigma=sigma)
    
    # establishing boundaries for each simulated walk
    # necessary for performing scans only within lives and not between
    seg_boundaries = [1] + [0]*(n_steps-1)
    seg_boundaries = np.array(seg_boundaries, dtype=np.uint8)
    seg_boundary_flags = np.tile(seg_boundaries, int(n_runs))
    seg_boundary_flags = cl_array.to_device(queue, seg_boundary_flags)

    # Defining segmented kernel operation
    prefix_sum = GenericScanKernel(ctx, np.float32,
                 arguments="__global float *ary, __global char *segflags, "
                           "__global float rho, __global float *out",
                 input_expr="ary[i]",
                 scan_expr="across_seg_boundary ? b : (rho*a+b)", neutral="0",
                 is_segment_start_expr="segflags[i]",
                 output_statement="out[i] = item",
                 options=[])

    # Allocate space for result of kernel on device
    dev_result = cl_array.empty_like(eps)

    # All cases of rho
    rho_cases = np.linspace(rho_lb, rho_ub, num=rho_n, dtype=np.float32)

    # shape that the "matrix" version should take
    shaping = (int(n_runs), int(n_steps))

    # Embarrassing implementation of grid search
    best_length = -np.inf # sentinel for grid search
    best_rho = None
    rho_length_lst = []
    for rho in rho_cases:
        rho_ = np.float32(rho) # just in case type mismatch occurs
        # calculating the average length
        candidate = -crit_fn(rho_, mu, shaping, eps,
                             seg_boundary_flags, prefix_sum)
        # recording the rho and average length
        rho_length_lst.append([rho_, candidate])

        # updating the best case
        if candidate > best_length:
            best_length = candidate
            best_rho = rho_

    # printing the best cases
    print("Best avg. length of periods is %10.6f" % best_length)
    print("This is achieved by rho of %6.5f" % best_rho)    

    # exporting rho_length_lst
    df = pd.DataFrame(rho_length_lst, columns=['rho', 'avg_length'])
    df.to_csv("q3_gpu_results.csv", index=False)

    # elapsed time
    final_time = time.time()
    elapsed = final_time - t0
    print("Simulation required %10.6f seconds" % elapsed)

    return None

def main():
    q3_gpu()

if __name__ == '__main__':
    main()

#### D-(2) q3_gpu.sbatch

In [None]:
#!/bin/bash
#SBATCH --job-name=q3_gpu      # job name
#SBATCH --output=q3_gpu.out # output log file
#SBATCH --error=q3_gpu.err  # error file
#SBATCH --time=00:20:00  # 5 minutes of wall time
#SBATCH --nodes=1        # 1 GPU node
#SBATCH --partition=gpu2 # GPU2 partition
#SBATCH --ntasks=1       # 1 CPU core to drive GPU
#SBATCH --gres=gpu:1     # Request 1 GPU

module load cuda
module load mpi4py/3.0.1a0_py3

python ./q3_gpu.py

### Part E. Question 4-(a)

#### E-(1) q4.py

In [None]:
from mpi4py import MPI
import pandas as pd
import numpy as np
import scipy.stats as sts
import scipy.optimize as opt
import time

def crit_fn_mpi(par, *args):
    '''
    Criterion function to be used in mpi4py implementation
    of finding optimal rho (and average length of periods)
    using scipy.optimize.minimize_scalar.

    Inputs:
    - par: the rho value to be tested (and optimized later)
    - *args: broken down into...
        - mu: mu value used in the AR(1) process
        - eps: numpy array (shape (n_runs, n_steps)) of AR(1) errors
    
    Outputs:
    - rtn_val: negative of the average length of periods until
        reaching nonpositive health outcomes; negative one is multiplied
        as we are using a minimization function (to find the maximum
        average length)

    '''
    ## unravelling the arguments
    mu, eps = args
    rho = np.float32(par)

    ## result of the below process stored here
    result = np.empty_like(eps, dtype=np.float32)
    n_steps = int(result.shape[1])

    ## implementing the scan-like procedure for weighted sum
    ## of epsilons to be used for the AR(1) process
    for i in range(n_steps):
        if i == 0:
            input_col = eps[:, i]
        else:
            input_col = result[:, i-1] * rho + eps[:, i]
        result[:, i] = input_col

    ## adding mu to finish things
    result = result + mu
    # print(result)

    ## for each row, trying to find the first time it reaches
    ## non-positive health outcome
    res = np.argmax(result <= 0, axis=1)

    ## calculating the negative of average length of period till
    ## reaching a nonpositive health outcome
    rtn_val = -res.mean()

    return rtn_val

def q4_a(n_runs=1000, n_steps=4160, mu=3.0, sigma=1.0,
         rho_ub=0.95, rho_lb=-0.95, rand_seed=25):
    '''
    Execution function for MPI4PY + optimize.minimize_scalar
    implementation of finding the best rho

    Inputs:
    - n_runs: number of lives to simulate
    - n_steps: number of periods to simulate
    - mu: parameter for the AR(1) process; also starting health level
    - sigma: scale for the epsilons ~ N(0, sigma)
    - rho_ub: upper bound for rho (parameter for the AR(1) process)
    - rho_lb: lower bound for rho
    - rand_seed: randomization seed for replicability

    Outputs: None
    - All relevant outputs are made in the function
       "q4_mpi_part"; see below for documentation

    '''
    # timing starts
    t0 = time.time()
    
    # randomizing
    np.random.seed(rand_seed)
    total_eps = sts.norm.rvs(loc=0, scale=sigma, size=(n_runs, n_steps))

    q4_mpi_part(mu, total_eps, rho_ub, rho_lb, t0)

    return None

def q4_mpi_part(mu, total_eps, rho_ub, rho_lb, t0):
    '''
    MPI4PY and optimize.minimize_scalar implementation
    for finding the best rho

    Inputs:
    - mu: parameter for the AR(1) process; also starting health level
    - total_eps: matrix of normal errors ~ N(0, sigma), size=(n_runs, n_steps)
    - rho_ub: upper bound for rho (parameter for the AR(1) process)
    - rho_lb: lower bound for rho

    Outputs: 
    - q4_mpi_result.csv (saved in directory): output containing rank,
       best rho and best average length of first nonpositive health outcome
    - elapsed (printed) 

    '''
    # Setting up the communicator
    comm = MPI.COMM_WORLD  
    rank = comm.Get_rank()
    size = comm.Get_size()
    
    # setting up how many rows to run in the process
    n_runs = total_eps.shape[0]
    rows_per_rank = int(n_runs/size)
    
    # subsetting the epsilons
    sub_eps = total_eps[(rows_per_rank*rank):(rows_per_rank*(rank+1)), :]
     
    # optimizing in the subcase
    sub_result = opt.minimize_scalar(crit_fn_mpi,
        args=((mu, sub_eps)), bounds=(rho_lb, rho_ub))
    
    # setting up the vector to return
    sub_vector = np.array([rank, sub_result.x, -sub_result.fun],
        dtype="float")

    # setting up an empty container for collecting results
    result = None
    if rank == 0:
        result = np.empty(int(3*size), dtype="float")

    # collecting the results
    comm.Gatherv(sendbuf=sub_vector, recvbuf=result, root=0)

    # printing the results and summarizing
    if rank == 0:
        res = result.reshape((size, 3))
        res_df = pd.DataFrame(res, columns=['rank', 'rho', 'avg_length'])
        res_df.to_csv("q4_mpi_result.csv", index=False)

        elapsed = time.time() - t0
        print("Computation time is {} seconds".format(elapsed))

    return None    

def main():
    q4_a()

if __name__ == '__main__':
    main()

#### E-(2) q4_try.sbatch

In [None]:
#!/bin/bash

#SBATCH --job-name=q4_actual
#SBATCH --error=q4_try.err
#SBATCH --output=q4_try.out
#SBATCH --partition=broadwl
#SBATCH --ntasks=20
#SBATCH --constraint=fdr
#SBATCH --mem-per-cpu=2G

module load mpi4py/3.0.1a0_py3

mpirun -n 20 python ./q4.py

### Part F. Question 4-(b)

#### F-(1) q4_gpu.py

In [None]:
import numpy as np
import pyopencl as cl
import pyopencl.array as cl_array
import pyopencl.clrandom as clrand
import pyopencl.tools as cltools
import scipy.stats as sts
import time
import pandas as pd
import scipy.optimize as opt

from pyopencl.scan import GenericScanKernel

def crit_fn(par, *args):
    '''
    Criterion function to be used in PyOpenCL implementation
    of finding optimal rho (and average length of periods)
    using scipy.optimize.minimize_scalar.

    Inputs:
    - par: the rho value to be tested (and optimized later)
    - *args: broken down into...
        - mu: mu value used in the AR(1) process
        - shaping: length-2 tuple; =(n_runs, n_steps)
        - eps: PyOpenCL array (length n_runs*n_steps) of AR(1) errors
        - boundaries: PyOpenCL array of segmented boundaries, to be
            used in the scan kernel
    
    Outputs:
    - rtn_val: negative of the average length of periods until
        reaching nonpositive health outcomes; negative one is multiplied
        as we are using a minimization function (to find the maximum
        average length)

    '''
    ## unravelling the arguments
    mu, shaping, eps, boundaries, prefix_sum = args
    rho = par

    ## generating an array of results, then calculating the AR(1) process
    results = cl_array.empty_like(eps)
    prefix_sum(eps, boundaries, rho, results)
    results = results + mu

    ## reshaping; row = number of lives, column = number of periods
    res = results.get().reshape(shaping)

    ## for each row, trying to find the first time it reaches
    ## non-positive health outcome
    res = np.argmax(res <= 0, axis=1)

    ## calculating the negative of average length of period till
    ## reaching a nonpositive health outcome
    rtn_val = -res.mean()

    return rtn_val

def q4_gpu(n_runs=1000, n_steps=4160, mu=3.0, sigma=1,
           rho_ub=0.95, rho_lb=-0.95, rand_seed=25):
    '''
    PyOpenCL implementation of finding optimal rho
    (and average length of periods) using
    scipy.optimize.minimize_scalar.

    Input:
    - n_runs: number of lives to simulate
    - n_steps: number of periods to simulate
    - mu: parameter value; also used as starting point
    - sigma: standard dev. of the error term (~N(0, sigma))
    - rho_ub: upper bound for the rho; rho is the parameter of AR(1)
    - rho_lb: lower bound for the rho
    - rand_seed: random seed for replication reasons (for the errors)

    Output:
    - elapsed (printed): time taken for overall computation
    - best_rate, best_rho (printed): best (i.e., lowest) avg length
        of first nonpositive occurrences, and the accompanying rho

    '''
    # Set up OpenCL context and command queue
    ctx = cl.create_some_context()
    queue = cl.CommandQueue(ctx)

    # timing starts
    t0 = time.time()

    # generating a vector of epsilons (errors); length n_runs*n_steps
    NT = int(n_runs*n_steps)
    rand_gen = clrand.PhiloxGenerator(ctx, seed=rand_seed)
    eps = rand_gen.normal(queue, NT, np.float32,
                          mu=0, sigma=sigma)

    # establishing boundaries for each simulated walk
    # necessary for performing scans only within lives and not between
    seg_boundaries = [1] + [0]*(n_steps-1)
    seg_boundaries = np.array(seg_boundaries, dtype=np.uint8)
    seg_boundary_flags = np.tile(seg_boundaries, int(n_runs))
    seg_boundary_flags = cl_array.to_device(queue, seg_boundary_flags)

    # Defining segmented kernel operation
    prefix_sum = GenericScanKernel(ctx, np.float32,
                 arguments="__global float *ary, __global char *segflags, "
                           "__global float rho, __global float *out",
                 input_expr="ary[i]",
                 scan_expr="across_seg_boundary ? b : (rho*a+b)", neutral="0",
                 is_segment_start_expr="segflags[i]",
                 output_statement="out[i] = item",
                 options=[])

    # using scipy.optimize.minimize_scalar
    shaping = (int(n_runs), int(n_steps))
    bounded = (np.float32(rho_lb), np.float32(rho_ub))
    result = opt.minimize_scalar(
        crit_fn, args=((mu, shaping, eps, seg_boundary_flags, prefix_sum)),
        bounds=bounded)

    # best_rho and best_length
    best_rho, best_length = result.x, -result.fun
    print("Best avg. length of periods is %10.6f" % best_length)
    print("This is achieved by rho of %6.5f" % best_rho)

    # elapsed time
    final_time = time.time()
    elapsed = final_time - t0
    print("Simulation required %10.6f seconds" % elapsed)

    return None

def main():
    q4_gpu()

if __name__ == '__main__':
    main()

#### F-(2) q4_gpu.sbatch

In [None]:
#!/bin/bash
#SBATCH --job-name=q4_gpu      # job name
#SBATCH --output=q4_gpu.out # output log file
#SBATCH --error=q4_gpu.err  # error file
#SBATCH --time=00:20:00  # 5 minutes of wall time
#SBATCH --nodes=1        # 1 GPU node
#SBATCH --partition=gpu2 # GPU2 partition
#SBATCH --ntasks=1       # 1 CPU core to drive GPU
#SBATCH --gres=gpu:1     # Request 1 GPU

module load cuda
module load mpi4py/3.0.1a0_py3

python ./q4_gpu.py