# Spark
Spark is the future, and in many ways the present.  It lets us work with a lot of the concepts we've covered at scale, combining some of the best aspects of Hadoop with a smarter execution engine for problems that aren't really MapReduce. 

In this notebook we'll examine some of the primitives that Spark has for transforming data in a distributed fashion, as well as use MLLib to implement machine learning in Spark.

## PySpark
PySpark is the Python binding for Spark, so that's how we'll investigate Spark in this notebook.  To get it up and running, you'll have to go through some gymnastics like the next few cells.

In [1]:
# Find the location of your local spark installation
import findspark
import os
findspark.init()

##### A spark context is the main entry point for Spark functionality. It is the connection to the Spark cluster and can be used to creat RDDs, accumulators and broadcast variables on that cluster

In [2]:
# Fire up a Spark context
import pyspark
sc = pyspark.SparkContext()
sc

<pyspark.context.SparkContext at 0x104562950>

In [3]:
spark_home = os.environ.get('SPARK_HOME', None)

In [4]:
print spark_home

/usr/local/opt/apache-spark/libexec


In [5]:
import numpy as np
from sklearn.datasets import fetch_20newsgroups
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
from StringIO import StringIO
from datetime import datetime
from collections import namedtuple
from operator import add, itemgetter
from sklearn.linear_model import SGDClassifier
import csv
from pyspark import SparkConf, SparkContext
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint

### The Word Count Example
Now that we have PySpark up and running, let's try out the canonical word count example.

In [6]:
# Read in the course README file with a simple call to textFile
text_file = sc.textFile("../README.md")

In [7]:
type(text_file)

pyspark.rdd.RDD

##### RDD = Resilient Distributed Dataset. This is an immutable, partitioned collection of elements that can be operated upon in parallel
Let's do some word counting on this RDD.

##### We can apply a filter using an anonymous function

In [8]:
lines_not_empty = text_file.filter(lambda x: len(x) > 0)

In [9]:
lines_not_empty.count()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/norahajjar/DS_BOS_07_Students/NoraHajjar/README.md
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:58)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)


##### We can use the `take()` function to retrieve items from our RDDs

In [10]:
lines_not_empty.take(10)

Py4JJavaError: An error occurred while calling o17.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/norahajjar/DS_BOS_07_Students/NoraHajjar/README.md
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:64)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:46)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)


##### flatMap() 
- flattens the return lists into a single list

In [11]:
words = text_file.flatMap(lambda x: x.split())

In [12]:
words.take(10)

Py4JJavaError: An error occurred while calling o17.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/norahajjar/DS_BOS_07_Students/NoraHajjar/README.md
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:64)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:46)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)


##### The map function
- map returns a new RDD containing values created by applying the supplied lambda function to each value in the original RDD
- A map function utlizing the anonymous Python function lambda

In [13]:
words = words.map(lambda x: x.replace('|', '').replace('.', '').\
                  replace('-', '').replace(' ', '').replace('&', '').replace('#','').upper())

In [14]:
words.take(10)

Py4JJavaError: An error occurred while calling o17.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/norahajjar/DS_BOS_07_Students/NoraHajjar/README.md
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:64)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:46)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)


##### A word counting mapper function

In [15]:
word_counts = words.map(lambda x: (x, 1))

In [16]:
word_counts.take(10)

Py4JJavaError: An error occurred while calling o17.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/norahajjar/DS_BOS_07_Students/NoraHajjar/README.md
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:64)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:46)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)


##### Now do a reduction
##### The reduceByKey function
- input must be tuples of the form (key, value)
- creates a new RDD containing a tuple for each unique value of the key
- the value in the output depends upon the supplied lambda function

In [17]:
word_counts = word_counts.reduceByKey(lambda a, b: a + b)

Py4JJavaError: An error occurred while calling o17.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/norahajjar/DS_BOS_07_Students/NoraHajjar/README.md
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:64)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:46)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)


In [18]:
word_counts.take(10)

Py4JJavaError: An error occurred while calling o17.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/norahajjar/DS_BOS_07_Students/NoraHajjar/README.md
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:64)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:46)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)


##### Now do another map and swap the key and the value in terms of their positions
##### Which will make the value the key

In [None]:
word_counts = word_counts.map(lambda x: (x[1], x[0]))

In [None]:
word_counts = word_counts.sortByKey(False)

In [None]:
word_counts.take(10)

### A Second Word Count Example

In [10]:
lines = sc.parallelize(['Its fun to have fun,','but you have to know how.']) 

In [11]:
rd1 = lines.map(lambda x: x.replace('|', '').\
                replace('.', '').replace('-', '').replace('&', '').replace('#','').upper())

In [12]:
rd1.take(10)

['ITS FUN TO HAVE FUN,', 'BUT YOU HAVE TO KNOW HOW']

In [13]:
rd2 = rd1.flatMap(lambda x: x.split())

In [14]:
rd2.take(20)

['ITS', 'FUN', 'TO', 'HAVE', 'FUN,', 'BUT', 'YOU', 'HAVE', 'TO', 'KNOW', 'HOW']

In [15]:
#create the tuples required for the reduce step, 1 is the value and this will be counted by the reduce lamda function
rd3 = rd2.map(lambda x: (x, 1))

In [16]:
rd4 = rd3.reduceByKey(lambda a, b: a + b)

In [17]:
rd4.take(20)

[('FUN', 1),
 ('TO', 2),
 ('HOW', 1),
 ('KNOW', 1),
 ('ITS', 1),
 ('FUN,', 1),
 ('YOU', 1),
 ('BUT', 1),
 ('HAVE', 2)]

In [18]:
#use another map function to swap the key, value positionally
rd5 = rd4.map(lambda x: (x[1], x[0]))

In [19]:
rd5.take(20)

[(1, 'FUN'),
 (2, 'TO'),
 (1, 'HOW'),
 (1, 'KNOW'),
 (1, 'ITS'),
 (1, 'FUN,'),
 (1, 'YOU'),
 (1, 'BUT'),
 (2, 'HAVE')]

##### The function sortByKey does exactly what is says, and sorts the tuples using the key value

In [20]:
rd6 = rd5.sortByKey(ascending=False)

In [21]:
rd6.take(20)

[(2, 'TO'),
 (2, 'HAVE'),
 (1, 'FUN'),
 (1, 'HOW'),
 (1, 'KNOW'),
 (1, 'ITS'),
 (1, 'FUN,'),
 (1, 'YOU'),
 (1, 'BUT')]

### Working a slightly more complicated example

##### Let's use the 20 news groups dataset

In [22]:
ngd = fetch_20newsgroups(shuffle = True, remove = ("headers", "footers", "quotes"), random_state = 6)

##### Create an RDD

In [23]:
mrd1 = sc.parallelize(ngd.data) 

In [24]:
print np.shape(ngd.data)

(11314,)


In [25]:
mrd1.take(1)

[u'\n\n\n\n\nTheir should be no difference in the drive itself between IBM-PC and Mac.\nThe two main differences are the formatting of the disk itself (but with\nthe correct software each can read the others) and maybe the cable\n(depends on your SCSI board on IBM-PC).\n\nIf you get some Mac softawre to allow mounting of ANY IBM-formatted disk\nand the correct cable you should br able to mount and read your IBM-PC\nsyquest.\n\ngood luck,\n\n--Paul\n\n-- \n  +-------------------------------------------------------------------------+\n  | Paul Hardwick  |  Technical Consulting  |  InterNet: hardwick@panix.com |\n  | P.O. Box 1482  |  for MVS (SP/XA/ESA)   |  Voice:    (212) 535-0998     |\n  | NY, NY 10274   |  and 3rd party addons  |  Fax:      (212) Pending      |\n  +-------------------------------------------------------------------------+']

##### `glom()` allows you to treat a partition as an array rather than as a single row at a time

In [26]:
test = mrd1.glom()

In [27]:
#test.take(1)

#### Chaining commands
##### The aim here is to get a list of sentences
1. Use `glom()` to convert the partitions to an array of documents
2. Use `map()` to join the array of documents into 1 massive string with documents separated by a space
3. Use `flatMap()` to split the massive string by sentence into an array of sentences
4. Use `map()` to replace all newlines with '' and make everything lowercase
5. Use `map()` to remove all occurrences of "the"

In [28]:
mrd2 = mrd1.glom().map(lambda x: " ".join(x)).flatMap(lambda x: x.split('.')).map(lambda x: x.replace('\n', '').\
    lower()).map(lambda x: x.replace(' the ', ' '))

In [29]:
mrd2.take(2)

[u'their should be no difference in drive itself between ibm-pc and mac',
 u'the two main differences are formatting of disk itself (but withthe correct software each can read others) and maybe cable(depends on your scsi board on ibm-pc)']

#### Exercise: Using the sentences write a mapping function to find all the bigrams
- Use a `map()` that splits the sentences (x) into a list of tokens via the `split()` function
- Use a `flatMap()` that loops through each list and returns something like `((x[i], x[i+1]), 1)` for all the tokens in `x`

In [30]:
#Check out the first 10 bigrams with take()


##### Now let's count up the number of occurrences for each bigram
- Use a `reduceByKey()` to sum up the occurrences
- Use a `map()` to exchange the resulting keys with values
- Use a `sortByKey()` to sort the results in descending order

##### Use a `take()` to print out the top 10 bigrams!!

#### Spark supports the efficient parallel application of map and reduce operations by dividing data up into multiple partitions.
- Each partition is replicated across multiple workers running on different nodes in a cluster so that failure of a single worker should not cause the RDD to become unavailable.
- Many operations including map and flatMap can be applied independently to each partition, running as concurrent jobs based on the number of available cores. 
- When processing reduceByKey, Spark will create a number of output partitions based on the *default* paralellism based on the numbers of nodes and cores available to Spark. 
- Data is effectively reshuffled so that input data from different input partitions with the same key value is passed to the same output partition and combined there using the specified reduce function. 
- sortByKey is another operation which transforms N input partitions to M output partitions.
- The number of partitions generated by the reduce stage can be controlled by supplying the desired number of partitions as an extra parameter to reduceByKey

In [31]:
sc.defaultParallelism

4

In [32]:
new1 = sc.parallelize(ngd.data).glom().map(lambda x: " ".join(x)).flatMap(lambda x: x.split('.')).\
    map(lambda x: x.replace('\n', '').lower()).map(lambda x: x.replace('the', '')).map(lambda x: x.split()).\
    flatMap(lambda x: [((x[i], x[i+1]), 1) for i in range(0, len(x)-1)]).\
    reduceByKey(lambda a, b: a + b, numPartitions = 12).\
    map(lambda x: (x[1], x[0])).sortByKey(False)

In [33]:
def countPartitions(id, iterator): 
    c = 0 
    for _ in iterator: 
        c += 1 
        yield (id, c) 

In [34]:
new1.mapPartitionsWithIndex(countPartitions).collectAsMap()

{0: 54600, 1: 29046, 2: 90094, 11: 706646}

In [35]:
new1.take(10)

[(2954, (u'to', u'be')),
 (2895, (u'it', u'is')),
 (2508, (u'is', u'a')),
 (2178, (u'i', u'have')),
 (2170, (u'if', u'you')),
 (1854, (u'this', u'is')),
 (1591, (u'of', u'a')),
 (1531, (u'in', u'a')),
 (1529, (u'i', u'am')),
 (1496, (u'is', u'not'))]

### A Numerical Example
Let's use Spark to find all the primes in any range we specify.  Here's a function that determines if a number is prime:

In [36]:
def isprime(n):
    """
    check if integer n is a prime
    """
    
    # make sure n is a positive integer
    n = abs(int(n))
    
    # 0 and 1 are not primes
    if n < 2:
        return False
    
    # 2 is the only even prime number
    if n == 2:
        return True
    
    # all other even numbers are not primes
    if not n & 1:
        return False
    
    # range starts with 3 and only needs to go up the square root of n
    # for all odd numbers
    for x in range(3, int(n**0.5)+1, 2):
        if n % x == 0:
            return False
    return True

In [37]:
# Create an RDD of numbers from 0 to 1,000,000
nums = sc.parallelize(xrange(1000000))

nums.take(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

The `filter()` function allows us to supply a function that returns a boolean and filter an `RDD` by those entries that return True for that function.  Here's how we would use it to return prime numbers less than 1 million.

In [38]:
primes = nums.filter(isprime)

In [39]:
primes.take(10)

[2, 3, 5, 7, 11, 13, 17, 19, 23, 29]

In [40]:
# Compute the number of primes in the RDD
print primes.count()

78498


### Airline Delay Example

[Modified from "Getting Started with Spark (in Python) by Benjamin  Bengfort](https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python)

##### Setup some definitions and declarations

In [41]:
fields   = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep',
            'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')
Flight   = namedtuple('Flight', fields)
DATE_FMT = "%Y-%m-%d"
TIME_FMT = "%H%M"

##### Ref: the use of named tuples
[Named tuples](http://pymotw.com/2/collections/namedtuple.html)

##### Function to parse a row of the database and fill a row of a named tuple

In [42]:
def parse(row):
    '''Parses a row and returns a named tuple'''

    row[0]  = datetime.strptime(row[0], DATE_FMT).date()
    row[5]  = datetime.strptime(row[5], TIME_FMT).time()
    row[6]  = float(row[6])
    row[7]  = datetime.strptime(row[7], TIME_FMT).time()
    row[8]  = float(row[8])
    row[9]  = float(row[9])
    row[10] = float(row[10])
    
    #function returns a completed named tuple constructed from a row
    return Flight(*row[:11])

##### Uses line as if it were a file (StringIO)
##### csv.reader breaks it into lines

In [43]:
def split(line):
    '''Operator function for splitting a line with csv module'''
    
    reader = csv.reader(StringIO(line))
    return reader.next()

##### The use of the broadcast function

In [44]:
#this is an airlines lookup dictionary
airlines = dict(sc.textFile("../Data/ontime/airlines.csv").map(split).collect())

print type(airlines)

for i, key in enumerate(airlines.keys()):
    print key, airlines[key]
    if i == 5:
        break

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/norahajjar/DS_BOS_07_Students/NoraHajjar/Data/ontime/airlines.csv
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:58)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)


##### The airline lookup converts airline index to airline string

In [None]:
#broadcast the dictionary to the cluster
airline_lookup = sc.broadcast(airlines)

type(airline_lookup)

##### Eyeball the data in regular Python

In [None]:
df = pd.read_csv("../Data/ontime/flights.csv")
df.columns = fields
df.head(2)

In [None]:
#load the flights data into an RDD
#transfer a row of the data into the named tuple, and then split it on the fields of the tuple
flights = sc.textFile("../Data/ontime/flights.csv").map(split).map(parse)

print type(flights)

flights.take(1)

##### Here's the named tuple in use

In [None]:
tt = flights.take(1)[0]
print tt.date
print tt.airline
print tt.flightnum
print tt.origin
print tt.dest
print tt.dep_delay
print tt.arv_delay

##### and the airline lookup

In [None]:
airline_lookup.value['19805']

##### map the delays into a key value pair

In [None]:
#map the total delay to the airline, joined using the broadcast value
delays = flights.map(lambda f: (airline_lookup.value[f.airline], add(f.dep_delay, f.arv_delay)))

In [None]:
print type(delays)
delays.take(1)

##### Sum up the delays

In [None]:
#reduce to the total delay for the month
delays = delays.reduceByKey(add).collect()

print type(delays)
print delays[0]

In [None]:
#and sort the list
delays = sorted(delays, key=itemgetter(1))

In [None]:
print delays[0]

##### This is a list of airlines by increasing delay times

In [None]:
print "{:43s} {:15s}".format("Airline", "Delay (mins)")
for d in delays:
    print "{:35s} {:15.0f}".format(d[0], d[1])

##### and a plot of same

In [None]:
fig = plt.figure(figsize = (7,7))
ax = plt.subplot(111)


airlines = [d[0] for d in delays]
minutes = [d[1] for d in delays]
index = list(xrange(len(airlines)))

bars = ax.barh(index, minutes)

for idx, air, mins in zip(index, airlines, minutes):
    if mins > 0:
        bars[idx].set_color("red")
        ax.annotate("{:0.0f} min".format(mins), xy=(mins+1, idx+0.5), va='center')
    else:
        bars[idx].set_color("blue")
        ax.annotate("{:0.0f} min".format(mins), xy=(mins+1, idx+0.5), va='center')
        
ax.set_yticks([idx + 0.5 for idx in index])
ax.set_yticklabels(airlines)
xt = ax.get_xticks()
ax.set_xticklabels([' ']*len(xt))
ax.grid(axis='x', color='white', linestyle='-')
ax.set_title("Total Minutes Delayed per Airline")

## Logistic Regression with Spark MLlib
MLlib is how Spark does Machine Learning.  It has a variety of (what should be!) familiar algorithms that are optimized to work in a distributed fashion!

### Python First

In [None]:
dat = pd.read_csv("../Data/sample_svm_data.txt", delimiter = ' ', header = None)

In [None]:
dat.head(1)

In [None]:
predictors = dat.columns.values[1:]
print predictors

In [None]:
X = dat[predictors]
y = dat[0]

In [None]:
clf_pySGD = SGDClassifier(loss='log', alpha = 0.01, n_iter = 10000)
clf_pySGD.fit(X, y)
yhat = clf_pySGD.predict(X)
print clf_pySGD.score(X, y)

In [None]:
cm = pd.crosstab(y, yhat, rownames=["Actual"], colnames=["Predicted"])
cm

### Spark

##### LabeledPoint is a built in Pyspark class (label, features)

In [None]:
def parse_point(line):
    values = [float(x) for x in line.split(' ')]
    return LabeledPoint(values[0], values[1:])

In [None]:
data = sc.textFile("../Data/sample_svm_data.txt")

In [None]:
data.take(1)

##### Map into key value pairs

In [None]:
parsed_data = data.map(parse_point)

In [None]:
print type(parsed_data)
parsed_data.first()

###### Use the Spark logisitic regression model

In [None]:
spark_clf = LogisticRegressionWithSGD.train(parsed_data)

print type(spark_clf)

##### p is a labelled point

In [None]:
labels_and_predictions = parsed_data.map(lambda p: (p.label, spark_clf.predict(p.features)))

In [None]:
print type(labels_and_predictions)
print labels_and_predictions.take(10)

In [None]:
print parsed_data.count()

In [None]:
yyhat = labels_and_predictions.reduceByKey(lambda x, y: x + y).collect()
landp = labels_and_predictions.map(lambda x : (x[1], x[0]))
yyhat_1 = landp.reduceByKey(lambda x, y: x + y).collect()


print yyhat
print yyhat_1
print labels_and_predictions.filter(lambda (x, y): x != y).count()
print labels_and_predictions.filter(lambda (x, y): x == y).count()

In [None]:
results = list(labels_and_predictions.take(1000))
y = np.array([x[0] for x in results])
yhat = np.array([x[1] for x in results])

In [None]:
cm = pd.crosstab(y, yhat, rownames=["Actual"], colnames=["Predicted"])
cm

In [None]:
training_error = labels_and_predictions.filter(lambda (v, p): v != p).count()/float(parsed_data.count())

In [None]:
print training_error

### On Your Own
Go back to any of your favorite classification datasets that we've dealt with and see if you can implement the classifier with Spark as we just did above.