# Entity Resolution Application using PySpark 

<p style="text-align: center;"> 210962204
Kumud 
</p> 

In [4]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover

spark = SparkSession.builder \
    .config("spark.executor.memory", "3g") \
    .config("spark.driver.cores", "4") \
    .getOrCreate()
!ls
df = (
    spark.read.csv('donation/block*.csv', header=True, inferSchema=True)
)

 donation   'WEEK 1.ipynb'  'Week 2.ipynb'
 week1.csv   week2data.csv  'Week 3.ipynb'


In [5]:
from pyspark.sql.functions import max, avg, min, count, when
df = df.replace({'?': None})
df = df.drop(df.id_1).drop(df.id_2)
df.show()

+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
|     cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
|0.833333333333333|        null|         1.0|        null|      1|     1|     1|     1|      0|    true|
|                1|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|                1|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|                1|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|                1|        null|         1.0|           1|      1|     1|     1|     1|      1|    true|
|                1|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|                1|        null|         1.0|        nu

In [6]:
#Develop a PySpark script to clean and preprocess data before performing entity resolution.Include steps like tokenization and normalization.
#Cleaning data: drop null values, retain columns with high amount of data, normalize integer values
df = df.drop('id_1')
df = df.drop('id_2')
summary = df.describe()
#summary of columns 
summary.select("summary", "cmp_fname_c1", "cmp_fname_c2").show()


+-------+------------------+-------------------+
|summary|      cmp_fname_c1|       cmp_fname_c2|
+-------+------------------+-------------------+
|  count|           5748125|             103698|
|   mean|  0.71290247044295| 0.9000176718903214|
| stddev|0.3887583596162793|0.27131761057823345|
|    min|                 0|                  0|
|    max|                 1|                  1|
+-------+------------------+-------------------+



In [9]:
#match and miss dataframes based on is_match column 
matches = df.where("is_match = true")
matches_summary = matches.describe()

misses = df.where("is_match = false")
misses_summary = misses.describe()

matches_summary.show()
misses_summary.show()

+-------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+--------------------+-------------------+-------------------+
|summary|       cmp_fname_c1|       cmp_fname_c2|       cmp_lname_c1|       cmp_lname_c2|            cmp_sex|              cmp_bd|              cmp_bm|             cmp_by|            cmp_plz|
+-------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+--------------------+-------------------+-------------------+
|  count|              20922|               1333|              20931|                475|              20931|               20925|               20925|              20925|              20902|
|   mean| 0.9973163859635041| 0.9898900320318176| 0.9970152595958817| 0.9693701678438521|  0.987291577086618|  0.9970848267622461|  0.9979450418160095| 0.9961290322580645| 0.9584250310975027|
| stddev|0.03650667584833679|0.082519737

In [10]:
#obtain modified summaries
from pyspark.sql.types import DoubleType

def modify_sum(summary):
    #change names using pandas functions (transpose to make it visible)
    summary_p = summary.toPandas()
    summary_p = summary_p.set_index('summary').transpose().reset_index()
    summary_p = summary_p.rename(columns = {'index': 'field'})
    summary_p = summary_p.rename_axis(None, axis = 1)
    print(summary_p)
    #all columns are made into double type
    summary_ss = ss.createDataFrame(summary_p)
    for c in summary_ss.columns:
        if c == 'field': 
            continue
        summary_ss = summary_ss.withColumn(c, summary_ss[c].cast(DoubleType()))
    summary_ss.printSchema()
    return summary_ss

In [11]:
match_summary_ss = modify_sum(matches_summary)
miss_summary_ss = modify_sum(misses_summary)

          field  count                mean                stddev  min  max
0  cmp_fname_c1  20922  0.9973163859635041   0.03650667584833679    0    1
1  cmp_fname_c2   1333  0.9898900320318176   0.08251973727615237    0    1
2  cmp_lname_c1  20931  0.9970152595958817   0.04311880753394513  0.0  1.0
3  cmp_lname_c2    475  0.9693701678438521   0.15345280740388917    0    1
4       cmp_sex  20931   0.987291577086618   0.11201570591216435    0    1
5        cmp_bd  20925  0.9970848267622461  0.053914876598079815    0    1
6        cmp_bm  20925  0.9979450418160095  0.045286127452170664    0    1
7        cmp_by  20925  0.9961290322580645   0.06209804856731054    0    1
8       cmp_plz  20902  0.9584250310975027   0.19962063345931916    0    1
root
 |-- field: string (nullable = true)
 |-- count: double (nullable = true)
 |-- mean: double (nullable = true)
 |-- stddev: double (nullable = true)
 |-- min: double (nullable = true)
 |-- max: double (nullable = true)

          field    count  

In [12]:
#obtain top 5 columns with max count in both match and misses? 
match_summary_ss.sort("count", ascending = False).show()
miss_summary_ss.sort("count", ascending = False).show()

#based on the counts, choose top columns for similarity evaluation 
top_cols = ["cmp_lname_c1", "cmp_sex", "cmp_by", "cmp_bd", "cmp_bm"]

+------------+-------+------------------+--------------------+---+---+
|       field|  count|              mean|              stddev|min|max|
+------------+-------+------------------+--------------------+---+---+
|     cmp_sex|20931.0| 0.987291577086618| 0.11201570591216435|0.0|1.0|
|cmp_lname_c1|20931.0|0.9970152595958817| 0.04311880753394513|0.0|1.0|
|      cmp_bd|20925.0|0.9970848267622461|0.053914876598079815|0.0|1.0|
|      cmp_by|20925.0|0.9961290322580645| 0.06209804856731054|0.0|1.0|
|      cmp_bm|20925.0|0.9979450418160095|0.045286127452170664|0.0|1.0|
|cmp_fname_c1|20922.0|0.9973163859635041| 0.03650667584833679|0.0|1.0|
|     cmp_plz|20902.0|0.9584250310975027| 0.19962063345931916|0.0|1.0|
|cmp_fname_c2| 1333.0|0.9898900320318176| 0.08251973727615237|0.0|1.0|
|cmp_lname_c2|  475.0|0.9693701678438521| 0.15345280740388917|0.0|1.0|
+------------+-------+------------------+--------------------+---+---+

+------------+---------+--------------------+--------------------+---+---+
|

In [13]:
#Implement a PySpark program that computes similarity scores between records using a chosen similarity metric.
from pyspark.sql.functions import expr

sum_exp = "+". join(top_cols)
df = df.fillna(0, subset = top_cols)
df_scored = df.withColumn('score', expr(sum_exp)).select('score', 'is_match')
df_scored = df_scored.fillna(0)
df_scored.show()
#df_check = df_scored.groupBy('score').count()
#df_check.show()

+-----+--------+
|score|is_match|
+-----+--------+
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  4.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
+-----+--------+
only showing top 20 rows



In [14]:
def crosstabs(df_cop, num1): 
    return df_cop.selectExpr(f"score >= {num1} as above", "is_match").groupBy("above").pivot("is_match", ("true", "false")).count()


In [15]:
a = crosstabs(df_scored, 4.0)
a.show()

row_list = a.collect() 
precision = (row_list[0].__getitem__('true'))/(row_list[0].__getitem__('true') + row_list[0].__getitem__('false'))
recall = (row_list[0].__getitem__('false'))/(row_list[0].__getitem__('true') + row_list[1].__getitem__('false'))
print("Precision: ", precision)
print("False positive: ", recall)
print("F1 score", (precision * recall)/(precision + recall))



+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20892| 160712|
|false|   39|5567489|
+-----+-----+-------+

Precision:  0.11504151890927512
False positive:  0.028758239640425374
F1 score 0.023006934105859467
