In [1]:
from dask import delayed
import dask.dataframe as dd
from dask.distributed import Client
import numpy as np
import pandas as pd

In [2]:
### Options ###
z4types = ('H', 'S', 'R')
county_codes = ("037", "059")

In [None]:
client = Client()
client

In [3]:
### Extract all moves ###

# Settings
usecols_all_states = ['z4type', 'effdate']
for i in range(2, 11):
    usecols_all_states.append(f'z4type{i}')
    usecols_all_states.append(f'effdate{i}')
for i in range(1, 11):
    usecols_all_states.append(f'fips{i}')

dtype_all_states = {
    'z4type': 'object',
    'effdate': 'float',
    'fips1': 'object',
}
for i in range(2, 11):
    dtype_all_states[f'z4type{i}'] = 'object'
    dtype_all_states[f'effdate{i}'] = 'float'
    dtype_all_states[f'fips{i}'] = 'object'


# DASK: Load all_states
dd_all_states = dd.read_csv(
    "data/all_states.csv", 
    usecols=usecols_all_states,
    dtype=dtype_all_states
)

In [12]:
# PANDAS
df_all_states = pd.read_csv(
    "data/all_states.csv", 
    usecols=usecols_all_states,
    dtype=dtype_all_states
)

In [5]:
# Settings
list_dict_col_names = [
    {
        'z4type': 'z4type',
        'effdate': 'effdate',
        'fips1': 'fips',
    }
]
for i in range(2, 11):
    list_dict_col_names.append(
        {
            f'z4type{i}': 'z4type',
            f'effdate{i}': 'effdate',
            f'fips{i}': 'fips',
        }
    )


# DASK: Split rows into individual areas
list_dd_all_areas = [
    dd_all_states[[*list_dict_col_names[i].keys()]].rename(
        columns=list_dict_col_names[i]
    ) for i in range(10)
]

# Recombine all areas
dd_all_areas = dd.concat(list_dd_all_areas)


def reduce_effdate(group):
    print(group)

dd_all_areas.groupby([dd_all_areas.index, dd_all_areas['effdate']], dropna=True).apply(reduce_effdate)

  z4type  effdate fips
1    foo      1.0  foo
  z4type  effdate fips
2    foo      1.0  foo


  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  dd_all_areas_dropna.groupby([dd_all_areas.index, dd_all_areas['effdate']]).apply(reduce_effdate)


NotImplementedError: groupby-apply with a multiple Series is currently not supported

In [14]:
# PANDAS: Split rows into individual areas
list_df_all_areas = [
    df_all_states[list_dict_col_names[i].keys()].rename(
        columns=list_dict_col_names[i]
    ) for i in range(10)
]

# Recombine all areas + sort effdate
df_all_areas = pd.concat(list_df_all_areas)

df_all_areas = (
    df_all_areas.dropna(subset='effdate')
        .sort_values('effdate', kind='stable')
)

# All effdates that do not have a z4type in z4types
bi_all_dropped = ~(
    df_all_areas['z4type'].isin(z4types)
        .groupby([df_all_areas.index, df_all_areas['effdate']])
        .transform('any')
)

# Change values so selected effdates are not removed
df_all_areas.loc[bi_all_dropped, 'z4type'] = 'empty'
df_all_areas.loc[bi_all_dropped, 'fips'] = ""

# Filter by Zip+4 type
z4types_mask = (*z4types, 'empty')

df_filtered_areas = df_all_areas[
    df_all_areas['z4type'].isin(z4types_mask)
]

# Choose leftmost fips of each effdate
df_filtered_areas = (
    df_filtered_areas.groupby([df_filtered_areas.index, 'effdate']).first()
)
df_filtered_areas = df_filtered_areas.reset_index('effdate')

Run one of the below cells &darr;

In [None]:
# Link previous & next areas as moves

In [None]:
# dropna fips before shifting
df_all_moves = df_filtered_areas[['effdate', 'fips']].dropna()
df_all_moves = df_all_moves.rename(columns={'fips': 'destfips'})
df_all_moves['origfips'] = (
    df_all_moves.groupby(df_all_moves.index)['destfips']
    .shift(fill_value="first record")
)

In [None]:
# dropna origfips & destfips
df_all_moves = df_filtered_areas[['effdate', 'fips']]
df_all_moves = df_all_moves.rename(columns={'fips': 'destfips'})
df_all_moves['origfips'] = (
    df_all_moves.groupby(df_all_moves.index)['destfips']
    .shift(fill_value="first record")
)
df_all_moves = df_all_moves.dropna()

In [26]:
# don't dropna
df_all_moves = df_filtered_areas[['effdate', 'fips']]
df_all_moves = df_all_moves.rename(columns={'fips': 'destfips'})
df_all_moves['origfips'] = (
    df_all_moves.groupby(df_all_moves.index)['destfips']
    .shift(fill_value="first record")
)

Run one of the above cells &uarr;

In [27]:
# Filter by county code
df_filtered_moves = df_all_moves[
    df_all_moves['origfips'].str[2:5].isin(county_codes) |
        df_all_moves['destfips'].str[2:5].isin(county_codes)
]

In [28]:
# Separate moves into periods

def extract_period(start, stop):
    return df_filtered_moves[
        df_filtered_moves['effdate'].between(start, stop, inclusive='left')
    ][['origfips', 'destfips']]

df_03_07_moves = extract_period(200301, 200801)
df_08_12_moves = extract_period(200801, 201301)
df_13_17_moves = extract_period(201301, 201801)

In [29]:
arg_to_csv = {
    'index': False,
}

df_03_07_moves.to_csv("data/direct/03_07_moves.csv", **arg_to_csv)
df_08_12_moves.to_csv("data/direct/08_12_moves.csv", **arg_to_csv)
df_13_17_moves.to_csv("data/direct/13_17_moves.csv", **arg_to_csv)

In [8]:
### Data analysis ###

# Settings
usecols_fips_tract = ['tractid_fips', 'gainers', 'losers']

dtype_fips_tract = {
    'tractid_fips': 'object',
    'gainers': 'bool',
    'losers': 'bool',
}


# Load fips_tracts_cats
df_fips_tract = pd.read_csv(
    "fips_tracts_cats.csv",
    usecols=usecols_fips_tract,
    dtype=dtype_fips_tract
)

# Get Series of gainers and losers
se_gainers = df_fips_tract[df_fips_tract['gainers']]['tractid_fips']
se_losers = df_fips_tract[df_fips_tract['losers']]['tractid_fips']

In [9]:
# Compute gain & loss totals

def move_totals(df_moves):
    se_gain_total = (
        df_moves.groupby('destfips')
            .size()
            .reindex(se_gainers, fill_value=0)
            .rename('count')
    )
    se_loss_total = (
        df_moves.groupby('origfips')
            .size()
            .reindex(se_losers, fill_value=0)
            .rename('count')
    )
    return se_gain_total, se_loss_total

se_03_07_gain_total, se_03_07_loss_total = move_totals(df_03_07_moves)
se_08_12_gain_total, se_08_12_loss_total = move_totals(df_08_12_moves)
se_13_17_gain_total, se_13_17_loss_total = move_totals(df_13_17_moves)

In [59]:
# Write to files
se_03_07_gain_total.to_csv("data/direct/gaintotal_p0.csv")
se_03_07_loss_total.to_csv("data/direct/losstotal_p0.csv")

se_08_12_gain_total.to_csv("data/direct/gaintotal_p1.csv")
se_08_12_loss_total.to_csv("data/direct/losstotal_p1.csv")

se_13_17_gain_total.to_csv("data/direct/gaintotal_p2.csv")
se_13_17_loss_total.to_csv("data/direct/losstotal_p2.csv")

Note: Almost no origfips to same destfips moves now, likely because duplicate effdates were removed\
Note2: Failed geocoded fips kept

In [33]:
# Compute matrices

def matrix(df_moves):
    se_indices = (
        pd.concat([df_moves['origfips'], df_moves['destfips']])
            .drop_duplicates()
    )

    df_matrix = (
        df_moves.groupby(['origfips', 'destfips'])
            .size()
            .unstack(fill_value=0)
            .reindex(se_indices, columns=se_indices, fill_value=0)
    )

    return df_matrix

df_03_07_matrix = matrix(df_03_07_moves)
df_08_12_matrix = matrix(df_08_12_moves)
df_13_17_matrix = matrix(df_13_17_moves)

In [16]:
# Write to files
df_03_07_matrix.to_csv("data/direct/matrix_p0.csv")
df_08_12_matrix.to_csv("data/direct/matrix_p1.csv")
df_13_17_matrix.to_csv("data/direct/matrix_p2.csv")

In [34]:
# Compute high gains & losses

def high_moves(df_matrix):
    df_zeroed_matrix = df_matrix.copy(deep=True)

    np.fill_diagonal(df_zeroed_matrix.values, 0)

    # Sum across rows and columns
    df_high_loss = df_zeroed_matrix.sum(axis=1).to_frame(name='count')
    df_high_gain = df_zeroed_matrix.sum().to_frame(name='count')

    # Label fips based on gainers & losers
    condList_high_loss = [
        df_high_loss.index.isin(se_gainers), df_high_loss.index.isin(se_losers)
    ]
    condList_high_gain = [
        df_high_gain.index.isin(se_gainers), df_high_gain.index.isin(se_losers)
    ]
    choiceList_high = ['gain', 'loss']

    df_high_loss['type'] = pd.Categorical(
        np.select(condList_high_loss, choiceList_high, default='other')
    )
    df_high_gain['type'] = pd.Categorical(
        np.select(condList_high_gain, choiceList_high, default='other')
    )

    return df_high_loss, df_high_gain

df_03_07_high_loss, df_03_07_high_gain = high_moves(df_03_07_matrix)
df_08_12_high_loss, df_08_12_high_gain = high_moves(df_08_12_matrix)
df_13_17_high_loss, df_13_17_high_gain = high_moves(df_13_17_matrix)

In [24]:
# Write to files
df_03_07_high_loss.to_csv("data/direct/out_of_high_loss_p0.csv")
df_03_07_high_gain.to_csv("data/direct/into_high_gain_p0.csv")

df_08_12_high_loss.to_csv("data/direct/out_of_high_loss_p1.csv")
df_08_12_high_gain.to_csv("data/direct/into_high_gain_p1.csv")

df_13_17_high_loss.to_csv("data/direct/out_of_high_loss_p2.csv")
df_13_17_high_gain.to_csv("data/direct/into_high_gain_p2.csv")

In [37]:
# Compute summaries

def summary(df_high_moves):
    df_summary = pd.DataFrame(columns=['count'])
    
    # Sum each of gainers, losers, and other
    df_summary.loc["from_loss"] = (
        df_high_moves[df_high_moves.index.isin(se_losers)]['count'].sum()
    )
    df_summary.loc["from_gain"] = (
        df_high_moves[df_high_moves.index.isin(se_gainers)]['count'].sum()
    )
    df_summary.loc["from_other"] = (
        df_high_moves['count'].sum()
        - df_summary.loc["from_loss"]
        - df_summary.loc["from_gain"]
    )

    return df_summary

df_03_07_high_loss_summary = summary(df_03_07_high_loss)
df_03_07_high_gain_summary = summary(df_03_07_high_gain)

df_08_12_high_loss_summary = summary(df_08_12_high_loss)
df_08_12_high_gain_summary = summary(df_08_12_high_gain)

df_13_17_high_loss_summary = summary(df_13_17_high_loss)
df_13_17_high_gain_summary = summary(df_13_17_high_gain)

In [38]:
# Write to files
df_03_07_high_loss_summary.to_csv("data/direct/out_of_high_loss_summary_p0.csv")
df_03_07_high_gain_summary.to_csv("data/direct/into_high_gain_summary_p0.csv")

df_08_12_high_loss_summary.to_csv("data/direct/out_of_high_loss_summary_p1.csv")
df_08_12_high_gain_summary.to_csv("data/direct/into_high_gain_summary_p1.csv")

df_13_17_high_loss_summary.to_csv("data/direct/out_of_high_loss_summary_p2.csv")
df_13_17_high_gain_summary.to_csv("data/direct/into_high_gain_summary_p2.csv")