* https://ucilnica.fri.uni-lj.si/mod/resource/view.php?id=28089

<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><ul class="toc-item"><li><span><a href="#Spark-&amp;-Jupyter-notebook" data-toc-modified-id="Spark-&amp;-Jupyter-notebook-0.1"><span class="toc-item-num">0.1&nbsp;&nbsp;</span>Spark &amp; Jupyter notebook</a></span></li></ul></li><li><span><a href="#PySpark-Python-API" data-toc-modified-id="PySpark-Python-API-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>PySpark Python API</a></span></li><li><span><a href="#Parallelism-demo" data-toc-modified-id="Parallelism-demo-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Parallelism demo</a></span></li><li><span><a href="#RDD---Resilient-Distributed-Datasets" data-toc-modified-id="RDD---Resilient-Distributed-Datasets-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>RDD - Resilient Distributed Datasets</a></span></li><li><span><a href="#RDD-Actions" data-toc-modified-id="RDD-Actions-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>RDD Actions</a></span><ul class="toc-item"><li><ul class="toc-item"><li><span><a href="#Demo-files" data-toc-modified-id="Demo-files-4.0.1"><span class="toc-item-num">4.0.1&nbsp;&nbsp;</span>Demo files</a></span></li></ul></li></ul></li><li><span><a href="#RDD-Operations" data-toc-modified-id="RDD-Operations-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>RDD Operations</a></span><ul class="toc-item"><li><span><a href="#map()" data-toc-modified-id="map()-5.1"><span class="toc-item-num">5.1&nbsp;&nbsp;</span>map()</a></span></li><li><span><a href="#flatMap()" data-toc-modified-id="flatMap()-5.2"><span class="toc-item-num">5.2&nbsp;&nbsp;</span>flatMap()</a></span></li><li><span><a href="#mapValues()" data-toc-modified-id="mapValues()-5.3"><span class="toc-item-num">5.3&nbsp;&nbsp;</span>mapValues()</a></span></li><li><span><a href="#flatMapValues()" data-toc-modified-id="flatMapValues()-5.4"><span class="toc-item-num">5.4&nbsp;&nbsp;</span>flatMapValues()</a></span></li><li><span><a href="#filter()" data-toc-modified-id="filter()-5.5"><span class="toc-item-num">5.5&nbsp;&nbsp;</span>filter()</a></span></li><li><span><a href="#groupByKey()" data-toc-modified-id="groupByKey()-5.6"><span class="toc-item-num">5.6&nbsp;&nbsp;</span>groupByKey()</a></span></li><li><span><a href="#reduceByKey()" data-toc-modified-id="reduceByKey()-5.7"><span class="toc-item-num">5.7&nbsp;&nbsp;</span>reduceByKey()</a></span></li><li><span><a href="#sortBy()" data-toc-modified-id="sortBy()-5.8"><span class="toc-item-num">5.8&nbsp;&nbsp;</span>sortBy()</a></span></li><li><span><a href="#sortByKey()" data-toc-modified-id="sortByKey()-5.9"><span class="toc-item-num">5.9&nbsp;&nbsp;</span>sortByKey()</a></span></li><li><span><a href="#subtract()" data-toc-modified-id="subtract()-5.10"><span class="toc-item-num">5.10&nbsp;&nbsp;</span>subtract()</a></span></li><li><span><a href="#join()" data-toc-modified-id="join()-5.11"><span class="toc-item-num">5.11&nbsp;&nbsp;</span>join()</a></span></li></ul></li><li><span><a href="#MapReduce-demo" data-toc-modified-id="MapReduce-demo-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>MapReduce demo</a></span><ul class="toc-item"><li><ul class="toc-item"><li><span><a href="#Simplify-chained-transformations" data-toc-modified-id="Simplify-chained-transformations-6.0.1"><span class="toc-item-num">6.0.1&nbsp;&nbsp;</span>Simplify chained transformations</a></span></li></ul></li></ul></li></ul></div>

# Spark tutorial

### Spark & Jupyter notebook

To set up Spark in [Jupyter notebook](https://jupyter.org/), do the following:

1. add the following lines into ~/.bashrc
   - local access
```
    export PYSPARK_DRIVER_PYTHON=jupyter
    export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
``` 
   - remote access
```
    export PYSPARK_DRIVER_PYTHON=jupyter
    export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=<port> --ip='*'"
```   

2. run from terminal:
```
pyspark
```

Note that remote access to jupyter notebook requires a tunnel. On Windows machines, you can use [Putty](https://www.putty.org/) to set it up. In Linux environments, the following command can be used:

    ssh -N -L localhost:<port>:localhost:<local_port> <user>

Finally, you can run the notebook in your browser:

    http://localhost:<local_port>

In [1]:
import random
import re

## PySpark Python API 

PySpark can be used from standalone Python scripts by creating a `SparkContext`. You can set configuration properties by passing a `SparkConf` object to `SparkContext`.

Documentation: [pyspark package](https://spark.apache.org/docs/latest/api/python/pyspark.html)

In [2]:
from pyspark import SparkContext, SparkConf

In [3]:
# cannot run multiple SparkContexts at once (so stop one just in case)
sc = SparkContext.getOrCreate()
sc.stop()

In [4]:
# spark conf
conf = SparkConf()

In [5]:
# create a Spark context
sc = SparkContext(conf=conf)

In [16]:
sc

## Parallelism demo

In [6]:
num_samples = 100000000

In [7]:
def inside(p):
  x, y = random.random(), random.random()
  return x*x + y*y < 1

In [8]:
# watch jupyter notebook server output
count = sc.parallelize(range(0, num_samples)).filter(inside).count()

In [9]:
pi = 4 * count / num_samples

In [10]:
print(pi)

3.1416886


In [11]:
# stop Spark context
sc.stop()

## RDD - Resilient Distributed Datasets

resilient:
- (of a person or animal) able to withstand or recover quickly from difficult conditions
- (of a substance or object) able to recoil or spring back into shape after bending, stretching, or being compressed

Spark is RDD-centric!
- RDDs are immutable
- RDDs are computed lazily
- RDDs can be cached
- RDDs know who their parents are
- RDDs that contain only tuples of two elements are “pair RDDs”

## RDD Actions

**RDD** - Resilient Distributed Datasets

Some useful actions:
- take(n) – return the first n elements in the RDD as an array.
- collect() – return all elements of the RDD as an array. Use with caution.
- count() – return the number of elements in the RDD as an int.
- saveAsTextFile(‘path/to/dir’) – save the RDD to files in a directory. Will create the directory if it doesn’t exist and will fail if it does.
- foreach(func) – execute the function against every element in the RDD, but don’t keep any results.

#### Demo files

```
file1.txt:
    Apple,Amy
    Butter,Bob
    Cheese,Chucky
    Dinkel,Dieter
    Egg,Edward
    Oxtail,Oscar
    Anchovie,Alex
    Avocado,Adam
    Apple,Alex
    Apple,Adam
    Dinkel,Dieter
    Doughboy,Pilsbury
    McDonald,Ronald

file2.txt:
    Wendy,
    Doughboy,Pillsbury
    McDonald,Ronald
    Cheese,Chucky```

In [12]:
sc = SparkContext(conf=conf)

In [13]:
# input files
file1 = 'file1.txt'
file2 = 'file2.txt'

In [14]:
# load data
data1 = sc.textFile(file1)
data2 = sc.textFile(file2)

In [15]:
data1.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/C:/Users/tbresee/Documents/GIT2/DATA-MINING/ENTER/worked_notebooks/file1.txt
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)


In [None]:
print("file1: %d lines" % data1.count())

In [None]:
data1.take(3)

In [None]:
data2.collect()

In [None]:
print("file2: %d lines" % data2.count())

In [None]:
data2.take(3)

Note: the following produces output on Jupyter notebook server!

In [None]:
# prints each element in the Jupyter notebook output
data2.foreach(print)

## RDD Operations

### map()
Return a new RDD by applying a function to each element of this RDD.
- apply an operation to every element of an RDD
- return a new RDD that contains the results

In [None]:
data = sc.textFile(file1)

In [None]:
data

In [None]:
data.take(3)

In [None]:
data.map(lambda line: line.split(',')).take(3)

### flatMap()
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
- apply an operation to the value of every element of an RDD
- return a new RDD that contains the results after dropping the outermost container

In [None]:
data = sc.textFile(file1)

In [None]:
data.take(4)

In [None]:
data.flatMap(lambda line: line.split(',')).take(7)

### mapValues()
Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning.
- apply an operation to the value of every element of an RDD
- return a new RDD that contains the results

Only works with pair RDDs.

In [None]:
data = sc.textFile(file1)

In [None]:
data = data.map(lambda line: line.split(','))

In [None]:
data.take(3)

In [None]:
data = data.map(lambda pair: (pair[0], pair[1]))

In [None]:
data.take(3)

In [None]:
data.mapValues(lambda name: name.lower()).take(3)

### flatMapValues()
Pass each value in the key-value pair RDD through a flatMap function without changing the keys; this also retains the original RDD's partitioning.
- apply an operation to the value of every element of an RDD
- return a new RDD that contains the results after removing the outermost container

Only works with pair RDDs.

In [None]:
data = sc.textFile(file1)

In [None]:
data = data.map(lambda line: line.split(','))

In [None]:
data = data.map(lambda pair: (pair[0], pair[1]))

In [None]:
data.take(3)

In [None]:
data.flatMapValues(lambda name: name.lower()).take(9)

### filter()
Return a new RDD containing only the elements that satisfy a predicate.
- return a new RDD that contains only the elements that pass a **filter operation**

In [None]:
data = sc.textFile(file1)

In [None]:
data.take(3)

In [None]:
data.filter(lambda line: re.match(r'^[AEIOU]', line)).take(3)

In [None]:
data.filter(lambda line: re.match(r'.+[y]$', line)).take(3)

In [None]:
data.filter(lambda line: re.search(r'[x]$', line)).take(3)

### groupByKey()
Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.
- apply an operation to the value of every element of an RDD
- return a new RDD that contains the results after removing the outermost container

Only works with pair RDDs.

In [None]:
data = sc.textFile(file1)

In [None]:
data = data.map(lambda line: line.split(','))

In [None]:
data.take(3)

In [None]:
data = data.map(lambda pair: (pair[0], pair[1]))

In [None]:
data.take(3)

In [None]:
data.groupByKey().take(1)

In [None]:
for pair in data.groupByKey().take(1):
    print("%s: %s" % (pair[0], ",".join([n for n in pair[1]])))

### reduceByKey()
Merge the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.
- combine elements of an RDD by key and then 
- apply a reduce operation to pairs of keys
- until only a single key remains.
- return the result in a new RDD

In [None]:
data = sc.textFile(file1)

In [None]:
data = data.map(lambda line: line.split(","))

In [None]:
data = data.map(lambda pair: (pair[0], pair[1]))

In [None]:
data.take(3)

In [None]:
data.reduceByKey(lambda v1, v2: v1 + ":" + v2).take(6)

### sortBy()
Sorts this RDD by the given keyfunc.
- sort an RDD according to a sorting function
- return the results in a new RDD

In [None]:
data = sc.textFile(file1)

In [None]:
data = data.map(lambda line: line.split(","))

In [None]:
data = data.map(lambda pair: (pair[0], pair[1]))

In [None]:
data.collect()

In [None]:
data.sortBy(lambda pair: pair[1][1]).take(10)

### sortByKey()
Sorts this RDD, which is assumed to consist of (key, value) pairs.
- sort an RDD according to the natural ordering of the keys
- return the results in a new RDD

In [None]:
data = sc.textFile(file1)

In [None]:
data = data.map(lambda line: line.split(","))

In [None]:
data = data.map(lambda pair: (pair[0], pair[1]))

In [None]:
data.collect()

In [None]:
data.sortByKey().take(6)

### subtract()
Return each value in self that is not contained in other.
- return a new RDD that contains all the elements from the original RDD 
- that do not appear in a target RDD

In [None]:
data1 = sc.textFile(file1)

In [None]:
data1.collect()

In [None]:
data1.count()

In [None]:
data2 = sc.textFile(file2)

In [None]:
data2.collect()

In [None]:
data2.count()

In [None]:
data1.subtract(data2).collect()

In [None]:
data1.subtract(data2).count()

### join()
Return an RDD containing all pairs of elements with matching keys in self and other. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in self and (k, v2) is in other.
- return a new RDD that contains all the elements from the original RDD
- joined (inner join) with elements from the target RDD

In [None]:
data1 = sc.textFile(file1).map(lambda line: line.split(',')).map(lambda pair: (pair[0], pair[1]))

In [None]:
data1.collect()

In [None]:
data1.count()

In [None]:
data2 = sc.textFile(file2).map(lambda line: line.split(',')).map(lambda pair: (pair[0], pair[1]))

In [None]:
data2.collect()

In [None]:
data2.count()

In [None]:
data1.join(data2).collect()

In [None]:
data1.join(data2).count()

In [None]:
data1.fullOuterJoin(data2).take(2)

In [None]:
# stop Spark context
sc.stop()

## MapReduce demo

We will now count the occurences of each word. The typical "Hello, world!" app for Spark applications is known as word count. The map/reduce model is particularly well suited to applications like counting words in a document.

In [None]:
# create a Spark context
sc = SparkContext(conf=conf)

In [None]:
# read the target file into an RDD
lines = sc.textFile(file1)
lines.take(3)

The `flatMap()` operation first converts each line into an array of words, and then makes
each of the words an element in the new RDD.

In [None]:
# split the lines into individual words
words = lines.flatMap(lambda l: re.split(r'[^\w]+', l))
words.take(3)

The `map()` operation replaces each word with a tuple of that word and the number 1. The
pairs RDD is a pair RDD where the word is the key, and all of the values are the number 1.

In [None]:
# replace each word with a tuple of that word and the number 1
pairs = words.map(lambda w: (w, 1))
pairs.take(3)

The `reduceByKey()` operation keeps adding elements' values together until there are no
more to add for each key (word).

In [None]:
# group the elements of the RDD by key (word) and add up their values
counts = pairs.reduceByKey(lambda n1, n2: n1 + n2)
counts.take(3)

In [None]:
# sort the elements by values in descending order
counts.sortBy(lambda pair: pair[1], ascending=False).take(10)

#### Simplify chained transformations

It is good to know that the code above can also be written in the following way:

In [None]:
sorted_counts = (lines.flatMap(lambda l: re.split(r'[^\w]+', l))       # words
                      .map(lambda w: (w, 1))                           # pairs
                      .reduceByKey(lambda n1, n2: n1 + n2)             # counts
                      .sortBy(lambda pair: pair[1], ascending=False))  # sorted counts

In [None]:
sorted_counts.take(10)

In [None]:
# stop Spark context
sc.stop()