## Design Approach:
* Instead of the driver task creating bootstrap samples, that funcitonality is delegated to 
executor tasks by using Spark broadcast() function.  This reduces run time by avoiding recreating the original data set
for every sample in an executor.
* Each parallel task consists of a `sample_id` and `sample_seed`.  The `sample_seed` value is used by the executor
task to create a bootstrap sample

In [1]:
from pyspark import RDD
import pickle
import base64
from pyspark import SparkConf, HiveContext
from pyspark.sql import  Row
import numpy as np
import pandas as pd
import datetime
import socket
import os
import gc

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
5,application_1533220140196_0007,pyspark3,idle,Link,Link,âœ”


SparkSession available as 'spark'.


In [2]:
NUMBER_OF_ROWS = 4000000

NUMBER_EXECUTORS = 32
NUMBER_CORES = 2
#NUMBER_PARTITIONS = NUMBER_EXECUTORS*NUMBER_CORES

NUMBER_OF_SAMPLES = 1000 #SAMPLE_SETS * SAMPLE_SET_REPLICATIONS

MAX_RANDOM_SEED = int(1e5)

In [3]:
sc.version

'2.3.1'

In [4]:
sc.applicationId

'application_1533220140196_0007'

In [5]:
spark

<pyspark.sql.session.SparkSession object at 0x7f1ca27b5f98>

In [6]:
np.random.seed(21)

col1 = np.random.exponential(1,size=NUMBER_OF_ROWS)
col2 = np.random.normal(0,1,NUMBER_OF_ROWS)
col3 = np.random.randint(1,10,NUMBER_OF_ROWS)
category = np.random.choice(['a','b','c','d','e'],NUMBER_OF_ROWS)
orig_df = pd.DataFrame(dict(category=category,col1=col1,col2=col2,col3=col3))

In [7]:
print(orig_df.shape)
print(orig_df.head(5))
print(type(orig_df))
orig_df.category.value_counts()

(4000000, 4)
  category      col1      col2  col3
0        b  0.049952  0.353572     1
1        a  0.341237  1.814845     3
2        b  1.276423  1.002329     4
3        b  0.021853  1.184573     4
4        b  0.230575  2.445508     5
<class 'pandas.core.frame.DataFrame'>
a    801266
c    800493
b    799794
d    799518
e    798929
Name: category, dtype: int64

In [8]:
# broadcast raw data to executors 
sc.broadcast(orig_df)

<pyspark.broadcast.Broadcast object at 0x7f1ca27b6a58>

## Bootstrap core computations

In [9]:
# Define Bootstrap specific Exceptions
class BootstrapError(Exception):
    """Custom excpetion for bootstrap analysis"""
    pass

In [10]:
#
# Common Calculation fucntion
#
def calculateSampleStats(df,col):
    stat_mean = df[col].mean()
    stat_min = df[col].min()
    stat_max= df[col].max()
    stat_50th = df[col].quantile(0.5)
    
    return stat_min, stat_mean, stat_max, stat_50th

In [11]:
#
# function to return summary of sample processing
#   Returns a single row of results for the sample
#
def processASampleReturnSummary(sample_run):
    # iterator: Python iterator for each record in a sample
    
    start_time = datetime.datetime.now()

    # retrieve sample id and seed for sampling
    sample_id = sample_run['sample_id']
    sample_seed = sample_run['sample_seed']

    # create bootstrap sample using the specified sample_seed value
    sample_df = orig_df.sample(n=orig_df.shape[0],replace=True,random_state=sample_seed)

    sample_df['col2'] = sample_df['col2'] + 10*sample_id

    result_stats = dict()

    for c in ['col2','col1']:
        stats = calculateSampleStats(sample_df,c)
        stats_to_return = ['min','mean','max','50th']

        result_stats.update(dict(zip([c + '_' + stat for stat in stats_to_return],
                            [float(x) for x in stats])))

    # caculate run-time performance measures
    end_time = datetime.datetime.now()

    elapsed_time_str = '{}'.format(end_time - start_time)

    start_str = '{}'.format(start_time)
    end_str = '{}'.format(end_time)

    print('>>>>>>Pid: {:d}, completed processing sample_id {:d} at {}'\
          .format(os.getpid(),sample_id,datetime.datetime.now()))

     # return results of bootstrap analysis
    return dict(sample_id=sample_id, sample_seed=sample_seed, 
                                  shape=str(sample_df.shape),
                      worker_hostname = socket.gethostname(),
                      worker_pid = os.getpid(),
                      time_start=start_str, time_end=end_str,
                      time_elapsed=elapsed_time_str ,    
                **result_stats)
        

## Create and analyze bootstrap samples 

In [12]:
print('Starting analysis for {:,d} samples'\
     .format(NUMBER_OF_SAMPLES))
bootstrap_start = datetime.datetime.now()

Starting analysis for 1,000 samples

## Create sample ids and seeds to be use in boostrap sampling

In [13]:
np.random.seed(13)  # make repeatable

sample_seeds = pd.DataFrame(dict(sample_id=np.array(range(NUMBER_OF_SAMPLES))+1,
                                sample_seed= np.random.choice(range(MAX_RANDOM_SEED),
                                                              size=NUMBER_OF_SAMPLES,
                                                             replace=False)))
sample_seeds.head(10)

   sample_id  sample_seed
0          1        72031
1          2        27978
2          3        55639
3          4        51955
4          5        52145
5          6         3011
6          7        83607
7          8        68952
8          9        90269
9         10        69234

In [14]:
sample_seeds.tail(10)

     sample_id  sample_seed
990        991        41440
991        992        92587
992        993          166
993        994        24458
994        995        74793
995        996        21813
996        997        62437
997        998        32343
998        999        81160
999       1000        53014

## Run the parallel tasks to create sample and compute metrics

In [15]:
# create RDD to contain sample_seed to create each bootstrap sample in the executors
sample_rdd = sc.parallelize(sample_seeds.to_dict('records')).repartition(100)
print("sample_rdd partitions: {:d}".format(sample_rdd.getNumPartitions()))

# use mapPartitions() to now run each bootstrap sample in parallel
results_df = sample_rdd.map(processASampleReturnSummary) 

bootstrap_results = pd.DataFrame(results_df.collect())
print('completed creating pandas dataframe creation time: {}'.format(datetime.datetime.now() - bootstrap_start))
print(bootstrap_results.shape)
        
print('shape of bootstrap_results is {}'.format(bootstrap_results.shape))

sample_rdd partitions: 100
completed creating pandas dataframe creation time: 0:04:57.137409
(1000, 16)
shape of bootstrap_results is (1000, 16)

## Show sample results

In [19]:
bootstrap_results.head()

name 'bootstrap_results' is not defined
Traceback (most recent call last):
NameError: name 'bootstrap_results' is not defined



In [20]:
bootstrap_results.tail()

name 'bootstrap_results' is not defined
Traceback (most recent call last):
NameError: name 'bootstrap_results' is not defined

