# Spark Basics 1

This notebook introduces two fundamental objects in Spark:

* The Spark Context

* The Resilient Distributed DataSet or RDD

## Spark Context
We start by creating a **SparkContext** object named **sc**. In this case we create a spark context that uses 4 *executors* (one per core)

In [12]:
a = SparkConf()

In [13]:
a.setMaster?

[0;31mSignature:[0m [0ma[0m[0;34m.[0m[0msetMaster[0m[0;34m([0m[0mvalue[0m[0;34m)[0m[0;34m[0m[0m
[0;31mDocstring:[0m Set master URL to connect to.
[0;31mFile:[0m      ~/anaconda/lib/python3.6/site-packages/pyspark/conf.py
[0;31mType:[0m      method


In [None]:
SparkConf().setMaster

In [14]:
from pyspark import SparkConf, SparkContext

#driver意思为连接spark集群的机子,所以配置host要配置当前编写代码的机子host
conf = SparkConf().setMaster('spark://10.19.140.200:31607,10.19.140.200:32733,10.19.140.200:32295').set('spark.driver.host',
                                                                                                '10.19.139.106').set(
                                                                                'spark.local.ip','10.19.139.106')

In [15]:
sc = SparkContext.getOrCreate(conf)

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.NullPointerException
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:559)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [3]:
#start the SparkContext
# import findspark
# findspark.init()

from pyspark import SparkContext 
sc = SparkContext(master="spark://10.19.140.200:31607,10.19.140.200:32733,10.19.140.200:32295",
                appName='RqyTest',)
print(sc)

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.NullPointerException
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:559)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [2]:
SparkContext?

[0;31mInit signature:[0m [0mSparkContext[0m[0;34m([0m[0mmaster[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0mappName[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0msparkHome[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0mpyFiles[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0menvironment[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0mbatchSize[0m[0;34m=[0m[0;36m0[0m[0;34m,[0m [0mserializer[0m[0;34m=[0m[0mPickleSerializer[0m[0;34m([0m[0;34m)[0m[0;34m,[0m [0mconf[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0mgateway[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0mjsc[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0mprofiler_cls[0m[0;34m=[0m[0;34m<[0m[0;32mclass[0m [0;34m'pyspark.profiler.BasicProfiler'[0m[0;34m>[0m[0;34m)[0m[0;34m[0m[0m
[0;31mDocstring:[0m     
Main entry point for Spark functionality. A SparkContext represents the
connection to a Spark cluster, and can be used to create L{RDD} and
broadcast variables on that cluster.
[0;31mIni

### Only one sparkContext at a time!
* Spark is designed for single user
* Only one sparkContext per program/notebook.
* Before starting a new sparkContext. Stop the one currently running

In [None]:
# sc.stop() #commented out so that you don't stop your context by mistake

## RDDs

<p>RDD (or Resilient Distributed DataSet) is the main novel data structure in Spark. You can think of it as a list whose elements are stored on several computers.</p>

<p><img alt="" src="Figures/SparkContextAndRDD.jpg" style="height:324px; width:900px" /></p>


The elements of each `RDD` are distributed across the **worker nodes** which are the nodes that perform the actual computations. This notebook, however, is running on the **Driver node**. As the RDD is not stored on the driver-node you cannot access it directly. The variable name `RDD` is really just a pointer to a python object which holds the information regardnig the actual location of the elements.

## Some basic RDD commands

### Parallelize 
* Simplest way to create an RDD.
* The method `A=sc.parallelize(L)`, creates an RDD named `A` from list `L`.
* `A` is an RDD of type `PythonRDD`.

In [None]:
A=sc.parallelize(range(3))
A

### Collect

* RDD content is distributed among all executors.
* `collect()` is the inverse of `parallelize()'
* collects the elements of the RDD
* Returns a list


In [None]:
L=A.collec t()
print(type(L))
print(L)

#### Using `.collect()` eliminates the benefits of parallelism
It is often tempting to `.collect()` and RDD, make it into a list, and then process the list using standard python. However, note that this means that you are using only the head node to perform the computation which means that you are not getting any benefit from spark.

Using RDD operations, as described below, **will** make use of all of the computers at your disposal.

### Map
* applies a given operation to each element of an RDD
* parameter is the function defining the operation.
* returns a new RDD.
* Operation performed in parallel on all executors.
* Each executor operates on the data **local** to it.

In [None]:
A.map(lambda x: x*x).collect()

**Note:** Here we are using **lambda** functions, later we will see that regular functions can also be used.

For more on lambda function see [here](http://www.secnetix.de/olli/Python/lambda_functions.hawk)

### Reduce

* Takes RDD as input, returns a single value.
* **Reduce operator** takes **two** elements as input returns **one** as output.
* Repeatedly applies a **reduce operator**
* Each executor reduces the data local to it.
* The results from all executors are combined.

The simplest example of a 2-to-1 operation is the sum:

In [None]:
A.reduce(lambda x,y:x+y)

Here is an example of a reduce operation that finds the shortest string in an RDD of strings.

In [None]:
words=['this','is','the','best','mac','ever']
wordRDD=sc.parallelize(words)
wordRDD.reduce(lambda w,v: w if len(w)<len(v) else v)

## Properties of reduce operations

* Reduce operations **must not depend on the order**
  * Order of operands should not matter
  * Order of application of reduce operator should not matter

* Multiplication and summation are good:

```
                1 + 3 + 5 + 2                      5 + 3 + 1 + 2 
```

 * Division and subtraction are bad:

```
                    1 - 3 - 5 - 2                      1 - 3 - 5 - 2
```

### Why must reordering not change the result?

You can think about the reduce operation as a binary tree where the leaves are the elements of the list and the root is the final result. Each triplet of the form (parent, child1, child2) corresponds to a single application of the reduce function. 

The order in which the reduce operation is applied is **determined at run time** and depends on how the RDD is partitioned across the cluster.
There are many different orders to apply the reduce operation. 

If we want the input RDD to uniquely determine the reduced value **all evaluation orders must must yield the same final result**. In addition, the order of the elements in the list must not change the result. In particular, reversing the order of the operands in a reduce function must not change the outcome. 

For example the arithmetic operations multiply `*` and add `+` can be used in a reduce, but the operations subtract `-` and divide `/` should not.

Doing so will not raise an error, but the result is unpredictable.

In [None]:
B=sc.parallelize([1,3,5,2])
B.reduce(lambda x,y: x-y)

Which of these the following orders was executed?
* $$((1-3)-5)-2$$ or
* $$(1-3)-(5-2)$$

### Using regular functions instead of lambda functions

* lambda function are short and sweet.
* but sometimes it's hard to use just one line.
* We can use full-fledged functions instead.

In [None]:
A.reduce(lambda x,y: x+y)

Suppose we want to find the 
* last word in a lexicographical order 
* among 
* the longest words in the list.

We could achieve that as follows

In [None]:
def largerThan(x,y):
    if len(x)>len(y): return x
    elif len(y)>len(x): return y
    else:  #lengths are equal, compare lexicographically
        if x>y: 
            return x
        else: 
            return y
        
wordRDD.reduce(largerThan)

## Summary

We saw how to:
* Start a SparkContext
* Create an RDD
* Perform Map and Reduce operations on an RDD
* Collect the final results back to head node.