In [1]:
import findspark
import phonenumbers
from splink import Splink
from splink.profile import column_value_frequencies_chart
import altair as alt
alt.renderers.enable('mimetype')
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, coalesce, lit, udf, when, format_number
from pyspark.sql.types import StringType, DoubleType, StructType, StructField
from uuid import uuid4

In [2]:
spark = SparkSession.builder.getOrCreate()

# Define functions

In [3]:
def translate_state():
    """
    Helper function which converts U.S. state codes into state names.
    """
    # https://gist.github.com/rogerallen/1583593
    us_state_to_abbrev = {
        "Alabama": "AL",
        "Alaska": "AK",
        "Arizona": "AZ",
        "Arkansas": "AR",
        "California": "CA",
        "Colorado": "CO",
        "Connecticut": "CT",
        "Delaware": "DE",
        "Florida": "FL",
        "Georgia": "GA",
        "Hawaii": "HI",
        "Idaho": "ID",
        "Illinois": "IL",
        "Indiana": "IN",
        "Iowa": "IA",
        "Kansas": "KS",
        "Kentucky": "KY",
        "Louisiana": "LA",
        "Maine": "ME",
        "Maryland": "MD",
        "Massachusetts": "MA",
        "Michigan": "MI",
        "Minnesota": "MN",
        "Mississippi": "MS",
        "Missouri": "MO",
        "Montana": "MT",
        "Nebraska": "NE",
        "Nevada": "NV",
        "New Hampshire": "NH",
        "New Jersey": "NJ",
        "New Mexico": "NM",
        "New York": "NY",
        "North Carolina": "NC",
        "North Dakota": "ND",
        "Ohio": "OH",
        "Oklahoma": "OK",
        "Oregon": "OR",
        "Pennsylvania": "PA",
        "Rhode Island": "RI",
        "South Carolina": "SC",
        "South Dakota": "SD",
        "Tennessee": "TN",
        "Texas": "TX",
        "Utah": "UT",
        "Vermont": "VT",
        "Virginia": "VA",
        "Washington": "WA",
        "West Virginia": "WV",
        "Wisconsin": "WI",
        "Wyoming": "WY",
        "District of Columbia": "DC",
        "American Samoa": "AS",
        "Guam": "GU",
        "Northern Mariana Islands": "MP",
        "Puerto Rico": "PR",
        "United States Minor Outlying Islands": "UM",
        "U.S. Virgin Islands": "VI",
    }
    
    us_abbrev_to_state = dict([(abbrev.lower(), state.lower()) for state, abbrev in us_state_to_abbrev.items()])
    
    def _translate(col):
        return us_abbrev_to_state.get(col, col)
    
    return udf(_translate, StringType())

In [4]:
def translate_phone(region='US'):
    """
    Clean the 'phone' column of the DataFrame by converting it to its national number
    """
    def _translate(col):
        try:
            return phonenumbers.parse(col, region).national_number
        except Exception:
            return None
    
    return udf(_translate, StringType())

In [5]:
generate_uuid = udf(lambda: str(uuid4()))

# Load the raw data

In [6]:
df = spark.read.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/crm") \
.option("user", "postgres") \
.option("password", "my-secret-pw") \
.option("driver", "org.postgresql.Driver") \
.option("dbtable", "candidates") \
.load()
df.show()

+--------------------+------------+-----------+-----------+----+--------------+--------------------+-------+-------------+--------------+-----+-------------------+------------+
|                  id|  first_name|middle_name|  last_name| dob|         phone|               email|address|         city|         state|  zip|       time_created|time_deleted|
+--------------------+------------+-----------+-----------+----+--------------+--------------------+-------+-------------+--------------+-----+-------------------+------------+
|00f46a02-1e18-403...|    richelle|         i.|     reczko|null|   213-9258287|richelle_reczko@y...|   null|  los angeles|            ca|90004|2019-12-09 19:50:24|        null|
|0067c2ca-1abd-4e0...|   stephanie|     garcia|  gutierrez|null|          null|stephgarcia5598@g...|   null|         null|          null|95670|2019-12-18 06:44:44|        null|
|007c5479-49f0-4a3...|     eduardo|       null|      plaza|null|  831-225-3298|eduardo_plaza94@y...|   null|       

# Clean the data

In [7]:
df = df.withColumn("state", translate_state()("state")) \
    .withColumn("phone", translate_phone()("phone")) \
    .withColumnRenamed("id", "unique_id") \
    .filter(col("first_name").isNotNull()) \
    .filter(col("time_deleted").isNull())
df.show()

+--------------------+------------+-----------+-----------+----+----------+--------------------+-------+-------------+--------------+-----+-------------------+------------+
|           unique_id|  first_name|middle_name|  last_name| dob|     phone|               email|address|         city|         state|  zip|       time_created|time_deleted|
+--------------------+------------+-----------+-----------+----+----------+--------------------+-------+-------------+--------------+-----+-------------------+------------+
|00f46a02-1e18-403...|    richelle|         i.|     reczko|null|2139258287|richelle_reczko@y...|   null|  los angeles|    california|90004|2019-12-09 19:50:24|        null|
|0067c2ca-1abd-4e0...|   stephanie|     garcia|  gutierrez|null|      null|stephgarcia5598@g...|   null|         null|          null|95670|2019-12-18 06:44:44|        null|
|007c5479-49f0-4a3...|     eduardo|       null|      plaza|null|8312253298|eduardo_plaza94@y...|   null|         null|          null| n

# Define comparison rules

In [8]:
settings = {
    "link_type": "dedupe_only",
    "blocking_rules": [
        "l.state = r.state"
    ],
    "comparison_columns": [
        {
            "col_name": "first_name",
            "num_levels": 2,
        },
        {
            "col_name": "middle_name",
            "num_levels": 2,
        },
        {
            "col_name": "last_name",
            "num_levels": 2,
        },
        {
            "col_name": "dob",
            "num_levels": 2,
        },
#         {
#             "col_name": "phone",
#             "num_levels": 2
#         },
        {
            "col_name": "city",
            "num_levels": 2,
        },
        {
            "col_name": "email",
            "num_levels": 2
        },
        {
            "col_name": "address",
            "num_levels": 2
        },
    ],
    "additional_columns_to_retain": [
        "time_created",
    ]
}

# Run pair-wise comparisons

In [9]:
linker = Splink(settings=settings, spark=spark, df_or_dfs=df.limit(15000))
df_e = linker.get_scored_comparisons()


Custom string comparison functions such as jaro_winkler_sim are not available in Spark
Or you did not pass 'spark' (the SparkSession) into 'Model' 
You can import these functions using the scala-udf-similarity-0.0.9.jar provided with Splink.
You will need to add it by correctly configuring your spark config
For example in Spark 2.4.5

from pyspark.sql import SparkSession, types
from pyspark.context import SparkConf, SparkContext
conf.set('spark.driver.extraClassPath', '/Users/fasih/opt/anaconda3/lib/python3.8/site-packages/splink/jars/scala-udf-similarity-0.0.9.jar') # Not needed in spark 3
conf.set('spark.jars', '/Users/fasih/opt/anaconda3/lib/python3.8/site-packages/splink/jars/scala-udf-similarity-0.0.9.jar')
spark.udf.registerJavaFunction('jaro_winkler_sim','uk.gov.moj.dash.linkage.JaroWinklerSimilarity',types.DoubleType())
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

Alternatively, for Jaro Winkler, you can register a less efficient Python implementation usi

In [10]:
df_e.printSchema()

root
 |-- match_weight: double (nullable = true)
 |-- match_probability: double (nullable = true)
 |-- unique_id_l: string (nullable = true)
 |-- unique_id_r: string (nullable = true)
 |-- first_name_l: string (nullable = true)
 |-- first_name_r: string (nullable = true)
 |-- gamma_first_name: integer (nullable = false)
 |-- middle_name_l: string (nullable = true)
 |-- middle_name_r: string (nullable = true)
 |-- gamma_middle_name: integer (nullable = false)
 |-- last_name_l: string (nullable = true)
 |-- last_name_r: string (nullable = true)
 |-- gamma_last_name: integer (nullable = false)
 |-- dob_l: date (nullable = true)
 |-- dob_r: date (nullable = true)
 |-- gamma_dob: integer (nullable = false)
 |-- city_l: string (nullable = true)
 |-- city_r: string (nullable = true)
 |-- gamma_city: integer (nullable = false)
 |-- email_l: string (nullable = true)
 |-- email_r: string (nullable = true)
 |-- gamma_email: integer (nullable = false)
 |-- address_l: string (nullable = true)
 |-- 

# View results

In [11]:
df_e.filter(
    col("match_probability") >= 0.50
).orderBy(
    col("match_probability").desc()
).select(
    concat(col("first_name_l"), lit(' '), coalesce(col("last_name_l"), lit(''))).alias('candidate_name_l'),
    concat(col("first_name_r"), lit(' '), coalesce(col("last_name_r"), lit(''))).alias('candidate_name_r'),
    col("city_l"),
    col("city_r"),
    col("email_l"),
    col("email_r"),
    format_number(col("match_probability"), 3).alias("p_match")
).show(150, truncate=False)

+-------------------------+------------------------+----------------+----------------+----------------------------------+-----------------------------------+-------+
|candidate_name_l         |candidate_name_r        |city_l          |city_r          |email_l                           |email_r                            |p_match|
+-------------------------+------------------------+----------------+----------------+----------------------------------+-----------------------------------+-------+
|maria arreola sanchez    |samatha nunez arreola   |tipton          |tipton          |mariarreolaaz@gmail.com           |tikaanzgg@gmail.com                |1.000  |
|jakyra johnson           |jakyra johnson          |warner robins   |warner robins   |kayscojo@gmail.com                |kayscojo@gmail.com                 |1.000  |
|prem ananda              |prem ananda             |san diego       |san diego       |premonition1982@gmail.com         |premonition1982@gmail.com          |1.000  |
|chr

# Write the results to the database

In [12]:
df_e.filter(
    col("match_probability") >= 0.60
).select(
    col("unique_id_l").alias("candidate_id_l"),
    col("unique_id_r").alias("candidate_id_r"),
    when(col("match_probability") >= 0.99, when(col("time_created_l") <= col("time_created_r"), col("unique_id_l")).otherwise(col("unique_id_r"))).alias("duplicate_id"),
    format_number(col("match_probability"), 3).cast(DoubleType()).alias("match_probability"),
    when(col("match_probability") >= 0.99, "duplicate").alias("action")
).withColumn("id", generate_uuid()) \
.write.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/crm") \
.option("user", "postgres") \
.option("password", "my-secret-pw") \
.option("driver", "org.postgresql.Driver") \
.option("dbtable", "match_results") \
.mode("overwrite") \
.save()