In [1]:
from dask.distributed import Client
dclient = Client(memory_limit='60GB', n_workers=8)

import dask
from dask import delayed
from dask.delayed import delayed

import dask.dataframe as dd
import dask.bag as db

import os
import pandas as pd
import geopandas as gpd
import numpy as np
import scipy.sparse
import scipy.sparse.linalg
import matplotlib.pyplot as plt
import boto3
from datetime import date, timedelta
from pathlib import Path

import subprocess
from os.path import exists

# Stolen from WH example for convenience. Not all used.

os.chdir("/home/ec2-user/Contacts-sensitive/")
session = boto3.session.Session(profile_name='yse-bioecon')
client = boto3.client('s3')



os.chdir("/home/ec2-user/Contacts-sensitive/functions")
dclient.upload_file('aws_um_funcs.py')
import aws_um_funcs as fcombo

os.chdir("/home/ec2-user/Contacts-sensitive/clump_filtering")
dclient.upload_file('clump_finding.py')
import clump_finding

#os.system("export PYTHONPATH=~/Contacts-sensitive/:~/Contacts-sensitive/functions/")
os.chdir("/home/ec2-user/Contacts-sensitive/")

sql_exp = ("SELECT * " +
           "FROM s3Object")

In [2]:
os.chdir("/home/ec2-user/Contacts-sensitive/functions")
dclient.upload_file('aws_um_funcs.py')
import aws_um_funcs as fcombo

os.chdir("/home/ec2-user/Contacts-sensitive/clump_filtering")
dclient.upload_file('clump_finding.py')
import clump_finding

import time
import networkx as nx
from copy import deepcopy

os.chdir("/home/ec2-user/Contacts-sensitive/graph_analysis")
import timestep_analysis

def contacts_to_matrix(df, weighting='per_contact', return_only_largest_component=True, trim_weights_to=None, matrix='laplacian'):
    """
    Generates the Laplacian or Adjacency matrix of the contact graph over the day. Can be modified for edge weighting.
    
    If return_weights is passed, instead just returns the edge weights.
    """
    if 'weight' in df.columns:
        weights = df
    else:
        weights = timestep_analysis.sum_contacts_by_pairs([df], weighting=weighting)
    
    G = nx.Graph()
    G.add_weighted_edges_from(weights[['device_id_1', 'device_id_2', 'weight']].values)
    #print(max(dict(G.edges).items(), key=lambda x: x[1]['weight']))
    print(sorted([len(i) for i in nx.connected_components(G)])[-10:])
    if return_only_largest_component:
        G = G.subgraph(max(nx.connected_components(G), key=len))
    A = nx.to_scipy_sparse_matrix(G, dtype=np.float64, format='csc')
    if matrix == 'adjacency':
        return A
    elif matrix == 'laplacian':
        if trim_weights_to is not None:
            A.data = np.minimum(A.data, trim_weights_to)

        degs = np.squeeze(np.asarray(A.sum(axis=0)))
        D = scipy.sparse.diags(degs, format='csc')
        L = D - A
        return L
    else:
        raise NotImplementedError

In [3]:
from data_loading import date_iterator, load_date_interval

In [4]:
#from dask.distributed import Client
import os
import pandas as pd
import numpy as np
import scipy.sparse
os.chdir("/home/ec2-user/Contacts-sensitive/graph_analysis")
import data_loading, timestep_analysis
from sksparse.cholmod import cholesky

def per_device_exposure_index(contact_df, case_frequencies):
    """
    Calculates, for each device, an exposure index based on how much contact they had and where it was (i.e. case exposure)
    """
    if not isinstance(case_frequencies, dict):
        case_frequencies = case_frequencies.to_dict()
    exposures = data_loading.get_cty_row(contact_df).map(case_frequencies)
    id_1_sum = pd.DataFrame()
    id_2_sum = pd.DataFrame()
    id_1_sum['did'] = contact_df['device_id_1'].values
    id_2_sum['did'] = contact_df['device_id_2'].values
    id_1_sum['exposures'] = exposures.values
    id_2_sum['exposures'] = exposures.values
    id_1_sum = id_1_sum.groupby('did').sum()
    id_2_sum = id_2_sum.groupby('did').sum()
    
    return id_1_sum.add(id_2_sum, fill_value=0)

case_frequencies = data_loading.rename(data_loading.generator_function_arg_setter(data_loading.cty_case_rate_generator, {'time_average_period': 7}), 'case_frequencies')

def random_bits_from_distribution(distribution):
    # Given a vector of probabilities, returns a vector of bits of the same shape, where the bit at index I is 1 with probability distribution[I]
    return (np.random.uniform(size=distribution.shape) < distribution).astype(np.int)

def weights_df_to_laplacian(df, index=None):  # Given a df with at most one entry per contact, makes the laplacian with indexing by index (the ordered list of ids)
    if index is None:
        index = list(set(df['device_id_1'].tolist() + df['device_id_2'].tolist()))
    index_map = {ind: i for i, ind in enumerate(index)}
    li = len(index_map)
    L = scipy.sparse.dok_matrix((li, li))
    L[df['device_id_1'].map(index_map).to_numpy(), df['device_id_2'].map(index_map).to_numpy()] = -df['weight'].to_numpy()
    L[df['device_id_2'].map(index_map).to_numpy(), df['device_id_1'].map(index_map).to_numpy()] = -df['weight'].to_numpy()
    L[np.arange(li), np.arange(li)] = -L.sum(axis=0)
    return L.tocsr()

def weights_df_to_adjacency(df, index=None):  # Given a df with at most one entry per contact, makes the adjacency matrix with indexing by index (the ordered list of ids)
    if index is None:
        index = list(set(df['device_id_1'].tolist() + df['device_id_2'].tolist()))
    index_map = {ind: i for i, ind in enumerate(index)}
    li = len(index_map)
    A = scipy.sparse.dok_matrix((li, li))
    A[df['device_id_1'].map(index_map).to_numpy(), df['device_id_2'].map(index_map).to_numpy()] = df['weight'].to_numpy()
    A[df['device_id_2'].map(index_map).to_numpy(), df['device_id_1'].map(index_map).to_numpy()] = df['weight'].to_numpy()
    return A.tocsr()
    

class ConvergenceError(Exception):
    pass

def k_low_eigs_from_laplacian_cg(L, k, iters=100, eps=1e-5):
    n = L.shape[0]
    L = L + scipy.sparse.eye(n, format='csr') * eps
    current_eigvecs = [np.ones(shape=(n,)) / np.sqrt(n)]
    current_eigs = [eps]
    for _ in range(k):
        evec = np.random.normal(size=(n,))
        titers = 0
        while titers < 10000:
            for ce in current_eigvecs:
                evec -= ce * np.inner(ce, evec)
            evec /= np.linalg.norm(evec)
            grad = (L@evec[:, np.newaxis])
            for ce in current_eigvecs:
                grad -= ce[:, np.newaxis] * (ce[np.newaxis] @ grad)
            
            Lg = L @ grad
            c = (evec[np.newaxis] @ Lg)[0][0] / (grad.T @ Lg)[0][0]
            
            # (e-cg)A(e-cg) (e-cg)Ag=0
            # eAg=cgAg c = eAg/gAg
            nevec = evec - c * grad[:, 0]
            nevec /= np.linalg.norm(nevec)
            error = np.linalg.norm(nevec - evec)
            old_eig_est = np.sqrt((evec[np.newaxis] @ (L@evec[:, np.newaxis]))[0][0])
            new_eig_est = np.sqrt((nevec[np.newaxis] @ L @ nevec[:, np.newaxis])[0][0])
            eig_est_diff = abs(new_eig_est - old_eig_est)
            evec = nevec
            if error < .0001:
                break
            else:
                if titers % 100 == 0:
                    print(np.linalg.norm(evec), old_eig_est, np.linalg.norm(nevec), new_eig_est)
                    print("Finished iteration", titers, "with error:", error, "eig error:", eig_est_diff, "eig estimate:", new_eig_est)
                titers += 1
        current_eigvecs += [evec]
        current_eigs += [new_eig_est]
    return [x - eps for x in current_eigs[1:]]

def k_low_eigs_from_laplacian(L, k, iters=100, eps=1e-5):
    n = L.shape[0]
    L = L + scipy.sparse.eye(n, format='csr') * eps
    current_eigvecs = [np.ones(shape=(n,)) / np.sqrt(n)]
    current_eigs = [eps]
    for _ in range(k):
        evec = np.random.normal(size=(n,))
        for __ in range(iters):
            for ce in current_eigvecs:
                evec -= ce * np.inner(ce, evec)
            evec /= np.linalg.norm(evec)
            evec, conv = scipy.sparse.linalg.cg(L, evec)
            print("Iter!")
            if conv != 0:
                raise CustomError("Convergence failed with return value " + str(conv))
        current_eigvecs += [evec / np.linalg.norm(evec)]
        current_eigs += [1 / np.linalg.norm(evec)]
    return [x - eps for x in current_eigs[1:]]

def k_low_eigs_from_laplacian_cholesky(L, k, iters=100, eps=1e-4):  # Not delayable
    n = L.shape[0]
    chol = cholesky((L + scipy.sparse.eye(n, format='csr') * eps).tocsc())
    current_eigvecs = [np.ones(shape=(n,)) / np.sqrt(n)]
    current_eigs = [eps]
    for _ in range(k):
        evec = np.random.normal(size=(n,))
        for __ in range(iters):
            for ce in current_eigvecs:
                evec -= ce * np.inner(ce, evec)
            evec /= np.linalg.norm(evec)
            evec = chol.solve_A(evec)
        current_eigvecs += [evec / np.linalg.norm(evec)]
        current_eigs += [1 / np.linalg.norm(evec)]
    return [x - eps for x in current_eigs[1:]]

In [5]:
def eigs_from_sum_contacts_by_pairs(indata, k=6, iters=100):  # NOTE: NOT USING CHOLESKY ANYMORE
    if indata[0][-1] is None:
        return None
    else:
        L = contacts_to_matrix(pd.concat(indata[0]))
        return k_low_eigs_from_laplacian(L, k, iters=iters)   # ============NOTE WHICH OF ABOVE WE ARE USING
    
def adjacency_eigs_from_sum_contacts_by_pairs_uniform_weight(indata, k=6):
    if indata[0][-1] is None:
        return None
    else:
        A = contacts_to_matrix(pd.concat(indata[0]), matrix='adjacency', trim_weights_to=1)
        return np.sort(scipy.sparse.linalg.eigsh(A, which='LA', k=k)[0])[::-1]

def adjacency_eigs_from_sum_contacts_by_pairs_nonuniform_weight(indata, k=6):
    if indata[0][-1] is None:
        return None
    else:
        A = contacts_to_matrix(pd.concat(indata[0]), matrix='adjacency')
        return np.sort(scipy.sparse.linalg.eigsh(A, which='LA', k=k)[0])[::-1]
    

"""
The below uses Dan Spielman's laplacians.jl repo to speed up computation dramatically. We use PyCall (on the Julia end) and PyJulia (on the Python end) to interface
"""


from julia.api import Julia
import os
import random

def eigs_from_sum_contacts_by_pairs_laplaciansjl(indata, k=6, weighting='per_contact'):  # Laplacians.fiedler is kind enough to just pull lowest nonzero eigs, so we don't have to worry about splitting connected components here.
    if indata[0][-1] is None:
        return None
    else:
        all_data = pd.concat(indata[0])
        weights = timestep_analysis.sum_contacts_by_pairs(all_data, weighting=weighting)
        index = set(weights['device_id_1'].tolist() + weights['device_id_2'].tolist())
        index_map = {ind: i+1 for i, ind in enumerate(index)}  # laplacians.jl seems to want indices to begin with 1.
        weights['device_id_1'] = [index_map[x] for x in weights['device_id_1']]
        weights['device_id_2'] = [index_map[x] for x in weights['device_id_2']]
        array_string = weights.to_csv(header=False, index=False)
        
        tempfilename = str(random.randrange(10000000000000000))+'.tmp'
        with open(tempfilename, "a") as f:
            f.write(array_string)
        
        import julia
        jl = Julia(runtime='/home/ec2-user/julia-1.7.2/bin/julia', compiled_modules=False)
        from julia import Main
        Main.include("/home/ec2-user/Laplacians.jl")
        Main.in_string = array_string
        data = jl.eval("import Laplacians; out_data = Laplacians.fiedler(Laplacians.read_graph(" + '"' + tempfilename + '"' + "), nev=" + str(k) + "); return(out_data)")
        os.remove(tempfilename)
        return np.array(data[0])
        
        

In [17]:
df = load_date_interval('WY', (2020, 1, 3), (2020, 1, 4))
print(len(df.index))

456691


In [18]:
m = contacts_to_matrix(timestep_analysis.sum_contacts_by_pairs(df))

[14, 15, 16, 22, 23, 24, 26, 44, 79, 38055]


In [None]:
sum_contacts = timestep_analysis.sum_contacts_by_pairs(df)

In [51]:
test_df = df.head(1000000)
test_sum = timestep_analysis.sum_contacts_by_pairs(test_df)

In [52]:
eigs = eigs_from_sum_contacts_by_pairs_laplaciansjl([[test_sum]], k=2)
print(eigs)

In [None]:
timestep_analysis.rolling_day_data('Connecticut',
                                   (2020, 1, 1),
                                   (2022, 1, 1),
                                   external_data_generator_functions=[], 
                                   per_day_processing_functions=[timestep_analysis.sum_contacts_by_pairs],
                                   final_processing_function=eigs_from_sum_contacts_by_pairs_laplaciansjl,
                                   fpref='eig_data/eigs_from_sum_contacts_by_pairs_laplaciansjl',
                                   num_parallel=4,
                                   history_length=1)

No preexisting computed data found!
Starting processing loop...
Pre-compute
Post-compute
Finished computes up to 20200105
Pre-compute
Post-compute
Finished computes up to 20200109
Pre-compute
Post-compute
Finished computes up to 20200113
Pre-compute
Post-compute
Finished computes up to 20200117
Pre-compute
Post-compute
Finished computes up to 20200121
Pre-compute
Post-compute
Finished computes up to 20200125
Pre-compute
Post-compute
Finished computes up to 20200129
Pre-compute
Post-compute
Finished computes up to 20200202
Pre-compute
Post-compute
Finished computes up to 20200206
Pre-compute
Post-compute
Finished computes up to 20200210
Pre-compute
Post-compute
Finished computes up to 20200214
Pre-compute
Post-compute
Finished computes up to 20200218
Pre-compute
Post-compute
Finished computes up to 20200222
Pre-compute
Post-compute
Finished computes up to 20200226
Pre-compute
Post-compute
Finished computes up to 20200301
Pre-compute
Post-compute
Finished computes up to 20200305
Pre-comp

In [None]:
from data_loading import state_dict
for state in state_dict.keys():
    print("Beginning state:", state)
    timestep_analysis.rolling_day_data(state,
                                   (2020, 1, 1),
                                   (2021, 1, 1),
                                   external_data_generator_functions=[], 
                                   per_day_processing_functions=[timestep_analysis.sum_contacts_by_pairs],
                                   final_processing_function=eigs_from_sum_contacts_by_pairs_laplaciansjl,
                                   fpref='eig_data/eigs_from_sum_contacts_by_pairs_laplaciansjl',
                                   num_parallel=4,
                                   history_length=1)

Beginning state: Alabama
Found 365 days of already computed data!
Starting processing loop...
Pre-compute
Post-compute
Finished computes up to 20210101
Beginning state: Alaska
Found 365 days of already computed data!
Starting processing loop...
Pre-compute
Post-compute
Finished computes up to 20210101
Beginning state: American Samoa
Found 365 days of already computed data!
Starting processing loop...
KEYGRAB FAILED FOR 20201231
Pre-compute
Post-compute
Finished computes up to 20210101
Beginning state: Arizona
Found 365 days of already computed data!
Starting processing loop...
Pre-compute
Post-compute
Finished computes up to 20210101
Beginning state: Arkansas
Found 365 days of already computed data!
Starting processing loop...
Pre-compute
Post-compute
Finished computes up to 20210101
Beginning state: California
Found 365 days of already computed data!
Starting processing loop...
Pre-compute


# Collecting summary statistics about the graph

In [8]:
import networkx as nx
def summary_statistics_from_sum_contacts_by_pairs(indata):
    if indata[0][-1] is None:
        return None
    all_data = pd.concat(indata[0])
    G = nx.Graph()
    G.add_weighted_edges_from(all_data[['device_id_1', 'device_id_2', 'weight']].values)
    G = G.subgraph(max(nx.connected_components(G), key=len))
    left_degrees = all_data.groupby('device_id_1').sum().reset_index()
    right_degrees = all_data.groupby('device_id_2').sum().reset_index().rename(columns={'device_id_2':'device_id_1'})
    degrees = np.array(pd.concat([left_degrees, right_degrees]).groupby('device_id_1').sum()['weight'])
    capped_degrees = np.minimum(degrees, 100)
    stats = [
        G.number_of_nodes(),
        G.number_of_edges(),
        #nx.average_shortest_path_length(G),
        np.average(degrees),  # First moment (see "Predicting the SARS-CoV-2 effective reproduction number using bulk contact data from mobile phones")
        np.average(degrees * degrees),  # Second moment (see above)
        np.average(capped_degrees),
        np.average(capped_degrees * capped_degrees),
    ]
    return np.array(stats)

In [None]:
df = load_date_interval('CT', (2020, 1, 1), (2020, 1, 5))
print("DFL:", len(df.index))
sum_contacts = timestep_analysis.sum_contacts_by_pairs(df)

In [19]:
sstats = summary_statistics_from_sum_contacts_by_pairs([[sum_contacts]])
print(sstats)

[5.59793000e+05 3.20305900e+06 3.57453030e+01 1.12582716e+04
 2.22524161e+01 1.43927733e+03]


In [None]:
from data_loading import state_dict
for state in state_dict.keys():
    print("Beginning state:", state)
    timestep_analysis.rolling_day_data(state,
                                   (2020, 1, 1),
                                   (2021, 1, 1),
                                   external_data_generator_functions=[], 
                                   per_day_processing_functions=[timestep_analysis.sum_contacts_by_pairs],
                                   final_processing_function=summary_statistics_from_sum_contacts_by_pairs,
                                   fpref='summary_data/summary_data',
                                   num_parallel=4,
                                   history_length=1)
timestep_analysis.rolling_day_data('Connecticut',
                                   (2020, 1, 1),
                                   (2022, 1, 1),
                                   external_data_generator_functions=[], 
                                   per_day_processing_functions=[timestep_analysis.sum_contacts_by_pairs],
                                   final_processing_function=eigs_from_sum_contacts_by_pairs_laplaciansjl,
                                   fpref='eig_data/eigs_from_sum_contacts_by_pairs_laplaciansjl',
                                   num_parallel=4,
                                   history_length=1)

Beginning state: Alabama
Found 366 days of already computed data!
Starting processing loop...
Beginning state: Alaska
Found 366 days of already computed data!
Starting processing loop...
Beginning state: American Samoa
Found 366 days of already computed data!
Starting processing loop...
Beginning state: Arizona
Found 366 days of already computed data!
Starting processing loop...
Beginning state: Arkansas
Found 366 days of already computed data!
Starting processing loop...
Beginning state: California
Found 366 days of already computed data!
Starting processing loop...
Beginning state: Colorado
Found 366 days of already computed data!
Starting processing loop...
Beginning state: Connecticut
Found 366 days of already computed data!
Starting processing loop...
Beginning state: Delaware
Found 366 days of already computed data!
Starting processing loop...
Beginning state: District of Columbia
Found 366 days of already computed data!
Starting processing loop...
Beginning state: Florida
Found 