# Chapter 7 - Scaling Up On Google Cloud

In [None]:
#%pip uninstall splink
#%pip install git+https://github.com/moj-analytical-services/splink.git@29ce8fc3e63c95d036dfee623442efd2218e023c

In [None]:
import pandas as pd
import numpy as np
import json

## Step 1 and 2 - Data Acquisition, Standardization

In [None]:
df_m = pd.read_csv('gs://<your bucket>/handsonentityresolution/mari_clean.csv')
df_c = pd.read_csv('gs://<your bucket>/handsonentityresolution/basic_clean.csv')

## Step 3 - Record Blocking and Attribute Comparison

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import types

conf = SparkConf()

# This parallelism setting for cluster of 2 n2-standard-4
conf.set("spark.default.parallelism", "240")
conf.set("spark.sql.shuffle.partitions", "240")

sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)
spark.sparkContext.setCheckpointDir("gs://<your bucket>/handsonentityresolution/")

# Register the jaro winkler custom udf
spark.udf.registerJavaFunction(
    "jaro_winkler_similarity", "uk.gov.moj.dash.linkage.JaroWinklerSimilarity", types.DoubleType()
)

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([StructField("Postcode", StringType()), StructField("CompanyName", StringType()),StructField("unique_id", IntegerType())])

In [None]:
df_c = df_c[['Postcode','CompanyName','unique_id']]

In [None]:
dfs_m = spark.createDataFrame(df_m, schema)
dfs_c = spark.createDataFrame(df_c, schema)

In [None]:
import splink.spark.comparison_library as cl

settings = {
    "link_type": "link_only",
    "blocking_rules_to_generate_predictions": [
        "l.Postcode = r.Postcode",
        "l.CompanyName = r.CompanyName",
    ],
    "comparisons": [
        cl.jaro_winkler_at_thresholds("CompanyName",[0.9,0.8]),
    ],
    "retain_intermediate_calculation_columns" : True,
    "retain_matching_columns" : True,   
}

In [None]:
from splink.spark.linker import SparkLinker
linker = SparkLinker([dfs_m, dfs_c], settings, input_table_aliases=["dfs_m", "dfs_c"])

In [None]:
linker.estimate_u_using_random_sampling(max_pairs=5e7)

In [None]:
linker.estimate_parameters_using_expectation_maximisation("l.Postcode = r.Postcode")

In [None]:
#linker.save_model_to_json("<your_path>/Chapter7_Splink_Settings.json", overwrite=True)
linker.load_model("<your_path>/Chapter7_Splink_Settings.json")

In [None]:
linker.match_weights_chart()

In [None]:
linker.m_u_parameters_chart()

# Step 4 - Match Classification

In [None]:
# Calculate predictions

df_pred = linker.predict(threshold_match_probability=0.1).as_pandas_dataframe()
len(df_pred)

In [None]:
len(pd.unique(df_pred['CompanyName_r']))

In [None]:
postname = df_pred[(df_pred['CompanyName_l']==df_pred['CompanyName_r']) & (df_pred['Postcode_l']==df_pred['Postcode_r'])]
len(postname)

In [None]:
len(pd.unique(postname['CompanyName_r']))

In [None]:
notname = df_pred[df_pred['CompanyName_l']!=df_pred['CompanyName_r']]
len(notname)

In [None]:
len(pd.unique(notname['CompanyName_r']))

In [None]:
notpost = df_pred[df_pred['Postcode_l']!=df_pred['Postcode_r']]
len(notpost)

In [None]:
len(pd.unique(notpost['CompanyName_r']))

In [None]:
results = df_m.merge(df_pred,left_on=['unique_id'], right_on=['unique_id_r'],how='left',
          suffixes=('_m', '_p'))
results[results['match_weight'].isnull()]