1. ### Title: Record Linkage Comparison Patterns 

2. Source Information
   -- Underlying records: Epidemiologisches Krebsregister NRW
      (http://www.krebsregister.nrw.de)
   -- Creation of comparison patterns and gold standard classification:
      Institute for Medical Biostatistics, Epidemiology and Informatics (IMBEI),
      University Medical Center of Johannes Gutenberg University, Mainz, Germany
      (http://www.imbei.uni-mainz.de) 
   -- Donor: Murat Sariyar, Andreas Borg (IMBEI)    
   -- Date: September 2008
 
3. Past Usage:
    1. Irene Schmidtmann, Gael Hammer, Murat Sariyar, Aslihan Gerhold-Ay:
       Evaluation des Krebsregisters NRW Schwerpunkt Record Linkage. Technical
       Report, IMBEI 2009. 
       http://www.krebsregister.nrw.de/fileadmin/user_upload/dokumente/Evaluation/EKR_NRW_Evaluation_Abschlussbericht_2009-06-11.pdf
       -- Describes the external evaluation of the registry's record linkage
          procedures.
       -- The comparison patterns in this data set were created in course of
          this evaluation.
           
    2. Murat Sariyar, Andreas Borg, Klaus Pommerening: 
       Controlling false match rates in record linkage using extreme value theory.
       Journal of Biomedical Informatics, 2011 (in press). 
       -- Predicted attribute: matching status (boolean).
       -- Results:
          -- A new approach for estimating the false match rate in record 
             linkage by methods of Extreme Value Theory (EVT).
          -- The model eliminates the need for labelled training data while
             achieving only slighter lower accuracy compared to a procedure
             that has knowledge about the matching status.

4. Relevant Information:

  The records represent individual data including first and 
  family name, sex, date of birth and postal code, which were collected through 
  iterative insertions in the course of several years. The comparison
  patterns in this data set are based on a sample of 100.000 records dating
  from 2005 to 2008. Data pairs were classified as "match" or "non-match" during 
  an extensive manual review where several documentarists were involved. 
  The resulting classification formed the basis for assessing the quality of the 
  registry’s own record linkage procedure.
  
  In order to limit the amount of patterns a blocking procedure was applied,
  which selects only record pairs that meet specific agreement conditions. The
  results of the following six blocking iterations were merged together:
  
    1. Phonetic equality of first name and family name, equality of date of birth.
    2. Phonetic equality of first name, equality of day of birth.
    3. Phonetic equality of first name, equality of month of birth.
    4. Phonetic equality of first name, equality of year of birth.
    5. Equality of complete date of birth.
    6. Phonetic equality of family name, equality of sex.
    
  This procedure resulted in 5.749.132 record pairs, of which 20.931 are matches.
  
  The data set is split into 10 blocks of (approximately) equal size and ratio
  of matches to non-matches.

  The separate file frequencies.csv contains for every predictive attribute 
  the average number of values in the underlying records. These values can, for example,
  be used as u-probabilities in weight-based record linkage following the
  framework of Fellegi and Sunter.
   

5. Number of Instances: 5.749.132

6. Number of Attributes: 12 (9 predictive attributes, 2 non-predictive, 
                             1 goal field)

7. Attribute Information:
   1. id_1: Internal identifier of first record.
   2. id_2: Internal identifier of second record.
   3. cmp_fname_c1: agreement of first name, first component
   4. cmp_fname_c2: agreement of first name, second component
   5. cmp_lname_c1: agreement of family name, first component
   6. cmp_lname_c2: agreement of family name, second component
   7. cmp_sex: agreement sex
   8. cmp_bd: agreement of date of birth, day component
   9. cmp_bm: agreement of date of birth, month component
   10. cmp_by: agreement of date of birth, year component
   11. cmp_plz: agreement of postal code
   12. is_match: matching status (TRUE for matches, FALSE for non-matches)

8. Missing Attribute Values:  

  cmp_fname_c1: 1007
  cmp_fname_c2: 5645434
  cmp_lname_c1: 0
  cmp_lname_c2: 5746668
  cmp_sex:      0
  cmp_bd:       795
  cmp_bm:       795
  cmp_by:       795
  cmp_plz:      12843


9. Class Distribution: 20.931 matches, 5728201 non-matches


In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .appName("Record linkage") \
    .getOrCreate()

/opt/spark/bin/load-spark-env.sh: line 68: ps: command not found


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
software.amazon.awssdk#s3 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7649f1c2-ea0d-4a52-b85e-e793899dcb62;1.0
	confs: [default]
	found software.amazon.awssdk#s3;2.26.30 in central
	found software.amazon.awssdk#aws-xml-protocol;2.26.30 in central
	found software.amazon.awssdk#aws-query-protocol;2.26.30 in central
	found software.amazon.awssdk#protocol-core;2.26.30 in central
	found software.amazon.awssdk#sdk-core;2.26.30 in central
	found software.amazon.awssdk#annotations;2.26.30 in central
	found software.amazon.awssdk#http-client-spi;2.26.30 in central
	found software.amazon.awssdk#utils;2.26.30 in central
	found org.reactivestreams#reactive-streams;1.0.4 in central
	found org.slf4j#slf4j-api;1.7.36 in central
	found software.amazon.awssdk#metrics-spi;2.26.30 in central
	found software.amazon.

In [3]:
# Lendo os arquivos csv que foram upados para o bucket raw na pasta donation

In [4]:
prev = spark.read.csv('s3a://raw/donation/block_*.csv')

                                                                                

In [5]:
prev

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string]

In [6]:
prev.show(2)

+----+----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| _c0| _c1|         _c2|         _c3|         _c4|         _c5|    _c6|   _c7|   _c8|   _c9|   _c10|    _c11|
+----+----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
|id_1|id_2|cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
|3148|8326|           1|           ?|           1|           ?|      1|     1|     1|     1|      1|    TRUE|
+----+----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
only showing top 2 rows



In [7]:
parsed = spark.read.option("header", "true").option("nullValue", "?").\
          option("inferSchema", "true").csv('s3a://raw/donation/block_*.csv')

                                                                                

In [8]:
parsed.show(10)

+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| 3148| 8326|         1.0|        NULL|         1.0|        NULL|      1|     1|     1|     1|      1|    true|
|14055|94934|         1.0|        NULL|         1.0|        NULL|      1|     1|     1|     1|      1|    true|
|33948|34740|         1.0|        NULL|         1.0|        NULL|      1|     1|     1|     1|      1|    true|
|  946|71870|         1.0|        NULL|         1.0|        NULL|      1|     1|     1|     1|      1|    true|
|64880|71676|         1.0|        NULL|         1.0|        NULL|      1|     1|     1|     1|      1|    true|
|25739|45991|         1.0|        NULL|         1.0|        NULL|      1|     1|     1|     1|      1|  

In [9]:
parsed.printSchema()

root
 |-- id_1: integer (nullable = true)
 |-- id_2: integer (nullable = true)
 |-- cmp_fname_c1: double (nullable = true)
 |-- cmp_fname_c2: double (nullable = true)
 |-- cmp_lname_c1: double (nullable = true)
 |-- cmp_lname_c2: double (nullable = true)
 |-- cmp_sex: integer (nullable = true)
 |-- cmp_bd: integer (nullable = true)
 |-- cmp_bm: integer (nullable = true)
 |-- cmp_by: integer (nullable = true)
 |-- cmp_plz: integer (nullable = true)
 |-- is_match: boolean (nullable = true)



In [10]:
parsed.count()

                                                                                

5749132

In [11]:
parsed.cache()

DataFrame[id_1: int, id_2: int, cmp_fname_c1: double, cmp_fname_c2: double, cmp_lname_c1: double, cmp_lname_c2: double, cmp_sex: int, cmp_bd: int, cmp_bm: int, cmp_by: int, cmp_plz: int, is_match: boolean]

In [12]:
from pyspark.sql.functions import col

In [13]:
parsed.groupBy('is_match').count().orderBy(col('count').desc()).show()



+--------+-------+
|is_match|  count|
+--------+-------+
|   false|5728201|
|    true|  20931|
+--------+-------+



                                                                                

In [14]:
from pyspark.sql.functions import avg, stddev

In [15]:
parsed.agg(avg('cmp_sex'),stddev('cmp_sex')).show()

+-----------------+------------------+
|     avg(cmp_sex)|   stddev(cmp_sex)|
+-----------------+------------------+
|0.955001381078048|0.2073011111689795|
+-----------------+------------------+



In [16]:
parsed.createOrReplaceTempView('linkage')

In [17]:
spark.sql("""
    SELECT is_match, COUNT(*) cnt 
    FROM linkage 
    GROUP BY is_match 
    ORDER BY cnt DESC
""").show()

+--------+-------+
|is_match|    cnt|
+--------+-------+
|   false|5728201|
|    true|  20931|
+--------+-------+



In [18]:
summary = parsed.describe()

In [19]:
summary.show()

24/08/09 13:43:39 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+
|summary|              id_1|              id_2|      cmp_fname_c1|      cmp_fname_c2|      cmp_lname_c1|       cmp_lname_c2|           cmp_sex|             cmp_bd|             cmp_bm|             cmp_by|            cmp_plz|
+-------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+
|  count|           5749132|           5749132|           5748125|            103698|           5749132|               2464|           5749132|            5748337|            5748337|            5748337|            5736289|
|   mean| 33324.48559643438| 66587.43558331935|0.7129024704436274|0.9000176718903216|0.3156278193084133|

                                                                                

In [20]:
summary.select("summary", "cmp_fname_c1", "cmp_fname_c2").show()

+-------+------------------+------------------+
|summary|      cmp_fname_c1|      cmp_fname_c2|
+-------+------------------+------------------+
|  count|           5748125|            103698|
|   mean|0.7129024704436274|0.9000176718903216|
| stddev|0.3887583596162788|0.2713176105782331|
|    min|               0.0|               0.0|
|    max|               1.0|               1.0|
+-------+------------------+------------------+



                                                                                

In [21]:
matches = parsed.where('is_match = true')
match_summary = matches.describe().show(5)

+-------+------------------+-----------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+
|summary|              id_1|             id_2|        cmp_fname_c1|       cmp_fname_c2|       cmp_lname_c1|       cmp_lname_c2|            cmp_sex|             cmp_bd|             cmp_bm|              cmp_by|            cmp_plz|
+-------+------------------+-----------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+
|  count|             20931|            20931|               20922|               1333|              20931|                475|              20931|              20925|              20925|               20925|              20902|
|   mean| 34575.72117911232|51259.95939037791|  0.9973163859635038| 0.98989003203181

                                                                                

In [22]:
misses = parsed.filter(col('is_match') == False)
miss_summary = misses.describe()

In [23]:
%pip install pyarrow

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip3 install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [24]:
%pip install pandas

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip3 install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [25]:
summary_p = summary.toPandas()

                                                                                

In [26]:
summary_p.head()

Unnamed: 0,summary,id_1,id_2,cmp_fname_c1,cmp_fname_c2,cmp_lname_c1,cmp_lname_c2,cmp_sex,cmp_bd,cmp_bm,cmp_by,cmp_plz
0,count,5749132.0,5749132.0,5748125.0,103698.0,5749132.0,2464.0,5749132.0,5748337.0,5748337.0,5748337.0,5736289.0
1,mean,33324.48559643438,66587.43558331935,0.7129024704436274,0.9000176718903216,0.3156278193084133,0.3184128315317437,0.955001381078048,0.2244652670850717,0.488855298497635,0.2227485966810923,0.0055286614743434
2,stddev,23659.859374488213,23620.487613269885,0.3887583596162788,0.2713176105782331,0.3342336339615816,0.3685670662006653,0.2073011111689795,0.4172297223846255,0.4998758236779038,0.4160909629831734,0.0741491492542006
3,min,1.0,6.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,max,99980.0,100000.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0


In [27]:
summary_p.shape

(5, 12)

In [28]:
summary_p = summary_p.set_index('summary').transpose().reset_index()

In [29]:
summary_p = summary_p.rename(columns={'index':'field'})

In [30]:
summary_p = summary_p.rename_axis(None, axis=1)

In [31]:
summary_p.shape

(11, 6)

In [32]:
summary_T = spark.createDataFrame(summary_p)

In [33]:
summary_T.show()

+------------+-------+-------------------+-------------------+---+------+
|       field|  count|               mean|             stddev|min|   max|
+------------+-------+-------------------+-------------------+---+------+
|        id_1|5749132|  33324.48559643438| 23659.859374488213|  1| 99980|
|        id_2|5749132|  66587.43558331935| 23620.487613269885|  6|100000|
|cmp_fname_c1|5748125| 0.7129024704436274| 0.3887583596162788|0.0|   1.0|
|cmp_fname_c2| 103698| 0.9000176718903216| 0.2713176105782331|0.0|   1.0|
|cmp_lname_c1|5749132| 0.3156278193084133| 0.3342336339615816|0.0|   1.0|
|cmp_lname_c2|   2464|0.31841283153174377|0.36856706620066537|0.0|   1.0|
|     cmp_sex|5749132|  0.955001381078048| 0.2073011111689795|  0|     1|
|      cmp_bd|5748337|0.22446526708507172| 0.4172297223846255|  0|     1|
|      cmp_bm|5748337|0.48885529849763504| 0.4998758236779038|  0|     1|
|      cmp_by|5748337| 0.2227485966810923|0.41609096298317344|  0|     1|
|     cmp_plz|5736289|0.00552866147434

In [34]:
summary_T.printSchema()

root
 |-- field: string (nullable = true)
 |-- count: string (nullable = true)
 |-- mean: string (nullable = true)
 |-- stddev: string (nullable = true)
 |-- min: string (nullable = true)
 |-- max: string (nullable = true)



In [35]:
from pyspark.sql import DataFrame
from pyspark.sql.types import DoubleType

In [36]:
def pivot_summary(desc):
    # converte para pandas DataFrame
    desc_p = desc.toPandas()
    # transpose
    desc_p = desc_p.set_index('summary').transpose().reset_index()
    desc_p = desc_p.rename(columns={'index':'field'})
    desc_p = desc_p.rename_axis(None, axis=1)
    # converter para Spark DataFrame
    descT = spark.createDataFrame(desc_p)
    for c in descT.columns:
        if c == 'field':
            continue
        else:
            descT = descT.withColumn(c, descT[c].cast(DoubleType()))
    return descT

In [37]:
match_summary = matches.describe()

In [38]:
match_summaryT = pivot_summary(match_summary)
miss_summaryT = pivot_summary(miss_summary)

                                                                                

In [39]:
match_summaryT.show()

+------------+-------+------------------+--------------------+---+-------+
|       field|  count|              mean|              stddev|min|    max|
+------------+-------+------------------+--------------------+---+-------+
|        id_1|20931.0| 34575.72117911232|  21950.312851969127|5.0|99946.0|
|        id_2|20931.0| 51259.95939037791|    24345.7334537752|6.0|99996.0|
|cmp_fname_c1|20922.0|0.9973163859635038|0.036506675848336785|0.0|    1.0|
|cmp_fname_c2| 1333.0|0.9898900320318174| 0.08251973727615237|0.0|    1.0|
|cmp_lname_c1|20931.0|0.9970152595958817| 0.04311880753394512|0.0|    1.0|
|cmp_lname_c2|  475.0| 0.969370167843852| 0.15345280740388917|0.0|    1.0|
|     cmp_sex|20931.0| 0.987291577086618| 0.11201570591216432|0.0|    1.0|
|      cmp_bd|20925.0|0.9970848267622461| 0.05391487659807977|0.0|    1.0|
|      cmp_bm|20925.0|0.9979450418160095| 0.04528612745217063|0.0|    1.0|
|      cmp_by|20925.0|0.9961290322580645|0.062098048567310576|0.0|    1.0|
|     cmp_plz|20902.0|0.9

In [40]:
miss_summaryT.show()

+------------+---------+--------------------+-------------------+----+--------+
|       field|    count|                mean|             stddev| min|     max|
+------------+---------+--------------------+-------------------+----+--------+
|        id_1|5728201.0|  33319.913548075565| 23665.760130330764| 1.0| 99980.0|
|        id_2|5728201.0|   66643.44259218557| 23599.551728241124|30.0|100000.0|
|cmp_fname_c1|5727203.0|  0.7118634802174252|0.38908060096985714| 0.0|     1.0|
|cmp_fname_c2| 102365.0|  0.8988473514090173| 0.2727209029401023| 0.0|     1.0|
|cmp_lname_c1|5728201.0| 0.31313801133682906| 0.3322812130572706| 0.0|     1.0|
|cmp_lname_c2|   1989.0| 0.16295544855122554|0.19302366635287027| 0.0|     1.0|
|     cmp_sex|5728201.0|  0.9548833918362851|0.20755988859217656| 0.0|     1.0|
|      cmp_bd|5727412.0|  0.2216425149788421| 0.4153518275558737| 0.0|     1.0|
|      cmp_bm|5727412.0|   0.486995347986141|   0.49983089404939| 0.0|     1.0|
|      cmp_by|5727412.0|  0.219923064728

In [41]:
match_summaryT.createOrReplaceTempView('match_desc')
miss_summaryT.createOrReplaceTempView('miss_desc')
spark.sql("""
    SELECT a.field, a.count + b.count total, a.mean - b.mean delta
    FROM match_desc a INNER JOIN miss_desc b ON a.field = b.field
    WHERE a.field NOT IN ("id_1", "id_2")
    ORDER BY delta DESC, total DESC
""").show()

+------------+---------+--------------------+
|       field|    total|               delta|
+------------+---------+--------------------+
|     cmp_plz|5736289.0|  0.9563812499852176|
|cmp_lname_c2|   2464.0|  0.8064147192926264|
|      cmp_by|5748337.0|  0.7762059675300512|
|      cmp_bd|5748337.0|   0.775442311783404|
|cmp_lname_c1|5749132.0|  0.6838772482590526|
|      cmp_bm|5748337.0|  0.5109496938298685|
|cmp_fname_c1|5748125.0|  0.2854529057460786|
|cmp_fname_c2| 103698.0| 0.09104268062280008|
|     cmp_sex|5749132.0|0.032408185250332844|
+------------+---------+--------------------+



In [42]:
good_features = ["cmp_lname_c1", "cmp_plz", "cmp_by", "cmp_bd", "cmp_bm"]

In [43]:
sum_expression = " + ".join(good_features)

In [44]:
sum_expression

'cmp_lname_c1 + cmp_plz + cmp_by + cmp_bd + cmp_bm'

In [45]:
from pyspark.sql.functions import expr
scored = parsed.fillna(0, subset=good_features).\
                withColumn('score', expr(sum_expression)).\
                select('score', 'is_match')

In [46]:
scored.show()

+-----+--------+
|score|is_match|
+-----+--------+
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  4.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  4.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
+-----+--------+
only showing top 20 rows



In [47]:
def crossTabs(scored: DataFrame, t: DoubleType) -> DataFrame:
    return scored.selectExpr(f'score >= {t} as above', 'is_match').\
           groupBy('above').pivot('is_match', ('true','false')).\
           count()

In [48]:
crossTabs(scored, 4.0).show()

+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20871|    637|
|false|   60|5727564|
+-----+-----+-------+



In [49]:
crossTabs(scored, 2.0).show()

+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20931| 596414|
|false| NULL|5131787|
+-----+-----+-------+

