In [3]:
%pip install ipumspy networkx


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.1[0m[39;49m -> [0m[32;49m24.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49m/Library/Frameworks/Python.framework/Versions/3.10/bin/python3 -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [4]:
import gzip
import pandas as pd
import os
import datetime
import pickle
import networkx as nx
from collections import defaultdict
from ipumspy import readers, ddi
from itertools import combinations, groupby
import matplotlib.pyplot as plt
import random
import numpy as np
from operator import itemgetter

In [20]:
DEFAULT_VARS = ['YEAR', 'SAMPLE', 'SERIAL', 'CBSERIAL', 'HHWT', 'CLUSTER', 'STRATA', 'GQ', 'PERNUM', 'PERWT'] 
WEALTH_PROXIES = ['INCINVST', 'INCBUS00']
INCOMES = ['INCWAGE', 'INCBUS00', 'INCSS', 'INCWELFR', 'INCINVST', 'INCRETIR', 'INCSUPP', 'INCOTHER']

def create_directory_with_timestamp(descriptor='graph'):
    now = datetime.datetime.now()
    timestamp = now.strftime("%Y-%m-%d_%H-%M-%S")
    directory_name = f"graphs/{descriptor}_{timestamp}"
    if not os.path.exists(directory_name):
        os.makedirs(directory_name)
    return directory_name


def add_to_data_graph(df, graph, industry_state_dict):
    nodes = []
    for index, row in df.iterrows():
        if row['AGE'] >= 15:    #otherwise lots of n/a values
            node_id = str(row['SERIAL']) + '-' + str(row['PERNUM'])
            industry_state = str(row['STATEICP']) + '-' + str(row['IND'])
            wealth_proxy = row[WEALTH_PROXIES].sum()
            node_attributes = row.drop(DEFAULT_VARS).to_dict()
            node_attributes['wealth_proxy'] = wealth_proxy
            node_attributes['state_x_industry'] = industry_state

            nodes.append((node_id, node_attributes))
            industry_state_dict[industry_state].append(node_id)

    graph.add_nodes_from(nodes)
    return graph, industry_state_dict


def process_data(iterator, dir, save=False):
    print("Processing IPUMS data (might take a while)\n")
    nodes_graph = nx.DiGraph()
    industry_state_dict = defaultdict(list)
    i = 0
    for df_chunk in iterator: 
        print(f"\rProcessed: {i * 1000}", end='', flush=True)
        nodes_graph, nodes_industry_state_dict = add_to_data_graph(df_chunk, nodes_graph, industry_state_dict)
        i += 1

    print()
    
    if save:
        print("Saving results...")

        filename = os.path.join(dir, "nodes_graph.pkl")
        with open(filename, 'wb') as f:
            pickle.dump(nodes_graph, f)

        dict_filename = os.path.join(dir, "industry_state_dict.pkl")
        with open(dict_filename, 'wb') as f:
            pickle.dump(nodes_industry_state_dict, f)

    return nodes_graph, nodes_industry_state_dict


def collect_edges(graph, industry_state_dict, incl_weight_if_0 = False):
    edges_dict = defaultdict(list)
    total_industries = len(industry_state_dict)
    count = 1

    for industry_x_state, nodes in industry_state_dict.items():
        print(f"\rCollecting edges: {count}/{total_industries} industry x states processed", end='', flush=True)
        if len(nodes) > 1:
            nodes_data = [(node, graph.nodes[node]['wealth_proxy']) for node in nodes]
            nodes_data.sort(key=lambda x: x[1], reverse=True)
            for i in range(len(nodes_data) - 1):
                u, wealth_u = nodes_data[i]
                for j in range(i + 1, len(nodes_data)):
                    v, wealth_v = nodes_data[j]
                    weight = wealth_u - wealth_v

                    if weight != 0 or incl_weight_if_0:
                        edges_dict[industry_x_state].append((u, v, weight))
        count += 1
    print()
    return edges_dict

def construct_graph(nodes_graph, edges_dict):
    big_graph = nodes_graph
    total_industries = len(edges_dict)
    count = 0

    for industry_x_state, edges in edges_dict.items():
        print(f"\rConstructing big graph: {count}/{total_industries} industry x states processed", end='', flush=True)
        count += 1
        if len(edges) > 1:
            big_graph.add_weighted_edges_from(edges)

    print("\nDone!")
    return big_graph


def process_industry(industry_x_state, nodes, graph, incl_weight_if_0):
    nodes_data = np.array([(node, float(graph.nodes[node]['wealth_proxy'])) for node in nodes])
    if len(nodes_data) > 1:
        nodes_data = nodes_data[nodes_data[:, 1].argsort()[::-1]]  # Ensure sorting by float values
        for (u, wealth_u), (v, wealth_v) in combinations(nodes_data, 2):
            weight = float(wealth_u) - float(wealth_v)
            if weight != 0 or incl_weight_if_0:
                yield (industry_x_state, u, v, weight)

def collect_edges_to_generator(graph, industry_state_dict, incl_weight_if_0=False):
    total_industries = len(industry_state_dict)
    for i, (industry_x_state, nodes) in enumerate(industry_state_dict.items(), 1):
        print(f"\rCollecting edges: {i}/{total_industries} industry x states processed", end='', flush=True)
        yield from process_industry(industry_x_state, nodes, graph, incl_weight_if_0)
    print()

def construct_graph_from_generator(nodes_graph, edges_generator):
    big_graph = nodes_graph.copy()  # Create a copy to avoid modifying the original graph
    
    # Group edges by industry_x_state
    sorted_edges = sorted(edges_generator, key=itemgetter(0))
    grouped_edges = groupby(sorted_edges, key=itemgetter(0))
    
    total_industries = sum(1 for _ in grouped_edges)  # Count the number of industries
    
    # Reset the groupby object
    grouped_edges = groupby(sorted_edges, key=itemgetter(0))
    
    for count, (industry_x_state, industry_edges) in enumerate(grouped_edges, 1):
        print(f"\rConstructing big graph: {count}/{total_industries} industry x states processed", end='', flush=True)
        edges = [(u, v, weight) for _, u, v, weight in industry_edges]
        if len(edges) > 1:
            big_graph.add_weighted_edges_from(edges)
    
    print("\nDone!")

def construct_graphs(industry_state_dict, edge_dict, dir, batch_size=10):
    total_industries = len(edge_dict)
    count = 0

    subgraphs_to_save = []
    for industry_x_state, edges in edge_dict.items():
        print(f"\rConstructing graphs: {count}/{total_industries} industry x states processed", end='', flush=True)
        print(len(edges))
        print(edges[0])
        count += 1
        if len(edges) > 1:
            nodes = industry_state_dict[industry_x_state]

            # Create a new DiGraph for each subgraph
            subgraph = nx.DiGraph()
            subgraph.add_nodes_from(nodes)
            subgraph.add_weighted_edges_from(edges)
            subgraphs_to_save.append((subgraph, industry_x_state))

            # Save subgraphs in batches
            if count % batch_size == 0 or count == total_industries:
                save_subgraphs(subgraphs_to_save, dir)
                subgraphs_to_save = []  # Reset batch
    print("\nDone!")
    return big_graph


def save_subgraphs(subgraphs_to_save, dir):
    for subgraph, industry_x_state in subgraphs_to_save:
        filename = os.path.join(dir, industry_x_state + '.pkl')
        with open(filename, 'wb') as f:
            pickle.dump(subgraph, f)

In [21]:
# # PARSE IPUMS DATASET FOR NODES AND INDUSTRY X STATE MAP

In [22]:
ddi_codebook = readers.read_ipums_ddi('data/ipums_codebook.xml')
iter_microdata = readers.read_microdata_chunked(ddi_codebook, 'data/ipums.dat.gz', chunksize=1000)
print("Set up IPUMS Readers")

dir = create_directory_with_timestamp('full_graph')
nodes_graph, nodes_industry_state_dict = process_data(iter_microdata, dir, save=True)
print(f"Saved nodes graph and nodes/industryXstate map to {dir}")

Set up IPUMS Readers
Processing IPUMS data (this might take a while)

Processed: 3373000
Saving results...
Saved nodes graph and nodes/industryXstate map to graphs/full_graph_2024-06-26_11-58-48


In [None]:
# # LOAD AND COLLECT ALL EDGES (usually infeasible bc of size of dataset)

In [None]:
filename = f"{dir}/nodes_graph.pkl"
with open(filename, 'rb') as f:
    nodes_graph = pickle.load(f)

filename = f"{dir}/nodes_dict.pkl"
with open(filename, 'rb') as f:
    nodes_industry_state_dict = pickle.load(f)

In [13]:
edges = collect_edges(nodes_graph, nodes_industry_state_dict)

Collecting edges: 1/13044 industry x states processed

KeyboardInterrupt: 

In [None]:
# # SUBSAMPLE 1% OF NODES

In [14]:
def subsample_nodes(nodes_graph, nodes_industry_state_dict, sample_fraction=0.01):
    print(f"Subsampling {sample_fraction * 100}% of nodes from the graph...")
    all_nodes = list(nodes_graph.nodes)
    
    # Subsample 1% of nodes
    sample_size = int(len(all_nodes) * sample_fraction)
    print(f"Total nodes: {len(all_nodes)}, Sample size: {sample_size}")
    sampled_nodes = set(random.sample(all_nodes, sample_size))
    subsampled_graph = nodes_graph.subgraph(sampled_nodes).copy()
    print("Subsampling nodes completed.")

    # Create new industry_state_dict with subsampled nodes
    print("Creating new industry_state_dict with subsampled nodes...")
    subsampled_industry_state_dict = defaultdict(list)
    for industry_state, nodes in nodes_industry_state_dict.items():
        subsampled_nodes_in_state = [node for node in nodes if node in sampled_nodes]
        if subsampled_nodes_in_state:
            subsampled_industry_state_dict[industry_state].extend(subsampled_nodes_in_state)
    print("New industry_state_dict creation completed.")

    return subsampled_graph, subsampled_industry_state_dict

In [15]:
subnodes, subdict = subsample_nodes(nodes_graph, nodes_industry_state_dict)

subsample_dir = "graphs/subsample_1percent"

filename = os.path.join(subsample_dir, "nodes_graph.pkl")
with open(filename, 'wb') as f:
        pickle.dump(subnodes, f)

filename = os.path.join(subsample_dir, "nodes_dict.pkl")
with open(filename, 'wb') as f:
        pickle.dump(subdict, f)

Subsampling 1.0% of nodes from the graph...
Total nodes: 3373378, Sample size: 33733
Subsampling nodes completed.
Creating new industry_state_dict with subsampled nodes...
New industry_state_dict creation completed.


In [12]:
subsample_dir = "graphs/subsample_1percent"

filename = "graphs/subsample_1percent/nodes_graph.pkl"
with open(filename, 'rb') as f:
    subnodes = pickle.load(f)

filename = "graphs/subsample_1percent/nodes_dict.pkl"
with open(filename, 'rb') as f:
    subdict = pickle.load(f)

In [13]:
edges_dict = collect_edges(subnodes, subdict)

print("saving edges industryXstate map")
filename = os.path.join(subsample_dir, 'edges_dict.pkl')
with open(filename, 'wb') as f:
    pickle.dump(edges_dict, f)

Collecting edges: 3000/5270 industry x states processed

IOPub message rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_msg_rate_limit`.

Current values:
ServerApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
ServerApp.rate_limit_window=3.0 (secs)



In [14]:
graph = construct_graph(subnodes, edges_dict)

print("Saving graph")
filename = os.path.join(subsample_dir, 'graph.pkl')
with open(filename, 'wb') as f:
    pickle.dump(graph, f)

Constructing big graph: 1521/1522 industry x states processed
Done!
saving graph


In [2]:
filename = "graphs/subsample_1percent/graph.pkl"
with open(filename, 'rb') as f:
    nodes_graph = pickle.load(f)

In [3]:
with gzip.open('graphs/subsample_1percent/graph.pkl', 'wb') as file:
    pickle.dump(nodes_graph, file)

In [None]:
# # SUBSAMPLE FOR ILLINOIS (still too big to be able to process for me so far(

In [50]:
def subsample_nodes_by_state(nodes_graph, nodes_industry_state_dict, icp=21):
    # Identify nodes with matching STATEICP (default of 21 is the default state)
    state_nodes = [node for node, data in nodes_graph.nodes(data=True) if data.get('STATEICP') == icp]
    print(f"Found {len(state_nodes)} nodes with STATEICP matching {icp}")


    # Create a subgraph with only state nodes
    subsampled_graph = nodes_graph.subgraph(state_nodes).copy()
    total_industries = len(nodes_industry_state_dict)
    count = 0
    print("Created subgraph with state nodes")
    
    # Create new industry_state_dict with subsampled nodes
    subsampled_industry_state_dict = {
        industry_state: nodes for industry_state, nodes in nodes_industry_state_dict.items()
        if industry_state.startswith(str(icp) + "-")
    }

    print("Created industry_state_dict with subsampled nodes")
    return subsampled_graph, subsampled_industry_state_dict


In [26]:
# subsample nodes graph and nodes industry X state dict
subnodes, subdict = subsample_nodes_by_state(nodes_graph, nodes_industry_state_dict, icp=21)

subsample_dir = "graphs/subsample_illinois"

print("saving nodes graph")
filename = os.path.join(subsample_dir, "nodes_graph.pkl")
with open(filename, 'wb') as f:
        pickle.dump(subnodes, f)

print("saving nodes industry x state dictionary")
filename = os.path.join(subsample_dir, "nodes_dict.pkl")
with open(filename, 'wb') as f:
        pickle.dump(subdict, f)

NameError: name 'subsample_nodes_by_state' is not defined

In [45]:
subsample_dir = "graphs/subsample_illinois"
filename = "graphs/subsample_illinois/nodes_graph.pkl"
with open(filename, 'rb') as f:
    subnodes = pickle.load(f)

filename = "graphs/subsample_illinois/nodes_dict.pkl"
with open(filename, 'rb') as f:
    subdict = pickle.load(f)

In [None]:
edges = collect_edges_to_generator(subnodes, subdict)
graph = construct_graph_from_generator(subnodes, edges)

print("saving graph")
filename = os.path.join(subsample_dir, 'graph.pkl')
with open(filename, 'wb') as f:
    pickle.dump(graph, f)

# print("saving edges industryXstate map")
# filename = os.path.join(subsample_dir, 'edges_dict.pkl')
# with open(filename, 'wb') as f:
#     pickle.dump(edges_dict, f)

Collecting edges: 1/270 industry x states processed

In [7]:
edges_dict = collect_edges_to_generator(subnodes, subdict)
graph = construct_graph_from_generator(subnodes, edges_dict)

print("saving graph")
filename = os.path.join(subsample_dir, 'graph.pkl')
with open(filename, 'wb') as f:
    pickle.dump(graph, f)

Process SpawnProcess-2:
Process SpawnProcess-3:
Process SpawnProcess-1:
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/process.py", line 237, in _process_worker
    call_item = call_queue.get(block=True)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/queues.py", line 122, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'process_industry' on <module '__main__' (built-in)>
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/process.py", line 315, in _bootstrap

BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore

TypeError: object of type 'generator' has no len()

In [15]:
filename = "graphs/subsample_illinois/graph.pkl"
with open(filename, 'rb') as f:
    graph = pickle.load(f)

In [16]:
graph.number_of_edges()

0

In [76]:
# build edges dict from subsample
edges_dict = collect_edges(subnodes, subdict)

print("saving edges industryXstate map")
filename = os.path.join(subsample_dir, 'edges_dict.pkl')
with open(filename, 'wb') as f:
    pickle.dump(edges_dict, f)


saving edges industryXstate map


In [None]:
#
graph = construct_graph(subnodes, edges_dict)

print("saving graph")
filename = os.path.join(subsample_dir, 'graph.pkl')
with open(filename, 'wb') as f:
    pickle.dump(graph, f)

In [None]:
edges_dict = collect_edges(nodes_graph, nodes_industry_state_dict)

print("saving edges industryXstate map")
filename = os.path.join(dir, 'edges_dict.pkl')
with open(filename, 'wb') as f:
    pickle.dump(edges_dict, f)

Collecting edges: 460/13044 industry x states processed

In [None]:
# # RANDOM TESTING STUFF

In [None]:
len(nodes_industry_state_dict.values())

In [None]:
max([len(nodes)for nodes in nodes_industry_state_dict.values()])

In [15]:
graph.number_of_nodes()

25000

In [16]:
graph.number_of_edges()

67018723

In [17]:
def find_non_zero_weight_edges(graph):
    non_zero_edges = []
    for u, v, data in graph.edges(data=True):
        if data.get('weight', 0) != 0:
            non_zero_edges.append((u, v, data['weight']))
    return non_zero_edges

# Example usage
non_zero_edges = find_non_zero_weight_edges(graph)

In [19]:
len(non_zero_edges)/graph.number_of_edges()

0.5232078205966413

In [None]:
# for one at a time stuff
graph = nx.DiGraph()
industry_state_dict = defaultdict(list)
graph, industry_state_dict = add_to_data_graph(first_chunk, graph, industry_state_dict)
graph = add_edges(graph, industry_state_dict)

In [8]:
first_chunk = next(iter_microdata)
first_chunk.head()

Unnamed: 0,YEAR,SAMPLE,SERIAL,CBSERIAL,HHWT,CLUSTER,STATEICP,STRATA,GQ,HHINCOME,...,FTOTINC,INCWAGE,INCBUS00,INCSS,INCWELFR,INCINVST,INCRETIR,INCSUPP,INCOTHER,INCEARN
0,2022,202201,1,2022010000031,69.0,2022000000011,41,280301,3,9999999,...,9999999,0,0,18800,0,0,0,0,0,0
1,2022,202201,2,2022010000111,22.0,2022000000021,41,200001,3,9999999,...,9999999,12500,0,0,0,0,0,0,0,12500
2,2022,202201,3,2022010000200,45.0,2022000000031,41,280301,3,9999999,...,9999999,16400,0,0,0,0,0,0,0,16400
3,2022,202201,4,2022010000261,4.0,2022000000041,41,110001,4,9999999,...,9999999,0,0,8600,0,0,0,0,0,0
4,2022,202201,5,2022010000296,47.0,2022000000051,41,150201,3,9999999,...,9999999,0,5000,0,0,0,0,0,0,5000


In [9]:
first_chunk[first_chunk['SERIAL'] == 215]

Unnamed: 0,YEAR,SAMPLE,SERIAL,CBSERIAL,HHWT,CLUSTER,STATEICP,STRATA,GQ,HHINCOME,...,FTOTINC,INCWAGE,INCBUS00,INCSS,INCWELFR,INCINVST,INCRETIR,INCSUPP,INCOTHER,INCEARN
214,2022,202201,215,2022010011646,15.0,2022000002151,41,40301,3,9999999,...,9999999,999999,999999,99999,99999,999999,999999,99999,99999,0


In [9]:
first_chunk.columns

Index(['YEAR', 'SAMPLE', 'SERIAL', 'CBSERIAL', 'HHWT', 'CLUSTER', 'STATEICP',
       'STRATA', 'GQ', 'HHINCOME', 'PERNUM', 'PERWT', 'SEX', 'AGE', 'MARST',
       'RACE', 'RACED', 'EDUC', 'EDUCD', 'EMPSTAT', 'EMPSTATD', 'OCC', 'IND',
       'INCTOT', 'FTOTINC', 'INCWAGE', 'INCBUS00', 'INCSS', 'INCWELFR',
       'INCINVST', 'INCRETIR', 'INCSUPP', 'INCOTHER', 'INCEARN'],
      dtype='object')