<a name="top"></a>
# MIDS - w261 Machine Learning At Scale
__Course Lead:__ Dr James G. Shanahan (__email__ Jimi via  James.Shanahan _AT_ gmail.com)   
__TA:__ Kyle Hamilton (__email__ kylehamilton _AT_ ischool.berkeley.edu)

## Pairwise Document Similairity 

1. [Generate data](#1)
2. [Phase 1 - Make inverted index](#2)
3. [Phase 2 - Calculate pairwise document similarity](#3)

## Resources:
[https://terpconnect.umd.edu/~oard/pdf/acl08elsayed2.pdf](https://terpconnect.umd.edu/~oard/pdf/acl08elsayed2.pdf)    
[http://stanford.edu/~rezab/papers/disco.pdf](http://stanford.edu/~rezab/papers/disco.pdf)

<a name="1"></a>
### Generate some data
[top of page](#top)

In [60]:
%%writefile systems_test.txt
docA	bright blue butterfly forget
docB	best forget bright sky
docC	blue sky bright sun
docD	under buttefly sky hangs
docE	forget blue butterfly

Overwriting systems_test.txt


<a name="3"></a>
### Phase 1 - Make Inverted Index
[top of page](#top)

In [66]:
%%writefile invertedIndexOnly.py
#!~/anaconda2/bin/python
# -*- coding: utf-8 -*-


from __future__ import division
import collections
import re
import json
import math
import numpy as np
import itertools
import mrjob
from mrjob.protocol import RawProtocol
from mrjob.job import MRJob
from mrjob.step import MRStep

class MRinvertedIndexOnly(MRJob):
    
    MRJob.SORT_VALUES = True 
    
    def steps(self):

        return [MRStep(
                    mapper=self.mapper,
                    reducer=self.reducer)
                ]
    
        
   
    def mapper(self,_,line):
        '''
        Reference:
        "For each term in the document,emits the term as key, and a tuple 
        consisting of the doc id and term weight as the value.
        The MR runtime automatically handles the grouping of these tuples..."
        (https://terpconnect.umd.edu/~oard/pdf/acl08elsayed2.pdf)
        '''
        #####################################################################
        # docs as input, ie:
        # doc id \t doc value
        # docA	bright blue butterfly forget
        #
        #####################################################################
        
        line = line.strip()
        key, value = line.split("\t")
        
        value = value.split(" ")
        l = len(value)
        for w in value:
            # Store the length of the document to use with JACCARD (|A| + |B|)
            yield w, (key, l)
        
        
        
    def reducer(self,key,value):
        '''
        Reference con't:
        "...which the reducer then writes out to disk, thus generating the postings."
        (https://terpconnect.umd.edu/~oard/pdf/acl08elsayed2.pdf)
        '''
        #####################################################################
        # Inverted Index as output, ie:
        # "term" [["doc",doc_length]]
        # "butterfly"	[["docA", 4],["docD", 4],["docE", 3]]
        #####################################################################
        d = collections.defaultdict(list)
        for v in value:
            d[key].append(v)
        yield key,d[key]
        
if __name__ == '__main__':
    MRinvertedIndexOnly.run() 

Overwriting invertedIndexOnly.py


In [67]:
!python invertedIndexOnly.py -r local systems_test.txt > systems_test_index.txt

Using configs in /etc/mrjob.conf
ignoring partitioner keyword arg (requires real Hadoop): 'org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner'
Creating temp directory /var/folders/2f/rb8qqgd55bl77zgchyxsfl7h0000gp/T/invertedIndexOnly.koza.20170207.204252.766154
Running step 1 of 1...
Streaming final output from /var/folders/2f/rb8qqgd55bl77zgchyxsfl7h0000gp/T/invertedIndexOnly.koza.20170207.204252.766154/output...
Removing temp directory /var/folders/2f/rb8qqgd55bl77zgchyxsfl7h0000gp/T/invertedIndexOnly.koza.20170207.204252.766154...


In [65]:
##########################################################
# Pretty print systems tests for generating Inverted Index
##########################################################

import json
        
print "\nSystems test - Inverted Index"
print "—"*100  
with open("systems_test_index.txt","r") as f:
    lines = f.readlines()
    for line in lines:
        line = line.strip()
        word,stripe = line.split("\t")
        stripe = json.loads(stripe)
    
        stripe.extend([["",""] for _ in xrange(3 - len(stripe))])
        
        print "{0:>15} |{1:>15} |{2:>15} |{3:>15}".format(
            (word), stripe[0][0]+" "+str(stripe[0][1]), stripe[1][0]+" "+str(stripe[1][1]), stripe[2][0]+" "+str(stripe[2][1]))



Systems test - Inverted Index
————————————————————————————————————————————————————————————————————————————————————————————————————
         "best" |         docB 4 |                |               
         "blue" |         docA 4 |         docC 4 |         docE 3
       "bright" |         docA 4 |         docB 4 |         docC 4
     "buttefly" |         docD 4 |                |               
    "butterfly" |         docA 4 |         docE 3 |               
       "forget" |         docA 4 |         docB 4 |         docE 3
        "hangs" |         docD 4 |                |               
          "sky" |         docB 4 |         docC 4 |         docD 4
          "sun" |         docC 4 |                |               
        "under" |         docD 4 |                |               


<a name="4"></a>
### Phase 2 - Calculate similarity
[top of page](#top)   


### Jaccard similarity
$$
Jaccard(A,B) = \frac{|A \cap B|}{|A|+|B|-|A \cap B|}
$$

Example - using the information from the inverted index above:     
$$
Jaccard(docA,docB) = \frac{2}{4+4-2} = \frac{1}{3}
$$

In [55]:
%%writefile similarityOnly.py
#!~/anaconda2/bin/python
# -*- coding: utf-8 -*-

from __future__ import division
import collections
import re
import json
import math
import numpy as np
import itertools
import mrjob
from mrjob.protocol import RawProtocol
from mrjob.job import MRJob
from mrjob.step import MRStep

class MRsimilarityOnly(MRJob):
    
    MRJob.SORT_VALUES = True 
    def steps(self):

        JOBCONF_STEP1 = {}
        JOBCONF_STEP2 = { 
          ######### IMPORTANT: THIS WILL HAVE NO EFFECT IN -r local MODE. MUST USE -r hadoop FOR SORTING #############
            'mapreduce.job.output.key.comparator.class': 'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator',
            'mapreduce.partition.keycomparator.options':'-k1,1nr',
        }
        return [MRStep(jobconf=JOBCONF_STEP1,
                    mapper=self.mapper_pair_sim,
                    reducer=self.reducer_pair_sim)
                ,
                MRStep(jobconf=JOBCONF_STEP2,
                    mapper=None,   
                    reducer=self.reducer_sort)
                ]
            
    def mapper_pair_sim(self,_,line):
        line = line.strip()
        key,inv_indx = line.split("\t")
        inv_indx = json.loads(inv_indx)
        
        '''
        @input: lines (postings) from inverted index
         "blue" [["DocA", 4], ["DocC", 4], ["DocE", 3]]
        
        @output: pairs of doc and doc_length, count the number of pairs
        make complex key and count of 1 as value:
         DocA.3.DocB.2, 1
         DocA.3.DocC.3, 1
         DocB.2.DocC.3, 1
        '''
        
        X = map(lambda x: x[0]+"."+str(x[1]) , inv_indx)
        
        # taking advantage of symetry, output only (a,b), but not (b,a)
        for subset in itertools.combinations(sorted(set(X)), 2):
            yield subset[0]+"."+subset[1], 1



    def reducer_pair_sim(self,key,value):
        w1,w1_len,w2,w2_len = key.split(".")
        t = sum(value)
        
        jaccard = t / ( int(w1_len) + int(w2_len) - t )
        
       
        yield jaccard, (w1+" - "+w2)
    
    
    def reducer_sort(self,key,value):
        for v in value:
            yield key,v
    
    
if __name__ == '__main__':
    MRsimilarityOnly.run()

Overwriting similarityOnly.py


In [58]:
!python similarityOnly.py -r hadoop systems_test_index.txt > similarities_test.txt

Using configs in /etc/mrjob.conf
Creating temp directory /var/folders/2f/rb8qqgd55bl77zgchyxsfl7h0000gp/T/similarityOnly.koza.20170207.092105.389095
Looking for hadoop binary in $PATH...
Found hadoop binary: /usr/local/bin/hadoop
Using Hadoop version 2.7.2
Copying local files to hdfs:///user/koza/tmp/mrjob/similarityOnly.koza.20170207.092105.389095/files/...
Detected hadoop configuration property names that do not match hadoop version 2.7.2:
The have been translated as follows
 mapred.output.key.comparator.class: mapreduce.job.output.key.comparator.class
mapred.text.key.comparator.options: mapreduce.partition.keycomparator.options
mapred.text.key.partitioner.options: mapreduce.partition.keypartitioner.options
Running step 1 of 2...
  packageJobJar: [/var/folders/2f/rb8qqgd55bl77zgchyxsfl7h0000gp/T/hadoop-unjar1243219462569908254/] [] /var/folders/2f/rb8qqgd55bl77zgchyxsfl7h0000gp/T/streamjob4424899413839839701.jar tmpDir=null
  Connecting to ResourceManager at /0.0.0.0:8032
  Connectin

In [59]:
############################################
# Pretty print systems tests
############################################

import json
print "Systems test - Similarity measures"
print '—'*110
print "{0:>15} |{1:>16} |{2:>15} |{3:>15} |{4:>15} |{5:>15}".format(
        "pair", "jaccard", "cosine", "dice", "overlap", "avg")
print '-'*110

with open("similarities_test.txt","r") as f:
    lines = f.readlines()
    for line in lines:
        line = line.strip()
        jaccard,stripe = line.split("\t")
        stripe = json.loads(stripe)

        print "{0:>15} | {1:>15f} |{2:>15} |{3:>15} |{4:>15} |{5:>15}".format(
            stripe, float(jaccard),  "TODO", "TODO", "TODO", "TODO")
        
print '-'*110


Systems test - Similarity measures
——————————————————————————————————————————————————————————————————————————————————————————————————————————————
           pair |         jaccard |         cosine |           dice |        overlap |            avg
--------------------------------------------------------------------------------------------------------------
    docA - docE |        0.750000 |           TODO |           TODO |           TODO |           TODO
    docB - docC |        0.333333 |           TODO |           TODO |           TODO |           TODO
    docA - docC |        0.333333 |           TODO |           TODO |           TODO |           TODO
    docA - docB |        0.333333 |           TODO |           TODO |           TODO |           TODO
    docD - docE |        0.166667 |           TODO |           TODO |           TODO |           TODO
    docC - docE |        0.166667 |           TODO |           TODO |           TODO |           TODO
    docB - docE |        0.16