The purpose of this Jupyter notebook is to combine and split the data from the various source files we were provided. The current format is extremely redundant, will cause memory issues if handled directly in its entirety. By combining and splitting the data we will have a much easier time working with the dataset.

This notebook splits the Combined input into the following files:


*   dfMerged.csv - Includes all voting information (need to rename)
*   dfVoters.csv - Includes information with regard to the voters
*   dfOutreach.csv - Includes information based on which outreach effort(s) affected a particular voter.



# Establish Google Drive Connection

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


## Inserting paths

In [None]:
import os
from os import listdir
from os.path import isfile, join
from pathlib import Path
import requests

if 'google.colab' in str(get_ipython()):
  #We keep changing the different paths, so I'm going to check if various configurations exist.
  if (os.path.isdir("/content/drive/MyDrive/Shared with me/content/drive/My Drive/Shared with me/ACLU/")):
    GoogleDriveBase = "/content/drive/MyDrive/Shared with me/content/drive/My Drive/Shared with me/" #Anupriya
    WorkingDirectory = GoogleDriveBase + 'ACLU/' #Anupriya
  elif (os.path.isdir("/content/drive/MyDrive/Projects/ACLU")):
    GoogleDriveBase = "/content/drive/MyDrive/" #Mackenzie
    WorkingDirectory = GoogleDriveBase + "Projects/ACLU/" #Mackenzie
  else:  
    if (os.path.isdir("/content/drive/My Drive/Projects/ACLU/")):
      GoogleDriveBase = "/content/drive/My Drive/" #Kyle
      WorkingDirectory = GoogleDriveBase + "Projects/ACLU/" #Kyle
else: # We're not running in Google Colab, which means we're probably running locally. 
  #Put code here for local copies of the files
  GoogleDriveBase = "" 
  WorkingDirectory = GoogleDriveBase + "" 


WorkingFiles = WorkingDirectory + 'WorkingFiles/'
BasePickeDrive = GoogleDriveBase + WorkingDirectory + "Pickle/"


#Make the necessary folders for the script to run.
ListOfAllRequiredDirectories = [WorkingDirectory + 'Pickle', 
                          WorkingDirectory + 'WorkingFiles',
                          WorkingDirectory + 'AdditionalData',
                          WorkingDirectory + 'ACLUData']

for folder in ListOfAllRequiredDirectories:
  RunningPath = GoogleDriveBase + folder + "/"
  Path(RunningPath).mkdir(parents=True, exist_ok=True)

# Prep Items

In [None]:
import numpy
import sys
from nltk.tokenize import RegexpTokenizer
from nltk.corpus import stopwords
from keras.models import Sequential
from keras.layers import Dense, Dropout, LSTM
from keras.utils import np_utils
from keras.callbacks import ModelCheckpoint
from keras.models import load_model
import csv
import string
import pandas as pd
import gc
import re
import string
import shutil
import os
import unittest

Now, to ensure that there is nothing left over from previous runs, we're going to delete all the files in our "WorkingFiles" directory.

In [None]:
def CleanWorkspace():
  dir = WorkingFiles
  for files in os.listdir(dir):
      path = os.path.join(dir, files)
      try:
          shutil.rmtree(path)
      except OSError:
          os.remove(path)
 

# Install Spark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz
!tar xf spark-2.4.1-bin-hadoop2.7.tgz
!pip install -q findspark

tar: spark-2.4.1-bin-hadoop2.7.tgz: Cannot open: No such file or directory
tar: Error is not recoverable: exiting now


In [None]:
#https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/
import os
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
if os.path.exists("spark-3.1.2-bin-hadoop3.2.tgz") == False:
  !wget -q https://apache.claz.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz #This has failed a few times, you may need to use a mirror from another site: https://www.apache.org/dyn/closer.lua/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar -xvzf spark-3.1.2-bin-hadoop3.2.tgz



spark-3.1.2-bin-hadoop3.2/
spark-3.1.2-bin-hadoop3.2/R/
spark-3.1.2-bin-hadoop3.2/R/lib/
spark-3.1.2-bin-hadoop3.2/R/lib/sparkr.zip
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/worker/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/worker/worker.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/worker/daemon.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/tests/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/tests/testthat/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/tests/testthat/test_basic.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/profile/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/profile/shell.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/profile/general.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/doc/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/doc/sparkr-vignettes.html
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/doc/sparkr-vignettes.Rmd
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/doc/sparkr-vignettes.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/doc/index.html
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/R/
spark-3.1.2-

In [None]:
import findspark

!ls
!pip install -q findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--master local[2] pyspark-shell"

findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .config("spark.driver.memory", "12g") \
        .config("spark.driver.maxResultSize", "5g")\
        .getOrCreate()

spark

drive  sample_data  spark-3.1.2-bin-hadoop3.2  spark-3.1.2-bin-hadoop3.2.tgz


In [None]:
#If you want to see the spark UI
#!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
#!unzip ngrok-stable-linux-amd64.zip
#get_ipython().system_raw('./ngrok http 4050 &')
#!curl -s http://localhost:4040/api/tunnels

# Combine the Voter Data from the ACLU

In [None]:
AllFilesInTheSubfolder = []
for subfolder in os.walk(WorkingDirectory + "ACLUData"):
  #Skip the pickle folder
  if "Pickle" in subfolder[0] or "WorkingFiles" in subfolder[0]:
      continue
  for file in subfolder[2]:
    FileWithPath = subfolder[0] +"/"+ file
    match = re.search("(.*)(text|mail|phone|call|postcards)", file.lower())
    if match == None:
      raise ValueError("Could not parse: " + file + "\n Please take a look at the type of communication and ensure that it's on the list. \nThis should only happen when we add new files whose name is inconsistant with the older files.")
    CommunicationType = match.group(2).replace("_", " ").strip()
    #We have some inconsistant communication type names. So, I'm doing a little cleanup here.
    if CommunicationType == "call":
      CommunicationType = "phone"
    if CommunicationType == "postcards":
      CommunicationType = "mail"
    FileWithDetails = (FileWithPath, match.group(1).replace("_", " ").strip(), match.group(2).replace("_", " ").strip(), file)
    AllFilesInTheSubfolder.append(FileWithDetails)

    #print(FileWithDetails)

In [None]:
if (len(AllFilesInTheSubfolder) < 25): #Set to more than 25 in case we receive more files.
  raise Exception("We should have at least 25 data files. Please double check you have loaded all the files.") 
else:
  print("Passed Sanity Check, at least 25 data files.")

Passed Sanity Check, at least 25 data files.


In [None]:
from pyspark.sql import functions as F
columns = []

firstDataFrame = None

for file in AllFilesInTheSubfolder:
  df = spark.read.format("csv").option("header", "true").option("delimiter", "\t").load(file[0])
  df = df.withColumn("ElectionType", F.lit(file[1]))
  df = df.withColumn("CommunicationType", F.lit(file[2]))
  df = df.withColumn("File", F.lit(file[3]))
  columns.extend(df.columns)
  if firstDataFrame is None:
    firstDataFrame = df
  else:
    FirstNotSelected = set(firstDataFrame.columns) - set(df.columns)
    for value in FirstNotSelected:
      df = df.withColumn(value, F.lit(""))
    SecondNotSelected = set(df.columns) - set(firstDataFrame.columns)
    for value in SecondNotSelected:
      firstDataFrame = firstDataFrame.withColumn(value, F.lit(""))
    firstDataFrame = firstDataFrame.union(df.select(firstDataFrame.columns))
  #Count takes a bit
  #print(firstDataFrame.count())
columns = set(columns)

In [None]:
print(firstDataFrame.count())
len(AllFilesInTheSubfolder)
if (firstDataFrame.count() < 3000000):
  raise Exception("We should have at least 3 Million records after the join proccess. Please double check your data.")
else:
  print("Passed Sanity Check: Number of records greater than 3 Million.")

3196831
Passed Sanity Check: Number of records greater than 3 Million.


In the following section we will be pulling out all unique voter information.

Please Note: As this data includes real world PII we will be making an effort to never show the data from this dataframe.

In [None]:
#It looks like "Voter File VANID" is a unique key for each user, so I'm going to pull out any other person information and stick that into another dataframe. This will reduce memory footprint, which will be necessary for the pivot
#that's coming up.
import pyspark.sql.functions as func
from pyspark.sql.functions import row_number
from pyspark.sql import DataFrameWriter
from pyspark.sql.window import Window

dataframesbytypejoinedwithoutidvalues = {}

id_columns = ["Voter File VANID", "FirstName", "MiddleName", "LastName", "Address", "DWID",  'Age', 'Primary19', 'MaritalStatus', 'mCity', 'Mass_Incarceration', 'StreetType', 'StreetSuffix',
  'mAddress', 'mState', 'mZip5', 'mZip4', 'Sex', 'Suffix', 'CD', 'SD', 'HD', 'PreferredEmail', 'Preferred Phone', 'Cell Phone', 'CountyName', 'DOB', 'DateReg', 'Home Phone', 
  'EthnicCatalistName', 'Party', 'PersonalEmail', 'RaceName', 'StreetNo', 'StreetNoHalf', 'StreetPrefix', 'StreetName', 'AptType', 'AptNo', 
  '2020_Biden_Support', 'Voting_Aug_Prim', 'PoliceAccountability', 'VBM_Application', 'MarijuanaConviction',  'Absentee_Voting'] #, 'VotedStatus', 'Voting_Nov_Gen','Zip5', 'City', 'State', 'Zip4', 'City.1', 'State.1', 'Zip5.1', 'Zip4.1', 
primary_key = ["Voter File VANID"]
dfVoters = None

dfVoters = firstDataFrame.select(id_columns)
dfVoters = dfVoters.na.fill(0)

WindowSpec  = Window.partitionBy("Voter File VANID").orderBy("Age")
dfVoters = dfVoters.withColumn("row_number",row_number().over(WindowSpec))
dfVoters = dfVoters.where(dfVoters['row_number'] == 1)

print(dfVoters.count())


1633349


Now we will make functions to save the dataframes we create: 

In [None]:

import glob
import os
import shutil

def SaveSingleFile(df, fileName):
  tempDir = WorkingFiles + "temp"
  if not os.path.exists(tempDir):
    os.makedirs(tempDir)
  else:
    shutil.rmtree(tempDir)
    os.makedirs(tempDir)
  print("Writing File")
  df.coalesce(1).write.csv(WorkingFiles + "temp", mode="overwrite", header=True)
  #df.write.csv(WorkingFiles + "temp", mode="overwrite", header=True)
  os.chdir(WorkingFiles + "temp")
  print("Moving file")
  for file in glob.glob('part-00000-*'):
      shutil.move(file, fileName)



In [None]:

SaveSingleFile(dfVoters, WorkingFiles + "dfVotersSingle.csv")


Writing File
Moving file


Creating our dfMerged file

In [None]:
ElectionColumns = ["Voter File VANID", "ElectionType", "CommunicationType", "File"]

RemainingColumns = list(set(firstDataFrame.columns) - (set(id_columns + ElectionColumns)))
dfMerged = firstDataFrame.select(RemainingColumns)

#print(dfMerged.show())

In [None]:
SaveSingleFile(dfMerged, WorkingFiles + "dfMerged.csv")


Writing File
Moving file


Adding our Age Group buckets

In [None]:
from pyspark.ml.feature import Bucketizer
from pyspark.sql.types import *
from pyspark.sql.functions import udf

Bucketized = spark.read.option("header",True).csv(WorkingFiles + "dfMerged.csv")

#dfVoters_sub = firstDataFrame.select(['Voter File VANID', 'Age', 
       #'MaritalStatus', 'Sex', 'CountyName',
       #'Party', 'RaceName', 'Primary19', '2020_Biden_Support', 
       #'PoliceAccountability','VBM_Application', 'MarijuanaConviction','Absentee_Voting', 'Mass_Incarceration'])

dfVoters_sub = firstDataFrame

bins= [18.0,29.0,44.0,64.0, 200.0]
labels = ['18-29','30-44','45-64','65+']
print(dfVoters_sub.head)
dfVoters_sub = dfVoters_sub.withColumn("AgeInt", firstDataFrame["Age"].cast(IntegerType()))
Bucketized = Bucketizer().setInputCol("AgeInt").setOutputCol( "age_bucket").setSplits( bins ).transform(dfVoters_sub)#.collect()

t = {'0.0':'18-29', '1.0': '30-44', '2.0':'45-64', '3.0': '65+'}

Bucketized = Bucketized.withColumn("AgeGroup", Bucketized["age_bucket"].cast("string"))
Bucketized = Bucketized.replace(t, "AgeGroup")



SaveSingleFile(Bucketized, WorkingFiles + "Bucketized.csv")


<bound method DataFrame.head of DataFrame[Voter File VANID: string, DWID: string, mAddress: string, mCity: string, mState: string, mZip5: string, mZip4: string, Sex: string, Address: string, City9: string, State10: string, Zip511: string, Zip412: string, LastName: string, FirstName: string, MiddleName: string, Suffix: string, CD: string, SD: string, HD: string, PreferredEmail: string, Preferred Phone: string, Age: string, Cell Phone: string, CountyName: string, DOB: string, DateReg: string, Home Phone: string, MaritalStatus: string, EthnicCatalistName: string, Party: string, PersonalEmail: string, RaceName: string, StateFileID: string, StreetNo: string, StreetNoHalf: string, StreetPrefix: string, StreetName: string, StreetType: string, StreetSuffix: string, AptType: string, AptNo: string, City42: string, State43: string, Zip544: string, Zip445: string, General20: string, General19: string, General18: string, General17: string, General16: string, General15: string, General14: string, Ge



Writing File
Moving file


In [None]:
Bucketized = spark.read.option("header",True).csv(WorkingFiles + "Bucketized.csv")

In [None]:
VoterColumns = ["Voter File VANID", "FirstName", "MiddleName", "LastName", "Address", "DWID",  'Age', 'Primary19', 'MaritalStatus', 'mCity', 'Mass_Incarceration', 'StreetType', 'StreetSuffix',
  'mAddress', 'mState', 'mZip5', 'mZip4', 'Sex', 'Suffix', 'CD', 'SD', 'HD', 'PreferredEmail', 'Preferred Phone', 'Cell Phone', 'CountyName', 'DOB', 'DateReg', 'Home Phone', 
  'EthnicCatalistName', 'Party', 'PersonalEmail', 'RaceName', 'StreetNo', 'StreetNoHalf', 'StreetPrefix', 'StreetName', 'AptType', 'AptNo', 
  '2020_Biden_Support', 'Voting_Aug_Prim', 'PoliceAccountability', 'VBM_Application', 'MarijuanaConviction',  'Absentee_Voting', 'age_bucket', 'AgeGroup',
  'Zip545', 'State43', 'Zip412', 'StateFileID', 'Zip511', 'City42', 'Ballot_Application', 'Zip446', 'Zip445', 'Zip544'] 

OutreachColumns = ["Voter File VANID", "ElectionType", "CommunicationType", "File"]

OtherColumns = ['Voting_Nov_Gen ',  '2020_Biden_Support']

VoterColumnsToRemove = list(set(VoterColumns) - set(primary_key))
OutreachColumnsToRemove = list(set(OutreachColumns) - set(primary_key))
OtherColumnsToRemove = list(set(OtherColumns) - set(primary_key))


AllColumns = set(Bucketized.columns)
RemainingColumns = list(((set(Bucketized.columns) - set(VoterColumnsToRemove)) - set(OtherColumnsToRemove)) - set(ElectionColumns))


VotingColumns = list(set(primary_key + RemainingColumns))
print(RemainingColumns)

['Special02', 'PresPrimary08Party', 'Primary12', 'Special98', 'Special03', 'Primary12Party', 'General09', 'Primary11Party', 'Special09', 'Primary09', 'Special07', 'PresPrimary16', 'Primary98', 'Primary08Party', 'Special00', 'Primary17Party', 'Primary19Party', 'Primary06', 'General12', 'Special11', 'General15', 'Primary01', 'Special16', 'Special13', 'General19', 'AgeInt', 'Primary09Party', 'Special10', 'Primary11', 'Primary15', 'General14', 'Primary20', 'Primary10', 'Special18', 'Primary18', 'Primary18Party', 'City9', 'Special05', 'Primary16Party', 'Primary15Party', 'General18', 'PresPrimary20Party', 'General06', 'Special99', 'General02', 'Primary16', 'General20', 'PresPrimary16Party', 'Special04', 'Primary00', 'PresPrimary08', 'Special19', 'General08', 'VotedStatus', 'Special17', 'Special06', 'Special01', 'PresPrimary12Party', 'Special15', 'General17', 'General16', 'General00', 'City43', 'Primary20Party', 'State44', 'Primary14Party', 'General98', 'PresPrimary12', 'General10', 'General1

Here we are going to check all the remaining columns, and ensure that they are all voting history columns. This is a necessary step as we have received additional files throughout the project.

In [None]:
dfRemaining = Bucketized.select(VotingColumns)
dfRemaining.show()

+---------+------------------+---------+---------+-------------+---------+--------------+---------+---------+---------+--------------+---------+---------+---------+---------+--------------+-------------+--------------+--------------+----------------+------------------+-------------+---------+---------+---------+---------+---------+---------+------+--------------+---------+---------+---------+---------+---------+---------+-------------+---------+--------------+---------+--------------+---------+---------+--------------+--------------+---------+---------+---------+--------------+---------+--------------+---------+---------+---------+---------+---------+------+---------+---------+---------+---------+---------+---------+------------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+-----------+---------+------------------+--------------+-------+-------------+---------+-----------+-------+---------+---------+---------+
|Special02|PresPrimary08Pa

Voter demographics dataframe

In [None]:
dfVoters = Bucketized.select(VoterColumns)
SaveSingleFile(dfVoters, WorkingFiles + "dfVoters.csv")

Writing File
Moving file


Outreach dataframe

In [None]:
#Pull out the file information, this indicates which outreach efforts they were included in.
dfOutreach = Bucketized.select(ElectionColumns)
SaveSingleFile(dfOutreach, WorkingFiles + "dfOutreach.csv")

Writing File
Moving file


Plotly FIPS Data

This is where we call the FIPS data so that we are able to plot it with plotly. Here we are mapping the plotly geojson data to County Name to obtain the FIPS code. 

In [None]:
from urllib.request import urlopen
import json
with urlopen('https://raw.githubusercontent.com/plotly/datasets/master/geojson-counties-fips.json') as response:
    counties = json.load(response)

counties["features"][0]

{'geometry': {'coordinates': [[[-86.496774, 32.344437],
    [-86.717897, 32.402814],
    [-86.814912, 32.340803],
    [-86.890581, 32.502974],
    [-86.917595, 32.664169],
    [-86.71339, 32.661732],
    [-86.714219, 32.705694],
    [-86.413116, 32.707386],
    [-86.411172, 32.409937],
    [-86.496774, 32.344437]]],
  'type': 'Polygon'},
 'id': '01001',
 'properties': {'CENSUSAREA': 594.436,
  'COUNTY': '001',
  'GEO_ID': '0500000US01001',
  'LSAD': 'County',
  'NAME': 'Autauga',
  'STATE': '01'},
 'type': 'Feature'}

In [None]:
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql import Row

data = {}
rows = []
nest1 = counties["features"]
for thing in nest1: 
  if thing['properties']['STATE'] == '26':
    data[thing['id']] = thing['properties']['NAME']
    rows.append(Row(FIPS= thing['id'], CountyName= thing['properties']['NAME']))

Now we will create a few functions to assist with melting the elections dataframe down from being columnar to row-wise. This means we will have multiple rows per Voter but for various elections over the years. 

In [None]:
from pyspark.sql.functions import array, col, explode, lit, struct, isnan, when, count, sum
from pyspark.sql import DataFrame
from typing import Iterable 

primary_key = ["Voter File VANID"]
columns_processed = []

def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))
    #print(_vars_and_vals)
    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

def renameColumns(df, ColumnList1, ColumnList2):
  for i in range(len(ColumnList1)):
    df = df.withColumnRenamed(ColumnList1[i], ColumnList2[i])
  return df

def ReshapeDataframeYears(df, DropNa = False):
  ColumnsToTransform = ["Special##", "General##", "Primary##", "PresPrimary##", "PresPrimary##Party", "Primary##Party", "Municipal##"]
  #DropNA is an optional argument for if we want to drop null values later on. for one of our analyses we need null values to determine non-voters. 
  melted_dataframes = []
  runningDataFrame = None

  for name in ColumnsToTransform:
    ColumnsForName = []
    years = []
    number_index = name.index("##")
    for column in df.columns:
      if len(column) < len(name):
        continue
      first = column[0:number_index]
      name_first = name[0:number_index]
      number = column[number_index:number_index+2]
      if len(column) - number_index+2 == 0:
        second = ""
        name_second = ""
      else:
        second = column[number_index+2:len(column)]
        name_second = name[number_index+2:len(name)]

      if first == name_first and second == name_second:
        ColumnsForName.append(column)
        columns_processed.append(column)
        year = column[len(first):len(first)+2]

        if int(year) < 50:
          year = "20" + year
        else:
          year = "19" + year
        years.append(year)

    df_reduced = df.select(primary_key + ColumnsForName)
    df_reduced = renameColumns(df_reduced, (primary_key + ColumnsForName), (primary_key + years))
    df_melted = melt(df_reduced, id_vars=primary_key, value_vars=(years))
    
    #Restructure the columns
    final_columns = primary_key.copy()
    final_columns.append("year")
    final_columns.append("participation")
    #print(df_melted.show())
    df_melted = renameColumns(df_melted, df_melted.columns, final_columns)
    df_melted = df_melted.withColumn("ElectionType", F.lit(name))
    #Drop records where we don't have participation data. Waffle df needs NA values so making an optional argument
    #if DropNa == 'Y':
    #  df_melted = df_melted.na.drop()
    #else: 
    #  df_melted
    #print(df_melted.head())
    melted_dataframes.append(df_melted)
    if runningDataFrame is None:
      runningDataFrame = df_melted
    else:
      runningDataFrame = runningDataFrame.union(df_melted)
  return runningDataFrame

def CombineMeltedDataframes(melted_dataframes):
  #Now join them all back together
  #Rather silly that this is my method, but I tried a few different versions to find the most memory efficient way to do this
  return pd.concat(melted_dataframes)



Visual check that the reformed data looks appropriate.

In [None]:
melteddfs = ReshapeDataframeYears(dfRemaining, False)
melteddfs = melteddfs.na.drop()
melteddfs.show()

+----------------+----+-------------+------------+
|Voter File VANID|year|participation|ElectionType|
+----------------+----+-------------+------------+
|           82502|2013|            Y|   Special##|
|           82502|2001|            Y|   Special##|
|           82502|2009|            P|   Special##|
|           82502|2005|            Y|   Special##|
|           82502|2017|            P|   Special##|
|           82502|2020|            P|   Special##|
|           71010|2013|            Y|   Special##|
|           71010|2001|            Y|   Special##|
|           71010|2009|            P|   Special##|
|           71010|2005|            Y|   Special##|
|           71010|2017|            P|   Special##|
|           71010|2020|            A|   Special##|
|           44565|2013|            Y|   Special##|
|           44565|2001|            Y|   Special##|
|           44565|2015|            A|   Special##|
|           44565|2003|            A|   Special##|
|           44565|2009|        

Now we will save it as dfreshapedUnique for consistency of what we created with pandas earlier. 

In [None]:
SortingColumns = ['RaceName', 'Sex', 'AgeGroup', 'CountyName']
SaveSingleFile(melteddfs, WorkingFiles + "dfreshapedUnique.csv")

Writing File
Moving file


Now we are going to put together the ACLU Voter information which is used for the Map graph we have. 

In [None]:
#Put Togther Voter Information DataFrame
import pyspark.sql.functions as F
from pyspark.sql.functions import upper, col
SortingColumns = ['RaceName', 'Sex', 'AgeGroup', 'CountyName']

AllRegisteredVoters = spark.read.option("header",True).csv(WorkingDirectory + 'AdditionalData/All Registered Voters Michigan.csv')
AllRegisteredVoters = AllRegisteredVoters.withColumnRenamed('County', 'CountyName')
ACLUVotersGrouped = dfVoters.groupby(SortingColumns).count()
dfVoters = dfVoters.withColumn('CountyName', upper(dfVoters['CountyName']))
ACLUVotersJoineWithAllVotersInfo = AllRegisteredVoters.join(ACLUVotersGrouped, ['CountyName'], 'inner')
ACLUVotersJoineWithAllVotersInfo = ACLUVotersJoineWithAllVotersInfo.withColumn("fraction", (F.col("count") / F.col("All Registered Voters")))

print("All registred Voters (Number of Counties): " + str(AllRegisteredVoters.count()))
print("ACLUVotersGrouped (Number of Counties) " + str(ACLUVotersGrouped.count()))
print("dfVoters (Number of Counties Post Join) " + str(AllRegisteredVoters.count()))

SaveSingleFile(ACLUVotersJoineWithAllVotersInfo ,WorkingFiles+ "total_voter_pop.csv")


All registred Voters (Number of Counties): 83
ACLUVotersGrouped (Number of Counties) 2689
dfVoters (Number of Counties Post Join) 83
Writing File
Moving file


In [None]:
#ACLUVotersJoineWithAllVotersInfo.show()

We have a lot of graphs which are taking some counts based off of a few grouping columns so we will write a function here to handle that with our primary key (Voter File VANID), Sorting Columns (Demographics), and then whatever additional columns we are grouping with. We will then save it to a file to be used on the dashboard later. 

In [None]:
def SummarizeForValue(df, ColumnsWeWantValuesFor, FileName, dropna = True):
  selected = df.select(SortingColumns + primary_key + ColumnsWeWantValuesFor)
  selected = selected.na.drop()
  selected = selected.groupBy(SortingColumns + ColumnsWeWantValuesFor).count()
  SaveSingleFile(selected, WorkingFiles + FileName)
  return selected


In [None]:
SummarizeForValue(Bucketized, ['Mass_Incarceration'], "mass_inc.csv");

Writing File
Moving file


In [None]:
print(Bucketized.columns)
SummarizeForValue(Bucketized, ['CommunicationType', 'ElectionType'], "AggregateByCommunication.csv");
SummarizeForValue(Bucketized, ['PoliceAccountability'], "policing.csv");
SummarizeForValue(Bucketized, ['MarijuanaConviction'], "weed.csv");
SummarizeForValue(Bucketized, ['2020_Biden_Support'], "biden_support_df.csv");

['Voter File VANID', 'DWID', 'mAddress', 'mCity', 'mState', 'mZip5', 'mZip4', 'Sex', 'Address', 'City9', 'State10', 'Zip511', 'Zip412', 'LastName', 'FirstName', 'MiddleName', 'Suffix', 'CD', 'SD', 'HD', 'PreferredEmail', 'Preferred Phone', 'Age', 'Cell Phone', 'CountyName', 'DOB', 'DateReg', 'Home Phone', 'MaritalStatus', 'EthnicCatalistName', 'Party', 'PersonalEmail', 'RaceName', 'StateFileID', 'StreetNo', 'StreetNoHalf', 'StreetPrefix', 'StreetName', 'StreetType', 'StreetSuffix', 'AptType', 'AptNo', 'City42', 'State43', 'Zip544', 'Zip445', 'General20', 'General19', 'General18', 'General17', 'General16', 'General15', 'General14', 'General12', 'General11', 'General10', 'General09', 'General08', 'General06', 'General04', 'General02', 'General00', 'General98', 'Municipal13', 'PresPrimary20', 'PresPrimary20Party', 'PresPrimary16', 'PresPrimary16Party', 'PresPrimary12', 'PresPrimary12Party', 'PresPrimary08', 'PresPrimary08Party', 'Primary20', 'Primary20Party', 'Primary19', 'Primary19Party'

Now to process the specific voting records.

In [None]:
VoterHistory = Bucketized.drop("ElectionType").join(melteddfs, ['Voter File VANID'], 'inner')
election_votes = SummarizeForValue(VoterHistory, ['ElectionType', 'year'], "election_votes.csv")
election_votes.show()

Writing File
Moving file
+---------+---+--------+----------+--------------+----+-----+
| RaceName|Sex|AgeGroup|CountyName|  ElectionType|year|count|
+---------+---+--------+----------+--------------+----+-----+
|Caucasian|  M|   45-64|     Eaton|     Special##|2015|  456|
|Caucasian|  M|   30-44|   Lenawee|     General##|2016| 3610|
| Hispanic|  M|   45-64|    Ottawa|     General##|2012| 1978|
|Caucasian|  M|     65+|     Wayne|     Special##|2001| 4069|
|Caucasian|  M|     65+|   Oakland|     General##|2011|99920|
|    Black|  F|   30-44|   Jackson|     Special##|2017|   25|
|Caucasian|  F|   30-44|   Genesee|     Special##|2020| 5198|
|Caucasian|  M|   30-44|    Ottawa|     Special##|2010|  226|
|    Asian|  M|   45-64|     Wayne|     General##|2010| 3096|
|    Black|  M|   30-44|    Macomb|     Primary##|2018| 2473|
|Caucasian|  F|   45-64| Van Buren|     Primary##|2018|  323|
|    Black|  M|   30-44|   Genesee|Primary##Party|2016| 2994|
|  Unknown|  F|   30-44|   Oakland| PresPrima

In [None]:
#VoterHistory.show()

Waffle DF Data

In [None]:
Bucketized_sub = Bucketized.select('Voter File VANID','RaceName','AgeGroup', 'CountyName', 'Sex', 'General20', 
                                   'Primary20',col("ElectionType").alias("CampaignType"), 'CommunicationType')
new_col_1 = when(col("General20").isNull() & col("Primary20").isNull(), 1).otherwise(0)
new_col_2 = when(col("General20").isNull() & col("Primary20").isNotNull(), 1).otherwise(0)
new_col_3 = when(col("General20").isNotNull() & col("Primary20").isNull(), 1).otherwise(0)
new_col_4 = when(col("General20").isNotNull() & col("Primary20").isNotNull(), 1).otherwise(0)
Bucketized_sub = (Bucketized_sub.withColumn("NoVote", new_col_1).withColumn("Primary", new_col_2).withColumn("General", new_col_3).withColumn("VoteBoth", new_col_4))
agg_t = Bucketized_sub.groupBy(SortingColumns + ['CampaignType', 'CommunicationType']).agg(sum('NoVote').alias("NoVote"), 
                                                                                           sum('Primary').alias("Primary"), 
                                                                                           sum('General').alias("General"), 
                                                                                           sum('VoteBoth').alias("VoteBoth"))
SaveSingleFile(agg_t, WorkingFiles + "waffle_df.csv")

Writing File
Moving file


General Election Participation

In [None]:
def participation(FileName):
  election_20 = melteddfs.where(melteddfs.year == 2020)
  gen_20 = election_20.where(election_20.ElectionType == 'General##')
  dfVoters_dem = dfVoters.select(['Voter File VANID','RaceName','AgeGroup', 'CountyName', 'Sex' ])
  gen_20_dem = gen_20.join(dfVoters_dem,['Voter File VANID'] , 'left')
  selected = gen_20_dem.groupBy(['RaceName','AgeGroup', 'CountyName', 'Sex','participation']).count()
  SaveSingleFile(selected, WorkingFiles + FileName)
  return selected

In [None]:
party = participation('gen20_participation.csv')

Writing File
Moving file


Outreach Data

In [None]:
dfVoters_outreach = dfOutreach.join(dfVoters, ['Voter File VANID'], 'inner')
print(dfVoters_outreach.show())
dfOutreachGrouped = SummarizeForValue(dfVoters_outreach, ['CommunicationType', 'ElectionType',], "dfoutreachSumm.csv")
dfOutreachGrouped.show()




+----------------+------------+-----------------+--------------------+---------+----------+--------+--------------------+---------+---+---------+-------------+------------+------------------+----------+------------+--------------------+------+-----+-----+---+------+---+---+---+--------------+---------------+-------------+----------+----------+----------+-------------+------------------+-----+-------------+---------+--------+------------+------------+----------+-------+-----+------------------+---------------+--------------------+---------------+-------------------+---------------+----------+--------+------+-------+------+-----------+------+------------+------------------+------+-------+--------+
|Voter File VANID|ElectionType|CommunicationType|                File|FirstName|MiddleName|LastName|             Address|     DWID|Age|Primary19|MaritalStatus|       mCity|Mass_Incarceration|StreetType|StreetSuffix|            mAddress|mState|mZip5|mZip4|Sex|Suffix| CD| SD| HD|PreferredEmail|Pr