In [1]:
import findspark
findspark.init("C:/spark") 

from pyspark import SparkContext
sc=SparkContext("local","Pyspark new")

In [2]:
from pyspark.sql import SparkSession
spark= SparkSession.builder.getOrCreate()

In [3]:
from pyspark.sql.functions import *

## Q1

In [4]:
df= spark.read.option("header",True).option("InferSchema",True).csv("customer(1).csv")

In [5]:
df.show(5)

+---+------------+-----------+
| id|        name|       city|
+---+------------+-----------+
|  1|   John Doe |   New York|
|  2|     Jon Doe|   New York|
|  3| Johnny Doe |Los Angeles|
|  4| Jane Smith |    Chicago|
|  5| Janet Smith|    Chicago|
+---+------------+-----------+
only showing top 5 rows



In [6]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)



In [7]:
#df.rdd.getNumPartitions()

#### Normalising , cleaning

In [8]:
df_cl1= df.withColumn("name_cleaned", lower(trim(regexp_replace(col("name"),r"[^A-Za-z0-9\s]",""))))

In [9]:
df_cl1=df_cl1.withColumn("city_cleaned", lower(trim(regexp_replace(col("city"),r"[^A-Za-z0-9\s]",""))))

In [10]:
df_cl1=df_cl1.withColumn("name_cleaned", trim(regexp_replace(col("name_cleaned"),r"\s+"," ")))

In [11]:
df_cl1= df_cl1.withColumn("block_key",
                             concat_ws("", 
#                                 col("city_cleaned").substr(1,4),
#                                 col("name_cleaned").substr(1,4)
                              trim(regexp_replace(col("city_cleaned"),r"\s+","")).substr(1,4),
                              trim(regexp_replace(col("name_cleaned"),r"\s+","")).substr(1,4)
                              #trim(regexp_replace(col("city_cleaned"),r"\s+","")).substr(1,4))
                            )
                         )

In [12]:
df_cl1.show(5)

+---+------------+-----------+------------+------------+---------+
| id|        name|       city|name_cleaned|city_cleaned|block_key|
+---+------------+-----------+------------+------------+---------+
|  1|   John Doe |   New York|    john doe|    new york| newyjohn|
|  2|     Jon Doe|   New York|     jon doe|    new york| newyjond|
|  3| Johnny Doe |Los Angeles|  johnny doe| los angeles| losajohn|
|  4| Jane Smith |    Chicago|  jane smith|     chicago| chicjane|
|  5| Janet Smith|    Chicago| janet smith|     chicago| chicjane|
+---+------------+-----------+------------+------------+---------+
only showing top 5 rows



In [13]:

# Tokenize normalized names into arrays of words
df_tok = df_cl1.withColumn("name_tokens", split(col("name_cleaned"), " ")).withColumn("city_tokens", split(col("city_cleaned"), " "))

df_tok.show(5)

+---+------------+-----------+------------+------------+---------+--------------+--------------+
| id|        name|       city|name_cleaned|city_cleaned|block_key|   name_tokens|   city_tokens|
+---+------------+-----------+------------+------------+---------+--------------+--------------+
|  1|   John Doe |   New York|    john doe|    new york| newyjohn|   [john, doe]|   [new, york]|
|  2|     Jon Doe|   New York|     jon doe|    new york| newyjond|    [jon, doe]|   [new, york]|
|  3| Johnny Doe |Los Angeles|  johnny doe| los angeles| losajohn| [johnny, doe]|[los, angeles]|
|  4| Jane Smith |    Chicago|  jane smith|     chicago| chicjane| [jane, smith]|     [chicago]|
|  5| Janet Smith|    Chicago| janet smith|     chicago| chicjane|[janet, smith]|     [chicago]|
+---+------------+-----------+------------+------------+---------+--------------+--------------+
only showing top 5 rows



In [14]:
a= df_tok.alias("a")
b= df_tok.alias("b")

In [15]:
# a.show(5)
# b.show(5)

In [16]:
a.count()

50

In [17]:
candidatePairs= a.join(b,
                      col("a.block_key")==col("b.block_key")).filter(col("a.id")<col("b.id")).withColumn("name_levenshtein", levenshtein(col("a.name_cleaned"), col("b.name_cleaned")))

In [18]:
candidatePairs.count()

80

In [19]:
candidatePairs = candidatePairs.select(
    col("a.id").alias("id_a"),
    col("a.name_cleaned").alias("name_a"),
    col("b.id").alias("id_b"),
    col("b.name_cleaned").alias("name_b"),
    col("a.block_key"),
    col("name_levenshtein"),
    col("a.city_cleaned"),
    col("a.name_tokens").alias("name_tokens_a"),
    col("b.name_tokens").alias("name_tokens_b"),
    col("b.city_tokens").alias("city_tokens_b")
    )

In [20]:
candidatePairs.count()

80

In [21]:
candidatePairs.show(5)

+----+--------+----+-----------+---------+----------------+------------+-------------+---------------+-------------+
|id_a|  name_a|id_b|     name_b|block_key|name_levenshtein|city_cleaned|name_tokens_a|  name_tokens_b|city_tokens_b|
+----+--------+----+-----------+---------+----------------+------------+-------------+---------------+-------------+
|   1|john doe|  41|   john doe| newyjohn|               0|    new york|  [john, doe]|    [john, doe]|  [new, york]|
|   1|john doe|  40|  john a de| newyjohn|               3|    new york|  [john, doe]|  [john, a, de]|  [new, york]|
|   1|john doe|  37|     john d| newyjohn|               2|    new york|  [john, doe]|      [john, d]|  [new, york]|
|   1|john doe|  36|   john doe| newyjohn|               0|    new york|  [john, doe]|    [john, doe]|  [new, york]|
|   1|john doe|  19|john doe jr| newyjohn|               3|    new york|  [john, doe]|[john, doe, jr]|  [new, york]|
+----+--------+----+-----------+---------+----------------+-----

In [22]:
candidatePairs= candidatePairs.withColumn("similarity Name", 
                                         1-( col("name_levenshtein")/greatest(length(col("name_a")),length(col("name_b"))))
                                         )

In [23]:
candidatePairs.show(5)

+----+--------+----+-----------+---------+----------------+------------+-------------+---------------+-------------+------------------+
|id_a|  name_a|id_b|     name_b|block_key|name_levenshtein|city_cleaned|name_tokens_a|  name_tokens_b|city_tokens_b|   similarity Name|
+----+--------+----+-----------+---------+----------------+------------+-------------+---------------+-------------+------------------+
|   1|john doe|  41|   john doe| newyjohn|               0|    new york|  [john, doe]|    [john, doe]|  [new, york]|               1.0|
|   1|john doe|  40|  john a de| newyjohn|               3|    new york|  [john, doe]|  [john, a, de]|  [new, york]|0.6666666666666667|
|   1|john doe|  37|     john d| newyjohn|               2|    new york|  [john, doe]|      [john, d]|  [new, york]|              0.75|
|   1|john doe|  36|   john doe| newyjohn|               0|    new york|  [john, doe]|    [john, doe]|  [new, york]|               1.0|
|   1|john doe|  19|john doe jr| newyjohn|      

In [24]:
candidatePairs.count()

80

In [25]:
candidatePairs.show()

+----+-----------+----+------------+---------+----------------+------------+--------------+---------------+--------------+------------------+
|id_a|     name_a|id_b|      name_b|block_key|name_levenshtein|city_cleaned| name_tokens_a|  name_tokens_b| city_tokens_b|   similarity Name|
+----+-----------+----+------------+---------+----------------+------------+--------------+---------------+--------------+------------------+
|   1|   john doe|  41|    john doe| newyjohn|               0|    new york|   [john, doe]|    [john, doe]|   [new, york]|               1.0|
|   1|   john doe|  40|   john a de| newyjohn|               3|    new york|   [john, doe]|  [john, a, de]|   [new, york]|0.6666666666666667|
|   1|   john doe|  37|      john d| newyjohn|               2|    new york|   [john, doe]|      [john, d]|   [new, york]|              0.75|
|   1|   john doe|  36|    john doe| newyjohn|               0|    new york|   [john, doe]|    [john, doe]|   [new, york]|               1.0|
|   1|

In [26]:
thresh=0.7
candidatePairs= candidatePairs.withColumn("isMatch", when(col("similarity Name")>=thresh, 1).otherwise(0))

In [27]:
candidatePairs.select("name_a","name_b","name_levenshtein","similarity Name", "isMatch").show(5)

+--------+-----------+----------------+------------------+-------+
|  name_a|     name_b|name_levenshtein|   similarity Name|isMatch|
+--------+-----------+----------------+------------------+-------+
|john doe|   john doe|               0|               1.0|      1|
|john doe|  john a de|               3|0.6666666666666667|      0|
|john doe|     john d|               2|              0.75|      1|
|john doe|   john doe|               0|               1.0|      1|
|john doe|john doe jr|               3|0.7272727272727273|      1|
+--------+-----------+----------------+------------------+-------+
only showing top 5 rows



In [28]:
nonmatch= candidatePairs.filter(col("isMatch")==0)
nonmatch.count()

28

In [29]:
candidatePairs.count()-nonmatch.count()

52

In [31]:
linked= candidatePairs.filter(col("isMatch")==1).select("id_a","id_b").collect()
#linked



In [32]:
parent = {}
    
def find(x):
    if parent.get(x, x) != x:
        parent[x] = find(parent[x])
    return parent.get(x, x)

def union(x, y):
    px, py = find(x), find(y)
    if px != py:
        parent[py] = px

for row in linked:
    union(row["id_a"], row["id_b"])

clusters = {}
for node in parent:
    root = find(node)
    clusters.setdefault(root, []).append(node)

print(clusters)

{1: [41, 37, 36, 19, 18, 9, 6, 40], 2: [32, 8], 3: [33, 15, 10], 4: [35, 13, 11, 5], 17: [39], 20: [50, 42, 28], 21: [43], 22: [44], 24: [46], 25: [47], 26: [48], 27: [49]}
