# Python and Spark (start here)
### This notebook contains basic examples to demonstrate the use of Spark by Python. 

### The `pyspark` module is the main entry point and contains routines for connecting to a cluster. 

### Run the `SparkContext` function to obtain a connection, called a _Spark context_, to the Spark cluster.

### In this case we run the Spark process locally  and so pass the string 'local[*]' to the `SparkContext` function. 

In [2]:
import pyspark
sc = pyspark.SparkContext('local[*]')
print('Spark context:',sc)

Spark context: <pyspark.context.SparkContext object at 0x7f6225c28860>


### Resilient Distributed Datasets (RDD)

An RDD can be thought of as a _Spark_ dataset. For example, consider the familiar `iris` dataset from __R__.

1. The dataset is broken into pieces. 
1. Multiple copies of each piece are distributed to multiple computers. 

This means that: 

1. The pieces of the dataset can be analyzed by multiple computers (simultaneously)
1. If one computer goes down then the _entire_ dataset is still available. 


### There are three basic types of operations on data in Spark: 

1. Load data into an RDD on the spark cluster
1. Transform data from one RDD (on the Spark cluster) into another RDD. 
1. Pull data from an RDD into the _driver_ (often your laptop)

### The following code loads the data `[1, 2, 3]` into a Spark RDD.

In [3]:
sc.parallelize([1,2,3])

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423

### This code pulls the data from an RDD back to the driver.

In [4]:
sc.parallelize([1,2,3]).collect()

[1, 2, 3]

### The functions that _transform data_ are called "transformations". We will see some of them in a moment.

### The functions that pull data into the driver are called "actions". The `collect` function is an action.

### We can keep track of RDDs with variables, and then run functions on those RDDs.

In [5]:
an_rdd = sc.parallelize([1,2,3])
an_rdd.collect()

[1, 2, 3]

### This example, loads data from the file `Dance.txt` with the `textFile` function. 

In [6]:
rdd_line = sc.textFile("Dance_5lines.txt")

### The data from `Dance.txt` isn't really sent to the spark cluster by the `textFile` command or by other _load_ commands (when they are called). Spark is _lazy_ in that it doesn't do any work until you call an action function. 

### Function `take` is an action function the sends to you (the driver) the first 5 (in this case) elements of the RDD.

In [7]:
rdd_line.take(2)

['', 'The Best Dance of 2015']

### Function `collect` is an action that sends  __all__ elements of the RDD to the driver. In general, this isn't a good idea.

In [8]:
rdd_line.collect()

['',
 'The Best Dance of 2015',
 'By ALASTAIR MACAULAY, GIA KOURLAS, BRIAN SEIBERT and SIOBHAN BURKEDEC. 9, 2015',
 'Photo',
 '']

### Function `count` is an action that returns the number of elements in an RDD.

In [9]:
rdd_line.count()

5

In [10]:
rdd_line.takeSample(withReplacement=False, num=2)

['', 'Photo']

### See [Actions in the Spark Programming Guide](http://spark.apache.org/docs/latest/programming-guide.html#actions) for a list of common actions.

### Now we'll look at _transformations_. The first example is a very simple __map-reduce__ statement.

### The `map` function is a transformation and applies a function to each element of the input RDD in order produce the output RDD.

In [11]:
rdd_line.map(lambda line:  len(line)).take(5)

[0, 22, 78, 5, 0]

### The `reduce` function is an action and replaces two elements of the input RDD with the result obtained by applying the anonymous function (argment to `reduce`) to those two elements, which produces a single element. The function `reduce` function returns where there is only element remaining and returns that element.

In [12]:
rdd_line.map(lambda line:  len(line)).reduce(lambda x, y: x+y)

105

### Function `filter` is a transformation which returns an RDD with only those elements from the input RDD for which the anonymous function returns `True`.

In [13]:
rdd_line.filter(lambda line: "2015" in line).collect()

['The Best Dance of 2015',
 'By ALASTAIR MACAULAY, GIA KOURLAS, BRIAN SEIBERT and SIOBHAN BURKEDEC. 9, 2015']

### We will use the `split` function on strings. It takes a string as input and returns a list of strings (words).

In [14]:
"February 26,         2016".split()

['February', '26,', '2016']

### We can apply this transformation to each element of the RDD of lines from `Dance_5lines.txt`.

In [15]:
rdd_line.map(lambda line: line.split()).take(3)

[[],
 ['The', 'Best', 'Dance', 'of', '2015'],
 ['By',
  'ALASTAIR',
  'MACAULAY,',
  'GIA',
  'KOURLAS,',
  'BRIAN',
  'SEIBERT',
  'and',
  'SIOBHAN',
  'BURKEDEC.',
  '9,',
  '2015']]

### This is nice, but not what we want, which is an RDD whose elements are single words from the file.

In [16]:
rdd_line.flatMap(lambda line: line.split()).take(7)

['The', 'Best', 'Dance', 'of', '2015', 'By', 'ALASTAIR']

### Store this new RDD of words in `rdd_word`.

In [17]:
rdd_word = rdd_line.flatMap(lambda line: line.split())

In [18]:
rdd_word.take(7)

['The', 'Best', 'Dance', 'of', '2015', 'By', 'ALASTAIR']

### The next example, counts occurences of words in the file. 

### The first step is to create an RDD whose elements are tuples (a Python term) which consist of a word (the key) and the number `1` (the corresponding value). This RDD is stored in `rdd_word_1`.

In [19]:
rdd_word_1 = rdd_word.map(lambda word: (word,1))
rdd_word_1.take(5)

[('The', 1), ('Best', 1), ('Dance', 1), ('of', 1), ('2015', 1)]

Think of this RDD has containing word counts, but with duplicates. 

### Function `reduceByKey` is a tranformation which applies its argument function to any two tuples with the same key (word) and returns a tuple with that key and with the sum of the values (word counts). 

In [20]:
rdd_word_count = rdd_word_1.reduceByKey(lambda count_1, count_2: count_1 + count_2)

In [21]:
rdd_word_count.collect()

[('Best', 1),
 ('ALASTAIR', 1),
 ('Dance', 1),
 ('GIA', 1),
 ('BURKEDEC.', 1),
 ('Photo', 1),
 ('9,', 1),
 ('and', 1),
 ('BRIAN', 1),
 ('By', 1),
 ('KOURLAS,', 1),
 ('of', 1),
 ('2015', 2),
 ('MACAULAY,', 1),
 ('SEIBERT', 1),
 ('The', 1),
 ('SIOBHAN', 1)]

In [22]:
rdd_word_count.filter(lambda key_val: key_val[1]>1).collect()

[('2015', 2)]

### In this next example, we inspect the `iris` dataset. To do so we will create a dataframe and will use SQL like functions. 

### First load functions `SQLContext` and `Row` from the `pyspark.sql` module and create an SQL context.

In [23]:
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
sqlContext

<pyspark.sql.context.SQLContext at 0x7f620671b668>

### Load the `iris_noheader.csv` file as a text file. 

### The `iris_text` RDD contains one element for each line of the file. 

In [31]:
iris_text = sc.textFile("iris_noheader.csv")
iris_text.take(3)

['5.1,3.5,1.4,0.2,"setosa"',
 '4.9,3,1.4,0.2,"setosa"',
 '4.7,3.2,1.3,0.2,"setosa"']

### Now split each line by commas to create an RDD of lists of strings.

In [32]:
iris_line = iris_text.map(lambda l: l.split(","))
iris_line.take(3)

[['5.1', '3.5', '1.4', '0.2', '"setosa"'],
 ['4.9', '3', '1.4', '0.2', '"setosa"'],
 ['4.7', '3.2', '1.3', '0.2', '"setosa"']]

### Now create an RDD of tuples with decimal and string values.

In [33]:
iris_row  = iris_line.map(lambda p: Row(SepalLength=float(p[0]),
                                        SepalWidth=float(p[1]),
                                        PetalLength=float(p[2]),
                                        PetalWidth=float(p[3]),
                                        Species=p[4]))
print('iris_row:',iris_row)
iris_row.take(3)

iris_row: PythonRDD[38] at RDD at PythonRDD.scala:43


[Row(PetalLength=1.4, PetalWidth=0.2, SepalLength=5.1, SepalWidth=3.5, Species='"setosa"'),
 Row(PetalLength=1.4, PetalWidth=0.2, SepalLength=4.9, SepalWidth=3.0, Species='"setosa"'),
 Row(PetalLength=1.3, PetalWidth=0.2, SepalLength=4.7, SepalWidth=3.2, Species='"setosa"')]

### RowMatrix (distributed)

A RowMatrix is a row-oriented distributed matrix without meaningful row indices, backed by an RDD of its rows, where each row is a local vector.

### Convert `iris_line` to type `RowMatrix`

In [62]:
from pyspark.mllib.linalg.distributed import RowMatrix
rows = iris_line.map(lambda line: (line[0], line[1], line[2], line[3]))
mat = RowMatrix(rows)
mat.numRows(), mat.numCols()

mat: <pyspark.mllib.linalg.distributed.RowMatrix object at 0x7f620518b390>


(150, 4)

In [61]:
from pyspark.mllib.stat import Statistics

# Compute column summary statistics.
summary = Statistics.colStats(mat)
print(summary.mean())
print(summary.variance())
print(summary.numNonzeros())

AttributeError: 'RowMatrix' object has no attribute 'map'

### Now create a dataframe from this RDD of tuples.

In [34]:
iris_df = sqlContext.createDataFrame(iris_row)
iris_df.printSchema()
iris_df.take(3)

root
 |-- PetalLength: double (nullable = true)
 |-- PetalWidth: double (nullable = true)
 |-- SepalLength: double (nullable = true)
 |-- SepalWidth: double (nullable = true)
 |-- Species: string (nullable = true)



[Row(PetalLength=1.4, PetalWidth=0.2, SepalLength=5.1, SepalWidth=3.5, Species='"setosa"'),
 Row(PetalLength=1.4, PetalWidth=0.2, SepalLength=4.9, SepalWidth=3.0, Species='"setosa"'),
 Row(PetalLength=1.3, PetalWidth=0.2, SepalLength=4.7, SepalWidth=3.2, Species='"setosa"')]

### Now we can use the functions that operate on dataframes. 

In [35]:
iris_df.select("PetalLength","PetalWidth","Species").filter(iris_df['PetalLength'] > 1.5).show()

+-----------+----------+------------+
|PetalLength|PetalWidth|     Species|
+-----------+----------+------------+
|        1.7|       0.4|    "setosa"|
|        1.6|       0.2|    "setosa"|
|        1.7|       0.3|    "setosa"|
|        1.7|       0.2|    "setosa"|
|        1.7|       0.5|    "setosa"|
|        1.9|       0.2|    "setosa"|
|        1.6|       0.2|    "setosa"|
|        1.6|       0.4|    "setosa"|
|        1.6|       0.2|    "setosa"|
|        1.6|       0.2|    "setosa"|
|        1.6|       0.6|    "setosa"|
|        1.9|       0.4|    "setosa"|
|        1.6|       0.2|    "setosa"|
|        4.7|       1.4|"versicolor"|
|        4.5|       1.5|"versicolor"|
|        4.9|       1.5|"versicolor"|
|        4.0|       1.3|"versicolor"|
|        4.6|       1.5|"versicolor"|
|        4.5|       1.3|"versicolor"|
|        4.7|       1.6|"versicolor"|
+-----------+----------+------------+
only showing top 20 rows



In [36]:
iris_df.describe("PetalLength","PetalWidth").show()

+-------+------------------+------------------+
|summary|       PetalLength|        PetalWidth|
+-------+------------------+------------------+
|  count|               150|               150|
|   mean|3.7580000000000027| 1.199333333333334|
| stddev|1.7652982332594662|0.7622376689603467|
|    min|               1.0|               0.1|
|    max|               6.9|               2.5|
+-------+------------------+------------------+



In [37]:
iris_df.groupBy("Species").count().show()

+------------+-----+
|     Species|count|
+------------+-----+
|    "setosa"|   50|
|"versicolor"|   50|
| "virginica"|   50|
+------------+-----+



### See [pyspark.sql documentation](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html).

In [38]:
from pyspark.sql import functions as pf
iris_df.select("PetalLength","Species"). \
groupBy("Species"). \
agg(pf.mean(iris_df.PetalLength)). \
collect()

[Row(Species='"setosa"', avg(PetalLength)=1.4620000000000002),
 Row(Species='"versicolor"', avg(PetalLength)=4.26),
 Row(Species='"virginica"', avg(PetalLength)=5.552)]

In [39]:
iris_df.registerTempTable("iris")

In [40]:
sqlContext.sql("SELECT * from iris")

DataFrame[PetalLength: double, PetalWidth: double, SepalLength: double, SepalWidth: double, Species: string]

In [41]:
sqlContext.sql("SELECT * from iris").show()

+-----------+----------+-----------+----------+--------+
|PetalLength|PetalWidth|SepalLength|SepalWidth| Species|
+-----------+----------+-----------+----------+--------+
|        1.4|       0.2|        5.1|       3.5|"setosa"|
|        1.4|       0.2|        4.9|       3.0|"setosa"|
|        1.3|       0.2|        4.7|       3.2|"setosa"|
|        1.5|       0.2|        4.6|       3.1|"setosa"|
|        1.4|       0.2|        5.0|       3.6|"setosa"|
|        1.7|       0.4|        5.4|       3.9|"setosa"|
|        1.4|       0.3|        4.6|       3.4|"setosa"|
|        1.5|       0.2|        5.0|       3.4|"setosa"|
|        1.4|       0.2|        4.4|       2.9|"setosa"|
|        1.5|       0.1|        4.9|       3.1|"setosa"|
|        1.5|       0.2|        5.4|       3.7|"setosa"|
|        1.6|       0.2|        4.8|       3.4|"setosa"|
|        1.4|       0.1|        4.8|       3.0|"setosa"|
|        1.1|       0.1|        4.3|       3.0|"setosa"|
|        1.2|       0.2|       

In [42]:
sqlContext.sql("SELECT * from iris where SepalWidth > 4.0").show()

+-----------+----------+-----------+----------+--------+
|PetalLength|PetalWidth|SepalLength|SepalWidth| Species|
+-----------+----------+-----------+----------+--------+
|        1.5|       0.4|        5.7|       4.4|"setosa"|
|        1.5|       0.1|        5.2|       4.1|"setosa"|
|        1.4|       0.2|        5.5|       4.2|"setosa"|
+-----------+----------+-----------+----------+--------+



### Descriptive statistics

In [43]:
from pyspark.mllib.stat import Statistics

# Compute column summary statistics.
summary = Statistics.colStats(iris_df)
print(summary.mean())
print(summary.variance())
print(summary.numNonzeros())

Py4JJavaError: An error occurred while calling o347.colStats.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 40.0 failed 1 times, most recent failure: Lost task 0.0 in stage 40.0 (TID 435, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 77, in _convert_to_vector
    raise TypeError("Cannot convert type %s into Vector" % type(l))
TypeError: Cannot convert type <class 'pyspark.sql.types.Row'> into Vector

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
	at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1136)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1113)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeColumnSummaryStatistics(RowMatrix.scala:405)
	at org.apache.spark.mllib.stat.Statistics$.colStats(Statistics.scala:46)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.colStats(PythonMLLibAPI.scala:837)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 77, in _convert_to_vector
    raise TypeError("Cannot convert type %s into Vector" % type(l))
TypeError: Cannot convert type <class 'pyspark.sql.types.Row'> into Vector

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	... 1 more
