In [None]:
import tempfile
import argparse
import csv
import os

import dedupe
import psycopg2
from psycopg2.extras import DictCursor

In [None]:
KEY_FIELD = 'visitor_id'
SOURCE_TABLE = 'visitors'

FIELDS =  [{'field': 'firstname', 'variable name': 'firstname',
           'type': 'String','has missing': True},
           {'field': 'lastname', 'variable name': 'lastname',
           'type': 'String','has missing': True},
           {'field': 'uin', 'variable name': 'uin',
           'type': 'String','has missing': True},
           {'field': 'meeting_loc', 'variable name': 'meeting_loc',
           'type': 'String','has missing': True}
           ]

In [None]:
def candidates_gen(result_set):
    lset = set
    block_id = None
    records = []
    i = 0
    for row in result_set:
        if row['block_id'] != block_id:
            if records:
                yield records
            
            block_id = row['block_id']
            records = []
            i += 1
            
            if i % 10000 == 0:
                print ('{} blocks'.format(i))

        smaller_ids = row['smaller_ids']
        if smaller_ids:
            smaller_ids = lset(smaller_ids.split(','))
        else:
            smaller_ids = lset([])
        
        records.append((row[KEY_FIELD], row, smaller_ids))
    
    if records:
        yield records

In [None]:
deduper = dedupe.Dedupe(FIELDS)
from sklearn.linear_model import LogisticRegression
deduper.classifier = LogisticRegression()

In [None]:
try:
    con = psycopg2.connect(database='postgres',
                          host='lva1-gendevdb01',
                           password='epWU32peDN',
                          cursor_factory=DictCursor)
    print ("I've connected")
except Exception as e:
    print (e)
c = con.cursor()


In [None]:
# Removing null values to preserve script, minimal data loss - still robust clusters can be made
c.execute('SELECT COUNT(*) AS count FROM %s where firstname is not null and firstname <> \'\' and lastname is not null and lastname <> \'\' and uin is not null and uin <> \'\' ' % SOURCE_TABLE)
row = c.fetchone()
count = row['count']
sample_size = int(count * 0.05)
print(count)

In [None]:
# Modifying for complete records
print ('Generating sample of {} records'.format(sample_size))
with con.cursor('deduper') as c_deduper:
    c_deduper.execute('SELECT visitor_id,lastname,firstname,uin,meeting_loc FROM %s where firstname is not null and firstname <> \'\' and lastname is not null and lastname <> \'\' and uin is not null and uin <> \'\' ' % SOURCE_TABLE)
    temp_d = dict((i, row) for i, row in enumerate(c_deduper))
    deduper.sample(temp_d, sample_size)

In [None]:
print ('Starting active learning')
dedupe.convenience.consoleLabel(deduper)

In [None]:
print('preparing training')
deduper.prepare_training(temp_d, sample_size =sample_size)
print ('Starting training')
deduper.train()

In [None]:
print ('Saving new training file to {}'.format('train.json'))
with open('train.json', 'w') as training_file:
    deduper.writeTraining(training_file)

deduper.cleanupTraining()

In [None]:
print ('Creating blocking_map table')
c.execute("""
    DROP TABLE IF EXISTS blocking_map
    """)
c.execute("""
    CREATE TABLE blocking_map
    (block_key VARCHAR(200), %s INTEGER)
    """ % KEY_FIELD)

In [None]:
for field in deduper.blocker.index_fields:
    print ('Selecting distinct values for "{}"'.format(field))
    c_index = con.cursor('index')
    c_index.execute("""
        SELECT DISTINCT %s FROM %s
        """ % (field, SOURCE_TABLE))
    field_data = (row[field] for row in c_index)
    deduper.blocker.index(field_data, field)
    c_index.close()

In [None]:
print ('Generating blocking map')
c_block = con.cursor('block')
c_block.execute("""
    SELECT * FROM %s
    """ % SOURCE_TABLE)
full_data = ((row[KEY_FIELD], row) for row in c_block)
b_data = deduper.blocker(full_data)

In [None]:
print ('Inserting blocks into blocking_map')
csv_file = tempfile.NamedTemporaryFile(prefix='blocks_', delete=False, mode ='w')
csv_writer = csv.writer(csv_file)
for x in b_data:
    csv_writer.writerow(list(x))
#csv_writer.writerows(b_data)
csv_file.close()

In [None]:
f = open(csv_file.name, 'r')
c.copy_expert("COPY blocking_map FROM STDIN CSV", f)
f.close()

os.remove(csv_file.name)

con.commit()

In [None]:
print ('Indexing blocks')
c.execute("""
    CREATE INDEX blocking_map_key_idx ON blocking_map (block_key)
    """)
c.execute("DROP TABLE IF EXISTS plural_key")
c.execute("DROP TABLE IF EXISTS plural_block")
c.execute("DROP TABLE IF EXISTS covered_blocks")
c.execute("DROP TABLE IF EXISTS smaller_coverage")

In [None]:
print ('Calculating plural_key')
c.execute("""
    CREATE TABLE plural_key
    (block_key VARCHAR(200),
    block_id SERIAL PRIMARY KEY)
    """)
c.execute("""
    INSERT INTO plural_key (block_key)
    SELECT block_key FROM blocking_map
    GROUP BY block_key HAVING COUNT(*) > 1
    """)

In [None]:
print ('Indexing block_key')
c.execute("""
    CREATE UNIQUE INDEX block_key_idx ON plural_key (block_key)
    """)

In [None]:
print ('Calculating plural_block')
c.execute("""
    CREATE TABLE plural_block
    AS (SELECT block_id, %s
    FROM blocking_map INNER JOIN plural_key
    USING (block_key))
    """ % KEY_FIELD)

In [None]:
print ('Adding {} index'.format(KEY_FIELD))
c.execute("""
    CREATE INDEX plural_block_%s_idx
    ON plural_block (%s)
    """ % (KEY_FIELD, KEY_FIELD))
c.execute("""
    CREATE UNIQUE INDEX plural_block_block_id_%s_uniq
    ON plural_block (block_id, %s)
    """ % (KEY_FIELD, KEY_FIELD))

In [None]:
print ('Creating covered_blocks')
c.execute("""
    CREATE TABLE covered_blocks AS
    (SELECT %s,
    string_agg(CAST(block_id AS TEXT), ','
    ORDER BY block_id) AS sorted_ids
    FROM plural_block
    GROUP BY %s)
    """ % (KEY_FIELD, KEY_FIELD))

In [None]:
print ('Indexing covered_blocks')
c.execute("""
    CREATE UNIQUE INDEX covered_blocks_%s_idx
    ON covered_blocks (%s)
    """ % (KEY_FIELD, KEY_FIELD))
print ('Committing')

In [None]:
print ('Creating smaller_coverage')
c.execute("""
    CREATE TABLE smaller_coverage AS
    (SELECT %s, block_id,
    TRIM(',' FROM split_part(sorted_ids,
    CAST(block_id AS TEXT), 1))
    AS smaller_ids
    FROM plural_block
    INNER JOIN covered_blocks
    USING (%s))
    """ % (KEY_FIELD, KEY_FIELD))
con.commit()

In [None]:
print ('Clustering...')
c_cluster = con.cursor('cluster')
c_cluster.execute("""
    SELECT *
    FROM smaller_coverage
    INNER JOIN %s
    USING (%s)
    ORDER BY (block_id)
    LIMIT 100
    """ % (SOURCE_TABLE, KEY_FIELD))

In [None]:
clustered_dupes = deduper.matchBlocks(candidates_gen(c_cluster), threshold=0.5)

In [None]:
print ('Creating entity_map table')
c.execute("DROP TABLE IF EXISTS entity_map")
c.execute("""
      CREATE TABLE entity_map (
      %s INTEGER,
      canon_id INTEGER,
      cluster_score FLOAT,
      PRIMARY KEY(%s)
      )""" % (KEY_FIELD, KEY_FIELD))

In [None]:
print ('Inserting entities into entity_map')
for cluster, scores in clustered_dupes:
    cluster_id = cluster[0]
    for key_field, score in zip(cluster, scores):
        c.execute("""
              INSERT INTO entity_map
              (%s, canon_id, cluster_score)
              VALUES (%s, %s, %s)
              """ % (KEY_FIELD, key_field, cluster_id, score))
        con.commit()