In [1]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
import pyspark

number_cores = 8
memory_gb = 24
conf = (
    pyspark.SparkConf()
        .setMaster('local[{}]'.format(number_cores))
        .set('spark.driver.memory', '{}g'.format(memory_gb))
)
sc = pyspark.SparkContext(conf=conf)

In [3]:
rdd = sc.parallelize([1, 4, 9])
sum_squares = (
    rdd.map(lambda elem: float(elem)**2)
        .reduce(lambda elem1, elem2: elem1 + elem2)
)

In [4]:
number_cores = int(os.environ["NUM_CPUS"])
memory_gb = int(os.environ["AVAILABLE_MEMORY_MB"]) // 1024

In [5]:
#import findspark
#findspark.init()
#sc = pyspark.SparkContext()

In [6]:
sc

In [7]:
sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).map(lambda x: x**2).sum()

385

In [8]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
# Print out the type of wordsRDD
print (type(wordsRDD))

<class 'pyspark.rdd.RDD'>


In [9]:
wordsRDD.collect()

['cat', 'elephant', 'rat', 'rat', 'cat']

In [10]:
def makePlural(word):
    return word + 's'

print (makePlural('cat'))

cats


In [11]:
pluralRDD = wordsRDD.map(makePlural)
print (pluralRDD.first())
print (pluralRDD.take(2))

cats
['cats', 'elephants']


In [12]:
pluralRDD.take(1)

['cats']

In [13]:
pluralRDD.collect()

['cats', 'elephants', 'rats', 'rats', 'cats']

In [14]:
wordPairs = wordsRDD.map(lambda w: (w, 1))
print (wordPairs.collect())

[('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)]


In [15]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
wordCountsCollected = (wordsRDD
                       .map(lambda w: (w, 1))
                       .reduceByKey(lambda x,y: x+y)
                       .collect())
print (wordCountsCollected)

[('cat', 2), ('elephant', 1), ('rat', 2)]


In [16]:
print ((wordsRDD.map(lambda w: (w, 1)).reduceByKey(lambda x,y: x+y)).toDebugString())

b'(4) PythonRDD[21] at RDD at PythonRDD.scala:48 []\n |  MapPartitionsRDD[20] at mapPartitions at PythonRDD.scala:436 []\n |  ShuffledRDD[19] at partitionBy at NativeMethodAccessorImpl.java:0 []\n +-(4) PairwiseRDD[18] at reduceByKey at <ipython-input-16-747e2b82d253>:1 []\n    |  PythonRDD[17] at reduceByKey at <ipython-input-16-747e2b82d253>:1 []\n    |  ParallelCollectionRDD[11] at parallelize at PythonRDD.scala:489 []'


In [17]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
print (wordsRDD)
wordsRDD.count()

ParallelCollectionRDD[22] at parallelize at PythonRDD.scala:489


5

In [18]:
#this is rerun from the start
wordsRDD.count()

5

In [19]:
#default storage level (MEMORY_ONLY)
wordsRDD.cache()#nothing done this is still lazy

ParallelCollectionRDD[22] at parallelize at PythonRDD.scala:489

In [20]:
#parallelize is rerun and cached because we told it to cache
wordsRDD.count()

5

In [21]:
#this `sc.parallelize` is not rerun in this case
wordsRDD.count()

5

In [22]:
birdsList=['heron','owl']
animList=wordsList+birdsList
animaldict={}
for e in wordsList:
    animaldict[e]='mammal'
for e in birdsList:
    animaldict[e]='bird'
animaldict

{'cat': 'mammal',
 'elephant': 'mammal',
 'heron': 'bird',
 'owl': 'bird',
 'rat': 'mammal'}

In [23]:
animsrdd = sc.parallelize(animList, 4)
animsrdd.cache()
#below runs the whole chain but causes cache to be populated
mammalcount=animsrdd.filter(lambda w: animaldict[w]=='mammal').count()
#now only the filter is carried out
birdcount=animsrdd.filter(lambda w: animaldict[w]=='bird').count()
print (mammalcount, birdcount)


5 2


Exercises: Fun with MapReduce
Read http://spark.apache.org/docs/latest/programming-guide.html for some useful background and then try out the following exercises

The file ./sparklect/english.stop.txt contains a list of English stopwords, while the file ./sparklect/shakes/juliuscaesar.txt contains the entire text of Shakespeare's 'Julius Caesar'.

Load all of the stopwords into a Python list
Load the text of Julius Caesar into an RDD using the sparkcontext.textfile() method. Call it juliusrdd.

In [24]:
stopwords = sc.textFile('english.stop.txt').collect()
juliusrdd = sc.textFile('juliuscaesar.txt')

How many words does Julius Caesar have? Hint: use flatMap().

In [25]:
# your turn
julius = juliusrdd.flatMap(lambda x: x.split())
print(julius.count())

21245


Now print the first 20 words of Julius Caesar as a Python list.

In [26]:
print(julius.take(20))

['1599', 'THE', 'TRAGEDY', 'OF', 'JULIUS', 'CAESAR', 'by', 'William', 'Shakespeare', 'Dramatis', 'Personae', 'JULIUS', 'CAESAR,', 'Roman', 'statesman', 'and', 'general', 'OCTAVIUS,', 'Triumvir', 'after']


In [27]:
def Func(lines):
      lines = lines.lower()
      return lines
rdd1 = julius.map(Func)

print(rdd1.take(20))

['1599', 'the', 'tragedy', 'of', 'julius', 'caesar', 'by', 'william', 'shakespeare', 'dramatis', 'personae', 'julius', 'caesar,', 'roman', 'statesman', 'and', 'general', 'octavius,', 'triumvir', 'after']


Now print the first 20 words of Julius Caesar, after removing all the stopwords. Hint: use filter().

In [28]:
juliusrdd_filtered = rdd1.filter(lambda x: x not in stopwords)
print(juliusrdd_filtered.take(20))

['1599', 'tragedy', 'julius', 'caesar', 'william', 'shakespeare', 'dramatis', 'personae', 'julius', 'caesar,', 'roman', 'statesman', 'general', 'octavius,', 'triumvir', "caesar's", 'death,', 'augustus', 'caesar,', 'emperor']


Now, use the word counting MapReduce code you've seen before. Count the number of times each word occurs and print the top 20 results as a list of tuples of the form (word, count). Hint: use takeOrdered() instead of take()

In [29]:
wordCountsCollected = (juliusrdd_filtered
                       .map(lambda w: (w, 1))
                       .reduceByKey(lambda x,y: x+y))
print (wordCountsCollected.takeOrdered(20, key= lambda x: -x[1]))

[('brutus.', 211), ('cassius.', 152), ('thou', 107), ('caesar', 96), ('brutus', 75), ('antony.', 73), ('citizen.', 68), ('good', 66), ('caesar.', 62), ('thy', 54), ('brutus,', 54), ('caesar,', 46), ('"', 44), ('casca.', 44), ('you,', 41), ('men', 41), ("caesar's", 40), ('enter', 40), ('lucius.', 38), ('cassius,', 38)]


Plot a bar graph. For each of the top 20 words on the X axis, represent the count on the Y axis.

In [30]:
import pandas as pd

top20_df = pd.DataFrame(wordCountsCollected.takeOrdered(20, key= lambda x: -x[1]))
print(top20_df)

           0    1
0    brutus.  211
1   cassius.  152
2       thou  107
3     caesar   96
4     brutus   75
5    antony.   73
6   citizen.   68
7       good   66
8    caesar.   62
9        thy   54
10   brutus,   54
11   caesar,   46
12         "   44
13    casca.   44
14      you,   41
15       men   41
16  caesar's   40
17     enter   40
18   lucius.   38
19  cassius,   38


In [31]:
import matplotlib.pyplot as plt

_ = plt.bar(top20_df[0], top20_df[1])
_ = plt.xticks(rotation=90)
_ = plt.xlabel('Word')
_ = plt.ylabel('Count')
_ = plt.title('Top 20 Words by Count')
plt.show()

<matplotlib.figure.Figure at 0x7ff4b74cd550>

Using partitions for parallelization
In order to make your code more efficient, you want to use all of the available processing power, even on a single laptop. If your machine has multiple cores, you can tune the number of partitions to use all of them! From http://www.stat.berkeley.edu/scf/paciorek-spark-2014.html:

You want each partition to be able to fit in the memory availalbe on a node, and if you have multi-core nodes, you want that as many partitions as there are cores be able to fit in memory.

For load-balancing you'll want at least as many partitions as total computational cores in your cluster and probably rather more partitions. The Spark documentation suggests 2-4 partitions (which they also seem to call slices) per CPU. Often there are 100-10,000 partitions. Another rule of thumb is that tasks should take at least 100 ms. If less than that, you may want to repartition to have fewer partitions.


In [32]:
shakesrdd=sc.textFile("./shakes/*.txt", minPartitions=4)

In [33]:
shakesrdd.take(10)

['1606',
 'THE TRAGEDY OF MACBETH',
 '',
 '',
 'by William Shakespeare',
 '',
 '',
 '',
 'Dramatis Personae',
 '']

Now calculate the top 20 words in all of the files that you just read.

In [34]:
shakes = shakesrdd.flatMap(lambda x: x.split())

def Func(lines):
      lines = lines.lower()
      return lines
rdd2 = shakes.map(Func)

wordCountsCollected2 = (rdd2
                       .map(lambda w: (w, 1))
                       .reduceByKey(lambda x,y: x+y))
print (wordCountsCollected2.takeOrdered(20, key= lambda x: -x[1]))

[('the', 11364), ('and', 10490), ('i', 8131), ('to', 7720), ('of', 6523), ('a', 5865), ('my', 5019), ('you', 4845), ('in', 4371), ('that', 4269), ('is', 3657), ('with', 3253), ('not', 3196), ('for', 3085), ('your', 2898), ('be', 2689), ('it', 2599), ('his', 2552), ('he', 2484), ('this', 2439)]


Optional topic 1: DataFrames
Pandas and Spark dataframes can be easily converted to each other, making it easier to work with different data formats. This section shows some examples of each.

In [35]:
df=pd.read_csv("01_heights_weights_genders.csv")
df.head()

Unnamed: 0,Gender,Height,Weight
0,Male,73.847017,241.893563
1,Male,68.781904,162.310473
2,Male,74.110105,212.740856
3,Male,71.730978,220.04247
4,Male,69.881796,206.349801


In [36]:
#Convert this pandas dataframe to a Spark dataframe
from pyspark.sql import SQLContext
sqlsc=SQLContext(sc)
sparkdf = sqlsc.createDataFrame(df)
sparkdf
sparkdf.show(5)


+------+-----------------+----------------+
|Gender|           Height|          Weight|
+------+-----------------+----------------+
|  Male|  73.847017017515|241.893563180437|
|  Male|68.78190404589029|  162.3104725213|
|  Male|74.11010539178491|  212.7408555565|
|  Male| 71.7309784033377|220.042470303077|
|  Male| 69.8817958611153|206.349800623871|
+------+-----------------+----------------+
only showing top 5 rows



In [37]:
type(sparkdf.Gender)

pyspark.sql.column.Column

In [38]:
 temp = sparkdf.rdd.map(lambda r: r.Gender)
print (type(temp))
temp.take(10)

<class 'pyspark.rdd.PipelinedRDD'>


['Male',
 'Male',
 'Male',
 'Male',
 'Male',
 'Male',
 'Male',
 'Male',
 'Male',
 'Male']

Optional topic 2: Machine Learning using Spark
While we don't go in-depth into machine learning using spark here, this sample code will help you get started.

In [39]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.regression import LabeledPoint

In [40]:
#Now create a data set from the Spark dataframe

In [41]:
data=sparkdf.rdd.map(lambda row: LabeledPoint(row.Gender=='Male',[row.Height, row.Weight]))
data.take(5)

[LabeledPoint(1.0, [73.847017017515,241.893563180437]),
 LabeledPoint(1.0, [68.78190404589029,162.3104725213]),
 LabeledPoint(1.0, [74.11010539178491,212.7408555565]),
 LabeledPoint(1.0, [71.7309784033377,220.042470303077]),
 LabeledPoint(1.0, [69.8817958611153,206.349800623871])]

In [42]:
data2=sparkdf.rdd.map(lambda row: LabeledPoint(row[0]=='Male',row[1:]))
data2.take(1)[0].label, data2.take(1)[0].features

(1.0, DenseVector([73.847, 241.8936]))

In [43]:
#Split the data set into training and test sets
train, test = data.randomSplit([0.7,0.3])
train.cache()
test.cache()

PythonRDD[67] at RDD at PythonRDD.scala:48

In [44]:
type(train)

pyspark.rdd.PipelinedRDD

In [45]:
#Train the logistic regression model using MLIB

In [46]:
model = LogisticRegressionWithLBFGS.train(train)

In [47]:
model.weights

DenseVector([-0.4758, 0.1958])

In [48]:
#Run it on the test data
results = test.map(lambda lp: (lp.label, float(model.predict(lp.features))))
print (results.take(10))
type(results)      

[(1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (1.0, 0.0), (1.0, 1.0)]


pyspark.rdd.PipelinedRDD

In [49]:
#Measure accuracy and other metrics
test_accuracy=results.filter(lambda (a,p): a==p).count()/float(results.count())
test_accuracy

SyntaxError: invalid syntax (<ipython-input-49-7cd8d6e68298>, line 2)

In [50]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
metrics = BinaryClassificationMetrics(results)

In [51]:
print (type(metrics))
metrics.areaUnderROC

<class 'pyspark.mllib.evaluation.BinaryClassificationMetrics'>


0.920761525660372

In [52]:
type(model)

pyspark.mllib.classification.LogisticRegressionModel

In [53]:
!rm -rf mylogistic.model

In [54]:
model.save(sc, "mylogistic.model")

In [55]:
sc.stop()