In [None]:
#Install PyWren-IBM if needed
try:
    import pywren_ibm_cloud as pywren
except ModuleNotFoundError:    
    !{sys.executable} -m pip install -U pywren-ibm-cloud==1.0.8
    import pywren_ibm_cloud as pywren

pywren.__version__

In [None]:
%config Completer.use_jedi = False

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import numpy as np
import pandas as pd
import pickle
import math
import ibm_boto3
from ibm_botocore.client import Config
from ibm_botocore.client import ClientError

In [None]:
import logging
# logging.basicConfig(level=logging.DEBUG)

In [None]:
import json

config = json.load(open('config.json'))
cos_client = ibm_boto3.client(service_name='s3',
                              ibm_api_key_id=config['ibm_cos']['api_key'],
                              config=Config(signature_version='oauth'),
                              endpoint_url=config['ibm_cos']['endpoint'])
bucket = config['pywren']['storage_bucket']

# Create test data & upload

In [None]:
# Make 2GB of test data (128M elements * 8 bytes per element * (1 column + 1 index))
test_size_mb = 2048
n_elements = test_size_mb * 2**20 // 16

In [None]:
df = pd.DataFrame(data={'val': np.random.rand(n_elements)}, index=np.arange(n_elements))
print('df memory usage:',df.memory_usage(index=True).sum() / 2**20, 'MiB')
print('df is sorted:', df.val.is_monotonic_increasing)

In [None]:
# Segment by key to simulate unsorted data
# Note: Each segment gets repartitioned into pieces that are approximately `segment_size_mb / n_segments` MiB. 
# Each piece cannot be smaller than 5MiB or else multi-part upload will fail.
segment_size_mb = 128
n_segments = math.ceil(test_size_mb / segment_size_mb)
assert segment_size_mb / n_segments > 5, "Segments too small - multi-part uploads will fail"

input_keys = [f'segmented_sort/input-{i}.pickle' for i in range(n_segments)]
repartition_keys = [f'segmented_sort/repartition-{i}.pickle' for i in range(n_segments)]
output_keys = [f'segmented_sort/output-{i}.pickle' for i in range(n_segments)]
segm_val_bounds = [(i/n_segments, (i+1)/n_segments) for i in range(n_segments)]

In [None]:
# Upload
for i in range(n_segments):
    segment_start = i*n_elements//n_segments
    segment_end = (i+1)*n_elements//n_segments
    segment_bytes = pickle.dumps(df[segment_start:segment_end].copy())
    cos_client.put_object(Bucket=bucket, 
                          Key=input_keys[i], 
                          Body=segment_bytes)

# Repartition

In [None]:
# Start multi-part uploads
multipart_ids = []
for i in range(n_segments):
    response = cos_client.create_multipart_upload(
        Bucket=config['pywren']['storage_bucket'], 
        Key=repartition_keys[i])
    multipart_ids.append(response['UploadId'])

In [None]:
# Repartition in pywren
def repartition_by_val(key, data_stream, ibm_cos, input_i):
    input_segment = pickle.loads(data_stream.read())
    output_parts = []
    for output_i, (lo, hi) in enumerate(segm_val_bounds):
        part = ibm_cos.upload_part(
            Bucket=bucket, 
            Key=repartition_keys[output_i], 
            PartNumber=input_i + 1, 
            UploadId=multipart_ids[output_i],
            Body=input_segment[lambda df: (df.val >= lo) & (df.val < hi)].to_msgpack())
        output_parts.append({
            "ETag": part["ETag"],
            "PartNumber": input_i + 1,
        })
    return output_parts

pw = pywren.ibm_cf_executor(config=config, runtime='ibmfunctions/action-python-v3.6', runtime_memory=512)
iterdata = [[f'{bucket}/{key}', i] for i, key in enumerate(input_keys)]
futures = pw.map(repartition_by_val, iterdata)
all_output_parts = pw.get_result(futures)
pw.clean()

In [None]:
# Complete multi-part uploads
for i, key in enumerate(repartition_keys):
    cos_client.complete_multipart_upload(
        Bucket=bucket,
        Key=key,
        UploadId=multipart_ids[i],
        MultipartUpload={
            "Parts": sorted((part_set[i] for part_set in all_output_parts), 
                            key=lambda part: part['PartNumber'])
        }
    )

# Merge parts of partitions & sort

In [None]:
# Merge & sort new partitions
def sort_partition(key, data_stream, ibm_cos, output_i):
    partition = pd.concat(pd.read_msgpack(data_stream.read()))
    partition.sort_values(by='val', inplace=True)
    ibm_cos.put_object(Bucket=bucket, Key=output_keys[output_i], Body=pickle.dumps(partition))

pw = pywren.ibm_cf_executor(config=config, runtime_memory=768) 
iterdata = [[f'{bucket}/{key}', i] for i, key in enumerate(repartition_keys)]
futures = pw.map(sort_partition, iterdata)
pw.get_result(futures)
pw.clean()

# Check outputs

In [None]:
output_dfs = [pickle.loads(cos_client.get_object(Bucket=bucket, Key=key)['Body'].read()) for key in output_keys]
sorted_output = pd.concat(output_dfs)
print('output memory usage:', sorted_output.memory_usage(index=True).sum() / 2**20, 'MiB')
print('output is sorted:', sorted_output.val.is_monotonic_increasing)

# Clean up

In [None]:
# Clean up unfinished multi-part uploads
for upload in cos_client.list_multipart_uploads(Bucket=bucket, Prefix='segmented_sort').get('Uploads', []):
    print(f'Aborting {upload["Key"]}')
    cos_client.abort_multipart_upload(Bucket=bucket, Key=upload['Key'], UploadId=upload['UploadId'])

In [None]:
# Clean up temp objects
temp_objects = cos_client.list_objects(Bucket=bucket, Prefix='segmented_sort').get('Contents', [])
temp_obj_keys = [obj['Key'] for obj in temp_objects]
if temp_obj_keys:
    print(f'Deleting {temp_obj_keys}')
    cos_client.delete_objects(Bucket=bucket, 
                              Delete={'Objects':[{'Key': key} for key in temp_obj_keys]})

# Debug stuff

In [None]:
# Debug code to list contents of the bucket
result = cos_client.list_objects_v2(Bucket=bucket, Prefix='segmented_sort')
contents = result.get('Contents', [])
total_size = 0
out = []
for obj in contents:
    out.append((obj['Key'], f'{obj["Size"]/2**20}MB', str(obj['LastModified'])))
    total_size += obj['Size']
print(out)
print(f'Total size: {total_size/2**20}MB')

In [None]:
# List multi-part uploads
cos_client.list_multipart_uploads(Bucket=bucket, Prefix='segmented_sort')['Uploads']

In [None]:
# Test multi-part upload minimum size limit
# Conclusion: regardless of total size, all parts except the last must be at least 5 MiB
# The last part can be any size.
segment_sizes = [5 * 2**20] * 2 + [1]
key = 'segmented-sort/multipart-test'
test_multipart_id = cos_client.create_multipart_upload(
    Bucket=config['pywren']['storage_bucket'], 
    Key=key)['UploadId']

test_parts = []
for i, size in enumerate(segment_sizes):
    part = cos_client.upload_part(
        Bucket=bucket, 
        Key=key, 
        PartNumber=i + 1, 
        UploadId=test_multipart_id,
        Body=bytes(size))
    test_parts.append({
        "ETag": part["ETag"],
        "PartNumber": i + 1,
    })

# Complete multi-part uploads
cos_client.complete_multipart_upload(
    Bucket=bucket,
    Key=key,
    UploadId=test_multipart_id,
    MultipartUpload={
        "Parts": test_parts
    }
)