In [1]:
from pathlib import Path
import pandas as pd
import seaborn as sns
import zipfile

In [30]:
df = pd.read_table('./data/indiv10.zip', usecols=[11], names=['occupation'], encoding='latin-1', sep='|')

In [31]:
df.occupation.value_counts().nlargest(100)

RETIRED                            160066
SELF                               112005
SELF-EMPLOYED                       89117
NONE                                70016
SELF EMPLOYED                       58344
                                    ...  
INTEL CORPORATION                     538
AMERICAN INCOME LIFE                  537
CITICORP / CITIBANK                   535
JOHN DEERE SHARED SERVICES INC.       535
BKD, LLP                              534
Name: occupation, Length: 100, dtype: int64

In [3]:
files = sorted(Path('data/').glob('indiv*.zip'))

In [4]:
files

[PosixPath('data/indiv10.zip'), PosixPath('data/indiv12.zip')]

In [71]:
for year in files:
    zf = zipfile.ZipFile(year)
    df = pd.read_table(zf.open('itcont.txt'), sep='|', encoding='latin-1', usecols=[11], names=['occupation'])
    counts = df.occupation.value_counts()
    total_counts = total_counts.add(counts, fill_value=0)

In [73]:
total_counts.nlargest(100).sort_values(ascending=False)

RETIRED                      670081.0
SELF-EMPLOYED                384327.0
SELF                         337635.0
NONE                         239246.0
SELF EMPLOYED                184627.0
                               ...   
JONES DAY                      1771.0
SOUTHERN COMPANY SERVICES      1756.0
WILLIAMS & JENSEN              1745.0
CREDIT SUISSE                  1734.0
DEAN FOODS COMPANY             1702.0
Length: 100, dtype: float64

### Dask

In [2]:
# convert csv to parquet
head = pd.read_csv('data/indiv_header_file.csv').rename(columns=str.lower)

In [5]:
# save data in parquet format
for year in files:
    zf = zipfile.ZipFile(year)
    df = pd.read_table(zf.open('itcont.txt'), sep='|', encoding='latin-1', names=head.columns, low_memory=False)
    par_name = year.with_suffix('.parq')
    df.to_parquet(par_name)

In [6]:
# load 
import dask.dataframe as dd

In [8]:
df = dd.read_parquet('data/indiv*.parq')

In [11]:
df.columns

Index(['cmte_id', 'amndt_ind', 'rpt_tp', 'transaction_pgi', 'image_num',
       'transaction_tp', 'entity_tp', 'name', 'city', 'state', 'zip_code',
       'employer', 'occupation', 'transaction_dt', 'transaction_amt',
       'other_id', 'tran_id', 'file_num', 'memo_cd', 'memo_text', 'sub_id'],
      dtype='object')

In [20]:
most_common = df.occupation.value_counts().nlargest(100)
most_common.compute().sort_values(ascending=False)

RETIRED                 837651
ATTORNEY                395402
PHYSICIAN               184082
PRESIDENT               175055
HOMEMAKER               165711
                         ...  
FIRE FIGHTER / EMS        4941
BUSINESS EXECUTIVE        4908
SMALL BUSINESS OWNER      4899
DOCTOR OF OPTOMETRY       4849
INFO REQUESTED            4724
Name: occupation, Length: 100, dtype: int64

In [23]:
sorted(most_common.dask.items())

[(('getitem-5d77430cfb6f8afc31fc1d7a4f5596b2', 0),
  (subgraph_callable,
   ('read-parquet-11a1dcf32f077ae994c95c71dfdfb0d4', 0),
   'occupation')),
 (('getitem-5d77430cfb6f8afc31fc1d7a4f5596b2', 1),
  (subgraph_callable,
   ('read-parquet-11a1dcf32f077ae994c95c71dfdfb0d4', 1),
   'occupation')),
 (('read-parquet-11a1dcf32f077ae994c95c71dfdfb0d4', 0),
  (<function dask.dataframe.io.parquet.core.read_parquet_part(func, fs, meta, part, columns, index, kwargs)>,
   <bound method ArrowEngine.read_partition of <class 'dask.dataframe.io.parquet.arrow.ArrowEngine'>>,
   <fsspec.implementations.local.LocalFileSystem at 0x7fbdf1cdd910>,
   Empty DataFrame
   Columns: [cmte_id, amndt_ind, rpt_tp, transaction_pgi, image_num, transaction_tp, entity_tp, name, city, state, zip_code, employer, occupation, transaction_dt, transaction_amt, other_id, tran_id, file_num, memo_cd, memo_text, sub_id]
   Index: []
   
   [0 rows x 21 columns],
   [('/Users/Danny/Documents/Learning/Modern_Pandas/data/indiv10.

In [25]:
individual_cols = ['cmte_id', 'entity_tp', 'employer', 'occupation', 'transaction_dt', 'transaction_amt']
indiv = dd.read_parquet('data/indiv*.parq', columns=individual_cols, engine='pyarrow')

In [26]:
indiv

Unnamed: 0_level_0,cmte_id,entity_tp,employer,occupation,transaction_dt,transaction_amt
npartitions=2,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
,object,object,object,object,float64,int64
,...,...,...,...,...,...
,...,...,...,...,...,...


In [44]:
indiv.head()

Unnamed: 0,cmte_id,entity_tp,employer,occupation,transaction_dt,transaction_amt
0,C00122176,IND,RETIRED,RETIRED,6292009.0,400
1,C00122176,IND,WINSTON & STRAWN,"ATTORNEY, RETIRED",6292009.0,250
2,C00122176,IND,"FLANNER & BUCHANAN, INC.",PRESIDENT,4142009.0,1000
3,C00122176,IND,INDIANA UNIVERSITY,"RETIRED, PROFESSOR EMERITUS",5112009.0,500
4,C00122176,IND,SELF,ATTORNEY,5262009.0,250


In [27]:
avg_tran = indiv.transaction_amt.mean()

In [31]:
avg_tran.compute()

1106.404532206981

In [54]:
total_by_emp = (indiv.groupby(['employer'])
                ['transaction_amt'].sum()
                .nlargest(100)
               )

In [55]:
total_by_emp.compute()

employer
RETIRED                          376108539
SELF-EMPLOYED                    323222775
SELF                             188623332
HOMEMAKER                        149997902
NONE                             130257437
                                   ...    
BOIES, SCHILLER & FLEXNER LLP      1616598
ENCIMA GLOBAL LLC                  1613500
FIDELITY INVESTMENTS               1608796
H AND H                            1594598
THE BLACKSTONE GROUP               1576986
Name: transaction_amt, Length: 100, dtype: int64