In [1]:
import pandas as pd

In [35]:
# entity_resolution.py
import re
import operator
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark import SparkContext


class EntityResolution:
    def __init__(self, dataFile1, dataFile2, stopWordsFile):
        spark = SparkSession.builder.appName("ER example").config("spark.some.config.option", "some-value").getOrCreate()
        sc = spark.sparkContext
        self.f = open(stopWordsFile, "r")
        self.stopWords = set(self.f.read().split("\n"))
        self.stopWordsBC = sc.broadcast(self.stopWords).value
        self.df1 = spark.read.parquet(dataFile1).cache()
        self.df2 = spark.read.parquet(dataFile2).cache()

    def preprocessDF(self, df, cols): 
        """
        Input: $df represents a DataFrame
               $cols represents the list of columns (in $df) that will be concatenated and be tokenized

        Output: Return a new DataFrame that adds the "joinKey" column into the input $df

        Comments: The "joinKey" column is a list of tokens, which is generated as follows:
                 (1) concatenate the $cols in $df; 
                 (2) apply the tokenizer to the concatenated string
        Here is how the tokenizer should work:
                 (1) Use "re.split(r'\W+', string)" to split a string into a set of tokens
                 (2) Convert each token to its lower-case
                 (3) Remove stop words
        """
        df = df.withColumn("joinkeys",func_udf(df.select(cols),self.stopWords))
        df.show()

    def filtering(self, df1, df2):
        """ 
        Input: $df1 and $df2 are two input DataFrames, where each of them 
               has a 'joinKey' column added by the preprocessDF function

        Output: Return a new DataFrame $candDF with four columns: 'id1', 'joinKey1', 'id2', 'joinKey2',
                where 'id1' and 'joinKey1' are from $df1, and 'id2' and 'joinKey2'are from $df2.
                Intuitively, $candDF is the joined result between $df1 and $df2 on the condition that 
                their joinKeys share at least one token. 

        Comments: Since the goal of the "filtering" function is to avoid n^2 pair comparisons, 
                  you are NOT allowed to compute a cartesian join between $df1 and $df2 in the function. 
                  Please come up with a more efficient algorithm (see hints in Lecture 2). 
        """

    def verification(self, candDF, threshold):
        """
             Input: $candDF is the output DataFrame from the 'filtering' function. 
                   $threshold is a float value between (0, 1] 

            Output: Return a new DataFrame $resultDF that represents the ER result. 
                    It has five columns: id1, joinKey1, id2, joinKey2, jaccard 

            Comments: There are two differences between $candDF and $resultDF
                      (1) $resultDF adds a new column, called jaccard, which stores the jaccard similarity 
                          between $joinKey1 and $joinKey2
                      (2) $resultDF removes the rows whose jaccard similarity is smaller than $threshold
        """

    def evaluate(self, result, groundTruth):
        """
        Input: $result is a list of matching pairs identified by the ER algorithm
               $groundTrueth is a list of matching pairs labeld by humans

        Output: Compute precision, recall, and fmeasure of $result based on $groundTruth, and
                return the evaluation result as a triple: (precision, recall, fmeasure)

        """

    def jaccardJoin(self, cols1, cols2, threshold):
        newDF1 = self.preprocessDF(self.df1, cols1)
        newDF2 = self.preprocessDF(self.df2, cols2)
        print ("Before filtering: %d pairs in total" %(self.df1.count()*self.df2.count())) 

        candDF = self.filtering(newDF1, newDF2)
        print ("After Filtering: %d pairs left" %(candDF.count()))

        resultDF = self.verification(candDF, threshold)
        print ("After Verification: %d similar pairs" %(resultDF.count()))

        return resultDF


    def __del__(self):
        self.f.close()


In [36]:
def tokenize(list_elements,stopwrds):
    new_string = " ".join(list_elements)
    tokens = re.split("\rW+",new_string)
    return list(set(tokens)-set(stopwrds))
func_udf = udf(tokenize)

In [37]:
if __name__ == "__main__":
    source_path = "data/amazon-google-sample/"
    er = EntityResolution(source_path+"Amazon_sample", source_path+"Google_sample", source_path+"stopwords.txt")
    amazonCols = ["title", "manufacturer"]
    googleCols = ["name", "manufacturer"]
    resultDF = er.jaccardJoin(amazonCols, googleCols, 0.5)

    result = resultDF.map(lambda row: (row.id1, row.id2)).collect()
    groundTruth = spark.read.parquet("Amazon_Google_perfectMapping_sample") \
                          .map(lambda row: (row.idAmazon, row.idGoogle)).collect()
    print ("(precision, recall, fmeasure) = ", er.evaluate(result, groundTruth))

TypeError: Invalid argument, not a string or column: DataFrame[title: string, manufacturer: string] of type <class 'pyspark.sql.dataframe.DataFrame'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

Exception ignored in: <function EntityResolution.__del__ at 0x000001F8225CE598>
Traceback (most recent call last):
  File "<ipython-input-21-f4d1c0977845>", line 54, in __del__
AttributeError: 'EntityResolution' object has no attribute 'f'
