### Record Linkage Example
- deduping productnames in pyspark
- datawrangling
- loading pre-trained binary classifier
- calculating pairwise similarity using binary classifier
- clustering similarity scores with K-means to identify entities i.e. products

In [1]:
import pyspark

from pyspark.sql.types import *
from pyspark.context import SparkContext
from pyspark.sql import Window
from pyspark.sql import SQLContext
from pyspark.sql.functions import col
from pyspark.sql.functions import first
from pyspark.sql.functions  import date_format
from pyspark.sql.functions import lit,StringType

from pyspark.sql.functions import row_number,udf,trim, upper, to_date, substring, length, min, when, format_number, dayofmonth, hour, dayofyear,  month, year, weekofyear, date_format, unix_timestamp
from pyspark import SparkConf
from pyspark.sql.functions import coalesce
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, dayofmonth
from pyspark.sql.functions import UserDefinedFunction
import datetime
from pyspark.sql.functions import year
from pyspark.sql.functions import datediff,coalesce,lag
from pyspark.sql.functions import when, to_date
from pyspark.sql.functions import date_add
from pyspark.sql.functions import UserDefinedFunction

import traceback
import sys
import time
import math
import datetime


In [2]:
test='record_linkage'

conf = pyspark.SparkConf()
spark = SparkSession.builder \
            .appName(test) \
            .config('spark.sql.codegen.wholeStage', False) \
            .getOrCreate()
sc = SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext(sc)





In [3]:
# reading in data

productnames_df=sqlContext.read.csv('./sample_70perc.csv',header=True)



In [4]:
productnames_df.show(truncate=False)

+---------+------------------------------------------------------------------+---+
|source_id|product                                                           |id |
+---------+------------------------------------------------------------------+---+
|13       | Samsung G950 Fiyatı                                              |1  |
|10       | Dsc-w800 201mp 5x Optik 27 Lcd Dijital Kompakt Siyah             |3  |
|5        |$$ Samsung UE-48H6270 LED Televizyon final                        |4  |
|2        |NEW HP Pavilion 11-n000nt Pentium N3540 4GB                       |5  |
|20       |display Sony cazoo Z2 Cep Telefonu                                |6  |
|15       |NEW Tefal GV8930 Pro Express Buhar Kazanlı #                      |8  |
|20       |BOH Sony cazoo Z2 Cep Telefonu final                              |9  |
|17       |$$ Samsung SM-T800 TABS 105 White Tablet                          |10 |
|12       | Sony Xperia Z2 Cep Telefonu *                                    |12 |
|7  

In [5]:
# create blocking_key

source_id_df3=productnames_df.withColumn('blocking_key',lit('A'))

# string indexing some columns

def add_string_index(df,index_cols):


    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer

    indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in index_cols ]


    pipeline = Pipeline(stages=indexers)
    df_r = pipeline.fit(df).transform(df)

    df_n=df_r[[x for x in df_r.columns if x not in index_cols]]


    for x in index_cols:

        df_n=df_n.withColumnRenamed(x+'_index',x)

    return df_n



def token_create(rwdf):

    from pyspark.sql.functions import regexp_extract, split, coalesce

    rwdf=rwdf.withColumn("product_num", regexp_extract("product", "([0-9]+)",1))


    test_df=rwdf

    
    from pyspark.sql.functions import udf, col, lower, regexp_replace, array_remove
    from pyspark.ml.feature import Tokenizer, StopWordsRemover
    from nltk.stem.snowball import SnowballStemmer

    # removing punctuation and converting to lower case
    test_df=test_df.alias('a').select(col('a.*'),
                              regexp_replace(lower(trim(col('a.product'))), "[^a-zA-Z\\s]", "")\
                              .alias('input_product')
                             )

    # Tokenize text
    tokenizer = Tokenizer(inputCol='input_product', outputCol='tokens')
    df_words_token = tokenizer.transform(test_df)
    

    # Remove stop words
    stop_word_list=['$$','new','sample', 'display','boh','final']

    remover = StopWordsRemover(inputCol='tokens', outputCol='tokens_clean',stopWords=stop_word_list)
    df_words_no_stopw = remover.transform(df_words_token).select(col('*'),
                                                                 array_remove(col('tokens_clean'),'').alias('tokens_full')
                                                                )

    df_words_no_stopw = df_words_no_stopw[[x for x in df_words_no_stopw.columns if x not in ['input_product', 'tokens', 'tokens_clean']]]


    df_words_no_stopw = df_words_no_stopw\
    .select("*",*(coalesce(col('tokens_full').getItem(i),lit(""))\
                            .alias('product{}'.format(i+1)) for i in range(2))).drop("tokens_full")

    return df_words_no_stopw



# for testing during development

# source_id_df3=source_id_df3.sample(0.010)

# source_id_df3.cache()


dedupe_source=token_create(source_id_df3)

### ADD NEW FEATURES HERE ####


fldLst=[
        'product', 'blocking_key','id', 'product_num', 'product1', 'product2']

dedupe_source=dedupe_source[fldLst]

### ADD NEW FEATURES SYMMETRICALLY HERE ###

def create_dedupe_pairs(df):
    
    # cartesian join issue hack

    spark.conf.set("spark.sql.crossJoin.enabled", True)


    label_df=df.alias('a')\
    .join(df.alias('b'),
          (trim(col('a.blocking_key'))==trim(col('b.blocking_key'))),

          how='inner'
         )\
    .select(
    col('a.product').alias('product_L'),
    col('a.product_num').alias('product_num_L'),
    col('a.product1').alias('product1_L'),
    col('a.product2').alias('product2_L'),
        col('a.id').alias('id_L'),
    col('b.product').alias('product_R'),
    col('b.product_num').alias('product_num_R'),
    col('b.product1').alias('product1_R'),
    col('b.product2').alias('product2_R'),
        col('b.id').alias('id_R')
)


    match_df=label_df

    # calculate similarity score between feature columns

    ### ADD NEW STRING FEATURES HERE TO CALCULATE EDIT DISTANCE ###
    
    from pyspark.sql.functions import length,levenshtein

    col_list=['product',
          'product1',
          'product2']

    for x in range(len(col_list)):

        if x==0:

            match_df2=match_df.withColumn(col_list[x],coalesce(levenshtein(coalesce(col_list[x]+'_L',lit('N/A')),coalesce(col_list[x]+'_R',lit('N/A')))/length(col_list[x]+'_L'),lit(0.9999)))

        else:

            match_df2=match_df2.withColumn(col_list[x],coalesce(levenshtein(coalesce(col_list[x]+'_L',lit('N/A')),coalesce(col_list[x]+'_R',lit('N/A')))/length(col_list[x]+'_L'),lit(0.9999)))


### ADD NEW BINARY OR CATEGORICAL FEATURES HERE ###

    col_list+['blocking_key']

    match_df2=match_df2[col_list+['product_num_L','product_num_R']+['id_L','id_R']]

    return match_df2, match_df

create_dedupe_pairs(df=dedupe_source)[1].count()

# creating pairwise match probabilities

product_pairs_df=create_dedupe_pairs(df=dedupe_source)[0]

id_pairs_df=create_dedupe_pairs(df=dedupe_source)[1]


### ADD NEW BINARY OR CATEGORICAL FEATURES HERE TO STRING INDEX ###


product_pairs_df=add_string_index(df=product_pairs_df,index_cols=['product_num_L','product_num_R'])


# calculate pairwise match probabilities

### ADD NEW FEATURES TO LIST ####

# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

# same columns model was trained with
features=['product', 'product1', 'product2', 'product_num_L', 'product_num_R']

assembler = VectorAssembler(inputCols=features,outputCol="features")

output2 = assembler.transform(product_pairs_df)


final_data2 = output2.select('features','id_L','id_R')

In [6]:
# loading pre-trained binary classifier to save time
# binary classifier calcs pairwise similarity score

from pyspark.ml.classification import GBTClassifier, GBTClassificationModel
sim_score_model = GBTClassificationModel.load('./pairwise_similarity_model/')

In [7]:
predictions_df = sim_score_model.transform(final_data2)

# adding threshold to adjust recall/precision trade off
# handy if classes are unbalanced (ex. matches very rare raise threshold)

match_threshold=0.0

from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# udf to extract match probability from probability vector in dataframe
float_convert=udf(lambda v:float(v[1]),FloatType())


predictions_df2=predictions_df.withColumn('match_prob',float_convert('probability')).alias('a')\
.select(col('a.*'),
       when(col('match_prob')>match_threshold,col('match_prob')).otherwise(0.0).alias('adj_prob')
       )

In [8]:
# creating pivot table of productname similarity scores for clustering step

# collect id values to speed up pivot step

id_R_values=[x.id_R for x in predictions_df2.select('id_R').distinct().collect()]

pivot_df = predictions_df2.groupBy('id_L').pivot('id_R',id_R_values).agg({'adj_prob':'max'})


# fill null values with 0.0

pivot_df2=pivot_df.na.fill(0.0)

In [9]:

# estimated number of entities
cluster_number=20


from pyspark.ml.clustering import KMeans

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

feat_cols = [x for x in pivot_df2.columns if x!='id_L']

vec_assembler = VectorAssembler(inputCols = feat_cols, outputCol='features')

final_data = vec_assembler.transform(pivot_df2)

kmeans3 = KMeans(featuresCol='features',k=cluster_number)

model_k3 = kmeans3.fit(final_data)

cluster_label_df=model_k3.transform(final_data)


# adding cluster label to product attributes

final_matched_df=dedupe_source.alias('a')\
.join(cluster_label_df.alias('b'),
     (col('a.id')==col('b.id_L')),
      how='inner'
     )\
.select(
col('a.*'),
    col('b.prediction').alias('match_id')
)

In [11]:
# NOTE: match_id identifies deduped product

final_matched_df[['product','id','match_id']]\
.orderBy(col('match_id')).show(truncate=False)

+------------------------------------------------------------------+---+--------+
|product                                                           |id |match_id|
+------------------------------------------------------------------+---+--------+
| SAPPHIRE 4GB R9 290X OC #                                        |37 |0       |
| SAPPHIRE 4GB R9 290X OC                                          |58 |0       |
|BOH SAPPHIRE 4GB R9 290X OC *                                     |39 |0       |
|sample Dsc-w800 201mp 5x Optik 27 Lcd Dijital Kompakt Siyah final |36 |1       |
|display Dsc-w800 201mp 5x Optik 27 Lcd Dijital Kompakt Siyah final|16 |1       |
|$$ Dsc-w800 201mp 5x Optik 27 Lcd Dijital Kompakt Siyah #         |38 |1       |
|sample Dsc-w800 201mp 5x Optik 27 Lcd Dijital Kompakt Siyah       |92 |1       |
| Dsc-w800 201mp 5x Optik 27 Lcd Dijital Kompakt Siyah             |3  |1       |
|BOH Dsc-w800 201mp 5x Optik 27 Lcd Dijital Kompakt Siyah final    |93 |1       |
| Dsc-w800 201mp