# MapReduce using SPARK

In [2]:
#%pylab inline
import pandas as pd
import seaborn as sns
#pd.set_option('display.width', 500)
#pd.set_option('display.max_columns', 100)

In [3]:
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()

In [4]:
sc

In [5]:
sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).map(lambda x: x**2).sum()

### Create A RDD

In [7]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
# Print out the type of wordsRDD
print type(wordsRDD)

### Call `collect` on an RDD: Lazy Spark

Spark is lazy. Until you `collect`, nothing is actually run.

>Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program.

In [10]:
wordsRDD.collect()

### Operations on RDDs

From the Spark Programming Guide:

>RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).

### Word Examples

In [14]:
def makePlural(word):
    return word + 's'

print makePlural('cat')

Transform one RDD into another.

In [16]:
pluralRDD = wordsRDD.map(makePlural)
print pluralRDD.first()
print pluralRDD.take(2)


In [17]:
pluralRDD.take(1)

In [18]:
pluralRDD.collect()

### Key Value Pairs

In [20]:
wordPairs = wordsRDD.map(lambda w: (w, 1))
print wordPairs.collect()

### WORD COUNT!

This little exercise shows how to use mapreduce to calculate the counts of individual words in a list.

In [22]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
wordCountsCollected = (wordsRDD
                       .map(lambda w: (w, 1))
                       .reduceByKey(lambda x,y: x+y)
                       .collect())
print wordCountsCollected

![Tons of shuffling](https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/images/reduce_by.png)

In [24]:
print (wordsRDD
    .map(lambda w: (w, 1))
    .reduceByKey(lambda x,y: x+y)).toDebugString()

### Using Cache

In [26]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
print wordsRDD
wordsRDD.count()

Normally, every operation is run from the start. This may be inefficient in many cases. So when appropriate, we may want to cache the result the first time an operation is run on an RDD.

In [28]:
#this is rerun from the start
wordsRDD.count()

In [29]:
#default storage level (MEMORY_ONLY)
wordsRDD.cache()#nothing done this is still lazy

In [30]:
#parallelize is rerun and cached because we told it to cache
wordsRDD.count()

In [31]:
#this `sc.parallelize` is not rerun in this case
wordsRDD.count()

Where is this useful: it is when you have branching parts or loops, so that you dont do things again and again. Spark, being "lazy" will rerun the chain again. So `cache` or `persist` serves as a checkpoint, breaking the RDD chain or the *lineage*.

In [33]:
birdsList=['heron','owl']
animList=wordsList+birdsList
animaldict={}
for e in wordsList:
    animaldict[e]='mammal'
for e in birdsList:
    animaldict[e]='bird'
animaldict

In [34]:
animsrdd = sc.parallelize(animList, 4)
animsrdd.cache()
#below runs the whole chain but causes cache to be populated
mammalcount=animsrdd.filter(lambda w: animaldict[w]=='mammal').count()
#now only the filter is carried out
birdcount=animsrdd.filter(lambda w: animaldict[w]=='bird').count()
print mammalcount, birdcount

In [35]:
# import sparklect

### Exercises: Fun with MapReduce

Read http://spark.apache.org/docs/latest/programming-guide.html for some useful background and then try out the following exercises

The file `./sparklect/english.stop.txt` contains a list of English stopwords, while the file `./sparklect/shakes/juliuscaesar.txt` contains the entire text of Shakespeare's 'Julius Caesar'.

* Load all of the stopwords into a Python list
* Load the text of Julius Caesar into an RDD using the `sparkcontext.textfile()` method. Call it `juliusrdd`.

In [38]:
stop = spark.table("juliuscaesar_txt")
stopWords = stop.select("_c0").rdd.flatMap(lambda x: x).collect()
julius_caesar = spark.table("juliuscaesar_txt")
juliusrdd = julius_caesar.select("_c0").rdd.flatMap(lambda x: x).collect()
julius_parallel = sc.parallelize(juliusrdd,4)
type(stopWords)

How many words does Julius Caesar have? *Hint: use `flatMap()`*.

In [40]:
# your turn
print julius_parallel.flatMap(lambda x: x.split()).count()


Now print the first 20 words of Julius Caesar as a Python list.

In [42]:
# your turn
print julius_parallel.take(20)

Now print the first 20 words of Julius Caesar, **after removing all the stopwords**. *Hint: use `filter()`*.

In [44]:
# your turn
juliusClean=julius_parallel.flatMap(lambda x: x.split()).map(lambda x: x.lower()).filter(lambda x: x not in stopWords)
print juliusClean.take(20)

Now, use the word counting MapReduce code you've seen before. Count the number of times each word occurs and print the top 20 results as a list of tuples of the form `(word, count)`. *Hint: use `takeOrdered()` instead of `take()`*

In [46]:
# your turn
juliusTop20=juliusClean.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y).takeOrdered(20, key= lambda x: -x[1])
print juliusTop20

Plot a bar graph. For each of the top 20 words on the X axis, represent the count on the Y axis.

In [48]:
# your turn
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np 

words = zip(*juliusTop20)[0]
count = zip(*juliusTop20)[1]

xVal = np.arange(len(words))
print words
print count
print xVal

fig, ax = plt.subplots()
ax = sns.barplot(xVal,count)
ax = plt.xticks(xVal,words,rotation =90)
ax = plt.xlabel('Top 20 Words')
ax = plt.ylabel('Count')
display(fig)

### Using partitions for parallelization

In order to make your code more efficient, you want to use all of the available processing power, even on a single laptop. If your machine has multiple cores, you can tune the number of partitions to use all of them! From http://www.stat.berkeley.edu/scf/paciorek-spark-2014.html:

>You want each partition to be able to fit in the memory availalbe on a node, and if you have multi-core nodes, you want that as many partitions as there are cores be able to fit in memory.

>For load-balancing you'll want at least as many partitions as total computational cores in your cluster and probably rather more partitions. The Spark documentation suggests 2-4 partitions (which they also seem to call slices) per CPU. Often there are 100-10,000 partitions. Another rule of thumb is that tasks should take at least 100 ms. If less than that, you may want to repartition to have fewer partitions.

In [51]:
# import all the texts through spark tables 
 
asyoulikeit = spark.table("asyoulikeit")
ayli = asyoulikeit.select("_c0").rdd.flatMap(lambda x: x).collect()
aylirdd = sc.parallelize(ayli,4)

coriolanus = spark.table("coriolanus")
cor = coriolanus.select("_c0").rdd.flatMap(lambda x: x).collect()
corrdd = sc.parallelize(cor,4)

hamlet = spark.table("hamlet")
ham = hamlet.select("_c0").rdd.flatMap(lambda x: x).collect()
hamrdd = sc.parallelize(ham,4)

kinglear = spark.table("kinglear")
kl = kinglear.select("_c0").rdd.flatMap(lambda x: x).collect()
klrdd = sc.parallelize(kl,4)

kingrichard3 = spark.table("kingrichard3")
king = kingrichard3.select("_c0").rdd.flatMap(lambda x: x).collect()
kingrdd = sc.parallelize(king,4)

macbeth = spark.table("macbeth")
mac = macbeth.select("_c0").rdd.flatMap(lambda x: x).collect()
macrdd = sc.parallelize(mac,4)

measureformeasure = spark.table("measureformeasure")
mfm = measureformeasure.select("_c0").rdd.flatMap(lambda x: x).collect()
mfmrdd = sc.parallelize(mfm,4)

merchantofvenice = spark.table("merchantofvenice")
mov = merchantofvenice.select("_c0").rdd.flatMap(lambda x: x).collect()
movrdd = sc.parallelize(mov,4)

midsummersnightdream = spark.table("midsummersnightdream")
msnd = midsummersnightdream.select("_c0").rdd.flatMap(lambda x: x).collect()
msndrdd = sc.parallelize(msnd,4)

othello = spark.table("othello")
oth = othello.select("_c0").rdd.flatMap(lambda x: x).collect()
othrdd = sc.parallelize(oth,4)

romeojuliet = spark.table("romeojuliet")
rj = romeojuliet.select("_c0").rdd.flatMap(lambda x: x).collect()
rjrdd = sc.parallelize(rj,4)

tamingshrew = spark.table("tamingshrew")
ts = tamingshrew.select("_c0").rdd.flatMap(lambda x: x).collect()
tsrdd = sc.parallelize(ts,4)

titusandronicus = spark.table("titusandronicus")
tar = titusandronicus.select("_c0").rdd.flatMap(lambda x: x).collect()
tarrdd = sc.parallelize(tar,4)

twelfthnight = spark.table("twelfthnight") 
tn = twelfthnight.select("_c0").rdd.flatMap(lambda x: x).collect()
tnrdd = sc.parallelize(tn,4)


In [52]:
shakesrdd = julius_parallel.union(aylirdd).union(corrdd).union(hamrdd).union(klrdd).union(kingrdd).union(macrdd).union(mfmrdd).union(movrdd).union(msndrdd).union(othrdd).union(rjrdd).union(tsrdd).union(tarrdd).union(tnrdd)
shakesrdd.take(10)

Now calculate the top 20 words in all of the files that you just read.

In [54]:
# your turn
shakesTop20=shakesrdd.flatMap(lambda x: x.split()).map(lambda x: x.lower()).filter(lambda x: x not in stopWords).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y).takeOrdered(20, key= lambda x: -x[1])
shakesTop20

## Optional topic 1: DataFrames

Pandas and Spark dataframes can be easily converted to each other, making it easier to work with different data formats. This section shows some examples of each.

Convert Spark DataFrame to Pandas

`pandas_df = spark_df.toPandas()`

Create a Spark DataFrame from Pandas

`spark_df = context.createDataFrame(pandas_df)`

Must fit in memory.

![](https://ogirardot.files.wordpress.com/2015/05/rdd-vs-dataframe.png?w=640&h=360)

VERY IMPORTANT: DataFrames in Spark are like RDD in the sense that they’re an immutable data structure.

In [57]:
df=pd.read_csv("sparklect/01_heights_weights_genders.csv")
df.head()

Convert this pandas dataframe to a Spark dataframe

In [59]:
sparkdf = spark.sql("SELECT if(Gender=='Male',1,0) as is_male, Height, Weight FROM heights_weights_genders")
display(sparkdf.select("*"))

In [60]:
#from pyspark.sql import SQLContext
#sqlsc=SQLContext(sc)
#sparkdf = sqlsc.createDataFrame(df)
#sparkdf

In [61]:
sparkdf.show(5)

## Optional topic 2: Machine Learning using Spark

In [63]:
#from pyspark.mllib.classification import LogisticRegressionWithLBFGS
#from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

Now create a data set from the Spark dataframe

In [65]:
#data=sparkdf.map(lambda row: LabeledPoint(row.Gender=='Male',[row.Height, row.Weight]))
#data.take(5)

features = ["Height","Weight"]
assembler = VectorAssembler(
    inputCols=features,
    outputCol='features')
assembled_df = assembler.transform(sparkdf)
assembled_df.show(5)

In [66]:
#data2=sparkdf.map(lambda row: LabeledPoint(row[0]=='Male',row[1:]))
#data2.take(1)[0].label, data2.take(1)[0].features

Split the data set into training and test sets

In [68]:
train, test = assembled_df.randomSplit([0.7,0.3])
train.cache()
test.cache()

In [69]:
type(train)

Train the logistic regression model using MLIB

In [71]:
lr = LogisticRegression(maxIter=10).setLabelCol("is_male").setFeaturesCol("features")
model = lr.fit(train)

#model = LogisticRegression().setLabelCol("Gender").setFeaturesCol("features").train(train)

In [72]:
#model.weights

Run it on the test data

In [74]:
results = test.map(lambda lp: (lp.label, float(model.predict(lp.features))))
print results.take(10)
type(results)                       

Measure accuracy and other metrics

In [76]:
test_accuracy=results.filter(lambda (a,p): a==p).count()/float(results.count())
test_accuracy

In [77]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
metrics = BinaryClassificationMetrics(results)

In [78]:
print type(metrics)
metrics.areaUnderROC

In [79]:
type(model)

In [80]:
!rm -rf mylogistic.model

In [81]:
model.save(sc, "mylogistic.model")

The pipeline API automates a lot of this stuff, allowing us to work directly on dataframes.

Also see:

- http://jordicasanellas.weebly.com/data-science-blog/machine-learning-with-spark
- http://spark.apache.org/docs/latest/mllib-guide.html
- http://www.techpoweredmath.com/spark-dataframes-mllib-tutorial/
- http://spark.apache.org/docs/latest/api/python/
- http://spark.apache.org/docs/latest/programming-guide.html

`rdd.saveAsTextFile()` saves an RDD as a string.

## Optional Topic 3: Your Turn at Machine Learning! :)

For this exercise, we're going to use one of the datasets we've already worked with: the Boston House Prices dataset. We're going to try a couple of regression algorithms, but from the SparkML library this time.

Before you proceed, make sure to do an overview of the documentation: 
http://spark.apache.org/docs/latest/api/python/pyspark.ml.html

In [86]:
# All imports go here




First, we have to load the dataset, which resides as a CSV file in the folder for this exercise.

In [88]:
# Path: /sparklect/boston.csv


Inspect the data to make sure everything is loaded properly.

Now we'll need to create a train/test split.

In [92]:
# We'll first have to vectorize the features


As the next step, fit a Linear Regression model on the training set.

Now validate the model on the test set, and check the Root Mean Squared Error.

Let's compare Linear Regression with a more powerful algorithm - the Random Forest. As the Random Forest has several hyperparameters that can be tuned for maximum accuracy, we're going to need to use k-fold Cross Validation.

First, set up a grid for the hyperparameter search.

Now, with a Random Forest regressor using k-fold Cross Validation, and find the optimal combination of hyperparameters.

Finally, validate the model on the test set and check the Root Mean Squared Error again.

## Optional Topic 4: Model Your Capstone Dataset

If you have time, load up the cleaned dataset from one of your capstone projects. Do you remember which algorithm and the accompanying combination of hyperparameters did the best job? For practice, try and implement the same model in SparkML.

In [107]:
# Stop Spark
sc.stop()