<a href="https://colab.research.google.com/github/nfaggian/record_linkage/blob/master/beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install jellyfish numpy apache-beam[gcp]

In [7]:
import apache_beam as beam
from apache_beam.io import BigQuerySource

from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions
from apache_beam.options.pipeline_options import SetupOptions

In [11]:
try: 
    from google.colab import auth
    auth.authenticate_user()
except:     
    try:
        import google.auth
        credentials, project = google.auth.default()
    except:
        raise Exception('Google cloud authentication required!')

In [9]:
project_id = 'anz-pso-nfaggian'
bucket_id = 'anz-pso-nfaggian'

%env GOOGLE_CLOUD_PROJECT=anz-pso-nfaggian

env: GOOGLE_CLOUD_PROJECT=anz-pso-nfaggian


In [10]:
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = project_id
google_cloud_options.job_name = 'distance_calculation'
google_cloud_options.staging_location = 'gs://{0}/stage'.format(bucket_id)
google_cloud_options.temp_location = 'gs://{0}/temp'.format(bucket_id)
options.view_as(StandardOptions).runner = 'DirectRunner'

In [153]:
query = """
WITH
  name_index AS (
   -- name: sorted neighbourhood indexing method 
  SELECT
    donor_id,
    name,
    address,
    ARRAY_AGG(STRUCT(donor_id, name, address)) OVER (ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS name_candidates
  FROM (
    SELECT
      *
    FROM
      dedup.processed_donors
    ORDER BY
      name) ),
  address_index AS (
  -- address: sorted neighbourhood indexing method
  SELECT
    donor_id,
    name,
    address,
    ARRAY_AGG(STRUCT(donor_id, name, address)) OVER (ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS address_candidates
  FROM (
    SELECT
      *
    FROM
      dedup.processed_donors
    ORDER BY
      address) )
SELECT
  name_index.donor_id,
  name_index.name,
  name_index.name_candidates,
  address_index.address,
  address_index.address_candidates
FROM
  address_index
JOIN
  name_index
ON
  address_index.donor_id = name_index.donor_id
LIMIT 500
"""

In [154]:
import jellyfish as jf
import numpy as np

class indexer(beam.DoFn): 
    """
    Forms candidate pairs from a structured query
    """
    def process(self, element):
        """
        Split the candidates
        """    
        candidate_groups = ['address_candidates', 'name_candidates']
        for group in candidate_groups:
            for candidate in element[group]:
                yield {'record_a': {'donor_id': element['donor_id'], 
                                    'name': unicode(element['name']), 
                                    'address': unicode(element['address'])},
                       'record_b': {'donor_id': candidate['donor_id'], 
                                    'name': unicode(candidate['name']), 
                                    'address': unicode(candidate['address'])}}
                
def comparator(element):
    """
    Extract similarity features
    """
    return {
        'donor_id1': element['record_a']['donor_id'],
        'donor_id2': element['record_b']['donor_id'],
        'jaro_name': jf.jaro_winkler(element['record_a']['name'], element['record_b']['name']),
        'damerau_name': jf.damerau_levenshtein_distance(element['record_a']['name'], element['record_b']['name']),
        'jaro_address': jf.jaro_winkler(element['record_a']['address'], element['record_b']['address']),
        'damerau_address': jf.damerau_levenshtein_distance(element['record_a']['address'], element['record_b']['address'])  
        }
                
def baseline_classifier(element):
    """
    Simple voting classifier.
    * assumes an equal weighting for the different types of distance metrics. 
    """
    votes = [
        element['jaro_name'] > 0.67,
        element['jaro_address'] > 0.67,
        element['damerau_name'] < 9,
        element['damerau_address'] < 9]
    return {'donor_id1': element['donor_id1'], 
            'donor_id2': element['donor_id2'], 
            'classification': np.mean(votes)}                


schema = 'donor_id1:STRING, donor_id2:STRING, classfication:FLOAT'

def printfn(x): print(x)

In [None]:
with beam.Pipeline(options=options) as p:
     
    _ = (p 
        | "query" >> beam.io.Read(beam.io.BigQuerySource(query=query, 
                                                         project='anz-pso-nfaggian', 
                                                         use_standard_sql=True))
        | "record generator" >> beam.ParDo(indexer())
        | "feature extraction" >> beam.Map(lambda x: comparator(x)) 
        | "duplicate classifier" >> beam.Map(lambda x: baseline_classifier(x)) 
        # Need a cluster creation function here 
        | "store" >> beam.io.Write(beam.io.BigQuerySink('dedup.classification', 
                                                        schema=schema, 
                                                        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, 
                                                        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
        )
    
    result = p.run().wait_until_finish()

ERROR:root:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f99d16875f0>, due to an exception.
 Traceback (most recent call last):
  File "/home/nfaggian/development/miniconda/envs/py2/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py", line 343, in call
    finish_state)
  File "/home/nfaggian/development/miniconda/envs/py2/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py", line 383, in attempt_call
    result = evaluator.finish_bundle()
  File "/home/nfaggian/development/miniconda/envs/py2/lib/python2.7/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 904, in finish_bundle
    writer.Write(v.value)
  File "/home/nfaggian/development/miniconda/envs/py2/lib/python2.7/site-packages/apache_beam/io/gcp/bigquery.py", line 768, in __exit__
    self._flush_rows_buffer()
  File "/home/nfaggian/development/miniconda/envs/py2/lib/python2.7/site-packages/apache_beam/io/gcp/bigquery.py", line 752, in _flus