In [1]:
pip install dj_database_url


Collecting dj_database_url
  Downloading dj_database_url-0.5.0-py2.py3-none-any.whl (5.5 kB)
Installing collected packages: dj-database-url
Successfully installed dj-database-url-0.5.0
Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.8.6-cp38-cp38-manylinux1_x86_64.whl (3.0 MB)
[K     |████████████████████████████████| 3.0 MB 1.8 MB/s eta 0:00:01
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.8.6
Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install dedupe

Collecting dedupe
  Downloading dedupe-2.0.6-cp38-cp38-manylinux1_x86_64.whl (90 kB)
[K     |████████████████████████████████| 90 kB 1.4 MB/s eta 0:00:011
[?25hCollecting doublemetaphone
  Downloading DoubleMetaphone-0.1-cp38-cp38-manylinux1_x86_64.whl (78 kB)
[K     |████████████████████████████████| 78 kB 3.2 MB/s eta 0:00:011
[?25hCollecting highered>=0.2.0
  Downloading highered-0.2.1-py2.py3-none-any.whl (3.3 kB)
Collecting dedupe-hcluster
  Downloading dedupe_hcluster-0.3.8-cp38-cp38-manylinux1_x86_64.whl (539 kB)
[K     |████████████████████████████████| 539 kB 7.9 MB/s eta 0:00:01
[?25hCollecting categorical-distance>=1.9
  Downloading categorical_distance-1.9-py3-none-any.whl (3.3 kB)
Collecting affinegap>=1.3
  Downloading affinegap-1.11-cp38-cp38-manylinux1_x86_64.whl (46 kB)
[K     |████████████████████████████████| 46 kB 4.6 MB/s eta 0:00:011
[?25hCollecting simplecosine>=1.2
  Downloading simplecosine-1.2-py2.py3-none-any.whl (3.2 kB)
Collecting fastcluster
  Down

In [4]:
import os
import time
import logging
import optparse
import locale
import itertools
import io
import csv

import dj_database_url
import psycopg2
import psycopg2.extras

import dedupe
import numpy


In [5]:
from psycopg2.extensions import register_adapter, AsIs
register_adapter(numpy.int32, AsIs)
register_adapter(numpy.int64, AsIs)
register_adapter(numpy.float32, AsIs)
register_adapter(numpy.float64, AsIs)

In [6]:
class Readable(object):

    def __init__(self, iterator):

        self.output = io.StringIO()
        self.writer = csv.writer(self.output)
        self.iterator = iterator

    def read(self, size):

        self.writer.writerows(itertools.islice(self.iterator, size))

        chunk = self.output.getvalue()
        self.output.seek(0)
        self.output.truncate(0)

        return chunk


def record_pairs(result_set):

    for i, row in enumerate(result_set):
        a_record_id, a_record, b_record_id, b_record = row
        record_a = (a_record_id, a_record)
        record_b = (b_record_id, b_record)

        yield record_a, record_b

        if i % 10000 == 0:
            print(i)


def cluster_ids(clustered_dupes):

    for cluster, scores in clustered_dupes:
        cluster_id = cluster[0]
        for donor_id, score in zip(cluster, scores):
            yield donor_id, cluster_id, score


if __name__ == '__main__':
    # ## Logging

    # Dedupe uses Python logging to show or suppress verbose output. Added
    # for convenience.  To enable verbose output, run `python
    # pgsql_big_dedupe_example.py -v`
    optp = optparse.OptionParser()
    optp.add_option('-v', '--verbose', dest='verbose', action='count',
                    help='Increase verbosity (specify multiple times for more)'
                    )
    (opts, args) = optp.parse_args()
    log_level = logging.WARNING
    if opts.verbose:
        if opts.verbose == 1:
            log_level = logging.INFO
        elif opts.verbose >= 2:
            log_level = logging.DEBUG
    logging.getLogger().setLevel(log_level)

    # ## Setup
    settings_file = 'pgsql_big_dedupe_example_settings'
    training_file = 'pgsql_big_dedupe_example_training.json'

In [7]:
    # ## Setup
    settings_file = 'pgsql_big_dedupe_example_settings'
    training_file = 'pgsql_big_dedupe_example_training.json'

In [8]:
    start_time = time.time()
    
    read_con = psycopg2.connect(database="campaign-finance",
                        user="postgres",
                        password="",
                        host="172.16.238.13",
                        port="5432",
                        cursor_factory=psycopg2.extras.RealDictCursor)

    write_con = psycopg2.connect(database="campaign-finance",
                        user="postgres",
                        password="",
                        host="172.16.238.13",
                        port="5432")

In [None]:
    DONOR_SELECT = "SELECT donor_id, city, name, zip, state, address " \
                   "from processed_donors"

    # ## Training

    if os.path.exists(settings_file):
        print('reading from ', settings_file)
        with open(settings_file, 'rb') as sf:
            deduper = dedupe.StaticDedupe(sf, num_cores=4)
    else:

        # Define the fields dedupe will pay attention to
        #
        # The address, city, and zip fields are often missing, so we'll
        # tell dedupe that, and we'll learn a model that take that into
        # account
        fields = [{'field': 'name', 'type': 'String'},
                  {'field': 'address', 'type': 'String', 'has missing': True},
                  {'field': 'city', 'type': 'ShortString', 'has missing': True},
                  {'field': 'state', 'type': 'ShortString', 'has missing': True},
                  {'field': 'zip', 'type': 'ShortString', 'has missing': True},
                  ]

        # Create a new deduper object and pass our data model to it.
        deduper = dedupe.Dedupe(fields, num_cores=4)

        # Named cursor runs server side with psycopg2
        with read_con.cursor('donor_select') as cur:
            cur.execute(DONOR_SELECT)
            temp_d = {i: row for i, row in enumerate(cur)}

        # If we have training data saved from a previous run of dedupe,
        # look for it an load it in.
        #
        # __Note:__ if you want to train from
        # scratch, delete the training_file
        if os.path.exists(training_file):
            print('reading labeled examples from ', training_file)
            with open(training_file) as tf:
                deduper.prepare_training(temp_d, tf)
        else:
            deduper.prepare_training(temp_d)

        del temp_d

In [None]:
        # ## Active learning

        print('starting active labeling...')
        # Starts the training loop. Dedupe will find the next pair of records
        # it is least certain about and ask you to label them as duplicates
        # or not.

        # use 'y', 'n' and 'u' keys to flag duplicates
        # press 'f' when you are finished
        dedupe.console_label(deduper)
        # When finished, save our labeled, training pairs to disk
        with open(training_file, 'w') as tf:
            deduper.write_training(tf)

        # Notice our argument here
        #
        # `recall` is the proportion of true dupes pairs that the learned
        # rules must cover. You may want to reduce this if your are making
        # too many blocks and too many comparisons.
        deduper.train(recall=0.90)

        with open(settings_file, 'wb') as sf:
            deduper.write_settings(sf)

        # We can now remove some of the memory hogging objects we used
        # for training
        deduper.cleanup_training()

In [None]:
    # ## Blocking
    print('blocking...')

    # To run blocking on such a large set of data, we create a separate table
    # that contains blocking keys and record ids
    print('creating blocking_map database')
    with write_con:
        with write_con.cursor() as cur:
            cur.execute("DROP TABLE IF EXISTS blocking_map")
            cur.execute("CREATE TABLE blocking_map "
                        "(block_key text, donor_id INTEGER)")

    # If dedupe learned a Index Predicate, we have to take a pass
    # through the data and create indices.
    print('creating inverted index')

    for field in deduper.fingerprinter.index_fields:
        with read_con.cursor('field_values') as cur:
            cur.execute("SELECT DISTINCT %s FROM processed_donors" % field)
            field_data = (row[field] for row in cur)
            deduper.fingerprinter.index(field_data, field)

    # Now we are ready to write our blocking map table by creating a
    # generator that yields unique `(block_key, donor_id)` tuples.
    print('writing blocking map')

    with read_con.cursor('donor_select') as read_cur:
        read_cur.execute(DONOR_SELECT)

        full_data = ((row['donor_id'], row) for row in read_cur)
        b_data = deduper.fingerprinter(full_data)

        with write_con:
            with write_con.cursor() as write_cur:
                write_cur.copy_expert('COPY blocking_map FROM STDIN WITH CSV',
                                      Readable(b_data),
                                      size=10000)

In [None]:
    # free up memory by removing indices
    deduper.fingerprinter.reset_indices()

    logging.info("indexing block_key")
    with write_con:
        with write_con.cursor() as cur:
            cur.execute("CREATE UNIQUE INDEX ON blocking_map "
                        "(block_key text_pattern_ops, donor_id)")


In [None]:
    # ## Clustering

    with write_con:
        with write_con.cursor() as cur:
            cur.execute("DROP TABLE IF EXISTS entity_map")

            print('creating entity_map database')
            cur.execute("CREATE TABLE entity_map "
                        "(donor_id INTEGER, canon_id INTEGER, "
                        " cluster_score FLOAT, PRIMARY KEY(donor_id))")

    with read_con.cursor('pairs', cursor_factory=psycopg2.extensions.cursor) as read_cur:
        read_cur.execute("""
               select a.donor_id,
                      row_to_json((select d from (select a.city,
                                                         a.name,
                                                         a.zip,
                                                         a.state,
                                                         a.address) d)),
                      b.donor_id,
                      row_to_json((select d from (select b.city,
                                                         b.name,
                                                         b.zip,
                                                         b.state,
                                                         b.address) d))
               from (select DISTINCT l.donor_id as east, r.donor_id as west
                     from blocking_map as l
                     INNER JOIN blocking_map as r
                     using (block_key)
                     where l.donor_id < r.donor_id) ids
               INNER JOIN processed_donors a on ids.east=a.donor_id
               INNER JOIN processed_donors b on ids.west=b.donor_id""")

        print('clustering...')
        clustered_dupes = deduper.cluster(deduper.score(record_pairs(read_cur)),
                                          threshold=0.5)

        # ## Writing out results

        # We now have a sequence of tuples of donor ids that dedupe believes
        # all refer to the same entity. We write this out onto an entity map
        # table

        print('writing results')
        with write_con:
            with write_con.cursor() as write_cur:
                write_cur.copy_expert('COPY entity_map FROM STDIN WITH CSV',
                                      Readable(cluster_ids(clustered_dupes)),
                                      size=10000)

    with write_con:
        with write_con.cursor() as cur:
            cur.execute("CREATE INDEX head_index ON entity_map (canon_id)")

    # Print out the number of duplicates found

In [9]:
 locale.setlocale(locale.LC_ALL, '')  # for pretty printing numbers

'C.UTF-8'

In [16]:
read_con.commit()

In [None]:
    with read_con.cursor() as cur:
        cur.execute("DROP TABLE e_map")

In [11]:
 with read_con.cursor() as cur:
   
        cur.execute(
            "SELECT CONCAT_WS(' ', donors.name) as name, "
            "SUM(CAST(contributions.amount AS FLOAT)) AS totals "
            "FROM donors INNER JOIN contributions "
            "USING (donor_id) "
            "GROUP BY (donor_id) "
            "ORDER BY totals DESC "
            "LIMIT 10"
        )

        print("Top Donors (raw)")
        for row in cur:
            row['totals'] = row['totals']
            print('%(totals)20s: %(name)s' % row)

   

Top Donors (raw)
         24556813.25: gmmb, inc.
          14350000.0: good government coalition
  12433621.810005147: aggregated individual contribution
  11729787.940000001: onmessage, inc.
  11379875.429999998: nc democratic leadership committee
          11305000.0: citizens for a better north carolina
          10222018.0: misty smithey
           9755400.0: cooper for north carolina
          8850440.35: nc democratic leadership committee
           8850000.0: democratic action


In [17]:
    with read_con.cursor() as cur:
               
        cur.execute("CREATE TEMPORARY TABLE e_map "
                    "AS SELECT COALESCE(canon_id, donor_id) AS canon_id, donor_id "
                    "FROM entity_map "
                    "RIGHT JOIN donors USING(donor_id)")
        
        cur.execute(
            "SELECT donors.name AS name, "
            "donation_totals.totals AS totals "
            "FROM donors INNER JOIN "
            "(SELECT contributions.canon_id, SUM(CAST(amount AS FLOAT)) AS totals "
            " FROM contributions INNER JOIN e_map "
            " USING (donor_id) "
            " GROUP BY (contributions.canon_id) "
            " ORDER BY totals "
            " DESC LIMIT 10) "
            "AS donation_totals ON donors.donor_id=donation_totals.canon_id "
            "WHERE donors.donor_id = donation_totals.canon_id"
        )

        print("Top Donors (deduped)")
        for row in cur:
            row['totals'] = row['totals']
            print('%(totals)20s: %(name)s' % row)
            
        cur.execute("SELECT * FROM e_map")
   

Top Donors (deduped)
         27648935.78: nc democratic leadership building fund
          25527066.7: gmmb inc
  17567691.199999984: north carolina dec
          14350000.0: good government coalition
  12433621.810034692: aggregated individual contribution
  11926236.380000003: onmessage
          11305000.0: citizens for a better north carolina
  11035671.030000003: buying time
  10643647.379999965: nc democcratic party
         10222098.25: misty smithey


In [None]:
    read_con.close()
    write_con.close()

    print('ran in', time.time() - start_time, 'seconds')