# Extraction and Loading of Georgia Election Source Data

The notebooks in this repo require a processed, cleaned, and normalized version of the source election data from the State of Georgia's Office of Secretary of State. This notebook, and an associated python script, will create a sqlite3-based database that subsequent notebooks will use as a data source.

### Why sqlite3?
This library is used to permit running the Jupyter notebooks on a laptop or Google Colab-type environment, without the need for purchasing or establishing a Google Cloud Compute (GCS) or AWS Resource. Future versions of this notebook may include the option to use paid-database resources on one or both of these services.

### Prerequisites and Considerations

1. Anaconda Python is used to create the environment needed to operate this notebook, with an environment defined in `environment.yml`. Anaconda Python may be [downloaded without cost](https://www.anaconda.com/distribution/#download-section) for Linux, Windows, or macOS.


2. __This repo should be cloned from GitHub__; the notebook may not operate as expected if individual notebooks are downloaded without supporting files.


3. This notebook was tested on a linux-based file-system. There may be unexpected behavior if operated on a Windows-based file-system.


4. If you are running this notebook on a temporary instance (e.g., Google Colab), you may want to consider saving the results of this notebook in a persistent storage space.


5. At least 4GB of free memory space is required.

In [1]:
# Required libraries
import os
import sqlite3
from tqdm import tqdm
from zipfile import ZipFile
from glob import glob

## Configuration variables

In [19]:
# The location of source data contained within the repo
source_data_loc = 'source_data'

# The current vintage of the data located in the repo
data_vintage_loc = '20190709'

# If not set here, the notebook will assume that you are running the notebook in the
# root directory of the repo.
working_directory = None
#working_directory = '/home/michael/git_repos/georgia_election_data'

# The first and last years' of data in the voter database
first_years_data = 1996
last_years_data = 2019

# The sub-folder that will contain a sqlite3 database with the processed data
database_location = 'processed_data'

# The name of the file containing the election data
dest_db_name = 'election_data.db'

#The name of the table in the database containing the election results.
dest_db_table_name = 'gaelect'

# The batch size for processing.
# IMPORTANT NOTE: If you are receiving out-of-memory errors, reduce the batch size.
# Note that smaller batch size will result in slower performance.

batch_size = int(3e6)

# From the variables above, the full path to the sqlite3 database.
full_path_to_db = f'{working_directory}/{database_location}/{dest_db_name}'

### About the Source Data

Included in this repo are the source ZIP files obtained from the State of Georgia Secretary of State in August 2019. _These files are provided without assertions to accuracy_. To re-acquire or download the source files, you may follow the following instructions.

#### 2013-2019
Step 1: Go to https://elections.sos.ga.gov/Elections/voterhistory.do
Step 2: Select the election year, and then download the `Full Year File`
Step 3: Download each of the ZIP files into a folder accessible to your Python instance

#### 1996-2012
Step 1: Go to https://elections.sos.ga.gov/Elections/voterhistoryprevious.do
Step 2: Download each year's zip file into a folder accessible to your Python instance

### Confirming the existence and integrity of the source data

The following cells indicate configuration parameters that describe the location and composition of the source data.

In [3]:
print(f'Current Working Directory is {os.getcwd()}')

if working_directory is None:
    working_directory = os.getcwd()

if working_directory.split('/')[-1] != 'georgia_election_data':
    print("Change the new_working_directory folder to indicate the root folder of this notebook")
        
else:    
    list_of_known_years_election_data = [str(a) + '.zip' for a in list(range(first_years_data,last_years_data+1))]

found_files = sorted(os.listdir(working_directory + f'/{source_data_loc}' + f'/{data_vintage_loc}'))
matches = found_files == list_of_known_years_election_data
if matches:
    print("Found all expected data.")
else:
    print(f"Expected data not found. Missing files are {list(set(list_of_known_years_election_data)-set(found_files))}")

Current Working Directory is /home/michael/git_repos/georgia_election_data/etl
Found all expected data.


## Voter Data  ETL Loading Functions

These functions load the two available vintages of Georgia Voter Data: data for years 2012 and prior, and data for years 2013 and subsequent.

In [4]:
def parse_function_2012_prior(in_line):

    in_line = in_line.decode('utf-8')
    
    county_no = in_line[0:3]
    reg_no = in_line[3:11]
    election_date = in_line[11:19]
    election_type = in_line[19:22].strip()
    party = in_line[22:23].strip()
    absentee = in_line[23:24]
    
    # dates for this time series are presented as day-month-year
    # we will standardize these dates to year-month-day to reduce
    # confusion and match the date format of the 2013 and onwards data
    #print(election_date)
    election_date = election_date[4:] + '-' + election_date[0:2] + '-' + election_date[2:4]
    
    # standardize party indicator from primary election records: single char
    # D -> Democrat
    # R -> Republican
    # N -> Non-Partisan
    
    if party == 'NP':
        party = 'N'
    elif party != 'D' and party != 'R' and party != 'N':
        party = ""


    if absentee == 'Y':
        absentee = True
    else:
        absentee = False
        
    return (0,county_no, reg_no, election_date,election_type, party, absentee,None,None) 

In [5]:
def parse_function_2013_post(in_line):

    # Convert all strings to utf-8 for international standarization purposes
    in_line = in_line.decode('utf-8')
    
    county_no = in_line[0:3].strip()
    reg_no = in_line[3:11].strip()
    election_date = in_line[11:19].strip()
    election_type = in_line[19:22].strip()
    party= in_line[22:24].strip()
    absentee =in_line[24:25].strip()
    provisional = in_line[25:26].strip()
    supplemental = in_line[26:27].strip()
    
    election_date = election_date[0:4] + '-' + election_date[4:6] + '-' + election_date[6:]    
    # the election date is already in YYYY-MM-DD, so no additional action needed
    
    # standardize party indicator from primary election records: single char
    # D -> Democrat
    # R -> Republican
    # N -> Non-Partisan
    
    if party == 'NP':
        party = 'N'
    elif party != 'D' and party != 'R' and party != 'N':
        party = None

    # Convert absentee flags to True/False

    if absentee == 'Y':
        absentee = True
    else:
        absentee = False

    # Convert provisional flags to True/False

    if provisional == 'Y':
        provisional = True
    else:
        provisional = False        

    # Convert supplemental flags to True/False
    
    if supplemental == 'Y':
        supplemental = True
    else:
        supplemental = False       
    return (1,county_no, reg_no, election_date,election_type, party, absentee,supplemental,absentee)
  

In [6]:
def check_if_elections_db_exists(full_path_to_db):
    found = False
    # first, check to see if the database file itself exists
    if os.path.exists(full_path_to_db):
        db = sqlite3.connect(full_path_to_db)
        c = db.cursor()
        c.execute(f'SELECT name FROM sqlite_master WHERE type=\'table\' AND name={dest_db_table_name}')
    else:
        return found
    
    

In [7]:
def get_n_lines_iterator(source_file):
    with ZipFile(source_file).open(ZipFile(source_file).namelist()[0]) as file:
        for i in file:
            yield i
            
#list_of_files = glob(working_directory + f'/{source_data_loc}' + f'/{data_vintage_loc}' + '/*')
#source_file = list_of_files[1]
#lines_required = batch_size

# get number of lines in text file



In [8]:
def create_etl_batches(source_file):
    with ZipFile(source_file).open(ZipFile(source_file).namelist()[0]) as f:
        line_count = sum(1 for _ in f)
        print(f'Datafile {ZipFile(source_file).namelist()[0]} contains {line_count} records')
        in_range = list(range(line_count))
        out_batch = [in_range[i * batch_size:(i + 1) * batch_size] for i in range((len(in_range) + batch_size - 1) // batch_size )] 
        return({'source_file':source_file,
                'line_count':line_count,
                'batches':[(min(b), max(b)) for b in out_batch]
        })

In [20]:
list_of_files = glob(working_directory + f'/{source_data_loc}' + f'/{data_vintage_loc}' + '/*')
list_of_files.sort()
print(f"Found {len(list_of_files)} to process.")
ret_batches = list(map(create_etl_batches, list_of_files))

Found 24 to process.
Datafile Voter History 1996.txt contains 4644200 records
Datafile Voter History 1997.txt contains 959435 records
Datafile Voter History 1998.txt contains 3553259 records
Datafile Voter History 1999.txt contains 539933 records
Datafile Voter History 2000.txt contains 5046847 records
Datafile Voter History 2001.txt contains 708042 records
Datafile Voter History 2002.txt contains 3731165 records
Datafile Voter History 2003.txt contains 602137 records
Datafile Voter History 2004.txt contains 6399634 records
Datafile Voter History 2005.txt contains 651522 records
Datafile Voter History 2006.txt contains 3796362 records
Datafile Voter History 2007.txt contains 752749 records
Datafile Voter History 2008.txt contains 9628482 records
Datafile Voter History 2009.txt contains 560700 records
Datafile Voter History 2010.txt contains 4841793 records
Datafile Voter History 2011.txt contains 877001 records
Datafile Voter History 2012.txt contains 7036094 records
Datafile 2013.TXT 

In [59]:
number_of_records = sum([r['batches'][-1][1]+1 for r in ret_batches])
number_of_batches = sum([len(r['batches']) for r in ret_batches])
print(f'There are {number_of_records} total voting records and {number_of_batches} batches in the data sets')

There are 76453445 total voting records and 42 batches in the data sets


In [52]:
def convert_fwf_to_sqlitedb(file_dict):
    
    """
    This function takes the source data files and builds a sqlite database.
    CSVs are being used in lieu of pandas dataframes for the source data because
    the memory footprint and CPU overhead of 78 million records requires an
    un-needed quantity of those resources.
    """
    
    db = sqlite3.connect(full_path_to_db)

    # now load the files

    pre_2012_range = set(range(1996,2013))
    post_2013_range = set(range(2013,2020))

    cur_file_name = file_dict['source_file']
    
    # collect the records
    gen = get_n_lines_iterator(cur_file_name)
        
    for cur_range in tqdm(file_dict['batches'], position=0, desc=f'Processing batch in {os.path.basename(cur_file_name).split(".")[0]}'):
        start_i, end_i = cur_range
        end_i = end_i + 1
        cur_record_list = [next(gen) for r in range(start_i, end_i)]
        if int(os.path.basename(cur_file_name).split(".")[0]) in pre_2012_range:
            parsed_data = list(map(parse_function_2012_prior,cur_record_list))

        elif int(os.path.basename(cur_file_name).split(".")[0]) in post_2013_range:
            parsed_data = list(map(parse_function_2013_post,cur_record_list))

        #print(f"Loading {os.path.basename(f)} into the sqlite database")
        c = db.cursor()
        c.executemany(f"INSERT INTO {dest_db_table_name} VALUES (?,?, ?, ?, ?, ?, ?, ?, ?)", parsed_data)
        db.commit()
        #print(f"Loaded {os.path.basename(f)} into the sqlite database")

    db.close()
    

This function drops and creates a new database

In [53]:
def drop_and_create_database():

    db = sqlite3.connect(full_path_to_db)
    c = db.cursor()
    c.execute(f'DROP TABLE IF EXISTS {dest_db_table_name}')
    db.commit()
              
    c = db.cursor()
    c.execute(f'''
               CREATE TABLE {dest_db_table_name}(
               vintage INT,
               county_no TEXT,
               reg_no TEXT, 
               election_date TEXT,
               election_type TEXT, 
               party TEXT,
               absentee BOOLEAN,
               supplemental BOOLEAN,
               provisional BOOLEAN) 
            ''')
    db.commit()


In [56]:
drop_and_create_database()

# If you receive a database locking error, 
# it is likely that the database has been corrupted from an incomplete transaction.
# You will likely need to delete the source database file to resolve and then run the function again.

In [57]:
_ = list(map(convert_fwf_to_sqlitedb,ret_batches))

Processing batch in 1996: 100%|██████████| 2/2 [00:24<00:00, 13.68s/it]
Processing batch in 1997: 100%|██████████| 1/1 [00:05<00:00,  5.56s/it]
Processing batch in 1998: 100%|██████████| 2/2 [00:20<00:00, 13.05s/it]
Processing batch in 1999: 100%|██████████| 1/1 [00:02<00:00,  2.90s/it]
Processing batch in 2000: 100%|██████████| 2/2 [00:28<00:00, 15.01s/it]
Processing batch in 2001: 100%|██████████| 1/1 [00:04<00:00,  4.04s/it]
Processing batch in 2002: 100%|██████████| 2/2 [00:22<00:00, 14.11s/it]
Processing batch in 2003: 100%|██████████| 1/1 [00:04<00:00,  4.83s/it]
Processing batch in 2004: 100%|██████████| 3/3 [00:42<00:00, 15.08s/it]
Processing batch in 2005: 100%|██████████| 1/1 [00:04<00:00,  4.76s/it]
Processing batch in 2006: 100%|██████████| 2/2 [00:25<00:00, 16.02s/it]
Processing batch in 2007: 100%|██████████| 1/1 [00:05<00:00,  5.15s/it]
Processing batch in 2008: 100%|██████████| 4/4 [01:09<00:00, 16.98s/it]
Processing batch in 2009: 100%|██████████| 1/1 [00:03<00:00,  3.

### Testing for completeness

The database should now contain as many records as there are rows in the source data. The following code will conduct an assertion test for this purpose. 

In [61]:
db = sqlite3.connect(full_path_to_db)
c = db.cursor()
c.execute(f'''SELECT count(*) FROM {dest_db_table_name}''')
database_rows = c.fetchall()[0][0]
db.close()

print(f'The database has {database_rows} rows, compared with {number_of_records} in the source data')
if database_rows == number_of_records:
    print('The Georgia Voter database appears to have been correctly loaded into the sqlite database.')
else:
    print('Sorry, not all Georgia voter records were loaded.')


The database has 76453445 rows, compared with 76453445 in the source data
The Georgia Voter database appears to have been correctly loaded into the sqlite database.


## Adding helper tables

There is one helper table that will also be added to the database. This helper table maps the election_type field to indicate if the field reflects a _primary_ election, a _primary runoff_ election, a _general_ election, or another type of vote.

This mapping is needed because the election_type coding has either 1) not been consistent from election to election or 2) a new election_type code was created when a ballot combined two types of elections. For example, a general election added to a recall election.

The repo includes a file `election_type_mapping.csv` that contains a manual classification between the election types and the above manual classification.



In [64]:
db = sqlite3.connect(full_path_to_db)
c = db.cursor()

manual_classification_file_loc = working_directory + '/auxillary_files/election_type_mapping.csv'

if os.path.exists(manual_classification_file_loc):
    print("found helper data here:" + manual_classification_file_loc)

# load the file into a list of tuples

input_str = open(manual_classification_file_loc, 'r').readlines()
input_str_list = [tuple(l.strip().split(',')) for l in input_str]
# remove header
input_str_list = input_str_list[1:]

c.execute('drop table if exists ga_elect_manual_classification;')
db.commit()
c = db.cursor()
c.execute('''
           CREATE TABLE IF NOT EXISTS ga_elect_manual_classification(
           election_type_index INT,
           election_type TEXT,
           election_type_description TEXT, 
           manual_classification TEXT)
           ''')
db.commit()
print("Table created")

c = db.cursor()
c.executemany("INSERT INTO ga_elect_manual_classification VALUES (?,?, ?, ?)", input_str_list)
db.commit()
print("Data uploaded")

db.close()

found helper data here:/home/michael/git_repos/georgia_election_data/auxillary_files/election_type_mapping.csv
Table created
Data uploaded


## Confirming that the auxillary table was loaded

Confirming that this new table was created

In [69]:
# Test Function/

db = sqlite3.connect(full_path_to_db)
c = db.cursor()
number_of_rows = c.execute('''SELECT count(*) FROM ga_elect_manual_classification''').fetchall()[0][0]
db.close()

if number_of_rows == 37:
    print("Auxillary data correctly loaded")
else:
    print("The auxillary data table appears to be incomplete.")

Auxillary data correctly loaded
