# Discussion Week 3: PySpark Review

## List Comprehensions, Lambdas, Generators, and Yield

In [None]:
# List comprehensions:
x = range(100)

y = [n**2 for n in x if n < 5]

print y

y2 = [n**2 if n % 2 else 0 for n in y]
print y2

print [a * b for a in y for b in y2]

In [None]:
# Lambda Expressions

def convert_me(n):
    return 1./ n ** 2

convert_you = lambda x: 1./x ** 2

convert_me(10) == convert_you(10)

In [None]:
gen1 = lambda n: (i for i in range(n))

def gen2(n):
    for i in range(n):
        yield i

In [None]:
g1 = gen1(5)
g2 = gen2(5)
for i in range(5):
    print next(g1) == next(g2)

## RDDs

Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

In [None]:
import pyspark
n = pyspark.SparkContext()

Let's load new RDD using a collection from 0 to 10.

In [None]:
rdd1 = n.parallelize(range(10), 10)

`rdd.collect()` return a list that contains all of the elements in this RDD.

In [None]:
rdd1.collect()

Instead of making our own collection, let's load in a file:

In [None]:
rdd2 = n.textFile('link_text.txt')

Let's take a peek at the first few entries in this document - 

In [None]:
print "Here is the Raw document"

!head -n 5 link_text.txt

In [None]:
rdd2.take(5)

Let's do something interesting with this data - get the domains of all of the websites

In [None]:
def get_site(iterator):
    for link in iterator:
        index = link.find("www.")
        end = link.find(".com")
        if index > 0 and end > 0:
            yield link[index + 4: end]

site_rdd = rdd2.mapPartitionsWithIndex(lambda index, iterator: get_site(iterator))
# notice how i toss out the index
# also notice how nothing happens

In [None]:
site_rdd.take(5)

In [None]:
print rdd2.getNumPartitions(), rdd2.count()

### Notice how the object itself is not very eventful...

In [None]:
rdd2

Here is the raw implementation of `rdd.distinct` from PySpark

In [None]:
def distinct(self, numPartitions=None):
    """
    Return a new RDD containing the distinct elements in this RDD.

    >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
    [1, 2, 3]
    """
    return self.map(lambda x: (x, None)) \
               .reduceByKey(lambda x, _: x, numPartitions) \
               .map(lambda x: x[0])

Let us get the distinct URLs: 

In [None]:
site_rdd.distinct().take(10)

### Rest is left as an optional exercise - can you guess what's going on below?

In [None]:
redrdd = n.parallelize(range(24), 4)
testrdd = redrdd.map(lambda x: ((x ** 2 * 7) % 13 , x ))

In [None]:
def checkit(rdd):
    def p(n, itr):
        for i in itr:
            yield (n, i)
    return rdd.mapPartitionsWithIndex(p).collect()

testrdd = testrdd.reduceByKey(lambda x, y: x + y, 3)

In [None]:
checkit(testrdd)