# ICIJ analysis: load KùzuDB

## Set up

Load the Python dependencies.

In [1]:
import pathlib
import shutil
import typing

from icecream import ic
import kuzu
import pandas as pd
import watermark

%load_ext watermark

In [2]:
%watermark
%watermark --iversions

Last updated: 2024-07-15T09:36:23.069198-07:00

Python implementation: CPython
Python version       : 3.11.9
IPython version      : 8.26.0

Compiler    : Clang 13.0.0 (clang-1300.0.29.30)
OS          : Darwin
Release     : 23.5.0
Machine     : arm64
Processor   : arm
CPU cores   : 14
Architecture: 64bit

kuzu     : 0.4.3.dev55
watermark: 2.4.3
pandas   : 2.2.2



Create a KùzuDB database and establish a connection.

In [3]:
TEMP_DIR: pathlib.Path = pathlib.Path("temp")
DEMO_DIR: pathlib.Path = pathlib.Path("demo")

shutil.rmtree(DEMO_DIR, ignore_errors = True, onerror = None)

In [4]:
db: kuzu.database.Database = kuzu.Database(DEMO_DIR)
conn: kuzu.database.Database = kuzu.Connection(db)

## Schema definitions

### `Entity` nodes

After iterating the first time through this analysis, we return to this point and redefine an `Entity` node structure which is a superset of the fields defined among all of the Entity-ish nodes in ICIJ.
See <https://docs.google.com/spreadsheets/d/1eSelhXhix_DtTZuzR2vfl_UdQlEwbwql6NZxqrtROxk/edit?usp=sharing>

In [5]:
conn.execute("""
  CREATE NODE TABLE Entity (
    node_id INT64,
    role STRING,
    name STRING,
    original_name STRING,
    former_name STRING,
    jurisdiction STRING,
    jurisdiction_description STRING,
    company_type STRING,
    Location STRING,
    internal_id STRING,
    incorporation_date STRING,
    inactivation_date STRING,
    struck_off_date STRING,
    dorm_date STRING,
    status STRING,
    service_provider STRING,
    ibcRUC STRING,
    country_codes STRING,
    countries STRING,
    sourceID STRING,
    valid_until STRING,
    note STRING,
    vague BOOLEAN,
    PRIMARY KEY (node_id)
  )
""");

Temporary workaround prior to KùzuDB release 0.5.0:

  * concatenate the four CSV files into one

In [6]:
!cp ./temp/entity.1.csv ./temp/entity.all.csv
!tail -n +2 ./temp/entity.2.csv >> ./temp/entity.all.csv
!tail -n +2 ./temp/entity.3.csv >> ./temp/entity.all.csv
!tail -n +2 ./temp/entity.4.csv >> ./temp/entity.all.csv

In [7]:
!wc -l ./temp/entity.?.csv

  814617 ./temp/entity.1.csv
  771369 ./temp/entity.2.csv
   25636 ./temp/entity.3.csv
    2990 ./temp/entity.4.csv
 1614612 total


In [8]:
!wc -l ./temp/entity.all.csv

 1614609 ./temp/entity.all.csv


In [9]:
conn.execute("""
    COPY Entity FROM "./temp/entity.all.csv" (header=true, escape='"', parallel=False)
""");

In [10]:
results = conn.execute("""
  MATCH (n:Entity)
  RETURN *
  LIMIT 1;
""")

while results.has_next():
    ic(results.get_next())

ic| results.get_next(): [{'Location': None,
                          '_id': {'offset': 524288, 'table': 0},
                          '_label': 'Entity',
                          'company_type': None,
                          'countries': 'Russia',
                          'country_codes': 'RUS',
                          'dorm_date': None,
                          'former_name': None,
                          'ibcRUC': None,
                          'inactivation_date': None,
                          'incorporation_date': None,
                          'internal_id': None,
                          'jurisdiction': None,
                          'jurisdiction_description': None,
                          'name': 'VLADIMIR KOVALENKO',
                          'node_id': 12095427,
                          'note': None,
                          'original_name': None,
                          'role': 'Officer',
                          'service_provider': None,
             

#### consistency checks

How many `Entity` nodes have been loaded?

In [11]:
conn.execute("""
  MATCH (n:Entity)
  RETURN COUNT(*)
""").get_as_df()

Unnamed: 0,COUNT_STAR()
0,1614277


Compare with the `node_id` values in the merged CSV files

In [12]:
nodes_kuzu: typing.Set[ int ] = set()

results = conn.execute("""
  MATCH (n:Entity)
  RETURN n.node_id
""")

while results.has_next():
    row = results.get_next()
    nodes_kuzu.add(str(row[0]))

len(nodes_kuzu)

1614277

In [13]:
node_ids: typing.List[ int ] = []

data_file: pathlib.Path = TEMP_DIR / "entity.all.csv"

df: pd.DataFrame = pd.read_csv(
    data_file,
    header = 0,
    low_memory = False,
).astype(str).fillna("")

node_ids: typing.List[ int ] = df.node_id.values.tolist()
ic(len(node_ids))

nodes_file: typing.Set[ int ] = set(node_ids)
ic(len(nodes_file))

ic| len(node_ids): 1614277
ic| len(nodes_file): 1614277


1614277

How many missing `node_id` values?

In [14]:
len(nodes_file) - len(nodes_kuzu)

0

In [15]:
nodes_missing = nodes_file - nodes_kuzu
nodes_missing

set()

### `Location` nodes

In [16]:
conn.execute("""
  CREATE NODE TABLE Location (
    node_id INT64,
    address STRING,
    name STRING,
    countries STRING,
    country_codes STRING,
    sourceID STRING,
    valid_until STRING,
    note STRING,
    PRIMARY KEY (node_id)
  )
""");

In [17]:
conn.execute("""
    COPY Location FROM "./temp/location.csv" (header=true, escape='"', parallel=False)
""");

In [18]:
results = conn.execute("""
  MATCH (n:Location)
  RETURN *
  LIMIT 1;
""")

while results.has_next():
    ic(results.get_next())

ic| results.get_next(): [{'_id': {'offset': 393216, 'table': 1},
                          '_label': 'Location',
                          'address': '2, ZNUBER STREET ATTARD ATD 2802',
                          'countries': None,
                          'country_codes': None,
                          'name': '2, ZNUBER STREET ATTARD ATD 2802',
                          'node_id': 58019418,
                          'note': None,
                          'sourceID': 'Paradise Papers - Malta corporate registry',
                          'valid_until': 'Malta corporate registry data is current through 2016'}]


### `RegisteredAddress` relations

In [19]:
conn.execute("""
  CREATE REL TABLE RegisteredAddress (FROM Entity TO Location,
    link STRING,
    status STRING,
    start_date STRING,
    end_date STRING,
    sourceID STRING,
    MANY_MANY
  )
""");

In [20]:
try:
    conn.execute("""
        COPY RegisteredAddress FROM "./temp/rel_regaddr.csv" (header=true, escape='"')
    """);
except Exception as ex:
    ic(ex)
    node_id = int(getattr(ex, "message", repr(ex)).split(" ")[-1].strip(".')"))
    ic(node_id, node_id in nodes_missing)

### `OfficerOf` relations

In [21]:
conn.execute("""
  CREATE REL TABLE OfficerOf (FROM Entity TO Entity,
    link STRING,
    status STRING,
    start_date STRING,
    end_date STRING,
    sourceID STRING,
    MANY_MANY
  )
""");

In [22]:
try:
    conn.execute("""
        COPY OfficerOf FROM "./temp/rel_officer.csv" (header=true, escape='"', parallel=False)
    """);
except Exception as ex:
    ic(ex)
    node_id = int(getattr(ex, "message", repr(ex)).split(" ")[-1].strip(".')"))
    ic(node_id, node_id in nodes_missing)

### `IntermediaryOf` relations

In [23]:
conn.execute("""
  CREATE REL TABLE IntermediaryOf (FROM Entity TO Entity,
    link STRING,
    status STRING,
    start_date STRING,
    end_date STRING,
    sourceID STRING,
    MANY_MANY
  )
""");

In [24]:
try:
    conn.execute("""
        COPY IntermediaryOf FROM "./temp/rel_intermed.csv" (header=true, escape='"')
    """);
except Exception as ex:
    ic(ex)
    node_id = int(getattr(ex, "message", repr(ex)).split(" ")[-1].strip(".')"))
    ic(node_id, node_id in nodes_missing)

### `ConnectedTo` relations

In [25]:
conn.execute("""
  CREATE REL TABLE ConnectedTo (FROM Entity TO Entity,
    link STRING,
    status STRING,
    start_date STRING,
    end_date STRING,
    sourceID STRING,
    MANY_MANY
  )
""");

In [26]:
try:
    conn.execute("""
        COPY ConnectedTo FROM "./temp/rel_connect.csv" (header=true, escape='"')
    """);
except Exception as ex:
    ic(ex)
    node_id = int(getattr(ex, "message", repr(ex)).split(" ")[-1].strip(".')"))
    ic(node_id, node_id in nodes_missing)

### `Underlying` relations

In [27]:
conn.execute("""
  CREATE REL TABLE Underlying (FROM Entity TO Entity,
    link STRING,
    status STRING,
    start_date STRING,
    end_date STRING,
    sourceID STRING,
    MANY_MANY
  )
""");

In [28]:
try:
    conn.execute("""
        COPY Underlying FROM "./temp/rel_underly.csv" (header=true, escape='"')
    """);
except Exception as ex:
    ic(ex)
    node_id = int(getattr(ex, "message", repr(ex)).split(" ")[-1].strip(".')"))
    ic(node_id, node_id in nodes_missing)

### `AliasOfficer` relations

In [29]:
conn.execute("""
  CREATE REL TABLE AliasOfficer (FROM Entity TO Entity,
    link STRING,
    status STRING,
    start_date STRING,
    end_date STRING,
    sourceID STRING,
    MANY_MANY
  )
""");

In [30]:
try:
    conn.execute("""
        COPY AliasOfficer FROM "./temp/rel_same_officer.csv" (header=true, escape='"')
    """);
except Exception as ex:
    ic(ex)
    node_id = int(getattr(ex, "message", repr(ex)).split(" ")[-1].strip(".')"))
    ic(node_id, node_id in nodes_missing)

### `AliasLocation` relations

In [31]:
conn.execute("""
  CREATE REL TABLE AliasLocation (FROM Location TO Location,
    link STRING,
    status STRING,
    start_date STRING,
    end_date STRING,
    sourceID STRING,
    MANY_MANY
  )
""");

In [32]:
try:
    conn.execute("""
        COPY AliasLocation FROM "./temp/rel_same_loc.csv" (header=true, escape='"')
    """);
except Exception as ex:
    ic(ex)
    node_id = int(getattr(ex, "message", repr(ex)).split(" ")[-1].strip(".')"))
    ic(node_id, node_id in nodes_missing)