In [60]:
# entity_resolution.py
import re
import operator
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, types, functions
from pyspark.sql.functions import concat, concat_ws, col, lit, explode

In [61]:
spark = SparkSession.builder.appName('entity_res').getOrCreate()
sc = spark.sparkContext

In [62]:
class EntityResolution:
    def __init__(self, dataFile1, dataFile2, stopWordsFile):
        self.f = open(stopWordsFile, "r")
        self.stopWords = set(self.f.read().split("\n"))
        self.stopWordsBC = sc.broadcast(self.stopWords).value
        self.df1 = spark.read.parquet(dataFile1).cache()
        self.df2 = spark.read.parquet(dataFile2).cache()

        
        
    def preprocessDF(self, df, cols): 
        stop_words = self.stopWordsBC
        def tokenized_filterized_string (string):
            string = re.sub('[^0-9a-zA-Z]+', ' ', string) # Remove any non alpha-numeric charecter with whitespace
            string = re.sub('\s+',' ',string).strip().lower() # Remove extra whitespace and finally remove trailing spaces
            tokens = re.split(r'\W+', string)
            tokens = set(tokens) - stop_words
            return list(tokens)
        
        get_tokenized_string = functions.udf(tokenized_filterized_string, types.ArrayType(types.StringType()))
        concatanated_column = 'joinKey'
        df = df.withColumn(concatanated_column, concat_ws(' ', df[cols[0]], df[cols[1]]))
        df = df.withColumn(concatanated_column, get_tokenized_string(df[concatanated_column]))                           
        return df

    def filtering(self, df1, df2):
        df1 = df1.withColumn("flattened_key", explode(df1['joinKey']))
        df2 = df2.withColumn("flattened_key", explode(df2['joinKey']))
        df1.createOrReplaceTempView("df1")
        df2.createOrReplaceTempView("df2")
        common_item = spark.sql('SELECT df1.id as id1, df1.joinKey as joinKey1, df2.id as id2, df2.joinKey as joinKey2 \
        FROM df1, df2 WHERE df1.flattened_key = df2.flattened_key')
#         common_items = df1.select('flattened_key').distinct().intersect(df2.select('flattened_key')).collect()
#         print(common_items)
        return common_item
        
        

    def verification(self, candDF, threshold):
        """
            Write your code!
        """
        def get_jaccard_similarity(set_1, set_2):
            set_1 = set(set_1)
            set_2 = set(set_2)
            return len(list(set_1 & set_2)) / len(list(set_1 | set_2))
            
        calculate_jaccard = functions.udf(get_jaccard_similarity, types.DoubleType())
        candDF = candDF.withColumn('jaccard', calculate_jaccard(candDF['joinKey1'], candDF['joinKey2']))
        candDF = candDF.filter(candDF.jaccard >= threshold)
        return candDF

    def evaluate(self, result, groundTruth):
        """
            Write your code!
        """

    def jaccardJoin(self, cols1, cols2, threshold):
        newDF1 = self.preprocessDF(self.df1, cols1)
        newDF2 = self.preprocessDF(self.df2, cols2)
        print ("Before filtering: %d pairs in total" %(self.df1.count()*self.df2.count())) 
        candDF = self.filtering(newDF1, newDF2)
        print ("After Filtering: %d pairs left" %(candDF.count()))

        resultDF = self.verification(candDF, threshold)
        resultDF.show()
#         print ("After Verification: %d similar pairs" %(resultDF.count()))

#         return resultDF


    def __del__(self):
        self.f.close()


In [63]:
if __name__ == "__main__":
    er = EntityResolution("Amazon_sample", "Google_sample", "stopwords.txt")
    amazonCols = ["title", "manufacturer"]
    googleCols = ["name", "manufacturer"]
    resultDF = er.jaccardJoin(amazonCols, googleCols, 0.5)

#     result = resultDF.map(lambda row: (row.id1, row.id2)).collect()
#     groundTruth = spark.read.parquet("Amazon_Google_perfectMapping_sample") \
#                           .map(lambda row: (row.idAmazon, row.idGoogle)).collect()
#     print ("(precision, recall, fmeasure) = ", er.evaluate(result, groundTruth))

Before filtering: 256 pairs in total
After Filtering: 145 pairs left


NameError: name 'df' is not defined