# Activity 1: Review of Session One

In [7]:
import findspark

# provide path to your spark directory directly
findspark.init("/Users/soumendra/spark2/")

import pyspark

In [8]:
sc = pyspark.SparkContext(appName="helloworld")

In [3]:
# let's test our setup by counting the number of nonempty lines in a text file
lines = sc.textFile('/Users/soumendra/helloworld')
lines_nonempty = lines.filter( lambda x: len(x) > 0 )
lines_nonempty.count()

15

## SparkContext

**So what version of Spark are we running?**

In [7]:
sc.version

'2.0.0'

**Let's now look more closely at the SparkContext object - *sc* - we just created.**

In [4]:
type(sc)

pyspark.context.SparkContext

**To get a list of attributes and methods accessible through the sc object, you can use the *dir()* function**

In [5]:
dir(sc)

['PACKAGE_EXTENSIONS',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getnewargs__',
 '__gt__',
 '__hash__',
 '__init__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_accumulatorServer',
 '_active_spark_context',
 '_batchSize',
 '_callsite',
 '_checkpointFile',
 '_conf',
 '_dictToJavaMap',
 '_do_init',
 '_ensure_initialized',
 '_gateway',
 '_getJavaStorageLevel',
 '_initialize_context',
 '_javaAccumulator',
 '_jsc',
 '_jvm',
 '_lock',
 '_next_accum_id',
 '_pickled_broadcast_vars',
 '_python_includes',
 '_temp_dir',
 '_unbatched_serializer',
 'accumulator',
 'addFile',
 'addPyFile',
 'appName',
 'applicationId',
 'binaryFiles',
 'binaryRecords',
 'broadcast',
 'cancelAllJobs',
 'cancelJobGroup',
 'clearFiles',
 'defaultMinPartitions',
 'd

**You can use *help()* function to get an easier to read list of the attributes**

In [6]:
help(sc)

Help on SparkContext in module pyspark.context object:

class SparkContext(builtins.object)
 |  Main entry point for Spark functionality. A SparkContext represents the
 |  connection to a Spark cluster, and can be used to create L{RDD} and
 |  broadcast variables on that cluster.
 |  
 |  Methods defined here:
 |  
 |  __enter__(self)
 |      Enable 'with SparkContext(...) as sc: app(sc)' syntax.
 |  
 |  __exit__(self, type, value, trace)
 |      Enable 'with SparkContext(...) as sc: app' syntax.
 |      
 |      Specifically stop the context on exit of the with block.
 |  
 |  __getnewargs__(self)
 |  
 |  __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class 'pyspark.profiler.BasicProfiler'>)
 |      Create a new SparkContext. At least the master and app name should be set,
 |      either through the named parameters here or through C{conf}.
 |      

# Activity 2: Exploring RDDs

**Let's first download sizable text files to play with.**

In [8]:
%%bash
wget "https://en.wikipedia.org/wiki/Python_(programming_language)" #Python Wikipedia Page

--2016-10-25 00:38:57--  https://en.wikipedia.org/wiki/Python_(programming_language)
Resolving en.wikipedia.org... 91.198.174.192, 2620:0:862:ed1a::1
Connecting to en.wikipedia.org|91.198.174.192|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/html]
Saving to: ‘Python_(programming_language)’

     0K .......... .......... .......... .......... .......... 11.3K
    50K .......... .......... .......... .......... .......... 24.7K
   100K .......... .......... .......... .......... .......... 24.8K
   150K .......... .......... .......... .......... .......... 13.2K
   200K .......... .......... .......... .......... ..........  512K
   250K .......... .......... .......... .......... .......... 38.3K
   300K .......... .......... ....                             30.1K=14s

2016-10-25 00:39:23 (22.4 KB/s) - ‘Python_(programming_language)’ saved [332270]



In [1]:
%%bash
ls

Python_(programming_language)
Spark_Activities_01_Basics.ipynb
Spark_Activities_02_Transformations_and_Actions.ipynb
ontime


In [3]:
%%bash
mv 'Python_(programming_language)' python_wiki

In [4]:
%%bash
ls

Spark_Activities_01_Basics.ipynb
Spark_Activities_02_Transformations_and_Actions.ipynb
ontime
python_wiki


**Let's now create an RDD out of the lines using the *textFile()* function.**

In [11]:
help(sc.textFile)

Help on method textFile in module pyspark.context:

textFile(name, minPartitions=None, use_unicode=True) method of pyspark.context.SparkContext instance
    Read a text file from HDFS, a local file system (available on all
    nodes), or any Hadoop-supported file system URI, and return it as an
    RDD of Strings.
    
    If use_unicode is False, the strings will be kept as `str` (encoding
    as `utf-8`), which is faster and smaller than unicode. (Added in
    Spark 1.2)
    
    >>> path = os.path.join(tempdir, "sample-text.txt")
    >>> with open(path, "w") as testFile:
    ...    _ = testFile.write("Hello world!")
    >>> textFile = sc.textFile(path)
    >>> textFile.collect()
    ['Hello world!']



** *textFile()* splits the string read from the file into constituent lines (split by the newline character).**

In [12]:
lines = sc.textFile("python_wiki")

**How do we inspect *lines*?**

In [13]:
print(lines)

python_wiki MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2


In [20]:
lines.first()

'<!DOCTYPE html>'

In [21]:
lines.take(5)

['<!DOCTYPE html>',
 '<html class="client-nojs" lang="en" dir="ltr">',
 '<head>',
 '<meta charset="UTF-8"/>',
 '<title>Python (programming language) - Wikipedia</title>']

# Activity 3: Transformations

## map()

**Square the numbers**

In [32]:
nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda num: num**2)
squared.collect()

[1, 4, 9, 16]

## flatMap()

In [97]:
lines = sc.parallelize(["hello world", "hi"])
words = lines.map(lambda line: line.split(" "))
words.collect()

[['hello', 'world'], ['hi']]

In [98]:
lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.collect()

['hello', 'world', 'hi']

In [35]:
words.collect()

['hello', 'world', 'hi']

# Set Operations

In [36]:
set1 = sc.parallelize(["coffee", "coffee", "panda", "monkey", "tea"])
set2 = sc.parallelize(["coffee", "money", "kitty"])

### distinct()

In [38]:
set1.distinct().collect()

['panda', 'coffee', 'tea', 'monkey']

### union()

In [99]:
set1.union(set2).collect()

['coffee', 'coffee', 'panda', 'monkey', 'tea', 'coffee', 'money', 'kitty']

In [101]:
set2.union(set1).collect()

['coffee', 'money', 'kitty', 'coffee', 'coffee', 'panda', 'monkey', 'tea']

### intersection()

In [42]:
set1.intersection(set2).collect()

['coffee']

### subtract()

In [44]:
set1.subtract(set2).collect()

['panda', 'tea', 'monkey']

In [45]:
set2.subtract(set1).collect()

['money', 'kitty']

## filter()

In [47]:
inputRDD = sc.parallelize(["message:", "error #523", 
                           "cause: seg fault detected", 
                           "please check you are not using windows"])
errorsRDD = inputRDD.filter(lambda x: "error" in x)
errorsRDD.collect()

['error #523']

# Activity 4: Actions

## reduce()

In [48]:
nums = sc.parallelize([1, 2, 3, 4])
nums.reduce(lambda x, y: x + y)


10

In [50]:
squared = nums.map(lambda num: num**2).reduce(lambda x, y: x + y)
squared

30

## collect()

**reduce() already collects**

In [57]:
nums.map(lambda num: num**2).reduce(lambda x, y: x + y).collect()

AttributeError: 'int' object has no attribute 'collect'

## first()

In [58]:
nums.map(lambda num: num**2).first()

1

## take()

In [59]:
nums.map(lambda num: num**2).take(3)

[1, 4, 9]

# Activity 5: Numeric RDD Operations

## stats()

In [61]:
nums.stats()

(count: 4, mean: 2.5, stdev: 1.11803398875, max: 4.0, min: 1.0)

## max()

In [63]:
nums.max()

4

## min()

In [64]:
nums.min()

1

## mean()

In [65]:
nums.mean()

2.5

# stdev()

In [68]:
nums.stdev()

1.1180339887498949

# Activity 6: Exercise

* Create a Python collection of 10,000 integers
* Create a Spark RDD from that collection
* Subtract one from each value using map
* Perform action collect to view results
* Perform action count to view counts
* Apply transformation filter (select only even numbers) and view results with collect

In [109]:
data = range(1, 10001)
len(data)

10000

In [103]:
data

range(1, 10001)

Note: Discuss xrange (Py2) vs range (Py3)

In [110]:
data = sc.parallelize(data, 8)
data.getNumPartitions()

8

In [111]:
data

PythonRDD[146] at RDD at PythonRDD.scala:48

In [105]:
def sub(value):
    return (value - 1)

data_1 = data.map(sub)

# Let's see the RDD transformation hierarchy
print(data_1.toDebugString())

b'(8) PythonRDD[144] at RDD at PythonRDD.scala:48 []\n |  ParallelCollectionRDD[143] at parallelize at PythonRDD.scala:475 []'


In [106]:
data_1.collect()

[0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99,
 100,
 101,
 102,
 103,
 104,
 105,
 106,
 107,
 108,
 109,
 110,
 111,
 112,
 113,
 114,
 115,
 116,
 117,
 118,
 119,
 120,
 121,
 122,
 123,
 124,
 125,
 126,
 127,
 128,
 129,
 130,
 131,
 132,
 133,
 134,
 135,
 136,
 137,
 138,
 139,
 140,
 141,
 142,
 143,
 144,
 145,
 146,
 147,
 148,
 149,
 150,
 151,
 152,
 153,
 154,
 155,
 156,
 157,
 158,
 159,
 160,
 161,
 162,
 163,
 164,
 165,
 166,
 167,
 168,
 169,
 170,
 171,
 172,
 173,
 174,
 175,
 176,
 177,
 178,
 179,
 180,
 181,
 182,
 183,
 184,


In [83]:
data_1.count()

10000

In [85]:
data_1.filter(lambda x: x%2 == 0).take(5)

[0, 2, 4, 6, 8]

## More Actions

In [86]:
data_1.first()

0

In [87]:
data_1.take(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [88]:
data_1.takeOrdered(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [91]:
data_1.top(10)

[9999, 9998, 9997, 9996, 9995, 9994, 9993, 9992, 9991, 9990]

* The key advantage of using takeOrdered() instead of first() or take() is that takeOrdered() returns a deterministic result, while the other two actions may return differing results, depending on the number of partions or execution environment
* takeOrdered() returns the list sorted in ascending order. The top() action is similar to takeOrdered() except that it returns the list in descending order

In [92]:
sc.parallelize([3, 2, 1, 4, 5, 7, 3, 9, 2]).take(3)

[3, 2, 1]

In [112]:
sc.parallelize([3, 2, 1, 4, 5, 7, 3, 9, 2]).takeOrdered(3)

[1, 2, 2]

# Activity 7

#### ** `groupByKey` and `reduceByKey` **
#### Let's investigate the additional transformations: [groupByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupByKey) and [reduceByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey).

#### Both of these transformations operate on pair RDDs.  A pair RDD is an RDD where each element is a pair tuple (key, value).  For example, `sc.parallelize([('a', 1), ('a', 2), ('b', 1)])` would create a pair RDD where the keys are 'a', 'a', 'b' and the values are 1, 2, 1.
#### The `reduceByKey()` transformation gathers together pairs that have the same key and applies a function to two associated values at a time. `reduceByKey()` operates by applying the function first within each partition on a per-key basis and then across the partitions.

#### While both the `groupByKey()` and `reduceByKey()` transformations can often be used to solve the same problem and will produce the same answer, the `reduceByKey()` transformation works much better for large distributed datasets. This is because Spark knows it can combine output with a common key on each partition *before* shuffling (redistributing) the data across nodes.  Only use `groupByKey()` if the operation would not benefit from reducing the data before the shuffle occurs.
#### Look at the diagram below to understand how `reduceByKey` works.  Notice how pairs on the same machine with the same key are combined (by using the lamdba function passed into reduceByKey) before the data is shuffled. Then the lamdba function is called again to reduce all the values from each partition to produce one final result.
![reduceByKey() figure](http://spark-mooc.github.io/web-assets/images/reduce_by.png)
#### On the other hand, when using the `groupByKey()` transformation - all the key-value pairs are shuffled around, causing a lot of unnecessary data to being transferred over the network.
#### To determine which machine to shuffle a pair to, Spark calls a partitioning function on the key of the pair. Spark spills data to disk when there is more data shuffled onto a single executor machine than can fit in memory. However, it flushes out the data to disk one key at a time, so if a single key has more key-value pairs than can fit in memory an out of memory exception occurs. This will be more gracefully handled in a later release of Spark so that the job can still proceed, but should still be avoided.  When Spark needs to spill to disk, performance is severely impacted.
![groupByKey() figure](http://spark-mooc.github.io/web-assets/images/group_by.png)
#### As your dataset grows, the difference in the amount of data that needs to be shuffled, between the `reduceByKey()` and `groupByKey()` transformations, becomes increasingly exaggerated.
#### Here are more transformations to prefer over `groupByKey()`:
  + #### [combineByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.combineByKey) can be used when you are combining elements but your return type differs from your input value type.
  + #### [foldByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.foldByKey) merges the values for each key using an associative function and a neutral "zero value".
#### Now let's go through a simple `groupByKey()` and `reduceByKey()` example.

In [133]:
pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])

# mapValues only used to improve format for printing
pairRDD.groupByKey().mapValues(lambda x: list(x)).collect()

[('b', [1]), ('a', [1, 2])]

In [129]:
# Different ways to sum by key
pairRDD.groupByKey().map(lambda k: (k[0], sum(k[1]))).collect()

[('b', 1), ('a', 3)]


In [119]:
# Using mapValues, which is recommended when they key doesn't change
print(pairRDD.groupByKey().mapValues(lambda x: sum(x)).collect())

[('b', 1), ('a', 3)]


In [121]:
def add(x, y):
    return(x + y)

# reduceByKey is more efficient / scalable
print(pairRDD.reduceByKey(add).collect())

[('b', 1), ('a', 3)]
