## Word Count
Counting the number of occurances of words in a text is one of the most popular first exercises when learning Map-Reduce Programming. It is the equivalent to `Hello World!` in regular programming.

We will do it two way, a simpler way where sorting is done after the RDD is collected, and a more sparky way, where the sorting is also done using an RDD.

In [6]:
# First, check that the text file is where we expect it to be
%ls ../Data/Moby-Dick.txt

../Data/Moby-Dick.txt


### Read the text file into an RDD
Note that, as execution is Lazy, this does not necessarily mean that actual reading of the file content has occured.

In [13]:
%%time
text_file = sc.textFile(u'../Data/Moby-Dick.txt')
type(text_file)

CPU times: user 1.4 ms, sys: 1.22 ms, total: 2.61 ms
Wall time: 66.3 ms


### Count the words
Next, we count the number of words that occured in the text. Again, this is only setting the plan.

In [24]:
%%time
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
type(counts)

CPU times: user 6.7 ms, sys: 2 ms, total: 8.69 ms
Wall time: 17.9 ms


### Have a look a the execution plan
Note that the earliest node in the dependency graph is the file `../../Data/Moby-Dick.txt`. It is possible that that even the first element in that file has not yet been read!

In [25]:
print counts.toDebugString()

(2) PythonRDD[50] at RDD at PythonRDD.scala:43 []
 |  MapPartitionsRDD[49] at mapPartitions at PythonRDD.scala:374 []
 |  ShuffledRDD[48] at partitionBy at NativeMethodAccessorImpl.java:-2 []
 +-(2) PairwiseRDD[47] at reduceByKey at <timed exec>:1 []
    |  PythonRDD[46] at reduceByKey at <timed exec>:1 []
    |  ../Data/Moby-Dick.txt MapPartitionsRDD[11] at textFile at NativeMethodAccessorImpl.java:-2 []
    |  ../Data/Moby-Dick.txt HadoopRDD[10] at textFile at NativeMethodAccessorImpl.java:-2 []


### Count!
Finally we count the number of times each word has occured.
Note that this cell, finally, the Lazy execution model finally performs some actual work.

In [27]:
%%time
Count=counts.count() #Count stands for the number of distinct words
Sum=counts.map(lambda (w,i): i).reduce(lambda x,y:x+y) # Count the number of total words
print 'count=%f, sum=%f, mean=%f'%(Count,Sum,float(Sum)/Count)

count=33782.000000, sum=219480.000000, mean=6.496951
CPU times: user 15.8 ms, sys: 4.68 ms, total: 20.5 ms
Wall time: 205 ms


### Collect the `Sum` RDD into the driver node
This also takes significant work.

In [28]:
%%time
C=counts.collect()
type(C)

CPU times: user 39.4 ms, sys: 7.76 ms, total: 47.2 ms
Wall time: 142 ms


### Sort 
Now that we have collected the Sum RDD into the driver node, we no longer rely on Spark. The following two cells
are simple python commands.

In [33]:
C.sort(key=lambda x:x[1])
print 'most common words',C[-10:]
print 'Least common words',C[:10]

most common words [(u'I', 1724), (u'his', 2415), (u'that', 2693), (u'in', 3878), (u'', 4347), (u'to', 4510), (u'a', 4533), (u'and', 5951), (u'of', 6587), (u'the', 13766)]
Least common words [(u'funereal', 1), (u'unscientific', 1), (u'lime-stone,', 1), (u'shouted,', 1), (u'pitch-pot,', 1), (u'cod-liver', 1), (u'prices', 1), (u'prefix', 1), (u'boots."', 1), (u'slew.', 1)]


### Compute the mean number of occurances per word.

In [34]:
Count2=len(C)
Sum2=sum([i for w,i in C])
print 'count2=%f, sum2=%f, mean2=%f'%(Count2,Sum2,float(Sum2)/Count2)


count2=33782.000000, sum2=219480.000000, mean2=6.496951


## Word Count in Pure Spark
We now show how to perform word count, including sorting, using RDDs, returning to the driver node just the top 10 words.

In [35]:
%%time
RDD=text_file.flatMap(lambda x: x.split(' '))\
    .filter(lambda x: x!='')\
    .map(lambda word: (word,1))

CPU times: user 50 µs, sys: 7 µs, total: 57 µs
Wall time: 57.9 µs


In [36]:
%%time
RDD1=RDD.reduceByKey(lambda x,y:x+y)

CPU times: user 7.39 ms, sys: 2.19 ms, total: 9.58 ms
Wall time: 15 ms


In [37]:
%%time
RDD2=RDD1.map(lambda (c,v):(v,c))

CPU times: user 17 µs, sys: 2 µs, total: 19 µs
Wall time: 21 µs


In [38]:
%%time
RDD3=RDD2.sortByKey(False)

CPU times: user 16.4 ms, sys: 4.31 ms, total: 20.7 ms
Wall time: 420 ms


In [39]:
print RDD3.toDebugString()

(2) PythonRDD[65] at RDD at PythonRDD.scala:43 []
 |  MapPartitionsRDD[64] at mapPartitions at PythonRDD.scala:374 []
 |  ShuffledRDD[63] at partitionBy at NativeMethodAccessorImpl.java:-2 []
 +-(2) PairwiseRDD[62] at sortByKey at <timed exec>:1 []
    |  PythonRDD[61] at sortByKey at <timed exec>:1 []
    |  MapPartitionsRDD[58] at mapPartitions at PythonRDD.scala:374 []
    |  ShuffledRDD[57] at partitionBy at NativeMethodAccessorImpl.java:-2 []
    +-(2) PairwiseRDD[56] at reduceByKey at <timed exec>:1 []
       |  PythonRDD[55] at reduceByKey at <timed exec>:1 []
       |  ../Data/Moby-Dick.txt MapPartitionsRDD[11] at textFile at NativeMethodAccessorImpl.java:-2 []
       |  ../Data/Moby-Dick.txt HadoopRDD[10] at textFile at NativeMethodAccessorImpl.java:-2 []


In [40]:
%%time
RDD3.take(10)

CPU times: user 5.25 ms, sys: 2.9 ms, total: 8.16 ms
Wall time: 188 ms


[(13766, u'the'),
 (6587, u'of'),
 (5951, u'and'),
 (4533, u'a'),
 (4510, u'to'),
 (3878, u'in'),
 (2693, u'that'),
 (2415, u'his'),
 (1724, u'I'),
 (1692, u'with')]