# Discussion Week 3: PySpark Review (Richard's Section)

## List Comprehensions, Lambdas, Generators, and Yield

In [None]:
# List comprehensions:
x = range(10)

# y = [n**2 for n in x if n < 5]

print y

y2 = [n**2 if n < 5 else 0 for n in x ]
print y2

# print [a * b for a in y for b in y2]

In [None]:
# Lambda Expressions

def convert_me(n):
    return 1./ n ** 2

convert_you = lambda x: 1./x ** 2

convert_me(10) == convert_you(10)

In [None]:
gen1 = lambda n: (i for i in range(n))

# def gen2(n):
#     for i in range(n):
#         yield i
        
x = gen1(5)
print x

In [None]:
print next(x)
print next(x)

In [None]:
g1 = gen1(5)
g2 = gen2(5)
for i in range(5):
    print next(g1) == next(g2)

## RDDs

Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

In [None]:
import pyspark
sc = pyspark.SparkContext()

Let's load new RDD using a collection from 0 to 9.

In [None]:
rdd1 = sc.parallelize(["one", "two", "three"], 3)

`rdd.collect()` return a list that contains all of the elements in this RDD.

In [None]:
check_partitions(rdd1)
# rdd1.collect()

Instead of making our own collection, let's load in a file:

In [None]:
rdd2 = sc.textFile('link_text.txt')

In [None]:
rdd2.take(5)

Let's take a peek at the first few entries in this document - 

In [None]:
print "Here is the Raw document"

!head -n 5 link_text.txt

In [None]:
rdd2.getNumPartitions()

Let's do something interesting with this data - get the domains of all of the websites

In [None]:
def get_site(iterator):
    for link in iterator:
        index = link.find("www.")
        end = link.find(".com")
        if index > 0 and end > 0:
            yield link[index + 4: end]

site_rdd = rdd2.mapPartitions(lambda iterator: get_site(iterator))

In [None]:
siterddcollect = site_rdd.collect()

In [None]:
siterddcollect[:20]

In [None]:
site_rdd.take(5)

In [None]:
print rdd2.getNumPartitions(), rdd2.count()

### Notice how the object itself is not very eventful...

In [None]:
site_rdd.distinct().take(10)

Here is the raw implementation of `rdd.distinct()` from PySpark

In [None]:
def distinct(self, numPartitions=2):
    """
    Return a new RDD containing the distinct elements in this RDD.

    >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
    [1, 2, 3]
    """
    return self.map(lambda key: (key, None)) \
               .reduceByKey(lambda value, val2: 1000, numPartitions) \
               .map(lambda x: x[0])

Let us get the distinct URLs: 

In [None]:
# distinct(site_rdd).take(20)
# site_rdd.distinct().take(20)

In [None]:
rdd1.map(lambda x: (1, x)).reduceByKey(lambda x, y: x + y).collect()

# Review
## Back to EXTERNAL HASHING. 

Recall the second lecture where we talked about partitioning your data:

<img src=https://docs.oracle.com/cd/B28359_01/server.111/b32024/img/cncpt158.gif />

### Ok well let's do this in PySpark (very important for Part 3)

In [None]:
# import string
# print string.lowercase
# print string.lowercase[:5]

def check_partitions(rdd):
    def p(n, iterate):
        for i in iterate:
            yield(n, i)
    return rdd.mapPartitionsWithIndex(p).collect()

# sc = n # just bc it's more intuitive

## Let's first motivate the problem of having a hash function with non-uniform data in a discrete setting:

In [None]:
% pylab inline
import numpy as np
from collections import Counter
samples = np.random.poisson(5, 1000)
# samples = np.random.rayleigh(3, 100)
import matplotlib.pyplot as plt

plt.hist(samples)
plt.xlabel("Elements")
plt.figure


## Let's make a new RDD...

In [None]:
rdd = sc.parallelize(range(4), 4)
print check_partitions(rdd)

## Generate a buncha random values for e

In [None]:
def generate_random(iterator):
    return ((np.random.rayleigh(200), i) for i in np.random.poisson(4, 20))

random_rdd = rdd.mapPartitions(lambda iterator: generate_random(iterator))

In [None]:
for i in check_partitions(random_rdd)[:20]:
    print i

In [None]:
partitioned = random_rdd.partitionBy(4)

In [None]:
for i in sorted(check_partitions(partitioned)):
    print i

In [None]:
% matplotlib inline

partition_distribution = partitioned\
            .mapPartitionsWithIndex(lambda k, y: (k for _ in y))\
            .collect() 
        
        
# print Counter(partition_distribution)
plt.hist(partition_distribution, 4)

## As shown above, partitioning is done (albeit skewed)! But what's the point of partitioning again...?

## Can you think of a better partition function to fix our skew?

In [None]:
def partition_better(key):
    pass

goodrdd = random_rdd.partitionBy(10, partitionFunc=partition_better)
partition_distribution = goodrdd\
            .mapPartitionsWithIndex(lambda y, iter: (y for i in iter))\
            .collect() 
        
        
print Counter(partition_distribution)
plt.hist(partition_distribution, 5)
