# Data Acquisition and ETL

In [1]:
# Config reader
import configparser

# Database connection and authentication
import firebase_admin
from firebase_admin import credentials, firestore

# Webscraping and API calls
import requests
from googleapiclient.discovery import build
from bs4 import BeautifulSoup
from mediawiki import MediaWiki

# Utils
import functools
import re
import googleapiclient
from datetime import datetime

## ProPublic API and Google Firestore Access
- Utilizing a ```config.ini``` file to store private API keys and location/name of certificates

In [2]:
# Read config.ini file
config = configparser.ConfigParser()
config.read('./auth/config.ini')

# Get ProPublica Auth
PROPUBLICA_API_KEY = config.get('propublica', 'PROPUBLICA_API_KEY')
API_ROOT = 'https://api.propublica.org/congress/v1/'
header = {'X-API-Key': f'{PROPUBLICA_API_KEY}'}

# Get Google Firebase Auth
GCP_AUTH_PATH = config.get('firebase', 'GCP_AUTH_PATH')
cred = credentials.Certificate(GCP_AUTH_PATH)
app = firebase_admin.initialize_app(cred)

# Instantiate connection to database
db = firestore.client()

## Acquire US Representatives Data
- Retrieving the 2 most recent congresses (116th and 117th congress) from ProPublica
- Transforming JSON files to smaller documents (fewer fields/keys)
- Loading documents on to Google Firestore database

### Acquire ProPublica Data

In [3]:
def get_house_ids(congress):
    '''
    Function to get house members' ProPublica ID by congress number
    '''

    call_string = API_ROOT + f'{congress}/house/members.json'
    r = requests.get(call_string, headers=header)
    result = r.json()['results'][0]['members']
    member_ids = [ member['id'] for member in result ]
    
    return member_ids

In [4]:
# House member ids of the 116-117th Congress (2018-2022)
ids_116 = get_house_ids(116)
ids_117 = get_house_ids(117)

In [5]:
# Create list of unique IDs to remove redundant API calls
all_ids = list(set(ids_116 + ids_117))

In [6]:
# Sample
print(all_ids[0])

H001087


In [7]:
def get_member_data(member):
    '''
    Function to get house member's data
    '''
    
    call_string = API_ROOT + f'members/{member}.json'
    r = requests.get(call_string, headers=header)
    result = r.json()['results'][0]
    
    return result

In [8]:
# Sample
sample = get_member_data(all_ids[0])
for k, v in list(sample.items())[:10]:
    print(f'{k}: {v}')

id: H001087
member_id: H001087
first_name: Katie
middle_name: None
last_name: Hill
suffix: None
date_of_birth: 1987-08-25
gender: F
url: None
times_topics_url: 


In [9]:
def member_cleaner(member):
    '''
    Function to keep relevent information on congress member
    '''
    
    roles = member['roles']
    roles.sort(key=lambda x: x['congress'], reverse=True)
    fec_id = roles[0]['fec_candidate_id'] # Most recent FEC candidate ID
    state = roles[0]['state'] # Most recent state served
    congresses = [ role['congress'] for role in roles ]
    date = datetime.strptime(member['date_of_birth'], '%Y-%m-%d')
    
    mem_dict = {
        '_id': member['id'],
        'first_name': member['first_name'],
        'middle_name': member['middle_name'],
        'last_name': member['last_name'],
        'dob': date,
        'gender': member['gender'],
        'current_party': member['current_party'],
        'state': state,
        'google_id': member['google_entity_id'],
        'votesmart_id': member['votesmart_id'],
        'govtrack_id': member['govtrack_id'],
        'cspan_id': member['cspan_id'],
        'crp_id': member['crp_id'],
        'fec_id': fec_id,
        'in_office': member['in_office'],
        'congresses': congresses,   
    }
    
    return mem_dict

In [10]:
# Sample
sample = member_cleaner(sample)
for k, v in list(sample.items())[:10]:
    print(f'{k}: {v}')

_id: H001087
first_name: Katie
middle_name: None
last_name: Hill
dob: 1987-08-25 00:00:00
gender: F
current_party: D
state: CA
google_id: /g/11gtgfywx0
votesmart_id: 179354


### Batch Insert

In [11]:
def batch_insert(docs, db, col_name, _id='id', with_ids=True):
    '''
    Function to batch write to Firestore
    '''
    
    print(f'Writing data to {col_name} collection')
    batch = db.batch()
    batch_len = 0
    batch_num = 0
    total = 0
    for doc in docs:
        if with_ids:
            _id = f'{doc[_id]}'
        else:
            _id = None
        
        ref = db.collection(col_name).document(_id)
        batch.set(ref, doc)
        batch_len += 1
        total += 1
        
        if batch_len > 499:
            batch.commit()
            print(f'Batch {batch_num} Inserted')
            print(f'Inserts: {len(batch.write_results)}')
            batch_len = 0
            batch_num += 1
        
    batch.commit()
    print(f'Batch {batch_num} Inserted')
    print(f'Inserts: {len(batch.write_results)}')
    print(f'Total Inserts: {total}')

In [12]:
# Generator for representative data
reps = ( get_member_data(mem) for mem in all_ids[:10] )

# Generator for cleaned representative data
cleaned_reps = ( member_cleaner(rep) for rep in reps )

In [13]:
# Batch insert
# batch_insert(cleaned_reps)

## Acquire Wikipedia Page URL
- Utilizing Google Knowledge Graph to find US Representative Wikipedia page
- Logging errors

In [14]:
# Collection reference
ref = db.collection('reps')

In [15]:
# Wrapper for error logging
def error_logging(func):
    @functools.wraps(func)
    def wrapper_error(*args):
        data = None
        error = None
        try:
            data = func(*args)
        except Exception as e:
            error = e
        return data, error
    return wrapper_error

In [16]:
# Retrieve Google GCP API Key from 'config.ini'
GCP_API_KEY = config.get('gcpkeys', 'GCP_API_KEY')

### Retrieve Wikipedia Page with Google Knowledge Graph by ID

In [17]:
# Get all representatives
query = ref.where('_id', '!=', '').stream()
members = [ doc.to_dict() for doc in query ]

In [18]:
# Use Google API client
service = build('kgsearch', 'v1', developerKey=GCP_API_KEY)
entities = service.entities()

In [19]:
def kg_id(mem):
    '''
    Function to search by Google Entity ID in Google Knowledge Graph
    '''
    
    _id = mem['google_id']
    r = entities.search(ids=_id).execute()
    result = r['itemListElement'][0]['result']
    return result

In [20]:
kg_id(members[0])

{'@type': ['Thing', 'Person'],
 'name': 'Robert Aderholt',
 '@id': 'kg:/m/024p03',
 'detailedDescription': {'url': 'https://en.wikipedia.org/wiki/Robert_Aderholt',
  'license': 'https://en.wikipedia.org/wiki/Wikipedia:Text_of_Creative_Commons_Attribution-ShareAlike_3.0_Unported_License',
  'articleBody': "Robert Brown Aderholt is an American politician and attorney serving as the U.S. Representative for Alabama's 4th congressional district, serving since 1997. He is a member of the Republican Party. "},
 'description': 'U.S. Representative',
 'url': 'https://aderholt.house.gov/'}

In [21]:
@error_logging
def get_wikipedia(mem):
    '''
    Get Wikipedia page of US Representative
    '''
    
    result = kg_id(mem)
    wiki_url = result['detailedDescription']['url']
    return wiki_url

In [22]:
get_wikipedia(members[0])

('https://en.wikipedia.org/wiki/Robert_Aderholt', None)

In [23]:
errors = {}
for mem in members:
    data, error = get_wikipedia(mem)
    if error != None:
        e_type = type(error)
        if e_type not in errors.keys():
            errors[e_type] = []
        errors[e_type].append(mem)
    mem['wiki_url'] = data

In [24]:
# Find different types of errors
for e, v in errors.items():
    print(f'{e}: {len(v)}')

<class 'googleapiclient.errors.HttpError'>: 5
<class 'IndexError'>: 16
<class 'KeyError'>: 12


### Check Errors
- ```KeyError```: Missing Wikipedia URL in Google Knowledge Graph
- ```HttpError```: Missing Google Entity ID from ProPublica data
- ```IndexError```: Wrong Google Entity ID in ProPublica data

In [25]:
http_errors = errors[googleapiclient.errors.HttpError]
http_errors[0] # Missing Google Entity IDs

{'in_office': True,
 'last_name': 'Case',
 'congresses': ['117', '116', '109', '108'],
 'first_name': 'Ed',
 'dob': DatetimeWithNanoseconds(1952, 9, 27, 0, 0, tzinfo=<UTC>),
 'cspan_id': None,
 'govtrack_id': '400069',
 '_id': 'C001055',
 'middle_name': None,
 'crp_id': 'N00025882',
 'votesmart_id': None,
 'google_id': None,
 'state': 'HI',
 'fec_id': 'H2HI02128',
 'current_party': 'D',
 'gender': 'M',
 'wiki_url': None}

In [26]:
key_errors = errors[KeyError]
kg_id(key_errors[0]) # Missing wikipedia 'url' key

{'@type': ['Person', 'Thing'],
 'url': 'http://debbiedingellforcongress.com/',
 'name': 'Debbie Dingell',
 'description': 'U.S. Representative',
 '@id': 'kg:/m/03m4188'}

In [27]:
index_errors = errors[IndexError]
entities.search(ids=index_errors[0]['google_id']).execute() # Google ID incorrect

{'@context': {'kg': 'http://g.co/kg',
  'resultScore': 'goog:resultScore',
  '@vocab': 'http://schema.org/',
  'EntitySearchResult': 'goog:EntitySearchResult',
  'detailedDescription': 'goog:detailedDescription',
  'goog': 'http://schema.googleapis.com/'},
 '@type': 'ItemList',
 'itemListElement': []}

In [28]:
# Merge lists
_id_errors = index_errors + http_errors
print('Google ID Errors:', len(_id_errors))

Google ID Errors: 21


### Retrieve Google IDs and Wikipedia URLs with Google Knowledge Graph by Search Terms

In [29]:
def kg_search(mem):
    '''
    Function to find Google Entity by search
    '''
    
    data = [
        mem['first_name'],
        mem['last_name'],
        'politician'
    ]
    query = ' '.join(data)
    r = entities.search(query=query).execute()
    result = r['itemListElement'][0]['result'] # Return top result
    return result

In [30]:
@error_logging
def get_google_id(mem):
    '''
    Get Google Entity ID via Google Knowledge Graph search
    '''
    
    result = kg_search(mem)
    s = result['@id']
    _id = re.search('(?<=:).*', s)[0]
    return _id

In [31]:
new_errors = {}
for mem in _id_errors:
    # Get correct Google Entity ID
    _id, error = get_google_id(mem)
    if error != None:
        e_type = type(error)
        if e_type not in new_errors.keys():
            new_errors[e_type] = []
        new_errors[e_type].append(mem)
    mem['google_id'] = _id
    
    # Get Wikipedia page with new Google Entity ID
    wiki_url, error = get_wikipedia(mem)
    if error != None:
        e_type = type(error)
        if e_type not in new_errors.keys():
            new_errors[e_type] = []
        new_errors[e_type].append(mem)
    mem['wiki_url'] = wiki_url

In [32]:
print('Types of ID Errors Remaining:', list(new_errors.keys()))

Types of ID Errors Remaining: []


### Retrieve Wikipedia URL from WikiMedia

In [33]:
def url_from_wikimedia(mem):
    '''
    Function to get Wikipedia URL of Representative from WikiMedia search
    '''
    
    wikipedia = MediaWiki()
    name = f"{mem['first_name']} {mem['last_name']} politician"
    wiki_url = wikipedia.page(name).url
    return wiki_url

In [34]:
# Sample
sample = key_errors[0]
url_from_wikimedia(sample)

'https://en.wikipedia.org/wiki/Debbie_Dingell'

In [35]:
# Correct 'wiki_url' values
for mem in key_errors:
    wiki_url = url_from_wikimedia(mem)
    mem['wiki_url'] = wiki_url

In [36]:
# Check if all members have 'google_id' and 'wiki_url'
for mem in members:
    assert mem['google_id'] != None
    assert mem['wiki_url'] != None

### Batch Insert
- The members of congress will be rewritten instead of updated
- Future updates will utilize batch updating instead

In [37]:
# Batch insert
# batch_insert(members)