In [1]:
import os
import pandas as pd
import numpy as np
from parsons import Table, GoogleBigQuery
import json
import requests
import base64
import zipfile
import io
from urllib.parse import urlencode, quote_plus

table = Table()
bq = GoogleBigQuery()

# bigquery
bq_project = 'av-states'
bq_dataset = 'lkesich'

In [2]:
##### pylegiscan functions adapted from https://github.com/poliquin/pylegiscan/blob/master/pylegiscan/legiscan.py

class LegiScanError(Exception):
    pass

class LegiScan(object):
    BASE_URL = 'http://api.legiscan.com/?key={0}&op={1}&{2}'

    def __init__(self, apikey=None):
        """LegiScan API.  State parameters should always be passed as
           USPS abbreviations.  Bill numbers and abbreviations are case
           insensitive.  Register for API at http://legiscan.com/legiscan
        """
        # see if API key available as environment variable
        if apikey is None:
            apikey = os.getenv('LEGISCAN_API_KEY')
        self.key = apikey.strip()

    def _url(self, operation, params=None):
        """Build a URL for querying the API."""
        if not isinstance(params, str) and params is not None:
            params = urlencode(params)
        elif params is None:
            params = ''
        return self.BASE_URL.format(self.key, operation, params)

    def _get(self, url):
        """Get and parse JSON from API for a url."""
        req = requests.get(url)
        if not req.ok:
            raise LegiScanError('Request returned {0}: {1}'\
                    .format(req.status_code, url))
        data = json.loads(req.content)
        if data['status'] == "ERROR":
            raise LegiScanError(data['alert']['message'])
        return data

    def get_session_list(self, state):
        """Get list of available sessions for a state."""
        url = self._url('getSessionList', {'state': state})
        data = self._get(url)
        return data['sessions']

    def get_dataset_list(self, state=None, year=None):
        """Get list of available datasets, with optional state and year filtering.
        """
        if state is not None:
            url = self._url('getDatasetList', {'state': state})
        elif year is not None:
            url = self._url('getDatasetList', {'year': year})
        else:
            url = self._url('getDatasetList')
        data = self._get(url)
        return data['datasetlist']

    def get_dataset(self, id, access_key):
        """Get list of available datasets, with optional state and year filtering.
        """
        url = self._url('getDataset', {'id': id, 'access_key': access_key})
        data = self._get(url)
        return data['dataset']
    
    def get_session_people(self, session_id=None):
        """Get list of bills for the current session in a state or for
           a given session identifier.
        """
        if session_id is not None:
            url = self._url('getSessionPeople', {'id': session_id})
        else:
            raise ValueError('Must specify session identifier.')
        data = self._get(url)
        return data['sessionpeople']
      
    def get_master_list(self, state=None, session_id=None):
        """Get list of bills for the current session in a state or for
           a given session identifier.
        """
        if state is not None:
            url = self._url('getMasterList', {'state': state})
        elif session_id is not None:
            url = self._url('getMasterList', {'id': session_id})
        else:
            raise ValueError('Must specify session identifier or state.')
        data = self._get(url)
        return [data['masterlist'][i] for i in data['masterlist']]
    
    def get_master_list_raw(self, state=None, session_id=None):
        """Get list of bills for the current session in a state or for
           a given session identifier, optimized for change hash detection.
        """
        if state is not None:
            url = self._url('getMasterListRaw', {'state': state})
        elif session_id is not None:
            url = self._url('getMasterListRaw', {'id': session_id})
        else:
            raise ValueError('Must specify session identifier or state.')
        data = self._get(url)
        return [data['masterlist'][i] for i in data['masterlist']]

    def get_bill(self, bill_id=None, state=None, bill_number=None):
        """Get primary bill detail information including sponsors, committee
           references, full history, bill text, and roll call information.

           This function expects either a bill identifier or a state and bill
           number combination.  The bill identifier is preferred, and required
           for fetching bills from prior sessions.
        """
        if bill_id is not None:
            url = self._url('getBill', {'id': bill_id})
        elif state is not None and bill_number is not None:
            url = self._url('getBill', {'state': state, 'bill': bill_number})
        else:
            raise ValueError('Must specify bill_id or state and bill_number.')
        return self._get(url)['bill']

    def get_roll_call(self, roll_call_id):
        """Roll call detail for individual votes and summary information."""
        data = self._get(self._url('getRollcall', {'id': roll_call_id}))
        return data['roll_call']

legis = LegiScan()

In [3]:
""" Running this cell will create one API call to the getDatasetList endpoint,
plus one call to the getDataset endpoint for each dataset with an updated change hash. """

datasets = legis.get_dataset_list(state = 'me')

stored_hashes_loc = f'{bq_project}.{bq_dataset}.legiscan_stored_hashes'
stored_hashes = bq.query(f'select * from {stored_hashes_loc}')
hash_dict = dict(zip(stored_hashes['session_id'], stored_hashes['dataset_hash']))

session_ids = []
dataset_hashes = []
 
votes = []
bills = []
people = []

for dataset in datasets:
    session_id = dataset['session_id']
    access_key = dataset['access_key']
    dataset_hash = dataset['dataset_hash']
    
    session_ids.append(session_id)
    dataset_hashes.append(dataset_hash)
    
    if hash_dict.get(session_id) == dataset_hash:
        continue
        
    api_output = legis.get_dataset(session_id, access_key)
    encoded = base64.b64decode(api_output['zip'])
    zipped = zipfile.ZipFile(io.BytesIO(encoded))
    files = zipped.namelist()
    
    for file in files:
        content = zipped.read(file).decode("utf-8")
        try:
            data = json.loads(content)
            if '/bill/' in file:
                bills.append(data['bill'])
            elif '/vote/' in file:
                votes.append(data['roll_call'])
            elif '/people/' in file:
                data['person']['session_id'] = session_id
                people.append(data['person'])       
        except:
            pass

In [4]:
def simplify_df(df):
    
    nested_cols = []
    
    for col in df.columns:
        if any(isinstance(item,list) for item in df[col]):
            nested_cols.append(col)
            
    old_names = df.columns
    new_names = [name.replace('.','_') for name in old_names]       
    name_dict = {k:v for (k,v) in zip(old_names, new_names) if k!=v}

    # rename columns
    output = df.rename(columns = name_dict)

    # replace nulls
    output.replace({np.nan: None}, inplace = True)

    # drop nested columns
    output.drop(columns = nested_cols, axis = 1, inplace = True)
    
    return output

def clean_table(tbl, prefix = ''):
    
    # rename columns
    old_cols = tbl.columns
    new_cols = [c.replace('.','_').replace(prefix,'') for c in old_cols]

    name_dict = {k:v for (k,v) in (zip(old_cols,new_cols)) if k!=v}
    tbl.rename_columns(name_dict)

    for col in new_cols:

        # convert all ids to numeric
        if col[-3:] == '_id':
            tbl.convert_column(col,int)
        
    return tbl

In [5]:
""" Check if new data is available """
new_hashes = table.from_columns([session_ids,dataset_hashes], header = ['session_id','dataset_hash'])

check_hashes = stored_hashes
check_hashes.stack(new_hashes)
check_hashes.deduplicate()

session_id,dataset_hash
48,20c496d1e0212856d370536e382b9691
81,82da33fa25759d5eaca5c3ad267571ce
1004,7d47cf9e6077e5265ec38de8d7a9cea3
1132,9b146a7c1144ab48d291b0081f49a7a4
1258,88e0805a4e5ecdae319bb8a24f8137ae


In [6]:
if check_hashes.num_rows == new_hashes.num_rows:
    print("Legiscan data is already up to date")
else:
    """ Unpack nested columns into separate tables + clean column names """            
    bill_df = pd.json_normalize(bills)
    vote_df = pd.json_normalize(votes)
    people_df = pd.json_normalize(people)

    # roll call
    roll_call_tbl = table.from_dataframe(bill_df).long_table('bill_id','votes')
    roll_call_tbl.rename_column('votes_roll_call_id','roll_call_id')
    clean_table(roll_call_tbl)

    # subjects
    subjects_tbl = table.from_dataframe(bill_df).long_table('bill_id','subjects')
    clean_table(subjects_tbl, 'subjects_')

    # sponsors
    sponsors_tbl = table.from_dataframe(bill_df).long_table('bill_id','sponsors')
    clean_table(sponsors_tbl, 'sponsors_')

    # vote
    vote_tbl = table.from_dataframe(vote_df).long_table(['roll_call_id','bill_id'],'votes')
    clean_table(vote_tbl, 'votes_')

    # bill
    simplified_bill_df = simplify_df(bill_df)
    bill_tbl = table.from_dataframe(simplified_bill_df)
    clean_table(bill_tbl)

    # person
    #people_df['party_id'] = pd.to_numeric(people_df['party_id'])
    simplified_people_df = simplify_df(people_df)

    simplified_people_df.loc[simplified_people_df['people_id'] == 8855, 'name'] = 'Michel Lajoie'
    simplified_people_df.loc[simplified_people_df['people_id'] == 8855, 'first_name'] = 'Michel'
    simplified_people_df.loc[simplified_people_df['people_id'] == 21232, 'name'] = 'Tiffany Roberts'
    simplified_people_df.loc[simplified_people_df['people_id'] == 21232, 'last_name'] = 'Roberts'

    people_tbl = table.from_dataframe(simplified_people_df)
    clean_table(people_tbl)
    
    legiscan_tables = [
        {
            'name':'legiscan_people',
            'primarykey': ['people_id','session_id'],
            'data': people_tbl
        },
        {
            'name':'legiscan_roll_calls',
            'primarykey': 'roll_call_id',
            'data': roll_call_tbl
        },
        {
            'name':'legiscan_bills',
            'primarykey': 'bill_id',
            'data': bill_tbl
        },
        {
            'name':'legiscan_votes',
            'primarykey': ['roll_call_id','people_id'],
            'data': vote_tbl
        },
        {
            'name':'legiscan_subjects',
            'primarykey': ['bill_id','subject_id'],
            'data': subjects_tbl
        },
        {
            'name':'legiscan_sponsors',
            'primarykey': ['people_id','bill_id'],
            'data': sponsors_tbl
        }
    ]
    

    """ Update or create all data tables in BigQuery. """
    for t in legiscan_tables:
        table_obj = t['data']
        target_table = f'''{bq_project}.{bq_dataset}.{t['name']}'''
        primary_key = t['primarykey']

        bq.upsert(table_obj, target_table, primary_key)
        
    """ Update stored dataset hashes in redshift. """
    bq.upsert(new_hashes, stored_hashes_loc, 'session_id')

google_bigquery INFO Building staging table: av-states.lkesich.legiscan_people_stg_20240722_1113_0182
google_cloud_storage INFO 283824f3-61b9-402a-a88d-448c87761d57.csv blob in lydia-scratch bucket deleted.
google_bigquery INFO Deleting staging table: av-states.lkesich.legiscan_people_stg_20240722_1113_0182
google_bigquery INFO Building staging table: av-states.lkesich.legiscan_roll_calls_stg_20240722_1113_9774
google_cloud_storage INFO 46e26876-af9c-4d48-9824-f1b8ba248775.csv blob in lydia-scratch bucket deleted.
google_bigquery INFO Deleting staging table: av-states.lkesich.legiscan_roll_calls_stg_20240722_1113_9774
google_bigquery INFO Building staging table: av-states.lkesich.legiscan_bills_stg_20240722_1113_0412
google_cloud_storage INFO b0cde665-4ad3-40aa-9235-7743f51d80d2.csv blob in lydia-scratch bucket deleted.
google_bigquery INFO Deleting staging table: av-states.lkesich.legiscan_bills_stg_20240722_1113_0412
google_bigquery INFO Building staging table: av-states.lkesich.legi