# Homework 3: PySpark - II
### CS186, UC Berkeley, Spring 2016
### Due: Thursday Feb 25, 2016, 11:59 PM
### Note: **This homework is to be done individually!  Do not modify any existing method signatures.**
### **This is the second of two .ipynb files in this homework.

In [234]:
## On some computers it may be possible to run this lab 
## locally by using this script; you will need to run
## this each time you start the notebook.
## You do not need to run this on inst machines.

# from local_install import setup_environment
# setup_environment()

In [235]:
import pyspark
from utils import SparkContext as sc

In [236]:
from utils import CleanRDD
from utils import tests

# Part 3: CacheMap

In this part, we'll construct an rdd that is backed by a `ClockMap` and will behave like `rdd.map(func)`.  
First, implement the `ClockMap` class so that it maintains a cache (of limited `cacheSize`) using the clock replacement policy.

### * BEGIN STUDENT CODE *

In [237]:
class ClockMap:
    
    def __init__(self, cacheSize, func):
        """
        Do not change existing variables.
        [Optional] You are free to add additional items and methods.
        """
        self.cacheSize = cacheSize
        self.fn = func
        self._p = 0 # pointer
        self._increments = 0
        self._miss_count = 0
        self.buffers = [[None, 0] for x in range(cacheSize)]
        self.items_to_index = {}
        self.cleaning_map = {} #handle map deletions
        
    def _increment(self):
        """
        Do not change this method.
        Updates the clock pointer. The modulo maintains the clock nature.
        """
        self._increments += 1
        self._p = (self._p + 1) % self.cacheSize

    def __getitem__(self, k):
        """
        Returns func(k) using the buffer to cache limited results.
        
        :param k: Value to be evaluated
        
        >>> clock = ClockMap(4, lambda x: x ** 2)
        >>> clock[4]
        16
        >>> clock[3]
        9
        >>> clock._p
        2
        """
        #if key exists inside cache return func(k) don't increment hand, cache hit, set bit
        if k in self.items_to_index:
            self.buffers[self.items_to_index[k]][1] = 1
            return self.buffers[self.items_to_index[k]][0]
        #otherwise cache miss
        else:
            #if there is free space
            free_space_dest = len(self.items_to_index)
            if free_space_dest < self.cacheSize:
                self.items_to_index[k] = free_space_dest
                self.buffers[self.items_to_index[k]] = [self.fn(k), 1]
                self.cleaning_map[free_space_dest] = k
                self._increment()
            #no free space
            else:
                #go around all the buffers until a replacement is found
                while self.buffers[self._p][1] != 0:
                    self.buffers[self._p][1] = 0
                    self._increment()
                #once a false buffer hits, delete dictionary location, replace then increment
                del self.items_to_index[self.cleaning_map[self._p]]
                self.items_to_index[k] = self._p
                self.buffers[self._p] = [self.fn(k), 1]
                self.cleaning_map[self._p] = k
                self._increment()
            return self.buffers[self.items_to_index[k]][0]

Now implement `cacheMap`, which will return an rdd.

In [238]:
def cacheMap(rdd, cacheSize, func):
    """
    Returns an RDD that behaves like rdd.map(func) but
    is implemented using the ClockMap.
    
    :param rdd: Given RDD
    :param cacheSize: Number of cache/buffer pages in the ClockMap
    :param func: Function to map with
    """
    def clock_cache_generator(index, iterator):
        cache_map = ClockMap(cacheSize, func)
        for item in iterator:
            yield cache_map[item]
    
    return rdd.mapPartitionsWithIndex(clock_cache_generator)

### * END STUDENT CODE *

In [239]:
clock = ClockMap(4, lambda x: x ** 2)
print clock[4], clock[3]
print clock._p

16 9
2


Free test for you!

Output should be 
```
16, 9
2
```

# Part 4: External Algorithms

You'll need an understanding of the partitioning step of external hashing, and the divide step of external sorting (recall the lecture on external algorithms).

In [143]:
from math import ceil # For buck list
from utils import *
import itertools
import bisect
import os

The following are some tools you may want to use (examples use cases included). You should Google the unfamiliar ones!

In [12]:
# itertools.islice
generator = (y for y in range(100))
test1 = itertools.islice(generator, 5)
next(test1)

0

In [16]:
# heapq.merge
generator1 = (odd for odd in range(100) if odd % 2)
generator2 = (even for even in range(100)[::2])
key = lambda x: x
test2 = heapq.merge([generator1, generator2], key=key, reverse=False)
next(test2)

5

In [300]:
# bisect.bisect_left
buckets = [2, 4, 4]
print "If we insert 3, it goes to %d" % bisect.bisect_left(buckets, 3)
print "If we insert 1, it goes to %d" % bisect.bisect_left(buckets, 1)
print "If we insert 4, it goes to %d" % bisect.bisect_left(buckets, 4)

If we insert 3, it goes to 1
If we insert 1, it goes to 0
If we insert 4, it goes to 1


In [22]:
# RDD.sample
rdd = sc.parallelize(range(100))
fraction = 0.1
rdd.sample(False, fraction).collect()

[8, 9, 12, 30, 41, 50, 65, 76, 79, 84]

In [23]:
# Serializer and os.unlink (Serializer is provided via utils.GeneralTools)
generator1 = (odd for odd in range(100) if odd % 2)
filename = "temp"
with open(filename, "w") as f:
    serializer.dump_stream(generator1, f)

with open(filename, "r") as f:
    stream = serializer.load_stream(f)
    print next(stream)

os.unlink(filename)

1


In [24]:
# get_used_memory - returns an int in MB
get_used_memory()

104

No need to modify the following function - it should come in handy!

In [65]:
def get_sort_dir(partId, n):
    """
    Returns a path for temporary file.

    :param n: Unique identification for file
    """
    d = "tmp/sort/" + str(partId) + "/"
    if not os.path.exists(d):
        os.makedirs(d)
    return os.path.join(d, str(n))

### * BEGIN STUDENT CODE *

In [343]:
def externalSortStream(iterator, partId=0, reverse=False, keyfunc=None, serial=serializer, limit=10, batch=100):
    """
    Given an iterator, returns an iterator of sorted elements (according to parameters). 
    
    :param iterator: iterator. Expects (Key, Value).
    :param keyfunc: function applied on the keykey.
    :param reverse: Reverse default ordering if true. (default is ascending; reverse is descending) 
    :param serializer: See README.
    :param limit: memory limit.
    :param batch: Number of elements to read at a time.
    """
    
    all_runs = [] # can be used to hold a list of iterators
    run = [] # used to hold the current run of elements
    run_count = 0 #for use as a unique identifier to create files
    file_cleanup = [] # use to save file paths to call os.unlink on for clean up
    
    def load(fileobj):
        """
        Returns a generator object that outputs elements 
        from a serialized (saved) stream. Closes the file when done.
        
        :param fileobj: python object file
        """
        for _ in serial.load_stream(fileobj):
            yield _
        fileobj.close()
   
    # TODO everywhere below 
    
    #start streaming through iterator
    while True:
        #Read the stream in batches
        c = list(itertools.islice(iterator, batch))
        
        #If list is empty, done streaming
        if not c and not run:
            break

        #As long as the RAM limit has not been exceeded add the values to run
        run.extend(c)
            
        #If the memory limit has been exceeded sort the run by key and write it to the disk, then 
        #reopen the written file as an iterator and store it in all_runs
        if get_used_memory() > limit or not c:
            run.sort(key=lambda x: keyfunc(x[0]), reverse = reverse)
            path = get_sort_dir(partId, run_count)
            
            #Write run to file
            fileobj_w = open(path, 'w+')
            serial.dump_stream(run, fileobj_w)
            fileobj_w.close()
            
            #Read file as a generator
            fileobj_r = open(path, 'r')
            os.unlink(path) #Closes once the file stream is fully read and closed
            all_runs.append(load(fileobj_r))
           
            #Clear run
            del run[:]
            run_count += 1 
    
    return heapq.merge(all_runs, key=lambda x: keyfunc(x[0]), reverse=reverse)

In [344]:
# Remember to run the import box above.

def partitionByKey(rdd, ascending=True, numPartitions=None, keyfunc=lambda x: x):
    """        
    Uses sampling to partitions the elements by the return value of 
    keyfunc.

    :param ascending: Smallest first.
    :param numPartitions: Number of partitions of the returning RDD.
    :param keyfunc: function to be applied to the key.
    """
    # Base cases done.

    if numPartitions is None:
        numPartitions = rdd.getNumPartitions()

    if numPartitions == 1:
        if rdd.getNumPartitions() > 1:
            rdd = rdd.coalesce(1)
        return rdd
    
    # TODO
    buckets = getBuckets(rdd, ascending, numPartitions, key)

    # 2 cases, either the bucket list is ascending or descending
    if ascending:
        balanceLoad = lambda x: bisect.bisect_left(buckets, keyfunc(x)) # bisect.bisect_left
    else:
        balanceLoad = lambda x: len(buckets) - bisect.bisect_left(buckets, keyfunc(x)) - 1
    return rdd.partitionBy(numPartitions, balanceLoad)


def getBuckets(rdd, ascending=True, numPartitions=None, keyfunc=lambda x: x):
    """        
    [Optional] Returns a list of bucket boundaries of length (numPartitions - 1),
    in an order as specfied by the given parameters: ascending, keyfunc. 
    Bucket boundaries are determined by sampling as specified in the README.

    :param ascending: Smallest first.
    :param numPartitions: Number of partitions of the returning RDD.
    :param keyfunc: function to be applied to the key.
    """
    # Base cases done.
    pivots = []
    
    percent = 1
    temp_percent = 10 * numPartitions/rdd.count()
    if temp_percent < 1:
        percent = temp_percent
    get_sample = rdd.sample(False, percent).collect()
    get_sample.sort(key=lambda x: keyfunc(x[0]), reverse=(not ascending))
    
    pivot_count = ceil(len(get_sample)/(numPartitions - 1))
    
    #Select pivots ranges based on court distributions
    count = 1
    for key, value in get_sample:
        if count % pivot_count == 0:
            pivots.append(key)
        count += 1      
    
    return pivots

In [345]:
def sortByKey(rdd, ascending=True, numPartitions=None, keyfunc=lambda x: x):
    """
    Returns an RDD after executing an external sort using 
    functions partitionByKey and externalSortStream. 

    :param ascending: Smallest first.
    :param numPartitions: Number of partitions of the returning RDD.
    :param keyFunc: function to be applied to the key.
    """
    def generator(index, iterator):
        return externalSortStream(iterator, index, not ascending, keyfunc)
    
    partitioned_rdd = partitionByKey(rdd, ascending, numPartitions, keyfunc)
    
    return partitioned_rdd.mapPartitionsWithIndex(generator)

### * END STUDENT CODE *

Here are tests for `partitionByKey` and `externalSortStream`:

In [346]:
test_stream = ((i, i) for i in range(100))
list(externalSortStream(test_stream, keyfunc=(lambda x: abs(50 - (x ** 2)))))[:10]

[(7, 7),
 (6, 6),
 (8, 8),
 (5, 5),
 (9, 9),
 (4, 4),
 (3, 3),
 (2, 2),
 (1, 1),
 (0, 0)]

Your output should be:
```
[(7, 7),
 (6, 6),
 (8, 8),
 (5, 5),
 (9, 9),
 (4, 4),
 (3, 3),
 (2, 2),
 (1, 1),
 (0, 0)]
```

In [347]:
rdd = CleanRDD(sc.parallelize(range(20), 4).map(lambda x: (x * 37 % 6, x ** 3 % 34)))
partitionByKey(rdd)

<utils.CleanRDD.CleanRDD at 0x7fbd02bb10d0>

Your output should look rather well-distributed. Try forcing a skewed distribution and observe how effective the partitioning is.

Here's a test for `sortByKey`:

In [352]:
rdd = CleanRDD(sc.parallelize(range(100), 4).map(lambda x: (x *((-1) ** x) , x)))
sortByKey(rdd, keyfunc=lambda key: key, ascending=False).collect()[-10:]

[(-99, 99),
 (-97, 97),
 (-95, 95),
 (-93, 93),
 (-91, 91),
 (-89, 89),
 (-87, 87),
 (-85, 85),
 (-83, 83),
 (-81, 81),
 (-79, 79),
 (-77, 77),
 (-75, 75),
 (-73, 73),
 (-71, 71),
 (-69, 69),
 (-67, 67),
 (-65, 65),
 (-63, 63),
 (-61, 61),
 (-59, 59),
 (-57, 57),
 (-55, 55),
 (-53, 53),
 (-51, 51),
 (-49, 49),
 (-47, 47),
 (-45, 45),
 (-43, 43),
 (-41, 41),
 (-39, 39),
 (-37, 37),
 (-35, 35),
 (-33, 33),
 (-31, 31),
 (-29, 29),
 (-27, 27),
 (-25, 25),
 (-23, 23),
 (-21, 21),
 (-19, 19),
 (-17, 17),
 (-15, 15),
 (-13, 13),
 (-11, 11),
 (-9, 9),
 (-7, 7),
 (-5, 5),
 (-3, 3),
 (-1, 1),
 (0, 0),
 (2, 2),
 (4, 4),
 (6, 6),
 (8, 8),
 (10, 10),
 (12, 12),
 (14, 14),
 (16, 16),
 (18, 18),
 (20, 20),
 (22, 22),
 (24, 24),
 (26, 26),
 (28, 28),
 (30, 30),
 (32, 32),
 (34, 34),
 (36, 36),
 (38, 38),
 (40, 40),
 (42, 42),
 (44, 44),
 (46, 46),
 (48, 48),
 (50, 50),
 (52, 52),
 (54, 54),
 (56, 56),
 (58, 58),
 (60, 60),
 (62, 62),
 (64, 64),
 (66, 66),
 (68, 68),
 (70, 70),
 (72, 72),
 (74, 74),
 (

Your output should be:
```
[(-81, 81),
 (-83, 83),
 (-85, 85),
 (-87, 87),
 (-89, 89),
 (-91, 91),
 (-93, 93),
 (-95, 95),
 (-97, 97),
 (-99, 99)]
```

# Testing

In [336]:
tests.test3ClockMap(ClockMap)
tests.test3CacheMap(cacheMap)
tests.test4(sortByKey)

Task 3: PASS - task3ClockMap.txt matched reference output.
Task 3: PASS - task3CacheMap.txt matched reference output.
Task 4: PASS - task4.txt matched reference output.
