In [3]:
import os
import sys
#iPython Notebook and Apache Spark path appending
os.environ['SPARK_HOME']="S:/spark/spark-1.5.2-bin-hadoop2.6/spark-1.5.2-bin-hadoop2.6"
sys.path.append("S:/spark/spark-1.5.2-bin-hadoop2.6/spark-1.5.2-bin-hadoop2.6/bin")
sys.path.append("S:/spark/spark-1.5.2-bin-hadoop2.6/spark-1.5.2-bin-hadoop2.6/python")
sys.path.append("S:/spark/spark-1.5.2-bin-hadoop2.6/spark-1.5.2-bin-hadoop2.6/python/pyspark")
sys.path.append("S:/spark/spark-1.5.2-bin-hadoop2.6/spark-1.5.2-bin-hadoop2.6/python/lib")
sys.path.append("S:/spark/spark-1.5.2-bin-hadoop2.6/spark-1.5.2-bin-hadoop2.6/python/lib/pyspark.zip")
sys.path.append("S:/spark/spark-1.5.2-bin-hadoop2.6/spark-1.5.2-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip")
sys.path.append("C:/Program Files/Java/jre1.8.0_60/bin")

In [4]:
import re
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import HashingTF, IDF
import operator

SparkContext.setSystemProperty("hadoop.home.dir", "S:\\spark\\winutil\\") # For Windows Only


conf = SparkConf().setAppName("Entity Resolution")
sc = SparkContext(conf=conf)
sqlCt = SQLContext(sc)

In [26]:
df1 = sqlCt.read.parquet("S:/data/amazon-google-sample/Amazon_sample").cache()

In [27]:
df1.show()

+----------+--------------------+--------------------+--------------------+------+
|        id|               title|         description|        manufacturer| price|
+----------+--------------------+--------------------+--------------------+------+
|b0002mg5uk|  iview mediapro 2.5|                    |    global marketing|199.99|
|b0002jtvng|bias deck le 3.5 ...|if you want to re...|                bias|  99.0|
|b0007lw22g|apple ilife '06 (...|ilife '06 is the ...|      apple computer|  79.0|
|b00007kh02|extensis intellih...|extensis intellih...|extensis corporation|199.99|
|b000saufpw|dk amazing animal...|meet your cd host...|global software p...|  9.99|
|b000v7vz1u|onone essentials ...|                    |      onone software| 59.95|
|b00006hvvo|upg sgms 1000 inc...|today enterprises...|  sonic systems inc.|   0.0|
|b0000vyk1o|    power director 3|powerdirector 3 -...|avanquest publish...| 79.95|
|b000jx1kma|aircraft power pa...|aircraft powerpac...|            red mile| 29.99|
|b00

In [28]:
df2 = sqlCt.read.parquet("S:/data/amazon-google-sample/Google_sample").cache()

In [29]:
df2.show()

+--------------------+--------------------+--------------------+-------------------+--------+
|                  id|                name|         description|       manufacturer|   price|
+--------------------+--------------------+--------------------+-------------------+--------+
|http://www.google...|punch software 20...|total interior an...|     punch software|   35.99|
|http://www.google...|onone software ic...|dynamic image enh...|     onone software|  158.99|
|http://www.google...|adobe cs3 web sta...|system requiremen...|                   | 1035.99|
|http://www.google...|red mile entertai...|red mile entertai...|                   |   27.97|
|http://www.google...|sonicwall gms 100...|today enterprises...|                   |62920.89|
|http://www.google...|hijack2 identity ...|prevent thieves f...|                   |    44.9|
|http://www.google...|bias deck 3.5 le ...|deck 3.5 le multi...|                   |   69.95|
|http://www.google...|iview mediapro 2....|product datasheet

#Goal is to do Entity Resolution based on Jaccard Similarity 
### - Finding items in two tables representing same item
### - Similarity cue = Name of the item and its manufacturer

In [30]:
df1 = df1.withColumn('joinKey', concat_ws(" ", *["title", "manufacturer"]))

In [31]:
df2 = df2.withColumn('joinKey', concat_ws(" ", *["name", "manufacturer"]))

In [32]:
#Loading the stop word list
f = open("S:/data/amazon-google-sample/stopwords.txt", "r")

In [33]:
stopWords = set(f.read().split("\n"))

In [34]:
def tokenizer(string):
    tokens = re.split(r'\W+', string) #split by non-alphanumeric character
    cleanedTokens = [t.lower() for t in tokens]
    return [x for x in cleanedTokens if x not in stopWords] #list containing tokens without stopwords

In [37]:
tokenizerUDF = udf(tokenizer, ArrayType(StringType()))
df1 = df1.withColumn('joinKey', tokenizerUDF(df1.joinKey))

In [38]:
df2 = df2.withColumn('joinKey', tokenizerUDF(df2.joinKey))

In [39]:
df1.show() #joinKey contains list of all the important words collect from the title/name and manufacturer column

+----------+--------------------+--------------------+--------------------+------+--------------------+
|        id|               title|         description|        manufacturer| price|             joinKey|
+----------+--------------------+--------------------+--------------------+------+--------------------+
|b0002mg5uk|  iview mediapro 2.5|                    |    global marketing|199.99|[iview, mediapro,...|
|b0002jtvng|bias deck le 3.5 ...|if you want to re...|                bias|  99.0|[bias, deck, le, ...|
|b0007lw22g|apple ilife '06 (...|ilife '06 is the ...|      apple computer|  79.0|[apple, ilife, 06...|
|b00007kh02|extensis intellih...|extensis intellih...|extensis corporation|199.99|[extensis, intell...|
|b000saufpw|dk amazing animal...|meet your cd host...|global software p...|  9.99|[dk, amazing, ani...|
|b000v7vz1u|onone essentials ...|                    |      onone software| 59.95|[onone, essential...|
|b00006hvvo|upg sgms 1000 inc...|today enterprises...|  sonic sy

In [40]:
invIdx1 = df1.select(df1['id'], explode(df1['joinKey']).alias('token')) #creating id-token pair for inverted indexing

In [41]:
invIdx1.show()

+----------+---------+
|        id|    token|
+----------+---------+
|b0002mg5uk|    iview|
|b0002mg5uk| mediapro|
|b0002mg5uk|        2|
|b0002mg5uk|        5|
|b0002mg5uk|   global|
|b0002mg5uk|marketing|
|b0002jtvng|     bias|
|b0002jtvng|     deck|
|b0002jtvng|       le|
|b0002jtvng|        3|
|b0002jtvng|        5|
|b0002jtvng|macintosh|
|b0002jtvng|       cd|
|b0002jtvng|     bias|
|b0007lw22g|    apple|
|b0007lw22g|    ilife|
|b0007lw22g|       06|
|b0007lw22g|      mac|
|b0007lw22g|      dvd|
|b0007lw22g|    older|
+----------+---------+
only showing top 20 rows



In [42]:
invIdx2 = df2.select(df2['id'], explode(df2['joinKey']).alias('token'))

In [43]:
invIdx2.show()

+--------------------+------------+
|                  id|       token|
+--------------------+------------+
|http://www.google...|       punch|
|http://www.google...|    software|
|http://www.google...|       20100|
|http://www.google...|       punch|
|http://www.google...|           5|
|http://www.google...|           1|
|http://www.google...|        home|
|http://www.google...|      design|
|http://www.google...|       punch|
|http://www.google...|    software|
|http://www.google...|       onone|
|http://www.google...|    software|
|http://www.google...|         ice|
|http://www.google...|       10770|
|http://www.google...|intellihance|
|http://www.google...|         pro|
|http://www.google...|           4|
|http://www.google...|           1|
|http://www.google...|         mac|
|http://www.google...|         win|
+--------------------+------------+
only showing top 20 rows



In [58]:
joined = invIdx1.join(invIdx2, invIdx1.token == invIdx2.token).select(invIdx1.id.alias("id1"), invIdx2.id.alias("id2")).cache()

In [59]:
joined.show() #join based on only the id from both df1 and df2 where atleast one token is similar

+----------+--------------------+
|       id1|                 id2|
+----------+--------------------+
|b0002mg5uk|http://www.google...|
|b0002mg5uk|http://www.google...|
|b0002mg5uk|http://www.google...|
|b0002mg5uk|http://www.google...|
|b0002mg5uk|http://www.google...|
|b0002mg5uk|http://www.google...|
|b0002mg5uk|http://www.google...|
|b0002jtvng|http://www.google...|
|b0002jtvng|http://www.google...|
|b0002jtvng|http://www.google...|
|b0002jtvng|http://www.google...|
|b0002jtvng|http://www.google...|
|b0002jtvng|http://www.google...|
|b0002jtvng|http://www.google...|
|b0002jtvng|http://www.google...|
|b0007lw22g|http://www.google...|
|b0007lw22g|http://www.google...|
|b0007lw22g|http://www.google...|
|b0007lw22g|http://www.google...|
|b0007lw22g|http://www.google...|
+----------+--------------------+
only showing top 20 rows



In [60]:
#joining amazon table with id1 and then joining the google table with id2
joined = joined.join(df1, joined['id1'] == df1['id']).join(df2, joined['id2'] == df2['id'])\
.select(joined['id1'], df1['joinKey'].alias('joinKey1'), joined['id2'], df2['joinKey'].alias('joinKey2')).cache()
#after the join operation the dataframe contains 1.amazonID of an item 2. amazon token  3. googleID 4. google token

In [62]:
joined.registerTempTable("candidates")

In [63]:
candidateDF = sqlCt.sql("select id1, first(joinKey1) as joinKey1, id2, first(joinKey2) as joinKey2 from candidates group by id1, id2").cache()

In [64]:
candidateDF.count() #group by id1 id2 only to consider the distincts

121

##Filtering based on Jaccard Similarity

In [68]:
def jaccardsim(row1,row2):
    intersection = len(set.intersection(*[set(row1), set(row2)]))
    union = len(set.union(*[set(row1), set(row2)]))
    return intersection/float(union)

In [70]:
jaccardUDF = udf(jaccardsim, FloatType())
resultDF = candidateDF.withColumn('jaccard', jaccardUDF(candidateDF.joinKey1, candidateDF.joinKey2))

In [71]:
resultDF.show()

+----------+--------------------+--------------------+--------------------+-----------+
|       id1|            joinKey1|                 id2|            joinKey2|    jaccard|
+----------+--------------------+--------------------+--------------------+-----------+
|b0002mg5uk|[iview, mediapro,...|http://www.google...|[punch, software,...|0.083333336|
|b00007kh02|[extensis, intell...|http://www.google...|[onone, software,...|  0.2631579|
|b00002s6sc|[punch, 5, 1, hom...|http://www.google...|[global, software...| 0.11111111|
|b000gaqlxe|[aquazone, bass, ...|http://www.google...|[power, productio...|0.083333336|
|b0000vyk1o|[power, director,...|http://www.google...|[adobe, cs3, web,...|0.083333336|
|b000gaqlxe|[aquazone, bass, ...|http://www.google...|[global, software...| 0.05263158|
|b000saufpw|[dk, amazing, ani...|http://www.google...|[bias, deck, 3, 5...|0.055555556|
|b0000vyk1o|[power, director,...|http://www.google...|[adobe, indesign,...|0.071428575|
|b00006hvvo|[upg, sgms, 1000,...

In [73]:
resultDF = resultDF.where(resultDF.jaccard >= 0.4)

In [74]:
resultDF.count()

8

##There are 8 items which can be considered to be same among both the tables given Jaccard Score >= 0.4