In [1]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
This is a setup script for mysql_example.  It downloads a zip file of
Illinois campaign contributions and loads them in t aMySQL database
named 'contributions'.

__Note:__ You will need to run this script first before execuing
[mysql_example.py](http://datamade.github.com/dedupe-examples/docs/mysql_example.html).

Tables created:
* raw_table - raw import of entire CSV file
* donors - all distinct donors based on name and address
* recipients - all distinct campaign contribution recipients
* contributions - contribution amounts tied to donor and recipients tables
"""
import csv
import os
import zipfile

import dj_database_url
import psycopg2
import psycopg2.extras
import unidecode
import requests

from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

##
## ***Always be logging***
##
import traceback
import logging
from logging.handlers import RotatingFileHandler

# Logging defaults - basic config will log to stdout, then we'll add a log-to-file handler
# Allow log override from environ variable
FILE_LOG_FORMAT = "%(asctime)s %(levelname)s %(module)s:%(lineno)d %(message)s"
#CONSOLE_LOG_FORMAT = "%(levelname)s %(module)s:%(lineno)d %(message)s"
CONSOLE_LOG_FORMAT = FILE_LOG_FORMAT
LOG_FILENAME = 'testVisitor.log'

LOG_LEVEL = os.getenv('LOG_LEVEL', logging.INFO)
if LOG_LEVEL == 'debug':
    LOG_LEVEL=logging.DEBUG

# setup for console
logging.basicConfig(level=LOG_LEVEL, format=CONSOLE_LOG_FORMAT)
logger = logging.getLogger('')

# setup for file
log_file_handler = RotatingFileHandler(LOG_FILENAME, maxBytes=20971520, backupCount=5)
log_file_handler.setFormatter(logging.Formatter(FILE_LOG_FORMAT))
logger.addHandler(log_file_handler)
##
##
##

In [2]:
def getData(url,fname):
    """
    Download the dataset from the webpage.
    """
    response = requests.get(url)
    with open(fname, 'w') as f:
        f.write(response.content)

In [6]:
#DATAURL = "https://open.whitehouse.gov/api/views/p86s-ychb/rows.csv?accessType=DOWNLOAD"
DATAURL = "open.obamawhitehouse.archives.gov/sites/default/files/White_House_Visitor_Records_Requests.csv?"
#http://open.obamawhitehouse.archives.gov/sites/default/files/White_House_Visitor_Records_Requests.csv?

In [10]:
%ls

Illinois-campaign-contributions.csv
Illinois-campaign-contributions.txt
Illinois-campaign-contributions.txt.zip
[0m[01;34m__MACOSX[0m/
small_White_House_test.csv
testVisitor.log
Untitled.ipynb
visitorTest1.ipynb
White_House_Visitor_Records_Requests.csv


In [11]:
#ORIGFILE = "fixtures/whitehouse-visitors.csv"
ORIGFILE = "small_White_House_test.csv"

In [9]:
#getData(DATAURL,ORIGFILE)

takes the csv data in as input, parses the datetime fields we’re interested in (‘lastname’,’firstname’,’uin’,’apptmade’,’apptstart’,’apptend’, ‘meeting_loc’.), and outputs a database table that retains the desired columns. 

In [12]:
db_conf = dj_database_url.config()

if not db_conf:
    raise Exception(
        'set DATABASE_URL environment variable with your connection, e.g. '
        'export DATABASE_URL=postgres://user:password@host/mydatabase'
    )
else:
    logger.info('database config loaded...')

logger.info('checking if database exists...')
conn = psycopg2.connect(user=db_conf['USER'],
                        password=db_conf['PASSWORD'],
                        host=db_conf['HOST'],
                        port=db_conf['PORT'])

# must set isolation level otherwise can't create database
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

2018-08-13 17:04:29,815 INFO <ipython-input-12-924a8d4095d1>:9 database config loaded...
2018-08-13 17:04:29,823 INFO <ipython-input-12-924a8d4095d1>:11 checking if database exists...


In [13]:
c = conn.cursor()
c.execute(f"SELECT COUNT(*) = 0 FROM pg_catalog.pg_database WHERE datname = '{db_conf['NAME']}'")
not_exists_row = c.fetchone()
not_exists = not_exists_row[0]
if not_exists:
    c.execute(f"CREATE DATABASE {db_conf['NAME']}")
    conn.commit()
    logger.info('DB didnt exist, now it does...')
else:
    logger.info('DB already exists, lets continue...')

logger.info('re-connecting specifically to database...')
conn = psycopg2.connect(database=db_conf['NAME'],
                        user=db_conf['USER'],
                        password=db_conf['PASSWORD'],
                        host=db_conf['HOST'],
                        port=db_conf['PORT'])

c = conn.cursor()

2018-08-13 17:06:05,641 INFO <ipython-input-13-839b165f611e>:10 DB already exists, lets continue...
2018-08-13 17:06:05,643 INFO <ipython-input-13-839b165f611e>:12 re-connecting specifically to database...


In [None]:
# Create a cleaned up CSV version of file with consistent row lengths.
# Postgres COPY doesn't handle "ragged" files very well
if not os.path.exists(ORIGFILE):
    logger.info('converting tab-delimited raw file to csv...')
    with open(contributions_txt_file, 'rU') as txt_file, \
            open(contributions_csv_file, 'w') as csv_file:
        csv_writer = csv.writer(csv_file, quoting=csv.QUOTE_ALL)
        for line in txt_file:
            if not all(ord(c) < 128 for c in line):
                line = unidecode.unidecode(line)
            row = line.rstrip('\t\r\n').split('\t')
            if len(row) != 29:
                logger.info('skipping bad row (length %s, expected 29):' % len(row))
                logger.info(row)
                continue
            csv_writer.writerow(row)

In [14]:
def dateParseSQL(nfile):
    c.execute('''CREATE TABLE IF NOT EXISTS visitors_er
                  (visitor_id SERIAL PRIMARY KEY,
                  lastname    varchar,
                  firstname   varchar,
                  uin         varchar,
                  apptmade    varchar,
                  apptstart   varchar,
                  apptend     varchar,
                  meeting_loc varchar);''')
    conn.commit()
    with open(nfile, 'rU') as infile:
        reader = csv.reader(infile, delimiter=',')
        next(reader, None)
        for row in reader:
            for field in DATEFIELDS:
                if row[field] != '':
                    try:
                        dt = parser.parse(row[field])
                        row[field] = dt.toordinal()  # We also tried dt.isoformat()
                    except:
                        continue
            sql = "INSERT INTO visitors_er(lastname,firstname,uin,apptmade,apptstart,apptend,meeting_loc) \
                   VALUES (%s,%s,%s,%s,%s,%s,%s)"
            cur.execute(sql, (row[0],row[1],row[3],row[10],row[11],row[12],row[21],))
            conn.commit()
    print ("All done!")



In [15]:
dateParseSQL(ORIGFILE)

  if sys.path[0] == '':


NameError: name 'DATEFIELDS' is not defined

In [None]:
https://open.obamawhitehouse.archives.gov/sites/default/files/White_House_Visitor_Records_Requests.csv

In [None]:


# Lets get down to business

_file = 'Illinois-campaign-contributions'
contributions_zip_file = _file + '.txt.zip'
contributions_txt_file = _file + '.txt'
contributions_csv_file = _file + '.csv'

if not os.path.exists(contributions_zip_file):
    logger.info('downloading', contributions_zip_file, '(~60mb) ...')
    u = requests.get(
        'https://s3.amazonaws.com/dedupe-data/Illinois-campaign-contributions.txt.zip')
    localFile = open(contributions_zip_file, 'wb')
    localFile.write(u.content)
    localFile.close()

if not os.path.exists(contributions_txt_file):
    zip_file = zipfile.ZipFile(contributions_zip_file, 'r')
    logger.info('extracting %s' % contributions_zip_file)
    zip_file_contents = zip_file.namelist()
    for f in zip_file_contents:
        if ('.txt' in f):
            zip_file.extract(f)
    zip_file.close()



########



logger.info('loading intarray extension...')
try:
    c.execute(f"CREATE EXTENSION intarray")
    conn.commit()
except:
    logger.info('extension already loaded')
    logger.debug('Full stack trace\n{}'.traceback.format_exc())
    pass

logger.info("re-creating any tables (if they exist)...")
c.execute("DROP TABLE IF EXISTS raw_table")
c.execute("DROP TABLE IF EXISTS donors")
c.execute("DROP TABLE IF EXISTS recipients")
c.execute("DROP TABLE IF EXISTS contributions")
c.execute("DROP TABLE IF EXISTS processed_donors")

c.execute("CREATE TABLE raw_table "
          "(reciept_id INT, last_name VARCHAR(70), first_name VARCHAR(35), "
          " address_1 VARCHAR(35), address_2 VARCHAR(36), city VARCHAR(20), "
          " state VARCHAR(15), zip VARCHAR(11), report_type VARCHAR(24), "
          " date_recieved VARCHAR(10), loan_amount VARCHAR(12), "
          " amount VARCHAR(23), receipt_type VARCHAR(23), "
          " employer VARCHAR(70), occupation VARCHAR(40), "
          " vendor_last_name VARCHAR(70), vendor_first_name VARCHAR(20), "
          " vendor_address_1 VARCHAR(35), vendor_address_2 VARCHAR(31), "
          " vendor_city VARCHAR(20), vendor_state VARCHAR(10), "
          " vendor_zip VARCHAR(10), description VARCHAR(90), "
          " election_type VARCHAR(10), election_year VARCHAR(10), "
          " report_period_begin VARCHAR(10), report_period_end VARCHAR(33), "
          " committee_name VARCHAR(70), committee_id VARCHAR(37))")

conn.commit()

logger.info('importing raw data from csv...')
with open(contributions_csv_file, 'rU') as csv_file:
    c.copy_expert("COPY raw_table "
                  "(reciept_id, last_name, first_name, "
                  " address_1, address_2, city, state, "
                  " zip, report_type, date_recieved, "
                  " loan_amount, amount, receipt_type, "
                  " employer, occupation, vendor_last_name, "
                  " vendor_first_name, vendor_address_1, "
                  " vendor_address_2, vendor_city, vendor_state, "
                  " vendor_zip, description, election_type, "
                  " election_year, "
                  " report_period_begin, report_period_end, "
                  " committee_name, committee_id) "
                  "FROM STDIN CSV HEADER", csv_file)

conn.commit()

logger.info('creating donors table...')
c.execute("CREATE TABLE donors "
          "(donor_id SERIAL PRIMARY KEY, "
          " last_name VARCHAR(70), first_name VARCHAR(35), "
          " address_1 VARCHAR(35), address_2 VARCHAR(36), "
          " city VARCHAR(20), state VARCHAR(15), "
          " zip VARCHAR(11), employer VARCHAR(70), "
          " occupation VARCHAR(40))")
conn.commit()

logger.info('populating donors table...')
c.execute("INSERT INTO donors "
          "(first_name, last_name, address_1, "
          " address_2, city, state, zip, employer, occupation) "
          "SELECT DISTINCT "
          "LOWER(TRIM(first_name)), LOWER(TRIM(last_name)), "
          "LOWER(TRIM(address_1)), LOWER(TRIM(address_2)), "
          "LOWER(TRIM(city)), LOWER(TRIM(state)), LOWER(TRIM(zip)), "
          "LOWER(TRIM(employer)), LOWER(TRIM(occupation)) "
          "FROM raw_table")
conn.commit()

logger.info('creating indexes on donors table...')
c.execute("CREATE INDEX donors_donor_info ON donors "
          "(last_name, first_name, address_1, address_2, city, "
          " state, zip)")
conn.commit()

logger.info('creating recipients table...')
c.execute("CREATE TABLE recipients "
          "(recipient_id SERIAL PRIMARY KEY, name VARCHAR(70))")
conn.commit()

logger.info('populating recipients table...')
c.execute("INSERT INTO recipients "
          "SELECT DISTINCT CAST(committee_id AS INTEGER), "
          "committee_name FROM raw_table")
conn.commit()

logger.info('creating contributions table...')
c.execute("CREATE TABLE contributions "
          "(contribution_id INT, donor_id INT, recipient_id INT, "
          " report_type VARCHAR(24), date_recieved DATE, "
          " loan_amount VARCHAR(12), amount VARCHAR(23), "
          " receipt_type VARCHAR(23), "
          " vendor_last_name VARCHAR(70), "
          " vendor_first_name VARCHAR(20), "
          " vendor_address_1 VARCHAR(35), vendor_address_2 VARCHAR(31), "
          " vendor_city VARCHAR(20), vendor_state VARCHAR(10), "
          " vendor_zip VARCHAR(10), description VARCHAR(90), "
          " election_type VARCHAR(10), election_year VARCHAR(10), "
          " report_period_begin DATE, report_period_end DATE)")
conn.commit()

logger.info('populating contributions table...')
c.execute("INSERT INTO contributions "
          "SELECT reciept_id, donors.donor_id, CAST(committee_id AS INTEGER), "
          " report_type, TO_DATE(TRIM(date_recieved), 'MM/DD/YYYY'), "
          " loan_amount, amount, "
          " receipt_type, vendor_last_name , "
          " vendor_first_name, vendor_address_1,"
          " vendor_address_2, "
          " vendor_city, vendor_state, vendor_zip,"
          " description, "
          " election_type, election_year, "
          " TO_DATE(TRIM(report_period_begin), 'MM/DD/YYYY'), "
          " TO_DATE(TRIM(report_period_end), 'MM/DD/YYYY') "
          "FROM raw_table JOIN donors ON "
          "donors.first_name = LOWER(TRIM(raw_table.first_name)) AND "
          "donors.last_name = LOWER(TRIM(raw_table.last_name)) AND "
          "donors.address_1 = LOWER(TRIM(raw_table.address_1)) AND "
          "donors.address_2 = LOWER(TRIM(raw_table.address_2)) AND "
          "donors.city = LOWER(TRIM(raw_table.city)) AND "
          "donors.state = LOWER(TRIM(raw_table.state)) AND "
          "donors.employer = LOWER(TRIM(raw_table.employer)) AND "
          "donors.occupation = LOWER(TRIM(raw_table.occupation)) AND "
          "donors.zip = LOWER(TRIM(raw_table.zip))")
conn.commit()

logger.info('creating indexes on contributions...')
c.execute("ALTER TABLE contributions ADD PRIMARY KEY(contribution_id)")
c.execute("CREATE INDEX donor_idx ON contributions (donor_id)")
c.execute("CREATE INDEX recipient_idx ON contributions (recipient_id)")
conn.commit()

logger.info('nullifying empty strings in donors...')
c.execute(
    "UPDATE donors "
    "SET "
    "first_name = CASE first_name WHEN '' THEN NULL ELSE first_name END, "
    "last_name = CASE last_name WHEN '' THEN NULL ELSE last_name END, "
    "address_1 = CASE address_1 WHEN '' THEN NULL ELSE address_1 END, "
    "address_2 = CASE address_2 WHEN '' THEN NULL ELSE address_2 END, "
    "city = CASE city WHEN '' THEN NULL ELSE city END, "
    "state = CASE state WHEN '' THEN NULL ELSE state END, "
    "employer = CASE employer WHEN '' THEN NULL ELSE employer END, "
    "occupation = CASE occupation WHEN '' THEN NULL ELSE occupation END, "
    "zip = CASE zip WHEN '' THEN NULL ELSE zip END"
)
conn.commit()

logger.info('creating processed_donors...')
c.execute("CREATE TABLE processed_donors AS "
          "(SELECT donor_id, "
          " LOWER(city) AS city, "
          " CASE WHEN (first_name IS NULL AND last_name IS NULL) "
          "      THEN NULL "
          "      ELSE LOWER(CONCAT_WS(' ', first_name, last_name)) "
          " END AS name, " 
          " LOWER(zip) AS zip, "
          " LOWER(state) AS state, "
          " CASE WHEN (address_1 IS NULL AND address_2 IS NULL) "
          "      THEN NULL "
          "      ELSE LOWER(CONCAT_WS(' ', address_1, address_2)) "
          " END AS address, " 
          " LOWER(occupation) AS occupation, "
          " LOWER(employer) AS employer, "
          " CAST((first_name IS NULL) AS INTEGER) AS person "
          " FROM donors)")

logger.info('creating index on processed_donors...')
c.execute("CREATE INDEX processed_donor_idx ON processed_donors (donor_id)")
conn.commit()

c.close()
conn.close()

logger.info('ETL job completed')


In [1]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
This is a setup script for mysql_example.  It downloads a zip file of
Illinois campaign contributions and loads them in t aMySQL database
named 'contributions'.

__Note:__ You will need to run this script first before execuing
[mysql_example.py](http://datamade.github.com/dedupe-examples/docs/mysql_example.html).

Tables created:
* raw_table - raw import of entire CSV file
* donors - all distinct donors based on name and address
* recipients - all distinct campaign contribution recipients
* contributions - contribution amounts tied to donor and recipients tables
"""
import csv
import os
import zipfile

import dj_database_url
import psycopg2
import psycopg2.extras
import unidecode
import requests

from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

##
## ***Always be logging***
##
import traceback
import logging
from logging.handlers import RotatingFileHandler

# Logging defaults - basic config will log to stdout, then we'll add a log-to-file handler
# Allow log override from environ variable
FILE_LOG_FORMAT = "%(asctime)s %(levelname)s %(module)s:%(lineno)d %(message)s"
#CONSOLE_LOG_FORMAT = "%(levelname)s %(module)s:%(lineno)d %(message)s"
CONSOLE_LOG_FORMAT = FILE_LOG_FORMAT
LOG_FILENAME = 'testVistor.log'

LOG_LEVEL = os.getenv('LOG_LEVEL', logging.INFO)
if LOG_LEVEL == 'debug':
    LOG_LEVEL=logging.DEBUG

# setup for console
logging.basicConfig(level=LOG_LEVEL, format=CONSOLE_LOG_FORMAT)
logger = logging.getLogger('')

# setup for file
log_file_handler = RotatingFileHandler(LOG_FILENAME, maxBytes=20971520, backupCount=5)
log_file_handler.setFormatter(logging.Formatter(FILE_LOG_FORMAT))
logger.addHandler(log_file_handler)
##
##
##