Develop a PySpark script to clean and preprocess data before performing entity resolution. 
Include steps like tokenization and normalization. 

In [112]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('lab3').getOrCreate()
sc = spark.sparkContext
from pyspark.sql.functions import *
from pyspark.ml.feature import Tokenizer

In [113]:
df1 = spark.read.csv('lab3_1_dataset.csv', header=True, inferSchema=True)
df2 = spark.read.csv('lab3_2_dataset.csv', header=True, inferSchema=True)

In [114]:
df1.show(10)
df2.show(10)

+----+------------------+--------------------+
|  id|              name|             address|
+----+------------------+--------------------+
|1859|        Nkev Koquo|1087 Uxkgk St, Zw...|
|1402|     Awdeltx Flswj|1313 Bysqtepcyb S...|
|3503|     Zvgk Qmnjjfub|4451 Uymmthnzp St...|
|1009|    Suaxud Ybukuxq|4793 Fodikswks St...|
|2737|              NULL|                NULL|
| 276|   Bdiolpnl Iduwpc|3640 Hrfulh St, C...|
|4600|      Mrada Gsxapz|7676 Sngqs St, Ut...|
| 580|      Hiwz Zvsxqmr|1884 Guaxszlv St,...|
|1901|Hogjboj Qnsbertdar|                NULL|
|4861|      Wpk Gfduhflq|7724 Etqegoijrf S...|
+----+------------------+--------------------+
only showing top 10 rows
+----+---------------+--------------------+
|  id|           name|             address|
+----+---------------+--------------------+
|3935|Yurap Kyxusxecr| 7993 Xbai St, Nzfyl|
|2135|           NULL|1410 Awcvaeroiq S...|
|2814|     Uood Btuwf|3255 Qfofnxsik St...|
|2090|  Pdullf Wioqxn|832 Tpjuaikwr St,...|
|3557|   

In [115]:
df1 = df1.withColumn('name',regexp_replace(lower(col('name')), r'[\.\'\"#\@]', ''))
df2 = df2.withColumn('name',regexp_replace(lower(col('name')), r'[\.\'\"#\@]', ''))

In [116]:
df1= df1.dropna()
df2 = df2.dropna()

In [117]:
tokenizer = Tokenizer(outputCol='name_tokens')
tokenizer.setInputCol('name')
df1 = tokenizer.transform(df1)
df2 = tokenizer.transform(df2)

In [118]:
df1.schema

StructType([StructField('id', IntegerType(), True), StructField('name', StringType(), True), StructField('address', StringType(), True), StructField('name_tokens', ArrayType(StringType(), True), True)])

In [119]:
df1.count(), df2.count()

(2796, 2782)

In [120]:
df1 = df1.withColumnRenamed('name', 'name_df1')\
         .withColumnRenamed('address', 'address_df1')\
         .withColumnRenamed('id', 'id_df1')

df2 = df2.withColumnRenamed('name', 'name_df2')\
         .withColumnRenamed('address', 'address_df2')\
         .withColumnRenamed('id' , 'id_df2')

In [121]:
merged = df1.join(df2, on='name_tokens', how='outer')
merged = merged.withColumn('address_merged', coalesce('address_df1', 'address_df2'))
merged = merged.withColumn('id_merged', coalesce('id_df1', 'id_df2'))
merged = merged.drop('name_df1', 'name_df2')
merged = merged.drop('address_df1', 'address_df2')
merged = merged.drop('id_df1', 'id_df2')

In [122]:
add_tokenizer = Tokenizer(inputCol='address_merged', outputCol='address_tokens')
merged = add_tokenizer.transform(merged).drop('address_merged')
merged.show()

+--------------------+---------+--------------------+
|         name_tokens|id_merged|      address_tokens|
+--------------------+---------+--------------------+
|     [aad, ndxqfspc]|     3818|[4396, esapexm, s...|
|       [aair, tnndz]|      941|[3281, dcai, st,,...|
|   [aaj, paphxmfosv]|     3515|[8939, jwzhevs, s...|
| [aajybwt, shqzlimt]|     1860|[4500, xkcygyshzf...|
|    [aar, erjcdifjp]|     2687|[3931, uthytgt, s...|
|   [aaugc, swbgkrdl]|     3599|[7657, xpxnwrhjg,...|
|[aauxxxbv, jalxfc...|     4903|[4008, ozxg, st,,...|
|  [aawv, tdjqumpezi]|     3184|[7643, mhpitlmfdg...|
|[aazngrbn, ilfnaacl]|     3657|[374, hockagql, s...|
|     [abe, kcrzwfho]|     1878|[357, uqjqmhzpu, ...|
|   [abkm, rdsaohawk]|     4478|[4691, ypigqc, st...|
|    [abl, wlktxuwlu]|     3804|[548, nthczioq, s...|
|[abohjww, cmbyeezrm]|     1705|[7521, wqljpcr, s...|
|  [abwuhj, pwludakn]|      652|[7678, gpdz, st,,...|
| [abwwo, rooyarqofu]|     4935|[1111, aaxcqjf, s...|
|[abzklrfz, lyqpwr...|     2

In [123]:
merged_pairs = merged.alias('a').crossJoin(merged.alias('b'))

In [124]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

def jaccard_similarity(list1, list2):
    set1 = set(list1)
    set2 = set(list2)
    intersection = set1.intersection(set2)
    union = set1.union(set2)
    if not union:
        return 0.0
    return float(len(intersection)) / len(union)

jaccard_udf = udf(jaccard_similarity, DoubleType())


In [125]:
merged_pairs_with_jaccard = merged_pairs.withColumn("jaccard_score", jaccard_udf("a.address_tokens", "b.address_tokens"))

In [126]:
merged_pairs_with_jaccard = merged_pairs_with_jaccard.filter((col('jaccard_score') >= 0.5)).sort('jaccard_score')

In [127]:
ground_truth = merged_pairs.filter('a.id_merged == b.id_merged')

In [128]:
tp = merged_pairs_with_jaccard.drop('jaccard_score').intersect(ground_truth)
tp_count = tp.count()


25/08/05 10:03:23 WARN ExtractPythonUDFFromJoinCondition: The join condition:(jaccard_similarity(address_tokens#1665, address_tokens#1704)#1716 >= 0.5) of the join plan contains PythonUDF only, it will be moved out and the join plan will be turned to cross join.
                                                                                

In [129]:
fp = merged_pairs_with_jaccard.drop('jaccard_score').subtract(ground_truth)
fp_count = fp.count()

25/08/05 10:04:04 WARN ExtractPythonUDFFromJoinCondition: The join condition:(jaccard_similarity(address_tokens#1665, address_tokens#1704)#1716 >= 0.5) of the join plan contains PythonUDF only, it will be moved out and the join plan will be turned to cross join.
                                                                                

In [130]:
fn = ground_truth.subtract(merged_pairs_with_jaccard.drop('jaccard_score'))
fn_count = fn.count()

25/08/05 10:04:44 WARN ExtractPythonUDFFromJoinCondition: The join condition:(jaccard_similarity(address_tokens#1895, address_tokens#1915)#1716 >= 0.5) of the join plan contains PythonUDF only, it will be moved out and the join plan will be turned to cross join.
                                                                                

In [132]:
precision = tp_count / (tp_count + fp_count)
recall = tp_count / (tp_count + fn_count)
f1 = 2 * (precision * recall) / (precision + recall)
precision, recall, f1

(0.9865422183633601, 1.0, 0.9932255244755245)