In [1]:
import pandas as pd
from glob import glob
from tqdm import tqdm
import dask
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from dask.multiprocessing import get
import numpy as np
import pyarrow
import time

### Clean up movers

In [2]:
movers = dd.read_csv(
    '/home/data/infutor/CRD4/bay_area_movers*.csv',
    dtype={'county_seq_' + str(x): str for x in range(1, 11)},
    assume_missing=True
)

In [3]:
movers['not_valid'] = (
    (movers.addrid_seq_1.notnull() & movers.effdate_seq_1.isna()) |
    (movers.addrid_seq_2.notnull() & movers.effdate_seq_2.isna()) |
    (movers.addrid_seq_3.notnull() & movers.effdate_seq_3.isna()) |
    (movers.addrid_seq_4.notnull() & movers.effdate_seq_4.isna()) |
    (movers.addrid_seq_5.notnull() & movers.effdate_seq_5.isna()) |
    (movers.addrid_seq_6.notnull() & movers.effdate_seq_6.isna()) |
    (movers.addrid_seq_7.notnull() & movers.effdate_seq_7.isna()) |
    (movers.addrid_seq_8.notnull() & movers.effdate_seq_8.isna()) |
    (movers.addrid_seq_9.notnull() & movers.effdate_seq_9.isna()) |
    (movers.addrid_seq_10.notnull() & movers.effdate_seq_10.isna()))

In [4]:
with ProgressBar():
    validated_movers = movers.compute()

[########################################] | 100% Completed |  1min  1.8s


In [5]:
cleaned_movers = validated_movers[validated_movers['not_valid'] == False]

In [7]:
cleaned_movers[cleaned_movers['pid_a'] == 'Y39394571272644']

Unnamed: 0,pid_a,county_seq_1,addrid_seq_1,effdate_seq_1,county_seq_2,addrid_seq_2,effdate_seq_2,county_seq_3,addrid_seq_3,effdate_seq_3,...,county_seq_8,addrid_seq_8,effdate_seq_8,county_seq_9,addrid_seq_9,effdate_seq_9,county_seq_10,addrid_seq_10,effdate_seq_10,not_valid
50133,Y39394571272644,13,239865960.0,200803.0,13,239865975.0,200803.0,,,,...,,,,,,,,,,False


In [6]:
cleaned_movers.shape

(5999924, 32)

In [7]:
cleaned_movers.to_parquet('../data/cleaned_movers.parquet', engine='pyarrow')

### Process movers wide to long

In [2]:
cleaned_movers = dd.read_parquet('../data/cleaned_movers.parquet', engine='pyarrow')

In [3]:
cleaned_movers = cleaned_movers.repartition(npartitions=10000)

In [4]:
def process_df(df):
    
    sttm = time.time()
    out_cols = ['pid', 'from_addrid', 'to_addrid', 'from_effdate', 'to_effdate', 'from_county', 'to_county', 'seq']
    long_moves = pd.DataFrame(columns=out_cols, dtype=str)
    
    for x in range(1,10):

        from_county_col = 'county_seq_' + str(x)
        to_county_col = 'county_seq_' + str(x + 1)
        from_addrid_col = 'addrid_seq_' + str(x)
        to_addrid_col = 'addrid_seq_' + str(x + 1)
        from_effdate_col = 'effdate_seq_' + str(x)
        to_effdate_col = 'effdate_seq_' + str(x + 1)
    
        tmp = df[[
            'pid_a', from_addrid_col, to_addrid_col, from_effdate_col, 
            to_effdate_col, from_county_col, to_county_col]].copy(deep=True)
        tmp.loc[:, 'seq'] = x
        long_moves = pd.concat((long_moves, tmp.rename(columns=dict(zip(tmp.columns, out_cols)))))

    return long_moves

In [5]:
out_cols = ['pid', 'from_addrid', 'to_addrid', 'from_effdate', 'to_effdate', 'from_county', 'to_county', 'seq']

In [6]:
long_movers = cleaned_movers.map_partitions(process_df, meta=pd.DataFrame(columns=out_cols, dtype=str))

In [7]:
with ProgressBar():
    all_moves = long_movers.compute()

[########################################] | 100% Completed | 15min 46.7s


In [8]:
len(all_moves) == len(cleaned_movers) * 9

True

### Drop rows without full to/from data

In [12]:
all_moves = dd.from_pandas(all_moves, npartitions=10000)

In [19]:
moves_not_null = all_moves[all_moves['from_effdate'].notnull() & all_moves['to_effdate'].notnull()]

In [20]:
with ProgressBar():
    moves_not_null = moves_not_null.compute()

[########################################] | 100% Completed |  2min 42.7s


In [22]:
len(moves_not_null)

16713668

### Drop rows where move is between the same address ID

In [23]:
moves_not_dupe = moves_not_null[moves_not_null['from_addrid'] != moves_not_null['to_addrid']]

In [25]:
len(moves_not_dupe)

14768635

In [28]:
moves_not_dupe.head()

Unnamed: 0,pid,from_addrid,to_addrid,from_effdate,to_effdate,from_county,to_county,seq
0,Y39394600237303,10019445.0,18876210.0,200205.0,200912.0,85,85,3
0,Y39394856369551,142211808.0,214903362.0,199511.0,199708.0,85,77,1
0,Y39394693863001,176952249.0,200339258.0,199908.0,200006.0,37,97,1
0,Y39394693863001,176952249.0,175588889.0,200603.0,200611.0,37,37,4
0,Y39394693863001,175588889.0,200339258.0,200611.0,201006.0,37,97,5


### Save results

In [27]:
moves_not_dupe.to_parquet('../data/moves_long.parquet', engine='pyarrow')