In [1]:
import pyspark
import pyspark.sql.functions as F
import pandas as pd
import numpy as np
import re
import os
import os.path as osp
import sys

In [2]:
!pwd

/home/matt/lsnm/etl


In [3]:
!whoami

matt


In [4]:
sc = pyspark.sql.SparkSession.Builder()\
        .appName('lsnm-etl')\
        .master('yarn')\
        .config('spark.executor.instances', '1')\
        .config('spark.yarn.executor.memoryOverhead', '3g')\
        .config('spark.executor.memory', '9g')\
        .config('spark.executor.cores', '2')\
        .enableHiveSupport()\
        .getOrCreate()

In [5]:
def get_ids(table_name, column_name, entity_type_str, verbose=True):
    query = f"""
SELECT DISTINCT({column_name}) AS entity_id,
  '{entity_type_str}' AS entity_type,
  '{table_name}/{column_name}' AS src_table
FROM {table_name}
"""
    
    if verbose:
        print(query)
    
    return sc.sql(query)

# Build list of candidates by aggregating sources

#### Candidates from `pq_crp_cands`

In [6]:
distinctCandsCands = get_ids('pq_crp_cands', 'cid', 'cand')


SELECT DISTINCT(cid) AS entity_id,
  'cand' AS entity_type,
  'pq_crp_cands/cid' AS src_table
FROM pq_crp_cands



In [7]:
distinctCandsCands.select(F.countDistinct('entity_id')).show()

+-------------------------+
|count(DISTINCT entity_id)|
+-------------------------+
|                    30677|
+-------------------------+



In [8]:
distinctCandsCands.limit(10).show()

+---------+-----------+----------------+
|entity_id|entity_type|       src_table|
+---------+-----------+----------------+
|N00002819|       cand|pq_crp_cands/cid|
|N00011981|       cand|pq_crp_cands/cid|
|N00012017|       cand|pq_crp_cands/cid|
|N00003276|       cand|pq_crp_cands/cid|
|N00000837|       cand|pq_crp_cands/cid|
|N00002092|       cand|pq_crp_cands/cid|
|N00001192|       cand|pq_crp_cands/cid|
|N00011875|       cand|pq_crp_cands/cid|
|N00000462|       cand|pq_crp_cands/cid|
|N00005010|       cand|pq_crp_cands/cid|
+---------+-----------+----------------+



#### Distinct candidates from `pq_crp_pacs`

In [25]:
distinctCandsPacs = get_ids('pq_crp_pacs', 'cid', 'cand').filter(F.substring(F.col('entity_id'), 0, 1) == 'N')


SELECT DISTINCT(cid) AS entity_id,
  'cand' AS entity_type,
  'pq_crp_pacs/cid' AS src_table
FROM pq_crp_pacs



#### Distinct candidates from `pq_crp_pac_other`

In [12]:
distinctCandsPacOtherFiler = get_ids('pq_crp_pac_other', 'filerid', 'cand').filter(F.substring(F.col('entity_id'), 0, 1) == 'N')


SELECT DISTINCT(filerid) AS entity_id,
  'cand' AS entity_type,
  'pq_crp_pac_other/filerid' AS src_table
FROM pq_crp_pac_other



In [14]:
distinctCandsPacRecip = get_ids('pq_crp_pac_other', 'recipid', 'cand').filter(F.substring(F.col('entity_id'), 0, 1) == 'N')


SELECT DISTINCT(recipid) AS entity_id,
  'cand' AS entity_type,
  'pq_crp_pac_other/recipid' AS src_table
FROM pq_crp_pac_other



In [15]:
distinctCandsPacOtherOther = get_ids('pq_crp_pac_other', 'otherid', 'cand').filter(F.substring(F.col('entity_id'), 0, 1) == 'N')


SELECT DISTINCT(otherid) AS entity_id,
  'cand' AS entity_type,
  'pq_crp_pac_other/otherid' AS src_table
FROM pq_crp_pac_other



#### Distinct candidates from `pq_crp_expends`

In [33]:
distinctCandsExpendsFiler = get_ids('pq_crp_expends', 'crpfilerid', 'cand').filter(F.substring(F.col('entity_id'), 0, 1) == 'N')


SELECT DISTINCT(crpfilerid) AS entity_id,
  'cand' AS entity_type,
  'pq_crp_expends/crpfilerid' AS src_table
FROM pq_crp_expends



In [34]:
distinctCandsExpendsCandid = get_ids('pq_crp_expends', 'candid', 'cand').filter(F.substring(F.col('entity_id'), 0, 1) == 'N')


SELECT DISTINCT(candid) AS entity_id,
  'cand' AS entity_type,
  'pq_crp_expends/candid' AS src_table
FROM pq_crp_expends



#### Distinct candidates from `pq_crp_indivs`

In [17]:
distinctCandsIndivsRecip = get_ids('pq_crp_indivs', 'recipid', 'cand').filter(F.substring(F.col('entity_id'), 0, 1) == 'N')


SELECT DISTINCT(recipid) AS entity_id,
  'cand' AS entity_type,
  'pq_crp_indivs/recipid' AS src_table
FROM pq_crp_indivs



In [18]:
distinctCandsIndivsCmte = get_ids('pq_crp_indivs', 'cmteid', 'cand').filter(F.substring(F.col('entity_id'), 0, 1) == 'N')


SELECT DISTINCT(cmteid) AS entity_id,
  'cand' AS entity_type,
  'pq_crp_indivs/cmteid' AS src_table
FROM pq_crp_indivs



## Unify distinct sources of candidate id

In [35]:
distinctCands = distinctCandsCands.unionAll(distinctCandsPacs)\
                                  .unionAll(distinctCandsPacOtherFiler)\
                                  .unionAll(distinctCandsPacRecip)\
                                  .unionAll(distinctCandsPacOtherOther)\
                                  .unionAll(distinctCandsExpendsFiler)\
                                  .unionAll(distinctCandsExpendsCandid)\
                                  .unionAll(distinctCandsIndivsRecip)\
                                  .unionAll(distinctCandsIndivsCmte)\
                                  .dropDuplicates(subset=['entity_id'])

In [36]:
distinctCands.groupBy('src_table').count().show()

+--------------------+-----+
|           src_table|count|
+--------------------+-----+
|     pq_crp_pacs/cid|    6|
|pq_crp_pac_other/...|    5|
|pq_crp_pac_other/...|    1|
|    pq_crp_cands/cid|30677|
|pq_crp_expends/cr...|    8|
|pq_crp_indivs/rec...|    1|
+--------------------+-----+



It's a good thing that appearances of unique `cid`s not contained in `pq_crp_cands` are quite rare. Only 21/30698 `cid`s appear outside this table.

# Repeat process for committees

#### PACs from `pq_crp_cmtes`

In [28]:
distinctPacCmtes = get_ids('pq_crp_cmtes', 'cmteid', 'cmte')


SELECT DISTINCT(cmteid) AS entity_id,
  'cmte' AS entity_type,
  'pq_crp_cmtes/cmteid' AS src_table
FROM pq_crp_cmtes



In [30]:
distinctPacCmtes.select(F.countDistinct('entity_id')).show()

+-------------------------+
|count(DISTINCT entity_id)|
+-------------------------+
|                    53312|
+-------------------------+



In [31]:
distinctPacCmtes.limit(10).show()

+---------+-----------+-------------------+
|entity_id|entity_type|          src_table|
+---------+-----------+-------------------+
|C00000729|       cmte|pq_crp_cmtes/cmteid|
|C00040220|       cmte|pq_crp_cmtes/cmteid|
|C00068452|       cmte|pq_crp_cmtes/cmteid|
|C00076992|       cmte|pq_crp_cmtes/cmteid|
|C00118414|       cmte|pq_crp_cmtes/cmteid|
|C00120469|       cmte|pq_crp_cmtes/cmteid|
|C00126284|       cmte|pq_crp_cmtes/cmteid|
|C00126581|       cmte|pq_crp_cmtes/cmteid|
|C00136853|       cmte|pq_crp_cmtes/cmteid|
|C00150995|       cmte|pq_crp_cmtes/cmteid|
+---------+-----------+-------------------+



In [37]:
distinctPacPacs = get_ids('pq_crp_pacs', 'pacid', 'cand').filter(F.substring(F.col('entity_id'), 0, 1) == 'C')
distinctPacPacOtherFiler = get_ids('pq_crp_pac_other', 'filerid', 'cand').filter(F.substring(F.col('entity_id'), 0, 1) == 'C')
distinctPacPacOtherRecip = get_ids('pq_crp_pac_other', 'recipid', 'cand').filter(F.substring(F.col('entity_id'), 0, 1) == 'C')
distinctPacPacOtherOther = get_ids('pq_crp_pac_other', 'otherid', 'cand').filter(F.substring(F.col('entity_id'), 0, 1) == 'C')
distinctPacExpends = get_ids('pq_crp_expends', 'crpfilerid', 'cand').filter(F.substring(F.col('entity_id'), 0, 1) == 'C')
distinctPacIndivsRecip = get_ids('pq_crp_indivs', 'recipid', 'cand').filter(F.substring(F.col('entity_id'), 0, 1) == 'C')
distinctPacIndivsCmte = get_ids('pq_crp_indivs', 'cmteid', 'cand').filter(F.substring(F.col('entity_id'), 0, 1) == 'C')

distinctPacs = distinctPacCmtes.unionAll(distinctPacPacs)\
                                .unionAll(distinctPacPacOtherFiler)\
                                .unionAll(distinctPacPacOtherRecip)\
                                .unionAll(distinctPacPacOtherOther)\
                                .unionAll(distinctPacExpends)\
                                .unionAll(distinctPacIndivsRecip)\
                                .unionAll(distinctPacIndivsCmte)\
                                .dropDuplicates(subset=['entity_id'])


SELECT DISTINCT(pacid) AS entity_id,
  'cand' AS entity_type,
  'pq_crp_pacs/pacid' AS src_table
FROM pq_crp_pacs


SELECT DISTINCT(filerid) AS entity_id,
  'cand' AS entity_type,
  'pq_crp_pac_other/filerid' AS src_table
FROM pq_crp_pac_other


SELECT DISTINCT(recipid) AS entity_id,
  'cand' AS entity_type,
  'pq_crp_pac_other/recipid' AS src_table
FROM pq_crp_pac_other


SELECT DISTINCT(otherid) AS entity_id,
  'cand' AS entity_type,
  'pq_crp_pac_other/otherid' AS src_table
FROM pq_crp_pac_other


SELECT DISTINCT(crpfilerid) AS entity_id,
  'cand' AS entity_type,
  'pq_crp_expends/crpfilerid' AS src_table
FROM pq_crp_expends


SELECT DISTINCT(recipid) AS entity_id,
  'cand' AS entity_type,
  'pq_crp_indivs/recipid' AS src_table
FROM pq_crp_indivs


SELECT DISTINCT(cmteid) AS entity_id,
  'cand' AS entity_type,
  'pq_crp_indivs/cmteid' AS src_table
FROM pq_crp_indivs



In [39]:
distinctPacs.groupBy('src_table').count().show()

+--------------------+-----+
|           src_table|count|
+--------------------+-----+
|pq_crp_pac_other/...|   34|
|pq_crp_pac_other/...|  127|
|pq_crp_indivs/cmteid|    8|
|pq_crp_pac_other/...|    3|
|   pq_crp_pacs/pacid|    2|
| pq_crp_cmtes/cmteid|53312|
+--------------------+-----+



In [41]:
pacOtherOrphans = distinctPacs.filter(F.substring(F.col('src_table'), 0, 16) == 'pq_crp_pac_other').toPandas()

In [45]:
pacOtherOrphans.query('src_table == "pq_crp_pac_other/otherid"').head(5)

Unnamed: 0,entity_id,entity_type,src_table
0,C00450500,cand,pq_crp_pac_other/otherid
1,C0034,cand,pq_crp_pac_other/otherid
2,C00630673,cand,pq_crp_pac_other/otherid
3,C00584131,cand,pq_crp_pac_other/otherid
4,C00093005,cand,pq_crp_pac_other/otherid


So there are a lot of ways for the distinct PACs to be broken - even of those with valid 8-digit PAC codes starting with 'C' (i.e. `C12345678`) many are not contained in the `pq_crp_cmtes` table.

# Now do corporations and individuals (without a standardized ID)

## Start with `pq_crp_expends`

In [50]:
sc.sql('describe pq_crp_expends').toPandas().to_records()

rec.array([( 0, 'Cycle', 'string', 'Last year (even) of a federal 2-yr election cycle, orig-dtype=Text (4), source=CRP'),
           ( 1, 'ID', 'int', 'An auto ID added when dbo_Expenditures is made- acts as a unique identifier. This field cannot be used to match records from one download to the next., orig-dtype=Integer, source=CRP'),
           ( 2, 'TransID', 'string', 'A unique record identifier within a given cycle., orig-dtype=Text (20), source=FEC'),
           ( 3, 'CRPFilerid', 'string', 'ID of the filing committee, same as Filerid unless it is a candidate committee, in which case it will be the candidates unique id. (Note that a candidate can have more than one committee - this field indicates the exact committee receiving the contribution), orig-dtype=Text (9), source=CRP'),
           ( 4, 'Recipcode', 'string', 'A two character code defining the type of recipient. For candidates, the first character is party (D for Democratic, R for Republican, 3 for Independent, Libertari

In [57]:
pd.options.display.max_columns = 500

In [58]:
sc.sql(f"""
select * from pq_crp_expends where `type`='15'
""").limit(10).toPandas()

Unnamed: 0,Cycle,ID,TransID,CRPFilerid,Recipcode,Pacshort,CRPRecipname,Expcode,Amount,Date,City,State,Zip,CmteID_EF,Candid,Type,Descrip,PG,ElecOther,EntType,Source
0,2000,42412,990310-005-38,N00009898,RL,Quayle 2000,Broadbent George P. Mr.,,-1000.0,1999-05-04,Indianapolis,IN,46240,,,15,,P,,COM,
1,2000,42413,990428-002-23,N00009898,RL,Quayle 2000,Dutton Harold H. Mr.,,-120.0,1999-06-23,Bethesda,MD,20817,,,15,,P,,COM,
2,2000,42414,990525-034-24,N00009898,RL,Quayle 2000,Dutton Harold H. Mr.,,-112.0,1999-06-23,Bethesda,MD,20817,,,15,,P,,COM,
3,2000,42415,990525-007-37,N00009898,RL,Quayle 2000,Frye Robert S. Mr.,,-150.0,1999-06-21,Tecumseh,MI,49286,,,15,,P,,COM,
4,2000,42416,990528-004-38,N00009898,RL,Quayle 2000,Frye Robert S. Mr.,,-100.0,1999-06-21,Tecumseh,MI,49286,,,15,,P,,COM,
5,2000,42417,990608-011-37,N00009898,RL,Quayle 2000,Frye Robert S. Mr.,,-125.0,1999-06-21,Tecumseh,MI,49286,,,15,,P,,COM,
6,2000,42418,990603-004-27,N00009898,RL,Quayle 2000,Genuario Louis V. Mr.,,-1000.0,1999-06-15,Alexandria,VA,22308,,,15,,P,,COM,
7,2000,42419,990310-004-37,N00009898,RL,Quayle 2000,Hofmeister Gary A. Mr.,,-100.0,1999-05-04,Indianapolis,IN,46240,,,15,,P,,COM,
8,2000,42420,990615-014-35,N00009898,RL,Quayle 2000,McElroy Roberta P. Ms.,,-100.0,1999-06-28,Pittsburgh,PA,15214,,,15,,P,,COM,
9,2000,42421,990617-004-38,N00009898,RL,Quayle 2000,Pardini Peter Mr.,,-50.0,1999-06-23,Daly City,CA,94014,,,15,,P,,COM,


In [74]:
crprecips = get_ids('pq_crp_expends', 'crprecipname', 'other').withColumn('pacshort', F.col('entity_id'))\
                                                              .join(sc.sql('select cmteid, pacshort from pq_crp_cmtes'),
                                                                    'pacshort',
                                                                    'left_anti')\
                                                              .drop('pacshort')


SELECT DISTINCT(crprecipname) AS entity_id,
  'other' AS entity_type,
  'pq_crp_expends/crprecipname' AS src_table
FROM pq_crp_expends



In [75]:
crprecips.limit(10).show()

+--------------------+-----------+--------------------+
|           entity_id|entity_type|           src_table|
+--------------------+-----------+--------------------+
|      Amoco of Fargo|      other|pq_crp_expends/cr...|
|Road King Inn of ...|      other|pq_crp_expends/cr...|
|      Koromilas Alec|      other|pq_crp_expends/cr...|
|            West End|      other|pq_crp_expends/cr...|
|       Sandy Proteau|      other|pq_crp_expends/cr...|
|             Walmart|      other|pq_crp_expends/cr...|
|   Sharpe Allison L.|      other|pq_crp_expends/cr...|
|          Dan Alpren|      other|pq_crp_expends/cr...|
|Gadi Nevo Ben-Yehuda|      other|pq_crp_expends/cr...|
|Hilton-Newark Gat...|      other|pq_crp_expends/cr...|
+--------------------+-----------+--------------------+



## `pq_crp_contribid`

Null vs. not null values in `indivs.contribid`:

```
select sum(if(or(isnull(contribid),
                 length(trim(contribid)) = 0),
              1, 0)
           ) as total_null_rows,
  count(*) as total_rows
from pq_crp_indivs;
```

> ```
+------------------+-------------+--+
| total_null_rows  | total_rows  |
+------------------+-------------+--+
| 415461           | 56531200    |
+------------------+-------------+--+
```

```
select sum(amount) from pq_crp_indivs where isnull(contribid) or length(trim(contribid)) = 0;
```

> ```
+--------------+--+
| sum(amount)  |
+--------------+--+
| 3039162144   |
+--------------+--+
```

```
select sum(amount) from pq_crp_indivs where contribid is not null and length(trim(contribid)) != 0;
```

> ```
+--------------+--+
| sum(amount)  |
+--------------+--+
| 29651883975  |
+--------------+--+
```

In [48]:
415461 / (415461 + 56531200)

0.00729561650682206

In [49]:
3039162144 / (3039162144 + 29651883975)

0.09296619425811652

So 0.7% of all rows in `pq_crp_indivs` table have no unique ID associated to them, representing 9.3% of all spending from this table.

In [83]:
sc.sql(f"""
select 
  if(or(or(isnull(contribid), length(trim(contribid)) = 0), trim(contribid) = '1'), NULL, contribid) AS contribid2,
  count(*) AS num_tx,
  sum(amount) as total
from pq_crp_indivs
group by contribid2
order by total desc
""").limit(10).show()

+------------+------+----------+
|  contribid2|num_tx|     total|
+------------+------+----------+
|        null|452301|3430051966|
|U00000036521|   578| 200465427|
|U0000003216A|   590| 107372808|
|U00000003101|   561| 102643106|
|U0000000310A|   439|  82516918|
|U0000003235 |   810|  78153728|
|a0000174632 |   219|  70728398|
|U0000004604 |  1491|  69409891|
|U00000037041|   400|  67303317|
|U00000036901|  1176|  55063388|
+------------+------+----------+



In [80]:
sc.sql("select * from pq_crp_indivs where contribid LIKE '% 1' OR contribid = ' 1' OR contribid = '1' limit 10").show()

+-----+-------------------+------------+--------------------+---------+--------------------+------+--------+----------+------+------+---------+-----+-----+---------+----+---------+---------+------+------------------+----------+--------+------+
|Cycle|         FECTransID|   ContribID|             Contrib|  RecipID|             Orgname|UltOrg|RealCode|      Date|Amount|Street|     City|State|  Zip|RecipCode|Type|   CmteID|  OtherID|Gender|         Microfilm|Occupation|Employer|Source|
+-----+-------------------+------------+--------------------+---------+--------------------+------+--------+----------+------+------+---------+-----+-----+---------+----+---------+---------+------+------------------+----------+--------+------+
| 2000|3061920110007388953|           1|        HARRY BROWNE|N00000011|[Candidate Contri...|      |   Z9000|2000-06-27| 10000|      | FRANKLIN|   TN|37069|       3L| 15C|C00357582|P60003043|      |20990168495       |          |        | Rept |
| 2000|30619201100073889

In [84]:
sc.sql("""
SELECT * FROM pq_crp_industry_names WHERE catcode = 'Z9000'
""").show()

+-------+--------------------+--------+--------------------+---------+----------+
|Catcode|             Catname|Catorder|            Industry|   Sector|SectorLong|
+-------+--------------------+--------+--------------------+---------+----------+
|  Z9000|Candidate contrib...|     Z07|Candidate Self-fi...|Candidate| Candidate|
+-------+--------------------+--------+--------------------+---------+----------+



In [96]:
crpindivs = get_ids('pq_crp_indivs', 'contribid', 'other').na.drop()\
                                                          .filter(F.length(F.trim(F.col('entity_id'))) > 0)\
                                                          .filter(F.trim(F.col('entity_id')) != '1')


SELECT DISTINCT(contribid) AS entity_id,
  'other' AS entity_type,
  'pq_crp_indivs/contribid' AS src_table
FROM pq_crp_indivs



In [97]:
crpindivs.count()

10047685

In [87]:
crpindivs.show(10)

+------------+-----------+--------------------+
|   entity_id|entity_type|           src_table|
+------------+-----------+--------------------+
|a0000807085 |      other|pq_crp_indivs/con...|
|a0000979770 |      other|pq_crp_indivs/con...|
|a0000363265 |      other|pq_crp_indivs/con...|
|a00002936451|      other|pq_crp_indivs/con...|
|a0000093633A|      other|pq_crp_indivs/con...|
|a0000852208 |      other|pq_crp_indivs/con...|
|a0001716974 |      other|pq_crp_indivs/con...|
|a00000967811|      other|pq_crp_indivs/con...|
|b0608437548 |      other|pq_crp_indivs/con...|
|a0001160556 |      other|pq_crp_indivs/con...|
+------------+-----------+--------------------+
only showing top 10 rows



In [98]:
distinctOthers = crprecips.unionAll(crpindivs)

In [99]:
distinctOthers.count()

11718180

## todo

- put distinct candidates, distinct PACs, and distinct others into a central entities table
- create linkages table using data that can be resolved to a transaction between two of the entities in the entities table
- convert linkages that amount to an independent expenditure against a candidate by a PAC into a contribution to a competitor using the G/P (general/primary) flag