# Linking in Spark - Demo in Microsoft Fabric

This notebook contains the [Splink demo for Spark](https://github.com/moj-analytical-services/splink/tree/master/docs/demos/examples/spark/deduplicate_1k_synthetic.ipynb), but adjusted for running in Fabric

Changes from the Github file:
- Spark options are set in a [Fabric Environment](https://learn.microsoft.com/en-us/fabric/data-engineering/create-and-use-environment) rather than in the notebook. Config not needed here is commented out
- The path to the similarity functions UDF jar file is also set in the Fabric Environment under Spark Properties. 
- File paths for the Checkpoint Directory and the model settings need to be adjusted so that they refer to valid locations in your lakehouse

For this demo to work, you need to:
- Add Splink as a Public Library from PyPl
- Upload the [similarity UDF](https://github.com/moj-analytical-services/splink/blob/master/splink/files/spark_jars/scala-udf-similarity-0.1.1_spark3.x.jar) jar into a Lakehouse  
- Add a "spark.jars" property that points to the jar file using an ABFS path (e.g. abfss://00000000-0000-0000-0000-000000000000@onelake.dfs.fabric.microsoft.com/00000000-0000-0000-0000-000000000000/Files/scala-udf-similarity-0.1.1_spark3.x.jar)

In [None]:
# from splink.spark.jar_location import similarity_jar_location

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import types
from notebookutils import mssparkutils

conf = SparkConf()
# This parallelism setting is only suitable for a small toy example
# conf.set("spark.driver.memory", "12g")
# conf.set("spark.default.parallelism", "16")

# Add custom similarity functions, which are bundled with Splink
# documented here: https://github.com/moj-analytical-services/splink_scalaudfs
# path = similarity_jar_location()
# conf.set("spark.jars", path)

sc = SparkContext.getOrCreate(conf=conf)

spark = SparkSession(sc)

mssparkutils.fs.mkdirs("Files/tmp_checkpoints")
spark.sparkContext.setCheckpointDir("Files/tmp_checkpoints")

In [None]:
from splink.datasets import splink_datasets
pandas_df = splink_datasets.fake_1000

df = spark.createDataFrame(pandas_df)

In [None]:
df.head(5)

In [None]:
import splink.spark.comparison_library as cl
import splink.spark.comparison_template_library as ctl
from splink.spark.blocking_rule_library import block_on

settings = {
    "link_type": "dedupe_only",
    "comparisons": [
        ctl.name_comparison("first_name"),
        ctl.name_comparison("surname"),
        ctl.date_comparison("dob", cast_strings_to_date=True),
        cl.exact_match("city", term_frequency_adjustments=True),
        ctl.email_comparison("email", include_username_fuzzy_level=False),
    ],
    "blocking_rules_to_generate_predictions": [
        block_on("first_name"),
        "l.surname = r.surname",  # alternatively, you can write BRs in their SQL form
    ],
    "retain_matching_columns": True,
    "retain_intermediate_calculation_columns": True,
    "em_convergence": 0.01
}

In [None]:
from splink.spark.linker import SparkLinker
linker = SparkLinker(df, settings, spark=spark)
deterministic_rules = [
    "l.first_name = r.first_name and levenshtein(r.dob, l.dob) <= 1",
    "l.surname = r.surname and levenshtein(r.dob, l.dob) <= 1",
    "l.first_name = r.first_name and levenshtein(r.surname, l.surname) <= 2",
    "l.email = r.email"
]

linker.estimate_probability_two_random_records_match(deterministic_rules, recall=0.7)

In [None]:
linker.estimate_u_using_random_sampling(max_pairs=5e5)

In [None]:
training_blocking_rule = "l.first_name = r.first_name and l.surname = r.surname"
training_session_fname_sname = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)

training_blocking_rule = "l.dob = r.dob"
training_session_dob = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)

In [None]:
results = linker.predict(threshold_match_probability=0.9)

In [None]:
results.as_pandas_dataframe(limit=5)

In [None]:
linker.missingness_chart()

In [None]:
linker.profile_columns(top_n=10, bottom_n=5)

In [None]:
linker.match_weights_chart()

In [None]:
linker.m_u_parameters_chart()

In [None]:
settings = linker.save_model_to_json("/lakehouse/default/Files/demo_settings/saved_model_1.json", overwrite=True)