##Using the MRJob Class below  calculate the  KL divergence of the following two objects.

In [10]:
%%writefile kltext.txt
1.Data Science is an interdisciplinary field about processes and systems to extract knowledge or insights from large volumes of data in various forms (data in various forms, data in various forms, data in various forms), either structured or unstructured,[1][2] which is a continuation of some of the data analysis fields such as statistics, data mining and predictive analytics, as well as Knowledge Discovery in Databases. '\t'

2.Machine learning is a subfield of computer science[1] that evolved from the study of pattern recognition and computational learning theory in artificial intelligence.[1] Machine learning explores the study and construction of algorithms that can learn from and make predictions on data.[2] Such algorithms operate by building a model from example inputs in order to make data-driven predictions or decisions,[3]:2 rather than following strictly static program instructions.

Overwriting kltext.txt


##MRjob class for calculating pairwise similarity using K-L Divergence as the similarity measure

Job 1: create inverted index (assume just two objects) <P>
Job 2: calculate the similarity of each pair of objects 

Job 1: create inverted index

In [2]:
!which python

/w261/venv/bin/python


In [5]:
%%writefile similarity.py
#!/w261/venv/bin/python
# -*- coding: utf-8 -*-

#####################################
# Solution 2 - preferred approach
#####################################


from __future__ import division
import collections
import re
import json
import math
import numpy as np
import itertools
import mrjob
from mrjob.protocol import RawProtocol
from mrjob.job import MRJob
from mrjob.step import MRStep

class MRsimilarity(MRJob):
    
    MRJob.SORT_VALUES = True 
    def steps(self):
        # for testing in hadoop local, force multiple reducers.
        # On AWS-EMR let the framework decide number of reducers.
        JOBCONF_STEP1 = {
#             "mapred.map.tasks":20,
#             "mapred.reduce.tasks":10
        }
        JOBCONF_STEP2 = {  
#             "mapred.map.tasks":20,
#             "mapred.reduce.tasks":10
        }
        JOBCONF_STEP3 = { 
#             'stream.num.map.output.key.field':2,
#             'stream.map.output.field.separator':",",
            'mapreduce.job.output.key.comparator.class': 'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator',
            'mapreduce.partition.keycomparator.options':'-k1,1nr',
#            "mapred.reduce.tasks":1
        }
        return [MRStep(jobconf=JOBCONF_STEP1,
                    mapper=self.mapper,
                    reducer=self.reducer)
                ,
                MRStep(jobconf=JOBCONF_STEP2,
                    mapper=self.mapper_pair_sim,
                    reducer=self.reducer_pair_sim)
                ,
                MRStep(jobconf=JOBCONF_STEP3,
                    mapper=None,
                    reducer=self.reducer_sort)
                ]
    
        
   
    def mapper(self,_,line):
        '''
        Reference:
        "For each term in the document,emits the term as key, and a tuple 
        consisting of the doc id and term weight as the value.
        The MR runtime automatically handles the grouping of these tuples..."
        (https://terpconnect.umd.edu/~oard/pdf/acl08elsayed2.pdf)
        '''
        #####################################################################
        # Stripes as input, ie:
        # "absolutely" {"falsehood": 113, "pyramid": 47, "apartments": 46}
        #####################################################################
        
        line = line.strip()
        key, stripe = line.split("\t")
        
        key = key.replace('"','')
        stripe = json.loads(stripe)
        l = len(stripe)
        for w in stripe:
            # Store the length of the document for caluculating similarities
            yield w, (key, l)
        
        
        
    def reducer(self,key,value):
        '''
        Reference con't:
        "...which the reducer then writes out to disk, thus generating the postings."
        (https://terpconnect.umd.edu/~oard/pdf/acl08elsayed2.pdf)
        '''
        #####################################################################
        # Inverted Index as output, ie:
        # "term" [["doc",doc_length]]
        # "abyss"	[["absorbed", 42], ["absurdity", 19]]
        #####################################################################
        d = collections.defaultdict(list)
        for v in value:
            d[key].append(v)
        yield key,d[key]
        
    

        
    def mapper_pair_sim(self,key,inv_indx):
        
        '''
        @input: lines from inverted index
         "X" [["DocA", 3], ["DocB", 2], ["DocC", 3]]
        
        @output: pairs of doc and doc_length, count the number of pairs
        make complex key and count of 1 as value:
         DocA.3.DocB.2, 1
         DocA.3.DocC.3, 1
         DocB.2.DocC.3, 1
        '''
        
        X = map(lambda x: x[0]+"."+str(x[1]) , inv_indx)
        
        # taking advantage of symetry, output only (a,b), but not (b,a)
        for subset in itertools.combinations(sorted(set(X)), 2):
            yield subset[0]+"."+subset[1], 1



    def reducer_pair_sim(self,key,value):
        w1,w1_len,w2,w2_len = key.split(".")
        t = sum(value)
        
        # http://stanford.edu/~rezab/papers/disco.pdf, pg5 - Table1
        jaccard = t / ( int(w1_len) + int(w2_len) - t )
        cosine = t / ( math.sqrt(int(w1_len))*math.sqrt(int(w2_len)) ) 
        overlap = t / ( min( int(w1_len), int(w2_len) ) )
        dice = ( 2*t ) / ( int(w1_len) + int(w2_len) )               
        
        avg = (jaccard+cosine+dice)/3
        yield avg, (w1+" - "+w2,cosine,jaccard,overlap,dice)
    

    def reducer_sort(self,key,value):
        for v in value:
            yield key,v
    
if __name__ == '__main__':
    MRsimilarity.run()

Writing similarity.py


In [3]:
%%writefile invertedIndexOnly.py
#!/w261/venv/bin/python
# -*- coding: utf-8 -*-

###################################
# Solution 1, Part A
###################################


from __future__ import division
import collections
import re
import json
import math
import numpy as np
import itertools
import mrjob
from mrjob.protocol import RawProtocol
from mrjob.job import MRJob
from mrjob.step import MRStep

class MRinvertedIndexOnly(MRJob):
    
    MRJob.SORT_VALUES = True 
    
    def steps(self):
        JOBCONF_STEP1 = {
            "mapred.map.tasks":20,
            "mapred.reduce.tasks":10
        }
        return [MRStep(jobconf=JOBCONF_STEP1,
                    mapper=self.mapper,
                    reducer=self.reducer)
                ]
    
        
   
    def mapper(self,_,line):
        '''
        Reference:
        "For each term in the document,emits the term as key, and a tuple 
        consisting of the doc id and term weight as the value.
        The MR runtime automatically handles the grouping of these tuples..."
        (https://terpconnect.umd.edu/~oard/pdf/acl08elsayed2.pdf)
        '''
        #####################################################################
        # Stripes as input, ie:
        # "absolutely" {"falsehood": 113, "pyramid": 47, "apartments": 46}
        #####################################################################
        
        line = line.strip()
        key, stripe = line.split("\t")
        
        key = key.replace('"','')
        stripe = json.loads(stripe)
        l = len(stripe)
        for w in stripe:
            # Store the length of the document to use with JACCARD (|A| + |B|)
            yield w, (key, l)
        
        
        
    def reducer(self,key,value):
        '''
        Reference con't:
        "...which the reducer then writes out to disk, thus generating the postings."
        (https://terpconnect.umd.edu/~oard/pdf/acl08elsayed2.pdf)
        '''
        #####################################################################
        # Inverted Index as output, ie:
        # "term" [["doc",doc_length]]
        # "abyss"	[["absorbed", 42], ["absurdity", 19]]
        #####################################################################
        d = collections.defaultdict(list)
        for v in value:
            d[key].append(v)
        yield key,d[key]
        
if __name__ == '__main__':
    MRinvertedIndexOnly.run() 

Writing invertedIndexOnly.py


In [11]:
!python similarity.py kltext.txt

No configs found; falling back on auto-configuration
ignoring partitioner keyword arg (requires real Hadoop): 'org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner'
Creating temp directory /tmp/similarity.root.20160629.233624.595445
Running step 1 of 3...
Traceback (most recent call last):
  File "similarity.py", line 141, in <module>
    MRsimilarity.run()
  File "/w261/venv/lib/python2.7/site-packages/mrjob/job.py", line 430, in run
    mr_job.execute()
  File "/w261/venv/lib/python2.7/site-packages/mrjob/job.py", line 448, in execute
    super(MRJob, self).execute()
  File "/w261/venv/lib/python2.7/site-packages/mrjob/launch.py", line 160, in execute
    self.run_job()
  File "/w261/venv/lib/python2.7/site-packages/mrjob/launch.py", line 230, in run_job
    runner.run()
  File "/w261/venv/lib/python2.7/site-packages/mrjob/runner.py", line 473, in run
    self._run()
  File "/w261/venv/lib/python2.7/site-packages/mrjob/sim.py", line 172, in _run
    self._invoke_st

In [None]:
# For systems test 1
!python invertedIndexOnly.py kltext.txt
# output supressed

In [2]:
import numpy as np
np.log(3)

1.0986122886681098

In [3]:
%%writefile kldivergence.py
from mrjob.job import MRJob
import re
import numpy as np
class kldivergence(MRJob):
    def mapper1(self, _, line):
        index = int(line.split('.',1)[0])
        letter_list = re.sub(r"[^A-Za-z]+", '', line).lower()
        count = {}
        for l in letter_list:
            if count.has_key(l):
                count[l] += 1
            else:
                count[l] = 1
        for key in count:
            yield key, [index, count[key]*1.0/len(letter_list)]


    def reducer1(self, key, values):
        #Fill in your code
    
    def reducer2(self, key, values):
        kl_sum = 0
        for value in values:
            kl_sum = kl_sum + value
        yield None, kl_sum
            
    def steps(self):
        return [self.mr(mapper=self.mapper1,
                        reducer=self.reducer1),
                self.mr(reducer=self.reducer2)]

if __name__ == '__main__':
    kldivergence.run()

Overwriting kldivergence.py


In [None]:
from kldivergence import kldivergence
mr_job = kldivergence(args=['kltext.txt'])
with mr_job.make_runner() as runner: 
    runner.run()
    # stream_output: get access of the output 
    for line in runner.stream_output():
        print mr_job.parse_output_line(line)