## End to End Workflow

## Step 1:  Imports and setup

The following is just boilerplate code that sets up the Spark session and sets some other non-essential configuration options

In [1]:
from pyspark.context import SparkContext, SparkConf
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import StructType
import pyspark.sql.functions as f
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
import os

In [2]:
#conf=SparkConf()

# Load in a jar that provides extended string comparison functions such as Jaro Winkler.
# Splink
#     conf.set('spark.driver.extraClassPath', 'jars/scala-udf-similarity-0.0.6.jar,jars/graphframes-0.6.0-spark2.3-s_2.11.jar')
#     conf.set('spark.jars', 'jars/scala-udf-similarity-0.0.6.jar,jars/graphframes-0.6.0-spark2.3-s_2.11.jar')
#conf.set('spark.driver.extraClassPath', 'jars/scala-udf-similarity-0.0.6.jar')
#conf.set('spark.jars', 'jars/scala-udf-similarity-0.0.6.jar')
#conf.set('spark.jars.packages', 'graphframes:graphframes:0.6.0-spark2.3-s_2.11')

#sc = SparkContext.getOrCreate(conf=conf)
#sc.setCheckpointDir("temp_graphframes/")


spark = SparkSession\
    .builder\
    .appName("Entity Resolution with Lineage")\
    .config("spark.hadoop.fs.s3a.s3guard.ddb.region","us-east-1")\
    .config("spark.yarn.access.hadoopFileSystems", os.environ['STORAGE'])\
    .config("spark.driver.extraClassPath", "jars/scala-udf-similarity-0.0.6.jar")\
    .config("spark.jars", "jars/scala-udf-similarity-0.0.6.jar")\
    .getOrCreate()

# Register UDFs
from pyspark.sql import types
spark.udf.registerJavaFunction('jaro_winkler_sim', 'uk.gov.moj.dash.linkage.JaroWinklerSimilarity', types.DoubleType())
spark.udf.registerJavaFunction('Dmetaphone', 'uk.gov.moj.dash.linkage.DoubleMetaphone', types.StringType())

In [3]:
spark

In [4]:
import pandas as pd 
pd.options.display.max_columns = 500

In [5]:
import logging 
logging.basicConfig()  # Means logs will print in Jupyter Lab

# Set to DEBUG if you want splink to log the SQL statements it's executing under the hood
logging.getLogger("splink").setLevel(logging.INFO)

## Step 2: Read in the data

The `l` and `r` stand for 'left' and 'right.  It doesn't matter which of the two datasets you choose as the left, performance and results will be the same.

⚠️ Note that `splink` makes the following assumptions about your data:

-  There is a field containing a unique record identifier in each dataset
-  The two datasets being linked have common column names - e.g. date of birth is represented in both datasets in a field of the same name.   In many cases, this means that the user needs to rename columns prior to using `splink`


##### READING FROM PHOENIX INTO A SPARK DF

In [6]:
class Db:
    def __init__(self):
        opts = {}
        opts['authentication'] = 'BASIC'
        opts['avatica_user'] = os.environ["WORKLOAD_USER"]
        opts['avatica_password'] = os.environ["WORKLOAD_PASSWORD"]
        database_url = os.environ["OPDB_ENDPOINT_AWS2"]
        self.TABLENAME = "test_table_paul"
        self.conn = phoenixdb.connect(database_url, autocommit=True,**opts)
        self.curs = self.conn.cursor()
        
    def get_data(self):

        query = f"SELECT * FROM CML_WORKSHOP_TABLE_RIGHT"

        model.curs.execute(query)
        rows = model.curs.fetchall()

        return rows

In [7]:
import logging
logging.basicConfig( level=logging.DEBUG)

import os
import phoenixdb
model = Db()

In [8]:
phoenix_df = model.get_data()

In [9]:
schema = StructType([StructField('unique_id', StringType(), True),
                     StructField('first_name', StringType(), True), 
                     StructField('surname', StringType(), True), 
                     StructField('dob', StringType(), True), 
                     StructField('city', StringType(), True), 
                     StructField('email', StringType(), True), 
                     StructField('group', StringType(), True)])

In [10]:
right_df = spark.createDataFrame(phoenix_df, schema=schema)

In [11]:
right_df = right_df.withColumn("source_dataset", f.lit("df_r"))

##### READING FROM HIVE INTO A SPARK DF

In [12]:
left_df = spark.sql("SELECT * FROM default.mytable")

In [13]:
left_df = right_df.withColumn("source_dataset", f.lit("df_l"))

In [14]:
left_df.show(5)
right_df.show(5)

+---------+----------+-------+----------+--------------+--------------------+-----+--------------+
|unique_id|first_name|surname|       dob|          city|               email|group|source_dataset|
+---------+----------+-------+----------+--------------+--------------------+-----+--------------+
|        0|    Julia |   None|2015-10-29|        London| hannah88@powers.com|    0|          df_l|
|      105|    Harry |  Tomas|2011-07-30|        Belast|sandra26@anderson...|   21|          df_l|
|      115|      None|   Ells|2013-01-20|Stoke-on-Trent|wmcdaniel@nelson.net|   22|          df_l|
|      122| Isabella | Wallca|2000-03-24|        London|hilltheresa@pears...|   23|          df_l|
|      128|   Edward |   wLis|2005-04-09|          None|whitakernichole@b...|   24|          df_l|
+---------+----------+-------+----------+--------------+--------------------+-----+--------------+
only showing top 5 rows

+---------+----------+-------+----------+--------------+--------------------+-----+-

## Step 3:  Configure splink using the `settings` object

Most of `splink` configuration options are stored in a settings dictionary.  This dictionary allows significant customisation, and can therefore get quite complex.  

💥 We provide an tool for helping to author valid settings dictionaries, which includes tooltips and autocomplete, which you can find [here](http://robinlinacre.com/splink_settings_editor/).

Customisation overrides default values built into splink.  For the purposes of this demo, we will specify a simple settings dictionary, which means we will be relying on these sensible defaults.

To help with authoring and validation of the settings dictionary, we have written a [json schema](https://json-schema.org/), which can be found [here](https://github.com/moj-analytical-services/splink/blob/master/splink/files/settings_jsonschema.json).  




In [15]:
# The comparison expression allows for the case where a first name and surname have been inverted 
sql_case_expression = """
CASE 
WHEN first_name_l = first_name_r AND surname_l = surname_r THEN 4 
WHEN first_name_l = surname_r AND surname_l = first_name_r THEN 3
WHEN first_name_l = first_name_r THEN 2
WHEN surname_l = surname_r THEN 1
ELSE 0 
END
"""

settings = {
    "link_type": "link_only", 
    "max_iterations": 20,
    "blocking_rules": [
    ],
    "comparison_columns": [
       {
            "custom_name": "name_inversion",
            "custom_columns_used": ["first_name", "surname"],
            "case_expression": sql_case_expression,
            "num_levels": 5
        },
        {
            "col_name": "city",
            "num_levels": 3
        },
        {
            "col_name": "email",
            "num_levels": 3
        },
        {
            "col_name": "dob"
        }
    ],
    "additional_columns_to_retain": ["group"]
    
}

In words, this setting dictionary says:

- We are performing a data linking task (the other options are `dedupe_only`, or `link_and_dedupe`)
- Rather than generate all possible comparisons (the cartesian product of the input datasets), we are going restrict record comparisons to those generated by at least one of the rules in the specified array
- When comparing records, we will use information from the `first_name`, `surname`, `dob`, `city` and `email` columns to compute a match score.
- For `first_name` and `surname`, string comparisons will have three levels:
    - Level 2: Strings are (almost) exactly the same
    - Level 1: Strings are similar 
    - Level 0: No match
- We will make adjustments for term frequencies on the `first_name` and `surname` columns
- We will retain the `group` column in the results even though this is not used as part of comparisons.  This is a labelled dataset and `group` contains the true match - i.e. where group matches, the records pertain to the same person

## Step 4:  Estimate match scores using the Expectation Maximisation algorithm

In [16]:
from splink import Splink

linker = Splink(settings, [left_df, right_df], spark)
df_linked = linker.get_scored_comparisons()

  "You have not specified any blocking rules, meaning all comparisons between the "
INFO:splink.iterate:Iteration 0 complete
INFO:splink.model:The maximum change in parameters was 0.3704400897026062 for key name_inversion, level 0
INFO:splink.iterate:Iteration 1 complete
INFO:splink.model:The maximum change in parameters was 0.0720943808555603 for key name_inversion, level 4
INFO:splink.iterate:Iteration 2 complete
INFO:splink.model:The maximum change in parameters was 0.013827092945575714 for key dob, level 0
INFO:splink.iterate:Iteration 3 complete
INFO:splink.model:The maximum change in parameters was 0.0013318657875061035 for key dob, level 1
INFO:splink.iterate:Iteration 4 complete
INFO:splink.model:The maximum change in parameters was 9.26434401127274e-06 for key dob, level 0
INFO:splink.iterate:EM algorithm has converged


In [17]:
# Inspect main dataframe that contains the match scores
df_linked.toPandas().sample(5)

Unnamed: 0,match_probability,source_dataset_l,unique_id_l,source_dataset_r,unique_id_r,first_name_l,first_name_r,surname_l,surname_r,gamma_name_inversion,city_l,city_r,gamma_city,email_l,email_r,gamma_email,dob_l,dob_r,gamma_dob,group_l,group_r
28699,7.89876e-61,df_l,792,df_r,738,Aisha,Lottie,King,errFst,0,Telford,Bristol,0,,jesus40@allen-graves.com,0,1980-04-24,1986-09-22,0,140,128
8401,7.89876e-61,df_l,122,df_r,719,Isabella,iilWam,Wallca,,0,London,,0,hilltheresa@pearson.org,taylor70@fisher.nfo,0,2000-03-24,2000-06-27,0,23,123
27306,7.89876e-61,df_l,731,df_r,612,Matthew,Caleb,Badyel,Kan,0,London,Wirangton,0,amandamartinez@melton.com,vmoreno@ussell.zib,0,2017-12-23,2000-10-09,0,126,102
3976,7.89876e-61,df_l,338,df_r,194,Spencer,acob,Lucas,,0,Swansea,Southend-on-Sea,0,,barrygary@flores.com,0,2007-10-26,1987-02-06,0,58,34
27580,7.89876e-61,df_l,744,df_r,614,Mcha le,Wright,Taylor,Jude,0,Brighton,Belfast,0,rhondawilliams@gonzalez-scott.com,lynnchapman@crawfard-lozon.com,0,1993-06-23,2017-03-28,0,129,103


In [18]:
df_linked.write.format("parquet").mode("overwrite").saveAsTable('default.matches_spark_table')

##### UPLOADING DATA TO IMPALA

Preparing data before insertion

In [19]:
df_impala = df_linked.toPandas()

In [20]:
df_impala.head()

Unnamed: 0,match_probability,source_dataset_l,unique_id_l,source_dataset_r,unique_id_r,first_name_l,first_name_r,surname_l,surname_r,gamma_name_inversion,city_l,city_r,gamma_city,email_l,email_r,gamma_email,dob_l,dob_r,gamma_dob,group_l,group_r
0,1.0,df_l,0,df_r,0,Julia,Julia,,,4,London,London,2,hannah88@powers.com,hannah88@powers.com,2,2015-10-29,2015-10-29,1,0,0
1,7.89876e-61,df_l,0,df_r,105,Julia,Harry,,Tomas,0,London,Belast,0,hannah88@powers.com,sandra26@anderson-davis.com,0,2015-10-29,2011-07-30,0,0,21
2,7.89876e-61,df_l,0,df_r,115,Julia,,,Ells,0,London,Stoke-on-Trent,0,hannah88@powers.com,wmcdaniel@nelson.net,0,2015-10-29,2013-01-20,0,0,22
3,8.285076e-35,df_l,0,df_r,122,Julia,Isabella,,Wallca,0,London,London,2,hannah88@powers.com,hilltheresa@pearson.org,0,2015-10-29,2000-03-24,0,0,23
4,7.89876e-61,df_l,0,df_r,128,Julia,Edward,,wLis,0,London,,0,hannah88@powers.com,whitakernichole@booth.com,0,2015-10-29,2005-04-09,0,0,24


##### We round match probabilities and pick entities that are a match

In [21]:
df_impala = df_impala.round(1)

In [22]:
df_impala = df_impala[df_impala["match_probability"] == 1]

In [23]:
#Filtering duplicate columns
df_impala = df_impala.filter(regex="_l")

In [24]:
df_impala.columns = df_impala.columns.str.replace("_l", "")

###### We are ready to upsert our matches into Impala

In [25]:
import os
import jaydebeapi
conn = jaydebeapi.connect("com.cloudera.impala.jdbc.DataSource",
                          "jdbc:impala://"+os.environ["IMPALA_HOST"]+":443/;ssl=1;transportMode=http;httpPath=cliservice;AuthMech=3;",
                          {'UID': os.environ["WORKLOAD_USER"], 'PWD': os.environ["WORKLOAD_PASSWORD"]},
                          '/home/cdsw/impala_drivers/ImpalaJDBC41.jar')

In [26]:
cursor = conn.cursor()

In [27]:
query = """
        CREATE TABLE IF NOT EXISTS ER_MATCHES (
        unique_id VARCHAR,
        first_name VARCHAR,
        surname VARCHAR,
        dob VARCHAR,
        city  VARCHAR,
        email VARCHAR,
        groupP VARCHAR)
        """

In [28]:
cursor.execute(query)

In [29]:
def upsert_linkage(data):

    sql = """insert into ER_MATCHES \
         (unique_id ,first_name,surname,dob,city,email,groupP) \
         values (?,?,?,?,?,?,?)"""
    #print(data)
    cursor.executemany(sql,data)

In [30]:
def upsert_data_jaydebeapi(data, records=100):
    total_records=0
    header = True
    rows = []
    i=1

    for index, row in data.iterrows():

        rows.append ([f"{row['unique_id']}",\
                  f"{row['first_name']}",f"{row['surname']}",\
                  f"{row['dob']}",f"{row['city']}", \
                  f"{row['email']}", f"{row['group']}"])
        total_records=total_records+1

        if i < records + 1 :   
            i=i+1
        else :
            upsert_linkage(rows)
            rows = []
            i=1
            print (f"Ingested {total_records} records")

    if len(rows) > 0 :
        upsert_linkage(rows)

    print (f"Ingested {total_records} records")

In [31]:
upsert_data_jaydebeapi(df_impala)

#curs.fetchall()

#cursor.close()
#conn.close()

Ingested 101 records
Ingested 181 records


## Step 5: Create a Custom Atlas Type (Process) reflecting the EM algorithm

First we need to instantiate the connection to Atlas in CDP

In [32]:
import atlasclient

Endpoint, Username and Passoword are stored as CML project variables and passed dynamically

In [33]:
from atlasclient.client import Atlas
client = Atlas(os.environ["ATLAS_ENDPOINT"], port='', username=os.environ["ATLAS_USER"], password=os.environ["ATLAS_PASSWORD"])

Looks like we have successfully established the connection. Next we can create a custom Atlas type (process) reflecting the EM algorithm

In [34]:
typedef_dict = {
    "enumTypes": [],
    "structTypes": [],
    "classificationDefs":[],
    "entityDefs": [{
        "superTypes": ["Process"],
        "name": "ER_algorithm",
        "description":"custom_type_for_Entity_Resolution",
        "attributeDefs": [{
            "name": "startTime",
            "isOptional": True,
            "isUnique": False,
            "isIndexable": False,
            "typeName":"string",
            "valuesMaxCount":1,
            "cardinality":"SINGLE",
            "valuesMinCount":0
        }]
    }]
}

And we can now register the new type with Atlas. For more on the Atlas type model, please visit this page: https://docs.cloudera.com/runtime/7.2.7/cdp-governance-overview/topics/atlas-metadata-model-overview.html

In [35]:
#Has already run once so will not run again
client.typedefs.create(data=typedef_dict)

<atlasclient.models.TypeDefs at 0x7fe148bb3780>

## Step 6: Instantiate the EM algorithm in Atlas along with lineage reflecting our Linkage Job above

Notice: we need to pass the Atlas guid for the two datasets we compared above as they were registered in Atlas when they were stored as a Spark table

In [36]:
#Retrieving GUID's for the three tables via Atlas Client - search by name

In [37]:
params = {'typeName': 'hbase_table', 'attrName': 'data', 'attrValue': 'provider','offset': '1', 'limit':'100'}
search_results = client.search_basic(**params)
for s in search_results:
    for e in s.entities:
        print(e.guid)
        print(e.attributes)
        #print(e.attributes.values)
        print(e.typeName)
        print(e.attributes)

420997b4-e1d2-4b88-ab46-6c7b794cf67d
{'owner': 'hbase', 'createTime': 1617763073453, 'qualifiedName': 'hbase:acl@cm', 'name': 'hbase:acl', 'description': 'hbase:acl'}
hbase_table
{'owner': 'hbase', 'createTime': 1617763073453, 'qualifiedName': 'hbase:acl@cm', 'name': 'hbase:acl', 'description': 'hbase:acl'}
f234866f-4396-4557-b23a-66ed3a7e82b1
{'owner': 'pauldefusco', 'createTime': 1617927034944, 'qualifiedName': 'SYSTEM:SEQUENCE@cm', 'name': 'SYSTEM:SEQUENCE', 'description': 'SYSTEM:SEQUENCE'}
hbase_table
{'owner': 'pauldefusco', 'createTime': 1617927034944, 'qualifiedName': 'SYSTEM:SEQUENCE@cm', 'name': 'SYSTEM:SEQUENCE', 'description': 'SYSTEM:SEQUENCE'}
94de73a3-137d-4532-8634-9bf4060b61c7
{'owner': 'pauldefusco', 'createTime': 1617927025330, 'qualifiedName': 'SYSTEM:CATALOG@cm', 'name': 'SYSTEM:CATALOG', 'description': 'SYSTEM:CATALOG'}
hbase_table
{'owner': 'pauldefusco', 'createTime': 1617927025330, 'qualifiedName': 'SYSTEM:CATALOG@cm', 'name': 'SYSTEM:CATALOG', 'description': '

In [38]:
params = {'typeName': 'hive_table', 'attrName': 'name', 'attrValue': 'cc_data', 'offset': '1', 'limit':'10'}
search_results = client.search_attribute(**params)
for s in search_results:
    for e in s.entities:
        print(e.guid)
        print(e.attributes)

In [39]:
for s in search_results:
    print(s.entities.to_dict())

[]


In [40]:
data = {'typeName': 'hive_table', 'attrName': 'name', 'attrValue': 'cc_data', 'offset': '1', 'limit': '100'}
search_results = client.search_basic.create(data=data)
for e in search_results.entities:
    print(e.guid)
    print(e.attributes)

a1ad7307-47d9-4972-a64c-d415d15f5231
{'owner': 'pauldefusco', 'createTime': 1617927521000, 'qualifiedName': 'default.cml_workshop_table_right@cm', 'name': 'cml_workshop_table_right'}
1a1d46fd-5bb2-42bb-b96f-146f1b740a97
{'owner': 'hive', 'createTime': 1617927563000, 'qualifiedName': 'default.tmp_compactor_cml_workshop_table_right_1617927561339_temp-89ec323a-9b23-4477-a721-e7247f5959c7@cm', 'name': 'tmp_compactor_cml_workshop_table_right_1617927561339'}
fcfbb0e5-2a8f-49e6-bf10-46b6dee8398a
{'owner': 'hive', 'createTime': 1617927875000, 'qualifiedName': 'default.tmp_compactor_cml_workshop_table_right_1617927874984_temp-89ec323a-9b23-4477-a721-e7247f5959c7@cm', 'name': 'tmp_compactor_cml_workshop_table_right_1617927874984'}
18c03667-05a0-4485-9553-097ac8ad2be9
{'owner': 'pauldefusco', 'createTime': 1617927954000, 'qualifiedName': 'default.customer_interactions_cicd@cm', 'name': 'customer_interactions_cicd'}
53c8b9b6-8bea-4fcd-9b37-6961a1ed11e8
{'owner': 'pauldefusco', 'createTime': 161792

##### We can now create a new process type in Atlas reflecting the ER Algorithm 

##### Before doing so, open Atlas and search for the tables you created. Then copy and paste the GUID for each entity down in the dictionary below

INSERT SCREENSHOT HERE

In [41]:
process_entity_dict = {
  "entity" : {
    "guid" : "-2089428075574333",
    "status" : "ACTIVE",
    "createdBy" : "pdefusco",
    "updatedBy" : "pdefusco",
    "createTime" : "12342",
    "updateTime" : "12342",
    "version" : "12342",
    "relationshipAttributes" : {},
    "classifications" : [],
    "typeName" : "ER_algorithm",
    "attributes" : {
      "startTime" : "123",
      "qualifiedName": "EM Record Linkage",
      "name":"EM Record Linkage",
      "description":"Record Linkage Algorithm",
      "owner": "pdefusco",
        #, 
      "inputs":[{"guid": "2bcde123-67f2-4ed7-a781-4824085a2d90", "typeName":"hive_table"},
               {"guid": "e26109c4-b70a-4583-a361-3de4539f532f", "typeName":"hbase_table"}], 
      "outputs":[{"guid":"ae778e6b-57f4-486e-9643-7ae574a9e16b", "type_name":"hive_table"}]
    }, 
  },
  
}

In [42]:
client.entity_post.create(data=process_entity_dict)

NotFound: HTTP request failed for POST https://mlworkshop-04212021-master0.mlworksh.z30z-14kp.cloudera.site:31443/api/atlas/v2/entity: Not found 404: {"errorCode":"ATLAS-404-00-00A","errorMessage":"Referenced entity ae778e6b-57f4-486e-9643-7ae574a9e16b is not found"}

##### Go back to the CDP Homepage, then Data Catalog -> Atlas and browse for "ER_algorithm". Open the instance and navigate to the "Lineage" tab 

![title](images/EM_Record_Linkage.png)

## Step 8: Create a new Atlas Process Type related to writing a Spark Dataset to Impala and Instantiate it with source and target

In [None]:
typedef_dict = {
    "enumTypes": [],
    "structTypes": [],
    "classificationDefs":[],
    "entityDefs": [{
        "superTypes": ["Process"],
        "name": "Write_to_Impala",
        "description":"write_to_impala",
        "attributeDefs": [{
            "name": "startTime",
            "isOptional": True,
            "isUnique": False,
            "isIndexable": False,
            "typeName":"string",
            "valuesMaxCount":1,
            "cardinality":"SINGLE",
            "valuesMinCount":0
        }]
    }]
}

In [None]:
process_entity_dict = {
  "entity" : {
    "guid" : "-2089428075574888",
    "status" : "ACTIVE",
    "createdBy" : "pdefusco",
    "updatedBy" : "pdefusco",
    "createTime" : "12342",
    "updateTime" : "12342",
    "version" : "12342",
    "relationshipAttributes" : {},
    "classifications" : [],
    "typeName" : "Write_to_Impala",
    "attributes" : {
      "startTime" : "123",
      "qualifiedName": "Write_to_Impala",
      "name":"Write_to_Impala",
      "description":"Record Linkage Algorithm",
      "owner": "pdefusco",
        #, 
      "inputs":[{"guid": "ae778e6b-57f4-486e-9643-7ae574a9e16b", "typeName":"hive_table"}], 
      "outputs":[{"guid":"INSERT IMPALA TABLE GUID HERE", "type_name":"IMPALA_TABLE???"}]
    }, 
  },
  
}

![title](images/ER_atlas_lineage.png)

Next we can optionally remove the EM Algorithm instance from Atlas via the client

In [None]:
entity = client.entity_guid("44848fe5-6950-4a73-a89c-9775b736b4c9")

In [None]:
entity.entity['attributes']["owner"]

In [None]:
entity.delete()

## We have completed our introduction to Splink and the Atlas Client. 
## Next we will simulate a real world Application with CML Jobs and COD (Cloudera Operational Database)