

## Challenge of processing large amounts of data
<ul>
<li>How to process is quickly?
<li>So how do we go about making the problem map so that it can be distributed computation?
<li> Distributed/Parrallel Programming is hard
</ul>

##Mapreduce addresses all the challengs
<ul>
<li>Google's computational/data manipulation model
<li> Elegant way to work with big data
</ul>

## Map Reduce and the New Software Stack
<ul>
<li> Covered in Chapter 2 in the Ullman book
<li> Python support packages https://pypi.python.org/pypi/mrjob
<li> map() and reduce() are basic built in functions in python https://docs.python.org/2/library/functions.html
</ul>


## Built-in Python Functional Programming Tools
https://docs.python.org/2/tutorial/datastructures.html

### Python Map Function
For each value in a sequence, process each one and output a new result for each element.

In [1]:
def cube(x): return x*x*x

map(cube,range(1,11))

[1, 8, 27, 64, 125, 216, 343, 512, 729, 1000]

If there are more parameters then each could be an array, and they are applied together one element at a time

In [2]:
seq = range(8)
def add(x,y): return x+y
map(add, seq,seq)

[0, 2, 4, 6, 8, 10, 12, 14]

## Python Reduce Function
For an array passed in, compute a single result.
So a reduce function always has two paramters to carry the result foward with the next element in the sequence.
The operation starts by using the first two values then passes the result with the next element to the function...

In [3]:
result = map(add, seq,seq)
reduce(add, result)  # adding each element of the result together

56

If there is only one value in a sequence then that element is returned; if the sequence is empty, an exception is raised.

##Word Count using python in a map reduce manner
Using the Canterbury Corpus Test file from http://compression.ca/act/files/canterbury.zip,
We will attempt to count the each unique word.


In [4]:
import re
import pandas as pd
import numpy as np
aliceFile = open('data/canterbury/alice29.txt','r')
map1=[]

WORD_RE = re.compile(r"[\w']+")

# Create the map of words with prelminary counts
for line in aliceFile:
    for w in WORD_RE.findall(line):
        map1.append([w,1])

#sort the map            
map2 = sorted(map1)

#Separate the map into groups by the key values
df = pd.DataFrame(map2)

uniquewords = df[0].unique()
DataFrameDict = {elem : pd.DataFrame for elem in uniquewords}

for key in DataFrameDict.keys():
    DataFrameDict[key] = df[:][df[0] == key]


In [5]:
def wordcount(x,y):
    x[1] = x[1] + y[1]
    return x

#Add up the counts using reduce

for uw in uniquewords:
    uarray = np.array(DataFrameDict[uw])
    print reduce(wordcount,uarray)


["'" 1091L]
["'TIS" 1L]
["'Tis" 2L]
["'em" 3L]
["'tis" 2L]
['2' 1L]
['9' 1L]
['A' 17L]
['ADVENTURES' 1L]
["ALICE'S" 3L]
['ALL' 4L]
['AND' 3L]
['ARE' 6L]
['AT' 1L]
['Ada' 1L]
['Adventures' 2L]
['Advice' 1L]
['After' 6L]
['Ah' 5L]
['Ahem' 1L]
['Alas' 1L]
['Alice' 386L]
["Alice's" 9L]
['All' 5L]
['Allow' 1L]
['Always' 1L]
['Ambition' 1L]
['An' 5L]
['And' 67L]
['Ann' 4L]
['Antipathies' 1L]
['Anything' 1L]
['Are' 4L]
['Arithmetic' 1L]
['As' 17L]
['At' 9L]
['Atheling' 1L]
['Australia' 1L]
['BE' 1L]
['BEE' 1L]
['BEFORE' 1L]
['BEG' 1L]
['BEST' 2L]
['BOOTS' 1L]
['BUSY' 1L]
['Back' 1L]
['Be' 2L]
['Beau' 4L]
['Beautiful' 5L]
['Because' 1L]
['Before' 1L]
['Begin' 1L]
['Behead' 1L]
['Besides' 1L]
['Between' 1L]
['Bill' 12L]
["Bill's" 4L]
['Birds' 1L]
['Boots' 1L]
['Brandy' 1L]
['Bring' 1L]
['But' 37L]
['By' 4L]
['C' 1L]
['CAN' 4L]
['CHAPTER' 12L]
['CHORUS' 2L]
['COULD' 4L]
['COURT' 1L]
['CURTSEYING' 1L]
['Call' 4L]
['Can' 1L]
["Can't" 1L]
['Canary' 1L]
['Canterbury' 1L]
['Carroll' 1L]
['Cat' 24L]
[

['dinner' 2L]
['dipped' 2L]
['directed' 2L]
['direction' 5L]
['directions' 3L]
['directly' 2L]
['disagree' 1L]
['disappeared' 2L]
['disappointment' 1L]
['disgust' 1L]
['dish' 4L]
['dishes' 2L]
['dismay' 1L]
['disobey' 1L]
['dispute' 2L]
['distance' 8L]
['distant' 2L]
['dive' 1L]
['do' 68L]
['dodged' 1L]
['does' 6L]
["doesn't" 16L]
['dog' 2L]
["dog's" 1L]
['dogs' 3L]
['doing' 5L]
["don't" 51L]
['done' 15L]
['door' 30L]
['doors' 2L]
['doorway' 1L]
['doth' 2L]
['double' 1L]
['doubled' 1L]
['doubling' 1L]
['doubt' 4L]
['doubtful' 2L]
['doubtfully' 2L]
['down' 99L]
['downward' 1L]
['downwards' 1L]
['doze' 1L]
['dozing' 1L]
['draggled' 1L]
['draw' 7L]
['drawing' 1L]
['dreadful' 2L]
['dreadfully' 6L]
['dream' 7L]
['dreamed' 1L]
['dreaming' 1L]
['dreamy' 1L]
['dressed' 1L]
['drew' 5L]
['dried' 1L]
['driest' 1L]
['drink' 4L]
['drinking' 1L]
['dripping' 1L]
['drive' 1L]
['drop' 1L]
['dropped' 5L]
['dropping' 1L]
['drowned' 1L]
['drunk' 2L]
['dry' 8L]
['duck' 1L]
['dull' 3L]
['dunce' 1L]
['e' 6L]

['pressing' 1L]
['pretend' 1L]
['pretending' 1L]
['pretexts' 1L]
['prettier' 1L]
['pretty' 1L]
['prevent' 1L]
['printed' 1L]
['prison' 1L]
['prisoner' 1L]
["prisoner's" 1L]
['prize' 1L]
['prizes' 3L]
['proceed' 2L]
['procession' 5L]
['processions' 1L]
['produced' 1L]
['producing' 1L]
['promise' 1L]
['promised' 1L]
['promising' 1L]
['pronounced' 1L]
['proper' 3L]
['proposal' 1L]
['prosecute' 1L]
['protection' 1L]
['proud' 2L]
['prove' 1L]
['proved' 2L]
['proves' 1L]
['provoking' 1L]
['puffed' 1L]
['pulled' 1L]
['pulling' 1L]
['pun' 1L]
['punching' 1L]
['punished' 1L]
['puppy' 6L]
["puppy's" 1L]
['purple' 1L]
['purpose' 1L]
['purring' 2L]
['push' 1L]
['put' 31L]
['putting' 3L]
['puzzle' 1L]
['puzzled' 9L]
['puzzling' 4L]
['quarrel' 1L]
['quarrelled' 1L]
['quarrelling' 2L]
['queer' 12L]
['queerest' 1L]
['question' 17L]
['questions' 4L]
['quick' 1L]
['quicker' 1L]
['quickly' 2L]
['quiet' 2L]
['quietly' 5L]
['quite' 53L]
['quiver' 1L]
['rabbit' 5L]
['rabbits' 1L]
['race' 5L]
['railway' 2L]


## MapReduce using MRJOB
Find documentation for MRJOB at https://pythonhosted.org/mrjob/
A framework that allows you to do mapreduce jobs without HADOOP but will run the same jobs in an hadoop environment.
( DUMBO and Pydoop give you lower level access to HADOOP)

Before using it for the first time install this package using : <code> pip install mrjob </code> 

If that does not work use the alternatives in provided on https://pythonhosted.org/mrjob/guides/quickstart.html#installation

%%writefile myfile.py

write/save cell contents into myfile.py (use -a to append). Another alias: %%file myfile.py
%run myfile.py

run myfile.py and output results in the current cell
%load myfile.py

load "import" myfile.py into the current cell

In [None]:
# %load code/MRWordFrequencyCount.py
from mrjob.job import MRJob

class MRWordFrequencyCount(MRJob):
    
    def mapper(self, _ , line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1
        
    def reducer(self, key, values):
        yield key, sum(values)
        
if __name__ == '__main__':
    MRWordFrequencyCount.run()

In [1]:
%run code/MRWordFrequencyCount.py  ./.mrjob.conf data/canterbury/alice29.txt

no configs found; falling back on auto-configuration
INFO:mrjob.conf:no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
INFO:mrjob.conf:no configs found; falling back on auto-configuration
creating tmp directory c:\users\ps\appdata\local\temp\MRWordFrequencyCount.PS.20150922.022135.283000
INFO:mrjob.runner:creating tmp directory c:\users\ps\appdata\local\temp\MRWordFrequencyCount.PS.20150922.022135.283000

PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols

writing to c:\users\ps\appdata\local\temp\MRWordFrequencyCount.PS.20150922.022135.283000\step-0-mapper_part-00000
INFO:mrjob.sim:writing to c:\users\ps\appdata\local\temp\MRWordFrequencyCount.PS.20150922.022135.283000\step-0-mapper_part-00000
writing to c:\users\ps\appdata\local\temp\MRWor

"chars"	148685
"lines"	12
"words"	26474


removing tmp directory c:\users\ps\appdata\local\temp\MRWordFrequencyCount.PS.20150922.022135.283000
INFO:mrjob.runner:removing tmp directory c:\users\ps\appdata\local\temp\MRWordFrequencyCount.PS.20150922.022135.283000


In [None]:
# MRJOB Word count function

In [1]:
# %load code\MRWordFreqCount.py
from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")


class MRWordFreqCount(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield word.lower(), 1

    def combiner(self, word, counts):
        yield word, sum(counts)

    def reducer(self, word, counts):
        yield word, sum(counts)


if __name__ == '__main__':
    MRWordFreqCount.run()

Usage: ipykernel_launcher.py [options] [input files]

ipykernel_launcher.py: error: no such option: -f


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [2]:
%run code/MRWordFreqCount.py data/canterbury/alice29.txt

No configs found; falling back on auto-configuration
Creating temp directory c:\users\srbal\appdata\local\temp\MRWordFreqCount.Crazy.20171127.004747.184000
Running step 1 of 1...
Streaming final output from c:\users\srbal\appdata\local\temp\MRWordFreqCount.Crazy.20171127.004747.184000\output...


"'"	1091
"'em"	3
"'tis"	5
"_i_"	2
"2"	1
"9"	1
"a"	632
"abide"	1
"able"	1
"about"	94
"above"	3
"absence"	1
"absurd"	2
"acceptance"	1
"accident"	2
"accidentally"	1
"account"	1
"accounting"	1
"accounts"	1
"accusation"	1
"accustomed"	1
"ache"	1
"across"	5
"act"	1
"actually"	1
"ada"	1
"added"	23
"adding"	1
"addressed"	2
"addressing"	1
"adjourn"	1
"adoption"	1
"advance"	3
"advantage"	3
"adventures"	7
"advice"	2
"advisable"	2
"advise"	1
"affair"	1
"affectionately"	1
"afford"	1
"afore"	1
"afraid"	12
"after"	43
"afterwards"	2
"again"	82
"against"	9
"age"	4
"ago"	2
"agony"	1
"agree"	2
"ah"	5
"ahem"	1
"air"	15
"airs"	1
"alarm"	2
"alarmed"	1
"alas"	4
"alice's"	12
"alice"	386
"alive"	3
"all"	182
"allow"	3
"almost"	6
"alone"	4
"along"	6
"aloud"	5
"already"	2
"also"	2
"altered"	1
"alternately"	1
"altogether"	5
"always"	13
"am"	15
"ambition"	1
"among"	12
"an"	57
"ancient"	1
"and"	872
"anger"	2
"angrily"	9
"angry"	5
"animal's"	1
"animal"	1
"animals"	4
"ann"	4
"annoy"	1
"annoyed"	1
"another"	22
"answer"

"ran"	16
"rapidly"	2
"rapped"	1
"rat"	1
"rate"	9
"rather"	25
"rats"	1
"rattle"	1
"rattling"	2
"raven"	1
"ravens"	1
"raving"	2
"raw"	1
"reach"	4
"reaching"	1
"read"	11
"readily"	1
"reading"	3
"ready"	8
"real"	3
"reality"	1
"really"	13
"rearing"	1
"reason"	9
"reasonable"	1
"reasons"	1
"received"	1
"recognised"	1
"recovered"	2
"red"	3
"reduced"	1
"reeds"	1
"reeling"	1
"refreshments"	1
"refused"	1
"regular"	2
"relief"	2
"relieved"	1
"remain"	1
"remained"	3
"remaining"	1
"remark"	10
"remarkable"	2
"remarked"	10
"remarking"	3
"remarks"	3
"remedies"	1
"remember"	14
"remembered"	5
"remembering"	1
"reminding"	1
"removed"	2
"repeat"	7
"repeated"	10
"repeating"	3
"replied"	29
"reply"	5
"resource"	1
"respect"	1
"respectable"	1
"respectful"	1
"rest"	10
"resting"	2
"result"	1
"retire"	1
"returned"	2
"returning"	1
"rich"	1
"riddle"	1
"riddles"	2
"ridge"	1
"ridges"	1
"ridiculous"	1
"right"	32
"righthand"	1
"rightly"	1
"ring"	2
"ringlets"	2
"riper"	1
"rippling"	1
"rise"	1
"rises"	1
"rising"	1
"roared"	

Removing temp directory c:\users\srbal\appdata\local\temp\MRWordFreqCount.Crazy.20171127.004747.184000...


In [None]:
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import print_function

import sys
from operator import add

from pyspark import SparkContext


if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: wordcount <file>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonWordCount")
    lines = sc.textFile(sys.argv[1], 1)
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
    output = counts.collect()
    for (word, count) in output:
        print("%s: %i" % (word, count))

    sc.stop()