# Data Engineering for AWS RDS PostgreSQL DB

In [1]:
import pickle
from datetime import date

import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

import configparser

In [17]:
with open('all_votes.p', 'rb') as f:
    votes = pickle.load(f)
    
with open('temp_senators.p', 'rb') as f:
    senators = pickle.load(f)

## Preparing data for database insertion

### Table for senate members
- SEN_ID TEXT PRIMARY key NOT NULL
- F_NAME TEXT NOT NULL
- L_NAME TEXT NOT NULL
- PARTY TEXT NOT NULL
- GENDER TEXT NOT NULL
- STATE TEXT NOT NULL

In [18]:
# Already in format for insertion into SQL database
senators[0]

{'id': 'A000031',
 'first_name': 'Brockman',
 'last_name': 'Adams',
 'party': 'D',
 'gender': None,
 'state': 'WA'}

#### Check gender self-identification
Determine senator gender self-identification to replace missing values: F - Female, M - Male, N - Non-Binary.

In [19]:
# Deal with gender NaN values
for senator in senators:
    if senator['gender'] == None:
        print(senator['id'], f'{senator["first_name"]} {senator["last_name"]}')

A000031 Brockman Adams
B000401 Lloyd Bentsen
B000647 Rudolph Boschwitz
B001077 Quentin Burdick
C000877 Alan Cranston
D000366 Alan Dixon
F000329 Wyche Fowler
H000951 Gordon Humphrey
M000250 Spark Matsunaga
P000513 Larry Pressler
S001138 Steven Symms


In [20]:
# Manual look up
gender = {'A000031': 'M',
          'B000401': 'M',
          'B000647': 'M',
          'B001077': 'M',
          'C000877': 'M',
          'D000366': 'M',
          'F000329': 'M',
          'H000951': 'M',
          'M000250': 'M',
          'P000513': 'M',
          'S001138': 'M'}

In [21]:
# Correct data
for senator in senators:
    for k, v in gender.items():
        if senator['id'] == k:
            senator['gender'] = v
        else:
            continue

In [22]:
senators[0]

{'id': 'A000031',
 'first_name': 'Brockman',
 'last_name': 'Adams',
 'party': 'D',
 'gender': 'M',
 'state': 'WA'}

### Table for bills and table for votes
#### Bills
- CSR_ID TEXT PRIMARY key NOT NULL (unique primary key will be constructed from 'congress.session.roll_call')
- CONGRESS INT NOT NULL
- SESSION INT NOT NULL
- ROLL_CALL INT NOT NULL
- DATE DATE NOT NULL

#### Votes
- ID INT PRIMARY KEY NOT NULL AUTO INCREMENT
- SEN_ID TEXT FOREIGN key NOT NULL
- CSR_ID TEXT FOREIGN key NOT NULL
- POSITION TEXT NOT NULL

In [23]:
# Some 'bills' are not bills but confirmations, treaty votes, etc. and will be dropped
votes[0]['bill_id'] # '-101'

'-101'

In [24]:
# Integer bill_ids are codes for special votes as mentioned above
new_votes = []
for vote in votes:
    try:
        int(vote['bill_id'])
    except:
        new_votes.append(vote)

In [25]:
print(len(votes)) # 8949
print(len(new_votes)) # 8401

8949
8401


In [26]:
# List of bills for bills table
dict_bills = []

# List of votes for votes table
dict_votes = []

for vote in new_votes:
    vote['csr_id'] = f'{vote["congress"]}.{vote["session"]}.{vote["roll_call"]}'
    bill = {
        'csr_id': vote['csr_id'],
        'congress': vote['congress'],
        'session': vote['session'],
        'roll_call': vote['roll_call'],
        'bill_id': vote['bill_id'],
        'date': vote['date']
    }
    dict_bills.append(bill)
    
    for position in vote['positions']:
        p = {
            'sen_id': position['member_id'],
            'csr_id': vote['csr_id'],
            'position': position['vote_position']
        }
        dict_votes.append(p)

In [27]:
dict_votes[0]

{'sen_id': 'A000031', 'csr_id': '101.1.11', 'position': 'Yes'}

In [28]:
dict_bills[0]

{'csr_id': '101.1.11',
 'congress': 101,
 'session': 1,
 'roll_call': 11,
 'bill_id': 's.j.res.7-101',
 'date': '1989-02-02'}

## Data to Database
Each table was inserted one at a time (it is far more efficient to create a list of tables to be inserted).

In [None]:
config = configparser.ConfigParser()
config.read('config.ini')
ENDPOINT = config.get('aws', 'ENDPOINT')
PORT = config.get('aws', 'PORT')
USR = config.get('aws', 'USER')
PWD = config.get('aws', 'PASSWORD')

In [None]:
# Establish connection to AWS
conn = psycopg2.connect(
    host=ENDPOINT,
    user=USR,
    password=PWD,
    port=PORT,
)

# Set connection to autocommit
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

In [None]:
# Instantiate cursor
cursor = conn.cursor()

In [None]:
# Create sen_db
# DO NOT RUN
# cursor.execute("CREATE DATABASE sen_db;")

In [None]:
# Connect to database
DB = config.get('aws', 'DATABASE')

conn = psycopg2.connect(
    host=ENDPOINT,
    user=USR,
    password=PWD,
    port=PORT,
    database=DB,
)

conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cursor = conn.cursor()

### Senators Table

In [None]:
# Create senators table
# DO NOT RUN
# cursor.execute(
#     """
#     CREATE TABLE senators (
#     sen_id TEXT PRIMARY KEY NOT NULL,
#     f_name TEXT NOT NULL,
#     l_name TEXT NOT NULL,
#     party TEXT NOT NULL,
#     gender TEXT NOT NULL,
#     state TEXT NOT NULL
#     );
#     """
# )

In [None]:
# Add senator operation for cursor execution
add_senator = (
    """
    INSERT INTO senators (sen_id, f_name, l_name, party, gender, state)
    VALUES (%s, %s, %s, %s, %s, %s);
    """
)

In [None]:
# Create list of rows to insert
sen_data = []
for senator in senators:
    data = (
        senator['id'],
        senator['first_name'],
        senator['last_name'],
        senator['party'],
        senator['gender'],
        senator['state']
    )
    sen_data.append(data)

In [None]:
# Insert rows
# DO NOT RUN
# cursor.executemany(add_senator, sen_data)

In [None]:
print('Number of rows in database:', cursor.rowcount) # 292
print('Number of senators:', len(senators)) # 292

### Bills Table

In [None]:
# Create bills table
# DO NOT RUN
# cursor.execute(
#     """
#     CREATE TABLE bills (
#     csr_id TEXT PRIMARY KEY NOT NULL,
#     congress INT NOT NULL,
#     session INT NOT NULL,
#     roll_call INT NOT NULL,
#     bill_id TEXT NOT NULL,
#     date DATE NOT NULL
#     );
#     """
# )

In [None]:
# Add bill operation for cursor execution
add_bill = (
    """
    INSERT INTO bills (csr_id, congress, session, roll_call, bill_id, date)
    VALUES (%s, %s, %s, %s, %s, %s);
    """
)

In [None]:
# Create list of rows to insert
bill_data = []
for bill in dict_bills:
    ymd = [ int(x) for x in bill['date'].split('-') ]
    data = (
        bill['csr_id'],
        bill['congress'],
        bill['session'],
        bill['roll_call'],
        bill['bill_id'],
        date(ymd[0], ymd[1], ymd[2])
    )
    bill_data.append(data)

In [None]:
# Insert rows
# DO NOT RUN
# cursor.executemany(add_bill, bill_data)

In [None]:
print('Number of rows in database:', cursor.rowcount) # 8401
print('Number of bills:', len(dict_bills)) # 8401

### Votes Table

In [None]:
# Create votes table, id is autoincrement
# DO NOT RUN
# cursor.execute(
#     """
#     CREATE TABLE votes (
#     id SERIAL PRIMARY KEY,
#     sen_id TEXT references senators(sen_id),
#     csr_id TEXT references bills(csr_id),
#     position TEXT NOT NULL
#     );
#     """
# )

In [None]:
# Add vote operation for cursor execution
add_vote = (
    """
    INSERT INTO votes (sen_id, csr_id, position)
    VALUES (%s, %s, %s);
    """
)

In [None]:
# Create list of rows to insert
vote_data = []
for vote in dict_votes:
    data = (
        vote['sen_id'],
        vote['csr_id'],
        vote['position']
    )
    vote_data.append(data)

In [None]:
# Insert rows
# DO NOT RUN
# cursor.executemany(add_vote, vote_data)

In [None]:
print('Number of rows in database:', cursor.rowcount) # 839173
print('Number of bills:', len(dict_votes)) # 839173

## Cleaning out the Database
Prior to loading onto the database, the data should have been pre-filtered to only current senators and only votes made by current senators.  Here we will find the current senate members and remove the extraneous senators and votes.

(Note: several senators have been serving longer than the time frame of votes all bills will be saved).

In [68]:
config = configparser.ConfigParser()
config.read('config.ini')
ENDPOINT = config.get('aws', 'ENDPOINT')
PORT = config.get('aws', 'PORT')
USR = config.get('aws', 'USER')
PWD = config.get('aws', 'PASSWORD')
DB = config.get('aws', 'DATABASE')

In [69]:
# Establish connection to AWS
conn = psycopg2.connect(
    host=ENDPOINT,
    user=USR,
    password=PWD,
    port=PORT,
    database=DB
)

# Set connection to autocommit
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

In [70]:
# Instantiate cursor
cursor = conn.cursor()

In [9]:
# Find current senators
cursor.execute(
    """
    SELECT sen_id FROM votes
    WHERE csr_id = '116.2.140'
    ;
    """
)

current_senators = cursor.fetchall()

In [11]:
# Check number of senators
len(current_senators) # 100

100

In [13]:
# Check response
current_senators[0]

('A000360',)

In [14]:
# Convert to list of senator ids
current_senators = [ senator[0] for senator in current_senators ]

In [16]:
current_senators[0] # 'A000360'

'A000360'

In [29]:
cursor.execute(
    """
    SELECT sen_id FROM senators
    ;
    """
)

all_senators = cursor.fetchall()

In [31]:
all_senators = [ senator[0] for senator in all_senators ]

In [36]:
# Senators that need to be removed along with their votes
old_senators = list(set(all_senators) - set(current_senators))

In [75]:
# Query to delete votes
remove_vote = (
    """
    DELETE FROM votes
    WHERE sen_id = %s
    ;
    """
)

# Query to delete senators
remove_senator = (
    """
    DELETE FROM senators
    WHERE sen_id = %s
    ;
    """
)

In [76]:
# Format for executemany
old_senators = [ (senator,) for senator in old_senators ]

In [79]:
# Delete votes
cursor.executemany(remove_vote, old_senators)

In [80]:
cursor.rowcount # 607010

607010

In [81]:
# Delete old senators
cursor.executemany(remove_senator, old_senators)

In [82]:
cursor.rowcount # 192

192