# Spark implementation of Entity Resolution System

This notebook contains the code for productionalizing our system in Spark. The code allows to distribute the preprocessing stage and the prediction stage. For training one can train the model locally and load it into this notebook. 


In [1]:
import unicodedata
import nltk
import gensim
import numpy as np
import jellyfish as jf
import scipy.spatial.distance as dist
import ast
import pickle

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, col, lit, udf, struct, concat_ws, collect_list, explode, split
from pyspark.sql.types import IntegerType, StringType, ArrayType, FloatType
import pyspark.sql.functions as F
from pyspark.sql import Window

from modules.spark.process_text import Process_text


In [4]:
# Load the data
spark = SparkSession(sc)

data1 = "/data/input.csv"
data2 = "/data/reference.csv"

path = "worddict/GoogleNews-vectors-negative300.bin"
df_1 = spark.read.csv(data1, header=True)
df_2 = spark.read.csv(data2, header=True)
test = "/data/test.csv"


In [5]:
def word2vec(path, words):
    """ creates a word to vec dictionary only on the words present in the corpus
    """
    word_dict = gensim.models.KeyedVectors.load_word2vec_format(path, binary=True)
    vectors = {}
    for w in words: 
        if w not in word_dict:
            continue
        else:
            vectors[w] = word_dict[w]
    return vectors

def clean_text(x):
    text_processor = Process_text()
    processed_sentence = nltk.word_tokenize(unicode(x))
    processed_sentence = text_processor.remove_non_ascii(processed_sentence)
    processed_sentence = text_processor.to_lowercase(processed_sentence)
    processed_sentence = text_processor.remove_punctuation(processed_sentence)
    processed_sentence = text_processor.remove_nan(processed_sentence)
    processed_sentence = text_processor.remove_stopwords(processed_sentence)
    if processed_sentence: 
        return processed_sentence
    else:
        return ['None']

In [6]:
# divide columns: define which columns belong into which category
special_columns = ['addressStreet', 'name', 'addressCity', 'addressZip']
word_columns = ['name', 'addressStreet', 'addressCity', 'addressState']
numeric_columns = ['addressZip']

# get the unique words in the dataset
df1 = df_1.withColumn('wordemb', concat_ws(' ', *word_columns))
df2 = df_2.withColumn('wordemb', concat_ws(' ', *word_columns))

In [7]:
#get unique words
words1 = df1.withColumn('word', explode(split(col('wordemb'), ' '))).select('word').distinct().rdd.map(lambda r: r[0]).collect()
words2 = df2.withColumn('word', explode(split(col('wordemb'), ' '))).select('word').distinct().rdd.map(lambda r: r[0]).collect()

#reduce the lookup dictionary to the words relevant in the text
wo = []
for i in words1 + words2: 
    wo.append(clean_text(i)[0])
word_dict = word2vec(path, wo)

In [8]:
def get_word2vec(sentence, word_dict=word_dict, word_dict_size=300):
    l = np.zeros(word_dict_size)
    i = 0
    for w in sentence:
        if w not in word_dict.keys():
            continue
        else:
            l += word_dict[w]
            i += 1

    if i == 0: 
        i = 1
    return l / i

def pptext(x):    
    processed_sentence = clean_text(x)
    return get_word2vec(processed_sentence).tolist()
    
@udf('string')
def geo_code_address(x, api_key='api_key'):
    if not(api_key):
        print("Geocoding Failed! Please provide an API key")
        return None, None, None
    
    if type(x) == list or type(x) == np.ndarray or type(x) == pd.core.series.Series:
        address = ', '.join(x)
    else:
        address = x
    api = GoogleV3(api_key=api_key)
    loc = api.geocode(address)
    if loc:
        return loc.address, loc.latitude, loc.longitude
    else:
        return str(None, None, None)

def levenshtein(a,b):
    tmp = [levenshtein_distance(x, y) for i, x in enumerate(a) for j, y in enumerate(b) if i == j]
    return np.asarray(tmp)


@udf('string')
def vector_similarity(x):
    return dist.cdist([x[0]],[x[1]], 'cosine').tolist()[0][0]
        
    

@udf('string')
def numeric_similarity(x):
    return float(np.exp(-2 * abs(float(x[0]) - float(x[1])) / (float(x[0]) + float(x[1]) + 1e-5)))


@udf('string')
def string_similarity(x):
    return jf.levenshtein_distance(unicode(x[0]),unicode(x[1]))

@udf('string')
def phone_number(x, broadcast1):
    # apply word embeddings to the line 
    return y

@udf('string')
def letter_count(x):
    processed_sentence = nltk.word_tokenize(sentence)
    return len(process_sentence)

#preprocess_text = udf(lambda x: pptext(x), StringType())

@udf('string')
def preprocess_text(x,word_dict=word_dict):
    y = pptext(x,word_dict)
    return y

prep = udf(pptext, ArrayType(FloatType()))

# Preprocessing

In [9]:
# Get the word embeddings 
for i in word_columns:
    df1 = df1.withColumn('wordemb_'+str(i), prep(col(i)))
    df2 = df2.withColumn('wordemb_'+str(i), prep(col(i)))

# Join the Columns

In [10]:
df2 = df2.selectExpr(*["{} as {}_2".format(i,i) for i in df2.columns])

In [11]:
dfx = df1.crossJoin(df2)

In [12]:
dfx.printSchema()

root
 |-- serial: string (nullable = true)
 |-- name: string (nullable = true)
 |-- addressStreet: string (nullable = true)
 |-- addressCity: string (nullable = true)
 |-- addressZip: string (nullable = true)
 |-- addressState: string (nullable = true)
 |-- wordemb: string (nullable = false)
 |-- wordemb_name: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- wordemb_addressStreet: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- wordemb_addressCity: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- wordemb_addressState: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- serial_2: string (nullable = true)
 |-- name_2: string (nullable = true)
 |-- addressStreet_2: string (nullable = true)
 |-- addressCity_2: string (nullable = true)
 |-- addressZip_2: string (nullable = true)
 |-- addressState_2: string (nullable = true)
 |-- wordemb_2: string (nullable = false)
 |-- wordemb_name_2: 

In [13]:
dfx.show(2)

+------+-----------------+--------------------+-----------+----------+------------+--------------------+--------------------+---------------------+--------------------+--------------------+--------+-------------------+---------------+-------------+------------+--------------+--------------------+--------------------+-----------------------+---------------------+----------------------+
|serial|             name|       addressStreet|addressCity|addressZip|addressState|             wordemb|        wordemb_name|wordemb_addressStreet| wordemb_addressCity|wordemb_addressState|serial_2|             name_2|addressStreet_2|addressCity_2|addressZip_2|addressState_2|           wordemb_2|      wordemb_name_2|wordemb_addressStreet_2|wordemb_addressCity_2|wordemb_addressState_2|
+------+-----------------+--------------------+-----------+----------+------------+--------------------+--------------------+---------------------+--------------------+--------------------+--------+-------------------+------

# Create the Similarity Matrix

In [14]:
i = 0
colnames = ['serial','serial_2']
for s in word_columns:
    col_name = 'wordemb'+str(i)
    dfx = dfx.withColumn(col_name, vector_similarity(struct(*['wordemb_{}'.format(s), 'wordemb_{}_2'.format(s)])))
    colnames.append(col_name)
    i += 1

for s in special_columns:
    col_name = 'stringsim_'+str(i)
    dfx = dfx.withColumn(col_name, string_similarity(struct(*['{}'.format(s), '{}_2'.format(s)])))
    colnames.append(col_name)
    i += 1

for s in numeric_columns:
    col_name = 'numericsim_'+str(i)
    dfx = dfx.withColumn(col_name, numeric_similarity(struct(*['{}'.format(s), '{}_2'.format(s)])))
    colnames.append(col_name)
    i += 1

In [15]:
dfx.select(colnames).show(10)

+------+--------+------------------+------------------+-----------------+--------------------+-----------+-----------+-----------+-----------+-------------------+
|serial|serial_2|          wordemb0|          wordemb1|         wordemb2|            wordemb3|stringsim_4|stringsim_5|stringsim_6|stringsim_7|       numericsim_8|
+------+--------+------------------+------------------+-----------------+--------------------+-----------+-----------+-----------+-----------+-------------------+
|   201|   15928|0.9361319069348657|0.6553759142709283|0.623078322225926|  0.5531163635847764|         15|         18|         10|          5|0.21074992522577732|
|   619|   15928|0.9354736562021811|0.6997708710444316|0.623078322225926|  0.5531163635847764|         14|         17|         10|          5|0.21074992522577732|
|   763|   15928|0.9624305454289781|0.7266836075336558|              0.0|1.110223024625156...|         14|         17|          0|          0|                1.0|
|  1054|   15928|0.839

# Parellelize the Machine Learning Code

In [28]:
import pickle
from sklearn.ensemble import RandomForestClassifier

# load the ML model from sklearn

# To load it, all packages need to be the following specifications: 
# numpy==1.13.3 sklearn==0.19.1 scipy==0.19.1


In [29]:
rf_random = pickle.load(open('rf_model.sav', 'rb'))

In [67]:
#get a dummy model 

#x_train = [[15,10,5,0.45756],[13,0,0,0.40601]]
#y_train = [0,1]

#rf_random = RandomForestClassifier(random_state=40)
#rf_fit = rf_random.fit(x_train,y_train)
#y_pred_prob_rf = rf_random.predict_proba(np.array([[4,3,2,5],[5,6,4,3]]))

In [69]:
# write predict function as udf
@udf('string')
def make_prediction(x, model=rf_random):
    proba = rf_random.predict_proba([x])
    return proba.tolist()[0][0]
    

In [57]:
# apply udf do the dataframe
dfx = dfx.select(colnames).withColumn('proba', make_prediction(struct(*colnames[2:])))

In [58]:
dfx.select(show()

+------+--------+----+----+----+-------------------+-----+
|serial|serial_2|sim0|sim1|sim2|               sim3|proba|
+------+--------+----+----+----+-------------------+-----+
|   201|   15928|  15|  10|   5| 0.4575667877711177|  0.5|
|   619|   15928|  14|  10|   5| 0.3667975146235235|  0.5|
|   763|   15928|  14|   0|   0| 0.2946390329558247|  0.5|
|  1054|   15928|  15|  10|   5| 0.4244207038629879|  0.5|
|  1360|   15928|  17|  10|   5|0.39429010305496726|  0.5|
|  1701|   15928|  13|   0|   0|0.45274087705598687|  0.5|
|  1766|   15928|  16|  10|   5|0.42584238257035933|  0.5|
|  1837|   15928|  15|  10|   5| 0.5027467177081633|  0.5|
|  1843|   15928|  13|   0|   0| 0.4060131115143911|  0.5|
|  2007|   15928|  14|  10|   4| 0.5019455298693172|  0.5|
|  2096|   15928|  15|  10|   4| 0.4940773373093259|  0.5|
|  2317|   15928|  16|  10|   5| 0.4159686918959705|  0.5|
|  2407|   15928|  11|   0|   0|0.27789604104707744|  0.5|
|  2442|   15928|  15|  10|   4| 0.5325185882400332|  0.

In [71]:
# select the highest value per predict (group by, select max)
w = Window.partitionBy('serial_2')
dfx.withColumn('max', F.max('proba').over(w))\
    .where(col('proba') == col('max'))\
    .select(['serial','serial_2']).collect()

+------+--------+----+----+----+-------------------+-----+---+
|serial|serial_2|sim0|sim1|sim2|               sim3|proba|max|
+------+--------+----+----+----+-------------------+-----+---+
|   201|  512983|  18|  10|   5| 0.3929147390570289|  0.5|0.5|
|   619|  512983|   9|  10|   5| 0.2781830373639601|  0.5|0.5|
|   763|  512983|  12|   0|   0|0.19465777771881887|  0.5|0.5|
|  1054|  512983|  15|  10|   5| 0.3216807467773285|  0.5|0.5|
|  1360|  512983|  12|  10|   5|0.28683031486913846|  0.5|0.5|
+------+--------+----+----+----+-------------------+-----+---+
only showing top 5 rows



In [None]:
#get the matching indices
dfx.select(['serial','serial_2'])