In [1]:
import pywren
import numpy as np
from boto.s3.connection import S3Connection
from boto.s3.key import Key
import pywren.storage as storage
import boto3
import pickle 
import sys
import time
import os
import redis
from rediscluster import StrictRedisCluster

In [3]:
# partition stage: partition input data into n groups 
def mapper(data):
    id = data[0] #+ 2000
    n = num_workers = data[1]
    bucket_name = data[2]
    sample_keys = data[3]
    path = data[4]

    t0=time.time()
    #[s3] read from input file: input<id> 
    s3 = boto3.resource('s3')
    file_local = '/tmp/input_tmp'
    lines = []
    for i in range(4):
        i += id*4
        key = path + 'input' + str(i)
        s3.Bucket(bucket_name).download_file(key, file_local)
        with open(file_local, "r") as f: 
            lines += f.readlines() #each line contains a 100B record
        os.remove(file_local)    
    t1=time.time() 
        
    #partition 
    p_list = [[] for x in xrange(2500)]  #list of n partitions  #hardcode
    for line in lines: 
        key1 = ord(line[0])-32 # key range 32-126
        key2 = ord(line[1])-32
        #126-32+1=95
        #2500/95 ~ 26.3
        index = int(26.3*(key1+key2/95.0))  #128*19+1/(128/19) 
        p_list[index].append(line)
    
    t1_2=time.time()

    #test1
    file_tmp = '/tmp/tmp'
    for i in range(2500):
        with open(file_tmp, "w+") as f:
            f.writelines(p_list[i])
            f.seek(0)
            p_list[i] = f.read()
        os.remove(file_tmp)
        
    t2=time.time()
    
    #write to output files: shuffle<id 0> shuffle<id 1> shuffle<id num_workers-1>    
    startup_nodes = [{"host": "rediscluster1.a9ith3.clustercfg.usw2.cache.amazonaws.com", "port": "6379"}]
    redis_client = StrictRedisCluster(startup_nodes=startup_nodes, decode_responses=True, skip_full_coverage_check=True)
    
    for i in range(2500): #hardcode
        key = 'shuffle' + str(id) + str(i)
        result = redis_client.set(key, p_list[i])
    t3=time.time()

    #return time spent (in sec) writing intermediate files 
    return [t1-t0, t1_2-t1, t3-t2, t2-t1_2] #read input, compute, write shuffle 

In [5]:
# sort stage: merge n sets of data & sort 
def reducer(data):
    id = data[0]
    n = num_workers = data[1]
    bucket_name = data[2]
    
    #read from input file: shuffle<0 id> shuffle<1 id> ... shuffle<id num_workers-1>
    t0 = time.time()
    startup_nodes = [{"host": "rediscluster1.a9ith3.clustercfg.usw2.cache.amazonaws.com", "port": "6379"}]
    redis_client = StrictRedisCluster(startup_nodes=startup_nodes, decode_responses=True, skip_full_coverage_check=True)
    
    all_lines = []
    for i in range(2500): #hardcode
        key = 'shuffle'+ str(i) + str(id)
        body = redis_client.get(key)
        if body == None:
            return -1
        #lines = pickle.loads(body)
        #all_lines += lines
        all_lines.append(body)
    t1 = time.time()
        
    #test1
    file_tmp = '/tmp/tmp'
    for i in range(2500):
        with open(file_tmp, "w+") as f:
            f.write(all_lines[i])
            f.seek(0)
            all_lines[i] = f.readlines()
        os.remove(file_tmp)
        
    t1_2 = time.time()
    
    #merge & sort 
    for i in range(len(all_lines)):
        all_lines[i] = (all_lines[i][:10], all_lines[i][12:])
    all_lines.sort(key=lambda x: x[0])
    
    

    t2=time.time()
    for i in range(len(all_lines)):
        all_lines[i] = all_lines[i][0]+"  "+all_lines[i][1]        

    #[s3] write to output file: output<id>  
    s3_client = boto3.client('s3')
    file_name = 'output/sorted_output'
    for i in range(4):
        with open(file_tmp, "w") as f:
            start = 1000000*i
            end = start + 1000000
            f.writelines(lines[start:end])
            result = s3_client.put_object(
                Bucket = bucket_name,
                Body = open(file_tmp, 'rb'),
                Key = file_name + str(id*4+i)
            )
            os.remove(file_tmp)
    t3=time.time()
    
    #return time (in sec) spent reading intermediate files
    return [t1-t0, t2-t1_2, t3-t2, t1_2-t1] #read shuffle, compute, write output 

In [7]:
def final_reducer(data):
    t0=time.time()
    n = data[0]
    bucket_name = data[1]
    
    #read from input file: shuffle<0 id> shuffle<1 id> ... shuffle<id num_workers-1>
    s3 = boto3.resource('s3')
    tuples_list = []
    file_name = 'output/sorted_output'
    for i in range(n*4):
        key = file_name + str(i)
        s3.Bucket(bucket_name).download_file(key, key)

    # concatenate all files 
    subprocess.call("cd output && cat sorted_output* > sorted_output", shell=True) 
    
    t1=time.time()
    return (t1-t0)

# final stage: concatenate outputs from the reduce/sort stage to form a single sorted output file
#final_reducer([num_workers,bucket_name])

In [41]:
wrenexec = pywren.default_executor()

In [46]:
map_data_list = []
reduce_data_list = []
sample_keys = []

num_workers = 100
bucket_name = 'terasort-yawen'
path = '1TB/'

for i in range(num_workers):
    map_data_list.append([i, num_workers, bucket_name, sample_keys, path])
    reduce_data_list.append([i, num_workers, bucket_name])

In [None]:
t1_map = time.time()
futures = wrenexec.map(mapper, map_data_list)
results_map = pywren.get_all_results(futures)
t2_map = time.time()
print t2_map-t1_map

In [None]:
t1_reduce = time.time()
futures = wrenexec.map(reducer, reduce_data_list)
results_reduce = pywren.get_all_results(futures)
t2_reduce = time.time()
print t2_reduce-t1_reduce

In [39]:
t_io = []
t_comp = []
t_inter = []
t_prepare = []
t_total = []
for r in results_map:
    t_io.append(r[0])
    t_comp.append(r[1])
    t_inter.append(r[2])
    t_prepare.append(r[3])
    t_total.append(r[0]+r[1]+r[2]+r[3])
print "map:"
print "read input: " + str(sum(t_io) / len(t_io)) + "  max: " + str(max(t_io))
print "compute: " + str(sum(t_comp) / len(t_comp)) + "  max: " + str(max(t_comp))
print "prepare: " + str(sum(t_prepare) / len(t_prepare))  + "  max: " + str(max(t_prepare))
print "write inter: " + str(sum(t_inter) / len(t_inter))  + "  max: " + str(max(t_inter))
print "map_total: " + str(sum(t_total) / len(t_total))  + "  max: " + str(max(t_total))

# returns time spent (in sec) writing intermediate data in each mapper 
#results_map

map:
read input: 9.87234793854  max: 15.3644170761
compute: 4.00935843515  max: 8.18745207787
prepare: 1.81250811386  max: 3.52183794975
write inter: 7.65488364458  max: 11.2715859413
map_total: 23.3490981321  max: 34.2560219765


In [None]:
# returns time spent (in sec) reading intermediate data in each reducer 

t_io = []
t_comp = []
t_inter = []
for r in results_reduce:
    t_io.append(r[2])
    t_comp.append(r[1])
    t_inter.append(r[0])
    t_total.append(r[0]+r[1]+r[2])
print "reduce:"
print "read inter: " + str(sum(t_inter) / len(t_inter)) + "  max: " + str(max(t_inter))
print "compute: " + str(sum(t_comp) / len(t_comp)) + "  max: " + str(max(t_comp))
print "write output: " + str(sum(t_io) / len(t_io)) + "  max: " + str(max(t_io))
print "reduce_total: " + str(sum(t_total) / len(t_total))  + "  max: " + str(max(t_total))

#results_reduce