## Create Data Cache:

In [5]:
%%time
from data_cache.utils import ProduceEntityResolution
from data_cache.schema import schema

from unidecode import unidecode
import pandas as pd

# For more info on sources, see data_cache/DATA_SCHEMA_README:
company_df = pd.read_parquet(schema['ukch_companies'])
officer_df = pd.read_parquet(schema['ukch_officers'])
psc_company_df = pd.read_parquet(schema['psc_company'])
psc_person_df = pd.read_parquet(schema['psc_person'])

all_politicians = pd.read_csv(schema['politicians_parsed'])
all_politicians = all_politicians.where(pd.notnull(all_politicians), None)

ru_bl_peps = pd.read_csv(schema['ru_bl_peps_parsed'])
ru_bl_peps = ru_bl_peps.where(pd.notnull(ru_bl_peps), None)

un_sanctions = pd.read_csv(schema['un_parsed'])
un_sanctions = un_sanctions.where(pd.notnull(un_sanctions), None)

per = ProduceEntityResolution(company_df)
per.resolve_entities(company_df, officer_df, psc_company_df, psc_person_df)

UKCH Company er map: 100%|██████████| 5107631/5107631 [00:05<00:00, 953235.19it/s] 
Officer (person) er map: 100%|██████████| 10035057/10035057 [00:23<00:00, 422416.04it/s]
Officer (company) er map: 100%|██████████| 313158/313158 [00:00<00:00, 756575.34it/s]
PSC (company) er map: 100%|██████████| 702472/702472 [00:00<00:00, 859221.68it/s]
PSC (person) er map: 100%|██████████| 9012596/9012596 [00:25<00:00, 355636.44it/s]

CPU times: user 2min 33s, sys: 48.9 s, total: 3min 21s
Wall time: 4min 29s





## Graph Building and Breaking down into subnetworks:

In [6]:
%%time
from data_cache.utils import GraphBuilder
gb = GraphBuilder()
gb.build(per, psc_company_df, psc_person_df, officer_df)
gb.break_into_subgraphs(1000)

PSC company graph: 702472it [00:03, 180621.51it/s]
PSC person graph: 9012596it [01:01, 145691.31it/s]
Officer graph: 10348215it [00:42, 241512.75it/s]


Top 10 Connected component sizes: [4373053, 1630, 1313, 760, 753, 512, 430, 409, 355, 346]


Breaking down Giant CC (size 4373053): 100%|██████████| 4373053/4373053 [05:18<00:00, 13723.90it/s]


Giant Component of size 4373053 was broken down.
    Added 3139321 neighbourhoods, 
    Sum of all nodes = 217725726
    Overhead ratio=49.78803732769761



Breaking down Giant CC (size 1630): 100%|██████████| 1630/1630 [00:00<00:00, 8143.89it/s]


Giant Component of size 1630 was broken down.
    Added 1443 neighbourhoods, 
    Sum of all nodes = 330985
    Overhead ratio=203.05828220858896



Breaking down Giant CC (size 1313): 100%|██████████| 1313/1313 [00:00<00:00, 2181.59it/s]


Giant Component of size 1313 was broken down.
    Added 84 neighbourhoods, 
    Sum of all nodes = 16293
    Overhead ratio=12.408987052551408

CPU times: user 7min 51s, sys: 14.1 s, total: 8min 5s
Wall time: 8min 3s


### Add node describtors for risk calculation later:

In [7]:
%%time
%load_ext autoreload
%autoreload 2
from data_cache.utils import NodeDescriber

nd = NodeDescriber(per)
nd.add_metadata(company_df, officer_df, psc_company_df, psc_person_df)

company: 100%|██████████| 5107631/5107631 [00:46<00:00, 110139.75it/s]
officer_person: 100%|██████████| 10035057/10035057 [00:29<00:00, 342760.19it/s]
officer_company: 100%|██████████| 313158/313158 [00:00<00:00, 626877.37it/s]
psc_person: 100%|██████████| 9012596/9012596 [00:22<00:00, 397809.45it/s]
psc_company: 100%|██████████| 702472/702472 [00:01<00:00, 491691.74it/s]


CPU times: user 1min 51s, sys: 5.37 s, total: 1min 57s
Wall time: 1min 56s


### Find if entities have possible matches against Politicians datasets

In [None]:
PEP, RUS = {}, {}
for name, dob, country in zip(all_politicians.NAME, all_politicians.DOB, all_politicians.COUNTRY):
    name = unidecode(name).lower()
    PEP[name] = {"country": country, "source": "every_politician"}
    if isinstance(dob, str):
        PEP[name]["yob"] = int(dob[:4])
        if len(dob) == 10:
            PEP[name]["mob"] = int(dob[5:7])
for name, dob, cat, tx in zip(ru_bl_peps.NAME_EN, ru_bl_peps.DOB, ru_bl_peps.CATEGORY, ru_bl_peps.TAXPAYER_NUM):
    name = unidecode(name).lower()
    RUS[name] = {"country": "RU/BY", "category": cat, "taxpayer_num": tx, "source": "rupep.org"}
    if isinstance(dob, str) and len(dob) == 10:
        RUS[name]["yob"] = int(dob[-4:])
        RUS[name]["mob"] = int(dob[3:5])

## Build subnetwork stats:

In [12]:
from tqdm import tqdm 
import numpy as np
from utils import TAX_HEAVENS

# How many networks to cache. UKCH Total in 2022 is about 7M.
N = 1_000_000
PROXY_TH = 50
PARTITION_SIZE = 1000

subnetwork_ids = list(gb.hash_to_subn_map.keys())[:N]

def count_company_ratio(ns):
    return np.mean([not n.startswith("p|") for n in ns])

def calculate_cyclicity(H):
    if H.number_of_nodes() < 1:
        print(f"Non existent network: {netws}")
        return 0
    E = H.number_of_edges()
    N = H.number_of_nodes()
    return (E + 1 - N)/(N*np.log(N))

def netw_names(names):
    names = set(names) - {None}
    return ", ".join(sorted(names))

def metadata_converter(md):
    return "; ".join([f"{k}: {v}" for k, v in md.items()])

clc, node_num, dfs, company_ratio, entity_names, multi_jurisdiction, jur_names, netw_tax_haven = [], [], [], [], [], [], [], []
nodes, proxy, is_person, tax_haven, jur, node_metadata, netws = [], [], [], [], [], [], []
pep, pepm, r, rm, netw_pep, netw_r = [], [], [], [], [], []
for _id in tqdm(subnetwork_ids[:N], desc= "Precomputing risk signals"):
    
    # Get networkx subgraph:
    nw = gb.hash_to_subn_map[_id]
    H = gb.G_undir.subgraph(nw)
    
    # Network:
    clc.append(calculate_cyclicity(H))
    node_num.append(len(nw))
    company_ratio.append(count_company_ratio(nw))
    
    # Edges:
    df = pd.DataFrame(H.edges.data("edge_type"), columns =['source', 'target', 'type'])
    df['subgraph_hash'] =_id
    df['subgraph_partition'] =_id % PARTITION_SIZE
    dfs.append(df)
    
    # Nodes:
    ns = gb.hash_to_subn_map[_id]
    netw_jurs, netw_entity_names = [], []
    netw_pep_value, netw_r_value = 0, 0
    for n in ns:
        nodes.append(n)
        netws.append(_id)
        proxy.append(int(gb.G_undir.degree[n] > PROXY_TH))
        is_person.append(int(n.startswith("p|")))
        node_metadata.append(nd.node_to_metadata.get(n, None))
        
        j = nd.node_to_jurs.get(n, set())
        jur.append(", ".join(sorted(j)))
        tax_haven.append(int(len(j.intersection(TAX_HEAVENS)) > 0))
        
        netw_jurs.append(nd.node_to_jurs.get(n, None))
        
        name = nd.node_to_names.get(n, None)
        netw_entity_names.append(name)
        
        if name is not None and name in RUS:
            r.append(1)
            rm.append(metadata_converter(RUS[name]))
            netw_r_value += 1
        else:
            r.append(0)
            rm.append("")
            
        if name is not None and name in PEP:
            pep.append(1)
            pepm.append(metadata_converter(PEP[name]))
            netw_pep_value += 1
        else:
            pep.append(0)
            pepm.append("")

    netw_pep.append(netw_pep_value)
    netw_r.append(netw_r_value)
    jurs_in_subnetwork = list(set().union(*[n for n in netw_jurs if n is not None]))
    jur_names.append("; ".join(jurs_in_subnetwork))
    netw_tax_haven.append(int(len(set(jurs_in_subnetwork).intersection(TAX_HEAVENS))> 0))
    multi_jurisdiction.append(int(len(jurs_in_subnetwork) > 1))
    entity_names.append(netw_names(netw_entity_names))


Precomputing risk signals: 100%|██████████| 1000000/1000000 [50:37<00:00, 329.22it/s] 


In [17]:
%%time
PROXY_NETW_ID = set(nodes_df[nodes_df.proxy_dir==1].subgraph_hash)
subnetwork_df["proxy"] = [int(s in PROXY_NETW_ID) for s in subnetwork_df.network_id.tolist()]
subnetwork_df = pd.DataFrame(data = {
    "network_id": subnetwork_ids, 
    "cyclicity": clc, 
    "node_num": node_num, 
    "company_ratio": company_ratio,
    "multi_jurisdiction": multi_jurisdiction, 
    "tax_haven": netw_tax_haven, 
    "potential_pep_match": netw_pep, 
    "potential_rus_pep_match": netw_r, 
    "entity_names": entity_names, 
    "jur_names": jur_names, 
})
nodes_df = pd.DataFrame(data = {
    'node_id': nodes, 
    'subgraph_hash': netws, 
    "is_person": is_person,
    "proxy_dir": proxy, 
    "node_metadata": node_metadata,
    "tax_haven": tax_haven, 
    "jur": jur,
    "politician": pep, 
    "politician_metadata": pepm, 
    "rus_politician": r,
    "rus_politician_metadata": rm, 
})
edges_df = pd.concat(dfs)

CPU times: user 4min, sys: 12.2 s, total: 4min 12s
Wall time: 4min 11s


In [38]:
edges_df.shape, nodes_df.shape, subnetwork_df.shape

((81482446, 5), (62008292, 12), (1000000, 11))

In [35]:
subnetwork_df.to_parquet(schema['output_nodes'])
edges_df.to_parquet(schema['output_nodes'], partition_cols = ["subgraph_partition"])
nodes_df['subgraph_partition'] = nodes_df.subgraph_hash.apply(lambda x: x%1000)
nodes_df.to_parquet(schema['output_nodes'], partition_cols = ["subgraph_partition"])