In [8]:
import pywren
import numpy as np
from boto.s3.connection import S3Connection
from boto.s3.key import Key
import pywren.storage as storage
import boto3
import pickle 
import sys

In [4]:
# configuration 
num_workers = 3 
bucket_name = 'terasort-yawen'

# the file to be sorted should be partitioned into "num_worker" number of files 
# as inputs to the map stage; 
# specify directory that contains files to be sorted: input1, input2, etc. 
file_path_local = 'input_files/' 
file_name = 'input'

In [66]:
import boto3
#boto3 already configured 

s3_client = boto3.client('s3')

# upload n input files to S3 (inputs to the mapper stage)
for i in range(num_workers):
    result = s3_client.put_object(
        Bucket = bucket_name,
        Body = open(file_path_local + file_name + str(i), 'rb'),
        Key = file_name+str(i)
    )

In [30]:
## sample[i − 1] <= key < sample[i] is sent to reduce i
def get_sample_keys(file_path, num_workers):
    ## Open the file with read only permit
    f = open(file_path, "r")

    ## use readlines to read all lines in the file
    ## The variable "lines" is a list containing all lines
    lines = f.readlines()

    key_list = []

    for line in lines: 
        data = line.split("  ")
        key = data[0]
        key_list.append(key)

    key_list.sort()
    length = len(key_list)
    print "num records: " + str(length)
    n = num_workers
    key_range = length/n
    index = 0
    sample_key_list = []
    for i in range(1, n+1): #1,2,3
        if (i==n):
            index = length -1
            sample_key_list.append(key_list[length-1])
        else:
            index += key_range
            sample_key_list.append(key_list[index])
        print index
    
    return sample_key_list

#sample_keys = get_sample_keys('input_files/input', num_workers)
#sample_keys

num records: 30
10
20
29


['GLSnlm0*P*', "o7~drsiz'L", '~sHd0jDv6X']

In [24]:
import time 

# partition stage: partition input data into n groups 
def mapper(data):
    import os
    
    id = data[0]
    n = num_workers = data[1]
    bucket_name = data[2]
    sample_keys = data[3]
    
    #[s3] read from input file: input<id> 
    s3 = boto3.resource('s3')
    key = 'input'+str(id)
    file_local = '/tmp/input_tmp'
    s3.Bucket(bucket_name).download_file(key, file_local)
    
    #partition 
    with open(file_local, "r") as f: 
        lines = f.readlines() #each line contains a 100B record
    os.remove(file_local)
    #p_list = []
    #p_list.append(lines[0:10])
    p_list = [[] for x in xrange(n)]  #list of n partitions
    for line in lines:
        data = line.split("  ")
        index = 0
        while data[0] > sample_keys[index]:
            #print index + " " + data[0]
            index += 1
        p_list[index].append(line)

        
    #write to output files: shuffle<id 0> shuffle<id 1> shuffle<id num_workers-1>
    f_list = [] #output file list
    
    t1 = time.time()
    s3_client = boto3.client('s3')
    for i in range(n):
        file_name = 'shuffle' + str(id) + str(i)
        result = s3_client.put_object(
            Bucket = bucket_name,
            Body = pickle.dumps(p_list[i]),
            Key = file_name
        )
    t2 = time.time()
    #return time spent (in sec) writing intermediate files 
    return t2-t1 

#mapper([0, num_workers, bucket_name, sample_keys]) 

In [25]:
import time 

# sort stage: merge n sets of data & sort 
def reducer(data):
    import os
    
    id = data[0]
    n = num_workers = data[1]
    bucket_name = data[2]
    
    #read from input file: shuffle<0 id> shuffle<1 id> ... shuffle<id num_workers-1>
    t1 = time.time()
    s3_client = boto3.client('s3')
    lines_list = []
    for i in range(n):
        key = 'shuffle'+ str(0) + str(id)
        body = s3_client.get_object(Bucket=bucket_name, Key=key)['Body'].read()
        lines = pickle.loads(body)
        lines_list.append(lines)
    t2 = time.time()
    
    #merge & sort 
    merged_lines = sum(lines_list, [])
    tuples_list = []
    for line in merged_lines:
        data = line.split('  ')
        tuples_list.append((data[0], data[1]+'  '+data[2]))
    
    sorted_tuples_list = sorted(tuples_list, key=lambda x: x[0])
    
    #[s3] write to output file: output<id>  
    with open('/tmp/sorted_output', 'w+') as f:
        for t in sorted_tuples_list: 
            f.write(t[0]+'  '+t[1])
    
    s3_client = boto3.client('s3')
    result = s3_client.put_object(
        Bucket = bucket_name,
        Body = open('/tmp/sorted_output', 'rb'),
        Key = 'sorted_output'
    )
    
    #return time (in sec) spent reading intermediate files
    return t2-t1

#reducer([0, num_workers, bucket_name])

To start using `pywren`, we first create an executor.

In [26]:
wrenexec = pywren.default_executor()

You can apply `my_function` to a list of arguments, and each will be executed remotely at the same time. 

Future is a placeholder for the returned value from applying `my_function` to the number `3`. We can call `result` on it and get the result. Note that this will block until the remote job has completed

In [32]:
map_data_list = []
reduce_data_list = []

sample_keys = get_sample_keys('input_files/input', num_workers)

for i in range(num_workers):
    map_data_list.append([i, num_workers, bucket_name, sample_keys])
    reduce_data_list.append([i, num_workers, bucket_name])

futures = wrenexec.map(mapper, map_data_list)

num records: 30
10
20
29


The pywren `get_all_results` function will wait until all of the futures are done and return their results

In [33]:
pywren.get_all_results(futures)
# returns time spent (in sec) writing intermediate data in each mapper 

[0.08512401580810547, 0.18042778968811035, 0.07289409637451172]

In [34]:
futures = wrenexec.map(reducer, reduce_data_list)

In [35]:
pywren.get_all_results(futures)
# returns time spent (in sec) reading intermediate data in each reducer 

[0.15877389907836914, 0.1647038459777832, 0.22743606567382812]

In [None]:
# final stage: concatenate outputs from the reduce/sort stage to form a single sorted output file

