In [17]:
# import needed packages
import multiprocessing as mp
import numpy as np
import pandas as pd
import class_3_model as cm
import time


# read in data for random n
df_random_n = pd.read_csv("class_3_random_n.csv")
# set the number of values to iterate over 
n_iter = 20
n_iter = min(n_iter, len(df_random_n))




In [18]:
# demonstration run using a large n
t0 = time.time()
n = 5000
rv = cm.log_sum_binomial(n, None)
time.time() - t0

5.7711920738220215

In [6]:
###########################
#    SERIAL 'FOR' LOOP    #
###########################

# initialize output values
vec_logsums = [0 for x in range(len(df_random_n))]

# set timer baseline
t0_serial = time.time()

# simple loop over
for i in range(n_iter):
    # important for this model to conver to int based on numerical issues
    vec_logsums[i] = cm.log_sum_binomial(int(df_random_n["random_n"].iloc[i]), int(df_random_n["random_n_id"].iloc[i]))
    
    if i%round(n_iter/5) == 0:
        print("\t%s iterations complete"%(i + 1))

t1_serial = time.time()
t_elapse_serial = t1_serial - t0_serial

print("Serial run complete in %s seconds."%t_elapse_serial)



	1 iterations complete
	5 iterations complete
	9 iterations complete
	13 iterations complete
	17 iterations complete
Serial run complete in 84.57079315185547 seconds.


In [19]:
###############################
#    ASYNCHRONOUS PARALLEL    #
###############################

t0_par_async = time.time()

#
# SOLUTION TO GET APPLY_ASYNC TO WORK WITH JUPYTER LAB: FUNCTION HAS TO BE PLACED IN MODULE AND IMPORTED: https://stackoverflow.com/questions/47313732/jupyter-notebook-never-finishes-processing-using-multiprocessing-python-3
# https://towardsdatascience.com/asynchronous-parallel-programming-in-python-with-multiprocessing-a3fc882b4023
#

# initialize output vector/array (pre-allocate memory)
vec_logsums_par_async = []

# set up dummy functions to get results
def get_result(result):
    
    global vec_logsums_par_async
    
    rand_id = result[0]
    val = result[1]
    
    # update
    vec_logsums_par_async.append(result)
    #vec_randids_par_async.append(rand_id)

# check to ensure current module is "__main__"; this is necessary in scripts that use multiprocessing. Without it, the processing framework will run the entirety of the original script in parallel
if __name__ == "__main__":
    
    # start the MP pool for asynchronous parallelization
    pool = mp.Pool(mp.cpu_count())

    # apply the function; note: if the function only takes one argument (e.g., f(x)), make sure the args is args = (x, ) - that extra comma is important
    for i in range(n_iter):
        pool.apply_async(
            # target function
            cm.log_sum_binomial,
            # function arguments 
            args = (int(df_random_n["random_n"].iloc[i]), int(df_random_n["random_n_id"].iloc[i])),
            callback = get_result
        )
        
        # 
        # pseudocode attempt at describing this function:
        # for i in 0:(n_iter - 1):
        #  assign task i to Pool
        #  use cm.log_sum_binomial with arguments (int(df_random_n["random_n"].iloc[i]), int(df_random_n["random_n_id"].iloc[i]))
        #  when task i finishes, apply get_results to the outpue
        #

    pool.close()
    pool.join()
    t1_par_async = time.time()

    # 
    t_elapse_par_async = t1_par_async - t0_par_async

# print the reduction in time
print("Asynchronous parallelization across %s cores reduced computational time by %s%s."%(mp.cpu_count(), round(100*(1 - t_elapse_par_async/t_elapse_serial), 2), "%"))



Asynchronous parallelization across 12 cores reduced computational time by 75.81%.


In [23]:
# examine results from the pool, which gives us tuples with the random_n_id + the output value associated with it
vec_logsums_par_async


[(4, 2796.848873559379),
 (11, 2842.5965874763356),
 (9, 2874.481357782093),
 (2, 2959.0453138104062),
 (12, 3044.9955641998395),
 (5, 3055.3927719082385),
 (6, 3058.858507811038),
 (8, 3180.852411589589),
 (7, 3223.8275367843053),
 (3, 3259.1780429928626),
 (1, 3332.651644132217),
 (10, 3366.615855979654),
 (13, 2909.83186399065),
 (15, 2974.987698963285),
 (19, 2891.1168901155315),
 (20, 2775.361310962021),
 (17, 3015.1902354357617),
 (14, 3284.8244886735806),
 (18, 3176.693528506229),
 (16, 3415.1361586188505)]

In [24]:
##  verify the values shown above (interactive)
# set the random id to check
rand_id_check = 4
# get the applicable data row
row = df_random_n[df_random_n["random_n_id"] == rand_id_check]
#
cm.log_sum_binomial(int(row["random_n"]), int(row["random_n_id"]))


(4, 2796.848873559379)

In [17]:
##############################
#    SYNCHRONOUS PARALLEL    #
##############################

#
# check to ensure current module is "__main__"; this is necessary in scripts that use multiprocessing. Without it, the processing framework will run the entirety of the original script in parallel
# more on this is available at: https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
#
# this approach is similar to running it in R
#

if __name__ == "__main__":
    
    # start the MP pool for asynchronous parallelization
    n_cores = mp.cpu_count()

    # copy the range
    list_task = list(range(n_iter))
    
    # set the outer return dictionary
    return_values = {}
    
    # set kill timer
    t0_par_sync = time.time()
    # upper threshold
    t_max = len(list_task)*20
    
    # set
    while ((len(list_task) > 0) & (time.time() - t0_par_sync < t_max)):
        
        # initialize the manager
        man = mp.Manager()
        return_dict = man.dict()
        
        # initialize the list of processes
        processes = []
        list_task_drop = []
        
        # start processes on available cores
        for i in range(min(n_cores, len(list_task))):
            
            # get the row index to work with
            ind = list_task[i]
            list_task_drop.append(ind)
            
            p = mp.Process(
                target = cm.log_sum_binomial_sync,
                args = (int(df_random_n["random_n"].iloc[ind]), int(df_random_n["random_n_id"].iloc[ind]), return_dict)
            )
            
            processes.append(p)
            p.start()
            
        # loop to close and join after starting
        for p in processes:
            p.join()

        # update the return values
        return_values.update(return_dict)
        
        # reduce the task list
        list_task = [x for x in list_task if x not in list_task_drop]

    t1_par_sync = time.time()

    # 
    t_elapse_par_sync = t1_par_sync - t0_par_sync


# print the reduction in time
print("Synchronous parallelization across %s cores reduced computational time by %s%s."%(n_cores, round(100*(1 - t_elapse_par_sync/t_elapse_serial), 2), "%"))



Synchronous parallelization across 12 cores reduced computational time by 78.79%.


In [20]:
t_elapse_serial

95.40652322769165

In [18]:
t_elapse_par_async

19.08370804786682

In [21]:
t_elapse_par_sync

20.233006238937378