# A KYC/AML Graph Pipeline for UK Corporate Registry Data

This notebook implements a high-throughput ETL (Extract, Transform, Load) pipeline designed to ingest, cleanse, and structure UK corporate registry data into a Neo4j knowledge graph. The system consumes two primary datasets from Companies House: the ["Basic Company Data,"](https://download.companieshouse.gov.uk/en_output.html) which provides foundational entity details, and the ["Persons with Significant Control" (PSC) snapshot,](https://download.companieshouse.gov.uk/en_pscdata.html) which details the beneficial ownership structures. By leveraging [PySpark](https://spark.apache.org/docs/latest/api/python/index.html) and the [Neo4j Spark plugin](https://neo4j.com/docs/spark/current/), the pipeline efficiently joins millions of company records with their corresponding officers and controlling entities, creating a unified view of corporate governance.

The transformation logic goes beyond simple data mapping by actively enriching the source information. Key enhancements include the geocoding of registered addresses to latitude and longitude coordinates, the deterministic generation of unique identifiers for individuals and organizations to resolve entity duplication, and the parsing of unstructured "nature of control" text into quantitative voting and ownership percentages. The final output is a rich property graph that models Companies, Officers, Addresses, and their multi-layered relationships, including historical name changes and complex control hierarchies.

## Why we built this pipeline

The primary business driver for this pipeline is the need to transition from flat, tabular compliance data to a connected graph structure that reflects the reality of corporate networks. In the context of Know Your Business (KYB) and Anti-Money Laundering (AML) workflows, tabular data often obscures indirect relationships and complex ownership chains. By modeling these entities as a graph, organizations can instantly traverse ownership structures to identify [Ultimate Beneficial Owners (UBOs)](https://www.swift.com/risk-and-compliance/know-your-customer-kyc/ultimate-beneficial-owner-ubo), detecting risk patterns that would require expensive recursive joins in a traditional relational database.

The enrichment features provide immediate value for risk assessment and investigative intelligence. The integration of geospatial data (converting locations to geographic points) allows analysts to identify clusters of companies registered at high-risk locations or residential addresses, a common indicator of shell company activity.

The precise parsing of voting rights and share ownership percentages enables granular querying for control thresholds (e.g., identifying all individuals with >25% control), while the inclusion of sanctions flags and identity verification details directly supports automated compliance screening and due diligence processes.

## Environment Setup and Spark Session Initialization

We first initialize the data processing environment by loading necessary environment variables and configuring the Apache Spark session. We rely on the `python-dotenv` library to securely manage credentials such as the Neo4j URI, user, and password, ensuring that sensitive information is not hardcoded in the notebook.

The Spark session is configured with the Neo4j Connector for Apache Spark, which allows for high-throughput data transfer between Spark DataFrames and the Neo4j Graph Database. We allocate substantial driver memory (32GB) to handle the large datasets and explicitly set the `spark.sql.legacy.timeParserPolicy` to ensure compatibility with various date formats found in the source files.

> If you are planning to run this notebook, ensure you do so in an environment with sufficient resources (e.g., a cluster or a local machine with enough RAM). The Neo4j database will also require appropriate configuration to handle the incoming data volume, including setting up indexes and constraints for optimal performance. The total running time for this pipeline is likely to be in the range of **10 to 30 minutes**, depending on your environment.

In [1]:
import os
import dotenv
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col,
    concat,
    lit,
    md5,
    when,
    to_date,
    regexp_replace,
    array,
    explode,
    struct,
    monotonically_increasing_id,
    coalesce,
    element_at,
    size,
    lag,
    lead,
    row_number,
    initcap,
)
from pyspark.sql.types import StringType, StructType, StructField, DoubleType
from pyspark.sql.window import Window
import pandas as pd
import pgeocode

dotenv.load_dotenv()

NEO4J_URI = os.getenv("NEO4J_URI")
NEO4J_USER = os.getenv("NEO4J_USER")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD")
NEO4J_DATABASE = os.getenv("NEO4J_DATABASE")
DATA_PATH = os.getenv("DATA_PATH")
COMPANIES_URL = os.getenv("COMPANIES_URL")
PSC_URL = os.getenv("PSC_URL")

# Initialize Spark with Neo4j Connector
neo4j_maven_pkg = "org.neo4j:neo4j-connector-apache-spark_2.12:5.3.10_for_spark_3"
spark = (
    SparkSession.builder.appName("PSC_Loader_Spark")
    .config("spark.driver.memory", "32g")
    .config("spark.jars.packages", neo4j_maven_pkg)
    .config("neo4j.url", NEO4J_URI)
    .config("neo4j.authentication.basic.user", NEO4J_USER)
    .config("neo4j.authentication.basic.password", NEO4J_PASSWORD)
    .config("neo4j.database", NEO4J_DATABASE)
    .getOrCreate()
)

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Check Spark and Connector versions
print(f"Spark version: {spark.version}")
print(f"Scala version: {spark.sparkContext.version.split('.')[1]}")
print(f"Neo4j Connector version: {neo4j_maven_pkg.split(':')[2]}")

Ivy Default Cache set to: /Users/pedroleitao/.ivy2/cache
The jars for the packages stored in: /Users/pedroleitao/.ivy2/jars
org.neo4j#neo4j-connector-apache-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-cdc6d386-3afc-4a2b-8db8-7aee28446f96;1.0
	confs: [default]
	found org.neo4j#neo4j-connector-apache-spark_2.12;5.3.10_for_spark_3 in central


:: loading settings :: url = jar:file:/Volumes/Home/pedroleitao/miniconda3/envs/governance-kyc-psc/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.neo4j#neo4j-connector-apache-spark_2.12_common;5.3.10_for_spark_3 in central
	found org.neo4j#caniuse-core;1.3.0 in central
	found org.neo4j#caniuse-api;1.3.0 in central
	found org.jetbrains.kotlin#kotlin-stdlib;2.1.20 in central
	found org.jetbrains#annotations;13.0 in central
	found org.neo4j#caniuse-neo4j-detection;1.3.0 in central
	found org.neo4j.driver#neo4j-java-driver-slim;4.4.21 in central
	found org.reactivestreams#reactive-streams;1.0.4 in central
	found io.netty#netty-handler;4.1.127.Final in central
	found io.netty#netty-common;4.1.127.Final in central
	found io.netty#netty-resolver;4.1.127.Final in central
	found io.netty#netty-buffer;4.1.127.Final in central
	found io.netty#netty-transport;4.1.127.Final in central
	found io.netty#netty-transport-native-unix-common;4.1.127.Final in central
	found io.netty#netty-codec;4.1.127.Final in central
	found io.netty#netty-tcnative-classes;2.0.73.Final in central
	found io.projectreactor#reactor-core;3.6.11 in central
	f

Spark version: 3.5.1
Scala version: 5
Neo4j Connector version: 5.3.10_for_spark_3


## Retrieving the Datasets

For this pipeline, we utilize two datasets from the UK Companies House:

- **Basic Company Data**: This dataset contains fundamental information about all registered companies, including their names, registration numbers, addresses, and status. It serves as the backbone for our graph, providing the core entities (Companies) and their attributes.

- **Persons with Significant Control (PSC)**: This dataset details the individuals and entities that have significant control over companies. It includes information about the nature of control, dates of appointment and cessation, and any associated sanctions or identity verification details. This dataset is crucial for modeling the governance relationships in our graph.

In [2]:
# Download and extract datasets if not already present
import zipfile
import requests
from io import BytesIO


def download_and_extract(url, extract_to):
    if not os.path.exists(extract_to):
        print(f"Downloading and extracting {url}...")
        response = requests.get(url)
        with zipfile.ZipFile(BytesIO(response.content)) as zip_ref:
            zip_ref.extractall(extract_to)
    else:
        print(f"{extract_to} already exists, skipping download.")


download_and_extract(COMPANIES_URL, os.path.join(DATA_PATH, "companies"))
download_and_extract(PSC_URL, os.path.join(DATA_PATH, "psc"))

.data/companies already exists, skipping download.
.data/psc already exists, skipping download.


## Geocoding and Identity Generation Utilities

To enrich our dataset with geospatial information, we define a [Pandas User-Defined Function (UDF)](https://pandas.pydata.org/docs/dev/user_guide/user_defined_functions.html) named `geocode_postcode`. This function utilizes the [`pgeocode`](https://pypi.org/project/pgeocode/) library to perform batch geocoding of UK postcodes, converting them into latitude and longitude coordinates. By using a scalar iterator pattern with Pandas UDFs, we significantly reduce the overhead compared to row-by-row processing, making the pipeline efficient enough to handle millions of records.

We also introduce a helper function, `generate_id_col`, which creates deterministic unique identifiers for our entities. This function concatenates specific columns and applies an MD5 hash, ensuring that we can generate consistent IDs for nodes (such as Addresses or Persons) based solely on their data content, without relying on external sequences. This forms the backbone of our [entity resolution](https://neo4j.com/developer/industry-use-cases/agnostic/entity-resolution/) strategy, allowing us to merge records accurately in Neo4j without duplication.

In [3]:
from pyspark.sql.functions import pandas_udf
from typing import Iterator


# We use a Pandas UDF (Scalar Iterator) for efficient batch geocoding
# This is much faster than row-by-row Python UDFs
@pandas_udf(
    StructType(
        [StructField("latitude", DoubleType()), StructField("longitude", DoubleType())]
    )
)
def geocode_postcode(postcode_iter: Iterator[pd.Series]) -> Iterator[pd.DataFrame]:
    # Initialize resolver once per partition/batch
    nomi = pgeocode.Nominatim("gb")

    for series in postcode_iter:
        # Clean series
        clean_codes = series.fillna("").astype(str).tolist()

        # Batch query
        results = nomi.query_postal_code(clean_codes)

        # Prepare output
        out_df = pd.DataFrame(
            {"latitude": results["latitude"], "longitude": results["longitude"]}
        )

        yield out_df


# Helper function for generating IDs (Spark Native version)
def generate_id_col(col_list):
    """Concatenates columns and hashes them to MD5"""
    concat_str = concat(*[coalesce(c, lit("")) for c in col_list])
    return md5(concat_str)

## Parsing Control Levels

The raw data describes control mechanisms (such as voting rights or share ownership) using text strings like "`voting-rights-25-to-50-percent`". Next we implement a specialized parsing logic to convert these unstructured strings into structured numerical ranges which can be used for quantitative analysis.

We define a Pandas UDF called `parse_control_levels` that iterates through the arrays of control descriptions. It extracts the minimum and maximum percentages for both voting rights and share ownership, handling various textual patterns including specific ranges and "more than" statements. This structured data allows us to query the graph for specific levels of influence later on.

In [4]:
import re
import numpy as np
from typing import Iterator
from pyspark.sql.types import StructType, StructField, IntegerType


@pandas_udf(
    StructType(
        [
            StructField("voting_rights_min", IntegerType()),
            StructField("voting_rights_max", IntegerType()),
            StructField("ownership_of_shares_min", IntegerType()),
            StructField("ownership_of_shares_max", IntegerType()),
        ]
    )
)
def parse_control_levels(iterator: Iterator[pd.Series]) -> Iterator[pd.DataFrame]:
    for series in iterator:
        # Apply the parsing logic to the entire batch
        parsed_data = [parse_control_list(x) for x in series]

        yield pd.DataFrame(
            parsed_data,
            columns=[
                "voting_rights_min",
                "voting_rights_max",
                "ownership_of_shares_min",
                "ownership_of_shares_max",
            ],
        )


# The parsing logic helper (Pure Python)
def parse_control_list(control_list):
    if control_list is None or len(control_list) == 0:
        return [None, None, None, None]

    # If it's a numpy array, convert to list (optional, but safer for iteration)
    if isinstance(control_list, np.ndarray):
        control_list = control_list.tolist()

    v_min, v_max = None, None
    s_min, s_max = None, None

    for item in control_list:
        if not item:
            continue

        if "voting-rights" in item:
            # Check 25-to-50 pattern
            m_range = re.search(r"voting-rights-(\d+)-to-(\d+)-percent", item)
            if m_range:
                val_min, val_max = int(m_range.group(1)), int(m_range.group(2))
                v_min = min(v_min, val_min) if v_min is not None else val_min
                v_max = max(v_max, val_max) if v_max is not None else val_max

            # Check more-than-25 pattern (Assumes max 100)
            m_more = re.search(r"voting-rights-more-than-(\d+)-percent", item)
            if m_more:
                val_min = int(m_more.group(1))
                v_min = min(v_min, val_min) if v_min is not None else val_min
                v_max = max(v_max, 100) if v_max is not None else 100

        if "ownership-of-shares" in item:
            # Check 25-to-50 pattern
            m_range = re.search(r"ownership-of-shares-(\d+)-to-(\d+)-percent", item)
            if m_range:
                val_min, val_max = int(m_range.group(1)), int(m_range.group(2))
                s_min = min(s_min, val_min) if s_min is not None else val_min
                s_max = max(s_max, val_max) if s_max is not None else val_max

            # Check more-than-25 pattern (Assumes max 100)
            m_more = re.search(r"ownership-of-shares-more-than-(\d+)-percent", item)
            if m_more:
                val_min = int(m_more.group(1))
                s_min = min(s_min, val_min) if s_min is not None else val_min
                s_max = max(s_max, 100) if s_max is not None else 100

    return [v_min, v_max, s_min, s_max]

## Data Ingestion and Cleaning

We begin the ETL process by loading two primary datasets: the Companies House Basic Company Data (CSV) and the Persons with Significant Control (PSC) snapshot (JSON Lines) which we downloaded before. For the company data, we perform an immediate cleaning pass to standardize column names, removing whitespace and replacing dots with underscores to ensure compatibility with Spark SQL.

We filter the company dataset to retain only active or relevant records that possess a valid company number and address information. Similarly, the PSC dataset is filtered to ensure all records are associated with a valid company number. This early filtering reduces the volume of data moving through the subsequent join and transformation stages, and ensures that we focus on entities that can be meaningfully connected in the graph.

In [5]:
# Find the file name under DATA_PATH/companies and DATA_PATH/psc (since the exact file name may change with each release, we look for the most recent one)
import glob

companies_files = glob.glob(os.path.join(DATA_PATH, "companies", "*.csv"))
psc_files = glob.glob(os.path.join(DATA_PATH, "psc", "*.txt"))

# Load Basic Company Data (CSV)
# We select only columns needed to save memory early
company_df = spark.read.option("header", "true").csv(companies_files[0])

# Clean column names (remove whitespace, replace dots with underscores)
new_cols = [c.strip().replace(".", "_") for c in company_df.columns]

company_df = company_df.toDF(*new_cols)

# Filter and Rename for join
company_clean = company_df.select(
    col("CompanyNumber").alias("company_number"),
    col("CompanyName").alias("company_name"),
    col("RegAddress_PostCode").alias("comp_postcode"),
    col("RegAddress_AddressLine1").alias("comp_addr1"),
    col("RegAddress_AddressLine2").alias("comp_addr2"),
    col("RegAddress_PostTown").alias("comp_town"),
    col("CountryOfOrigin").alias("country_of_origin"),
    col("CompanyCategory").alias("company_category"),
    col("CompanyStatus").alias("company_status"),
    col("DissolutionDate").alias("dissolution_date"),
    col("IncorporationDate").alias("incorporation_date"),
    col("Accounts_AccountCategory").alias("accounts_category"),
    # Select SIC codes
    col("SICCode_SicText_1").alias("sic_1"),
    col("SICCode_SicText_2").alias("sic_2"),
    col("SICCode_SicText_3").alias("sic_3"),
    col("SICCode_SicText_4").alias("sic_4"),
    # Previous Names (We will process these separately)
    *[
        col(f"PreviousName_{i}_CompanyName").alias(f"prev_name_{i}")
        for i in range(1, 11)
    ],
    *[col(f"PreviousName_{i}_CONDATE").alias(f"prev_date_{i}") for i in range(1, 11)],
).filter(
    col("company_number").isNotNull()
    & col("company_name").isNotNull()
    & col("comp_postcode").isNotNull()
    & col("comp_addr1").isNotNull()
    & col("comp_addr2").isNotNull()
)

# Load PSC Data (JSON Lines)
psc_df = spark.read.json(psc_files[0])

# Filter valid records
# TODO: extend the filter to match ***_id requirements
psc_clean = psc_df.filter(col("company_number").isNotNull())

26/02/15 00:26:40 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


## Data Enrichment and Caching

We now join the cleaned PSC dataset with the company information using an inner join on the `company_number` field. This unifies the controller data with the details of the company they control, creating a single "enriched" dataset.

Because this enriched DataFrame serves as the source for multiple downstream branches, generating nodes for Companies, Persons, Addresses, and Organizations, we explicitly cache it in memory so Spark does not have to recompute it for each action, improving the overall performance of the pipeline.

In [6]:
# Join PSC data with Company Data
# This replaces the Python Dictionary lookup
enriched_df = psc_clean.join(company_clean, "company_number", "inner")

# Caching this is crucial as we will branch off multiple writes from this DF
enriched_df.cache()
print(f"Total Enriched Records: {enriched_df.count()}")



Total Enriched Records: 5435751


                                                                                

## Main Data Transformation

Next we perform the core transformations required to prepare the data for the graph model. We generate unique IDs for Persons, Organizations, and Addresses using the hashing logic defined earlier. For individuals, the ID is a composite of their name and date of birth; for organizations, it relies on their local registration number; and for addresses, it uses various concatenated address fields.

We also standardize all date fields into a consistent format and apply the geospatial enrichment logic. The system calculates coordinates for both the company's registered address and the controller's address. Finally, we invoke the control level parser to extract the numerical voting and ownership percentages, appending these as distinct columns to the final DataFrame.

In [7]:
# Prepare the main "Controller" dataset (Persons and Orgs)
# We generate IDs and standardize dates here

final_df = (
    enriched_df.withColumn(
        "person_id",
        when(
            col("data.kind").rlike("individual"),
            md5(
                concat(
                    coalesce(col("data.name"), lit("UNKNOWN")),
                    lit("|"),
                    col("data.date_of_birth.year").cast("string"),
                    lit("|"),
                    col("data.date_of_birth.month").cast("string"),
                )
            ),
        ).otherwise(None),
    )
    .withColumn(
        "org_id",
        when(
            col("data.kind").rlike("corporate|legal"),
            md5(col("data.identification.registration_number")),
        ).otherwise(None),
    )
    .withColumn(
        # Company Address ID
        "comp_address_id",
        md5(
            concat(
                coalesce(col("comp_postcode"), lit("")),
                lit("|"),
                coalesce(col("comp_addr1"), lit("")),
                lit("|"),
                coalesce(col("comp_addr2"), lit("")),
            )
        ),
    )
    .withColumn(
        # Controller Address ID
        "ctrl_address_id",
        md5(
            concat(
                coalesce(col("data.address.postal_code"), lit("")),
                lit("|"),
                coalesce(col("data.address.address_line_1"), lit("")),
                lit("|"),
                coalesce(col("data.address.address_line_2"), lit("")),
                lit("|"),
                coalesce(col("data.address.locality"), lit("")),
                lit("|"),
                coalesce(col("data.address.country"), lit("")),
            )
        ),
    )
    .withColumn(
        # Standardize Dates
        "formatted_dissolution_date",
        to_date(col("dissolution_date"), "dd/MM/yyyy"),
    )
    .withColumn(
        "formatted_incorporation_date", to_date(col("incorporation_date"), "dd/MM/yyyy")
    )
    .withColumn("formatted_ceased_on", to_date(col("data.ceased_on"), "yyyy-MM-dd"))
    .withColumn("formatted_notified_on", to_date(col("data.notified_on"), "yyyy-MM-dd"))
)

# Calculate Geo-coordinates
# Company Locations
comp_geo = (
    final_df.select("comp_postcode")
    .distinct()
    .withColumn("geo", geocode_postcode(col("comp_postcode")))
    .select(
        "comp_postcode",
        col("geo.latitude").alias("comp_lat"),
        col("geo.longitude").alias("comp_lon"),
    )
)

# Controller Locations
ctrl_geo = (
    final_df.select(col("data.address.postal_code").alias("ctrl_postcode"))
    .distinct()
    .withColumn("geo", geocode_postcode(col("ctrl_postcode")))
    .select(
        "ctrl_postcode",
        col("geo.latitude").alias("ctrl_lat"),
        col("geo.longitude").alias("ctrl_lon"),
    )
)

# Join Geo back
final_df = final_df.join(comp_geo, "comp_postcode", "left").join(
    ctrl_geo, col("data.address.postal_code") == col("ctrl_postcode"), "left"
)

# Calculate Control Levels (Voting Rights and Ownership of Shares)
final_df = (
    final_df.withColumn(
        "control_levels", parse_control_levels(col("data.natures_of_control"))
    )
    .select(
        "*",
        col("control_levels.voting_rights_min").alias("voting_rights_min"),
        col("control_levels.voting_rights_max").alias("voting_rights_max"),
        col("control_levels.ownership_of_shares_min").alias("ownership_of_shares_min"),
        col("control_levels.ownership_of_shares_max").alias("ownership_of_shares_max"),
    )
    .drop("control_levels")
)

## Schema Constraint Enforcement

Before writing any data, we create uniqueness constraints in Neo4j. We define uniqueness for critical properties such as company numbers, person UIDs, and address UIDs. These constraints are vital for maintaining data integrity, preventing duplicate nodes during the load process, and ensuring high performance for merge operations and lookups.

In [8]:
from neo4j import GraphDatabase

CONSTRAINTS = [
    "CREATE CONSTRAINT company_number_unique IF NOT EXISTS FOR (c:Company) REQUIRE c.number IS UNIQUE",
    "CREATE CONSTRAINT person_uid_unique IF NOT EXISTS FOR (p:Person) REQUIRE p.uid IS UNIQUE",
    "CREATE CONSTRAINT org_uid_unique IF NOT EXISTS FOR (o:Organization) REQUIRE o.uid IS UNIQUE",
    "CREATE CONSTRAINT prev_name_unique IF NOT EXISTS FOR (n:PreviousName) REQUIRE n.name IS UNIQUE",
    "CREATE CONSTRAINT address_id_unique IF NOT EXISTS FOR (a:Address) REQUIRE a.uid IS UNIQUE",
    "CREATE CONSTRAINT country_name_unique IF NOT EXISTS FOR (c:Country) REQUIRE c.name IS UNIQUE",
    "CREATE CONSTRAINT company_category_unique IF NOT EXISTS FOR (cc:CompanyCategory) REQUIRE cc.name IS UNIQUE",
    "CREATE CONSTRAINT company_status_unique IF NOT EXISTS FOR (cs:CompanyStatus) REQUIRE cs.name IS UNIQUE",
    "CREATE CONSTRAINT sic_code_unique IF NOT EXISTS FOR (s:SICCode) REQUIRE s.code IS UNIQUE",
]
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
with driver.session(database=NEO4J_DATABASE) as session:
    for q in CONSTRAINTS:
        try:
            session.run(q)
        except Exception as e:
            print("constraint skipped:", e)
print("Constraints ensured")
driver.close()

Constraints ensured


## Schema Verification

Finally, we output the schema of the final transformed DataFrame. This allows us to inspect the data types and column structures to ensure that all transformations, such as the struct-based nesting of address data and the casting of date fields, have been applied correctly before we commence the write operations.

In [9]:
# Show columns present in the final enriched DataFrame
final_df.printSchema()

root
 |-- comp_postcode: string (nullable = true)
 |-- company_number: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- address_line_1: string (nullable = true)
 |    |    |-- address_line_2: string (nullable = true)
 |    |    |-- care_of: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- locality: string (nullable = true)
 |    |    |-- po_box: string (nullable = true)
 |    |    |-- postal_code: string (nullable = true)
 |    |    |-- premises: string (nullable = true)
 |    |    |-- region: string (nullable = true)
 |    |-- ceased: boolean (nullable = true)
 |    |-- ceased_on: string (nullable = true)
 |    |-- country_of_residence: string (nullable = true)
 |    |-- date_of_birth: struct (nullable = true)
 |    |    |-- month: long (nullable = true)
 |    |    |-- year: long (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- etag: string (nullabl

## Data Profiling

Let's now perform a statistical description of the parsed control level columns. This quick profiling step helps us validate the parsing logic by showing the count, mean, min, and max values for voting rights and share ownership. It ensures that the extracted values fall within expected ranges (e.g., 25 to 100 percent) and that the parsing UDF functioned as intended across the dataset.

In [10]:
# Show the range of values in the control levels to verify parsing logic
(
    final_df.select(
        "voting_rights_min",
        "voting_rights_max",
        "ownership_of_shares_min",
        "ownership_of_shares_max",
    )
    .describe()
    .show()
)



+-------+------------------+------------------+-----------------------+-----------------------+
|summary| voting_rights_min| voting_rights_max|ownership_of_shares_min|ownership_of_shares_max|
+-------+------------------+------------------+-----------------------+-----------------------+
|  count|           3619841|           3619841|                4450188|                4450188|
|   mean| 52.17390073210398| 77.38915189921325|     52.197811643013736|      77.43575664668549|
| stddev|24.298895795572154|24.278217832542193|     24.235118036823504|     24.211658849643058|
|    min|                25|                50|                     25|                     50|
|    max|                75|               100|                     75|                    100|
+-------+------------------+------------------+-----------------------+-----------------------+



                                                                                

## Loading Nodes into Neo4j

We now handle the creation of all nodes in the graph. We systematically project specific columns from the cached DataFrame to create distinct node sets for Companies, Addresses, Persons, Organizations, Countries, and other reference data like Company Categories and [SIC Codes](https://resources.companieshouse.gov.uk/sic/).

For each node label, we select the relevant properties, remove duplicates based on the node key, and write the data to Neo4j using the `Overwrite` mode. This idempotent approach ensures that if the pipeline is re-run, we simply update the existing entities rather than creating duplicates.

In [11]:
# Cache the dataframe in memory to avoid lazy evaluation overhead during multiple writes
final_df.cache()

# Force the computation immediately (Trigger the cache)
print(f"Cached data... Row count: {final_df.count()}")

# (:Company)
(
    final_df.select(
        col("company_number").alias("number"),
        col("company_name").alias("name"),
        col("formatted_dissolution_date").alias("dissolution_date"),
        col("formatted_incorporation_date").alias("incorporation_date"),
        col("accounts_category"),
    )
    .dropDuplicates(["number"])
    .write.format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":Company")
    .option("node.keys", "number")
    .save()
)

# (:Address)
# Company Addresses
(
    final_df.select(
        col("comp_address_id").alias("uid"),
        col("comp_addr1").alias("line_1"),
        col("comp_addr2").alias("line_2"),
        col("comp_postcode").alias("postcode"),
        col("comp_town").alias("locality"),
        col("comp_lat").alias("latitude"),
        col("comp_lon").alias("longitude"),
    )
    .filter(col("uid").isNotNull())
    .dropDuplicates(["uid"])
    .write.format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":Address")
    .option("node.keys", "uid")
    .save()
)

# Controller Addresses
(
    final_df.select(
        col("ctrl_address_id").alias("uid"),
        col("data.address.address_line_1").alias("line_1"),
        col("data.address.address_line_2").alias("line_2"),
        col("data.address.postal_code").alias("postcode"),
        col("data.address.locality").alias("locality"),
        col("ctrl_lat").alias("latitude"),
        col("ctrl_lon").alias("longitude"),
    )
    .filter(col("uid").isNotNull())
    .dropDuplicates(["uid"])
    .write.format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":Address")
    .option("node.keys", "uid")
    .save()
)

# (:Person)
(
    final_df.filter(col("person_id").isNotNull())
    .select(
        col("person_id").alias("uid"),
        col("data.name").alias("name"),
        col("data.date_of_birth.year").alias("birth_year"),
        col("data.date_of_birth.month").alias("birth_month"),
        col("data.nationality").alias("nationality"),
        col("data.is_sanctioned").alias("sanctioned"),
    )
    .filter(col("uid").isNotNull())
    .dropDuplicates(["uid"])
    .write.format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":Person")
    .option("node.keys", "uid")
    .save()
)

# (:Organization)
(
    final_df.filter(col("org_id").isNotNull())
    .select(
        col("org_id").alias("uid"),
        col("data.name").alias("name"),
        col("data.identification.registration_number").alias("registration_number"),
        col("data.is_sanctioned").alias("sanctioned"),
        col("formatted_ceased_on").alias("ceased_on"),
    )
    .filter(col("uid").isNotNull())
    .dropDuplicates(["uid"])
    .write.format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":Organization")
    .option("node.keys", "uid")
    .save()
)

# (:Country)
# From base company data
(
    final_df.filter(col("country_of_origin").isNotNull())
    .select(initcap(col("country_of_origin")).alias("name"))
    .dropDuplicates(["name"])
    .write.format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":Country")
    .option("node.keys", "name")
    .save()
)

# From controller addresses
(
    final_df.filter(col("data.address.country").isNotNull())
    .select(initcap(col("data.address.country")).alias("name"))
    .dropDuplicates(["name"])
    .write.format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":Country")
    .option("node.keys", "name")
    .save()
)

# (:CompanyCategory)
(
    final_df.filter(col("company_category").isNotNull())
    .select(col("company_category").alias("name"))
    .dropDuplicates(["name"])
    .write.format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":CompanyCategory")
    .option("node.keys", "name")
    .save()
)

# (:CompanyStatus)
(
    final_df.filter(col("company_status").isNotNull())
    .select(col("company_status").alias("name"))
    .dropDuplicates(["name"])
    .write.format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":CompanyStatus")
    .option("node.keys", "name")
    .save()
)

# (:SupervisoryAuthority)
(
    final_df.filter(
        col(
            "data.identity_verification_details.anti_money_laundering_supervisory_bodies"
        ).isNotNull()
    )
    .select(
        explode(
            col(
                "data.identity_verification_details.anti_money_laundering_supervisory_bodies"
            )
        ).alias("name")
    )
    .dropDuplicates(["name"])
    .write.format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":SupervisoryAuthority")
    .option("node.keys", "name")
    .save()
)

# (:AuthorisedCorporateServiceProvider)
(
    final_df.filter(
        col(
            "data.identity_verification_details.authorised_corporate_service_provider_name"
        ).isNotNull()
    )
    .select(
        col(
            "data.identity_verification_details.authorised_corporate_service_provider_name"
        ).alias("name")
    )
    .dropDuplicates(["name"])
    .write.format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":AuthorisedCorporateServiceProvider")
    .option("node.keys", "name")
    .save()
)

# (:SICCode)
# SIC Codes need unpivoting
sics = final_df.select(
    explode(array("sic_1", "sic_2", "sic_3", "sic_4")).alias("code")
).filter(col("code").isNotNull())
(
    sics.select(col("code").alias("code"))
    .dropDuplicates(["code"])
    .write.format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":SICCode")
    .option("node.keys", "code")
    .save()
)

                                                                                

Cached data... Row count: 5435751


                                                                                

### Loading Relationships

Now we define the connections between the nodes. We utilize a helper function, `write_rel`, to standardize the syntax for writing relationships. This function manages the projection of source and target keys and can optionally partition the data to optimize write performance.

We establish structural relationships such as `HAS_CATEGORY` and `REGISTERED_AT`, as well as the core governance relationships like `CONTROLS` (connecting Persons and Organizations to Companies). We also capture detailed edges like `HAS_SIC` for industry classification and `VERIFIED_BY` to link persons to their identity verifiers, including properties on the relationships where applicable (e.g., the date a person ceased control).

In [12]:
from pyspark.sql.functions import col, expr


def write_rel(
    df,
    source_label,
    source_key,
    source_col,
    target_label,
    target_key,
    target_col,
    rel_type,
    rel_props=None,
    partition_col=None,
    paralellism=False,
):
    """
    Helper method to write relationships with optional properties and smart partitioning.
    Parameters:
        - df: Input DataFrame containing source and target columns
        - source_label: Label of thesource node (e.g. "Company")
        - source_key: Key property of the source node (e.g. "number")
        - source_col: Column in the DataFrame that maps to the sourcekey (e.g. "company_number")
        - target_label: Label of the target node (e.g. "CompanyCategory")
        - target_key: Key property of the target node (e.g."name")
        - target_col: Column in the DataFrame that maps to the target key (e.g. "company_category")
        - rel_type: Type of the relationship (e.g. "HAS_CATEGORY")
        - rel_props: Optional list of additional columns to include as relationship properties (e.g.["notified_on"])
        - partition_col: Optional column name to use for partitioning (e.g. "company_number") - if None, defaults to partitioning by target key to keep chains local
    """

    cols_to_select = [col(source_col).alias("source"), col(target_col).alias("target")]
    if rel_props:
        for prop in rel_props:
            cols_to_select.append(col(prop))

    # Add partition col if it's not already covered
    if partition_col and partition_col not in [source_col, target_col]:
        cols_to_select.append(col(partition_col))

    rel_df = (
        df.select(*cols_to_select)
        .filter(col("source").isNotNull() & col("target").isNotNull())
        .dropDuplicates(["source", "target"])
    )

    # If a specific partition column is provided (e.g. company_number), use it.
    # Otherwise, default to the safe 'target' strategy.
    if partition_col:
        print(f"   > Strategy: Partitioning by '{partition_col}' to keep chains local.")
        rel_df = rel_df.repartition(col(partition_col))
    else:
        rel_df = rel_df.repartition(col("target"))

    if not paralellism:
        rel_df = rel_df.repartition(1)

    (
        rel_df.write.format("org.neo4j.spark.DataSource")
        .mode("Append")
        .option("relationship", rel_type)
        .option("relationship.save.strategy", "keys")
        .option("relationship.source.labels", f":{source_label}")
        .option("relationship.source.node.keys", f"source:{source_key}")
        .option("relationship.target.labels", f":{target_label}")
        .option("relationship.target.node.keys", f"target:{target_key}")
        .save()
    )


# (:Company)-[:HAS_CATEGORY]->(:CompanyCategory)
write_rel(
    final_df,
    "Company",
    "number",
    "company_number",
    "CompanyCategory",
    "name",
    "company_category",
    "HAS_CATEGORY",
)

# (:Company)-[:HAS_STATUS]->(:CompanyStatus)
write_rel(
    final_df,
    "Company",
    "number",
    "company_number",
    "CompanyStatus",
    "name",
    "company_status",
    "HAS_STATUS",
)

# (:Company)-[:ORIGINATES_FROM]->(:Country)
write_rel(
    final_df,
    "Company",
    "number",
    "company_number",
    "Country",
    "name",
    "country_of_origin",
    "ORIGINATES_FROM",
)

# (:Company)-[:REGISTERED_AT]->(:Address)
write_rel(
    final_df,
    "Company",
    "number",
    "company_number",
    "Address",
    "uid",
    "comp_address_id",
    "REGISTERED_AT",
)

# (:Company)-[:HAS_SIC]->(:SICCode) (Many-to-Many)
for i in range(1, 5):
    write_rel(
        final_df,
        "Company",
        "number",
        "company_number",
        "SICCode",
        "code",
        f"sic_{i}",
        "HAS_SIC",
    )

# (:Person)-[:CONTROLS]->(:Company)
# Pre-process: Filter nulls from array and flatten columns
persons_cleaned_df = (
    final_df.filter(col("person_id").isNotNull())
    .withColumn(
        "natures_of_control",
        expr("filter(data.natures_of_control, x -> x is not null)"),
    )
    .select(
        col("person_id"),
        col("company_number"),
        col("natures_of_control"),
        col("formatted_notified_on").alias("notified_on"),
        col("formatted_ceased_on").alias("ceased_on"),
        col("voting_rights_min"),
        col("voting_rights_max"),
        col("ownership_of_shares_min"),
        col("ownership_of_shares_max"),
    )
)

write_rel(
    persons_cleaned_df,
    "Person",
    "uid",
    "person_id",
    "Company",
    "number",
    "company_number",
    "CONTROLS",
    rel_props=[
        "natures_of_control",
        "notified_on",
        "ceased_on",
        "voting_rights_min",
        "voting_rights_max",
        "ownership_of_shares_min",
        "ownership_of_shares_max",
    ],
)

# (:Person)-[:LIVES_AT]->(:Address)
write_rel(
    final_df,
    "Person",
    "uid",
    "person_id",
    "Address",
    "uid",
    "ctrl_address_id",
    "LIVES_AT",
)

# (:Person)-[:RESIDES_IN]->(:Country)
write_rel(
    final_df,
    "Person",
    "uid",
    "person_id",
    "Country",
    "name",
    "data.country_of_residence",
    "RESIDES_IN",
)

# (:Person)-[:SUPERVISED_BY]->(:SupervisoryAuthority)
# Flatten the data for Persons
person_supervision_df = (
    final_df.filter(col("person_id").isNotNull())
    .select(
        col("person_id"),
        # Explode the array so each body gets its own relationship
        explode(
            col(
                "data.identity_verification_details.anti_money_laundering_supervisory_bodies"
            )
        ).alias("authority_name"),
        # Include and format the dates
        to_date(
            col("data.identity_verification_details.appointment_verification_start_on")
        ).alias("verification_start"),
        to_date(
            col("data.identity_verification_details.appointment_verification_end_on")
        ).alias("verification_end"),
    )
    .filter(col("authority_name").isNotNull())
)

write_rel(
    person_supervision_df,
    "Person",
    "uid",
    "person_id",
    "SupervisoryAuthority",
    "name",
    "authority_name",
    "SUPERVISED_BY",
    rel_props=["verification_start", "verification_end"],
)

# (:Person)-[:VERIFIED_BY]->(:AuthorisedCorporateServiceProvider)
verified_df = (
    final_df.filter(col("person_id").isNotNull())
    .select(
        col("person_id"),
        # Alias the nested ACSP name to a top-level column
        col(
            "data.identity_verification_details.authorised_corporate_service_provider_name"
        ).alias("acsp_name"),
        to_date(col("data.identity_verification_details.identity_verified_on")).alias(
            "verified_on"
        ),
    )
    .filter(col("acsp_name").isNotNull())
)

write_rel(
    verified_df,
    "Person",
    "uid",
    "person_id",
    "AuthorisedCorporateServiceProvider",
    "name",
    "acsp_name",
    "VERIFIED_BY",
)

# (:Organization)-[:CONTROLS]->(:Company)
# Pre-process: Filter nulls from array and flatten columns
orgs_cleaned_df = (
    final_df.filter(col("org_id").isNotNull())
    .withColumn(
        "natures_of_control",
        expr("filter(data.natures_of_control, x -> x is not null)"),
    )
    .select(
        col("org_id"),
        col("company_number"),
        col("natures_of_control"),
        col("formatted_notified_on").alias("notified_on"),
        col("formatted_ceased_on").alias("ceased_on"),
        col("voting_rights_min"),
        col("voting_rights_max"),
        col("ownership_of_shares_min"),
        col("ownership_of_shares_max"),
    )
)

write_rel(
    orgs_cleaned_df,
    "Organization",
    "uid",
    "org_id",
    "Company",
    "number",
    "company_number",
    "CONTROLS",
    rel_props=[
        "natures_of_control",
        "notified_on",
        "ceased_on",
        "voting_rights_min",
        "voting_rights_max",
        "ownership_of_shares_min",
        "ownership_of_shares_max",
    ],
)

# (:Organization)-[:REGISTERED_AT]->(:Address)
write_rel(
    final_df,
    "Organization",
    "uid",
    "org_id",
    "Address",
    "uid",
    "ctrl_address_id",
    "REGISTERED_AT",
)

# (:Organization)-[:BASED_IN]->(:Country)
write_rel(
    final_df,
    "Organization",
    "uid",
    "org_id",
    "Country",
    "name",
    "data.identification.country_registered",
    "BASED_IN",
    rel_props=["data.identification.registration_number"],
)

                                                                                

### Previous Name History

The final stage of the pipeline processes the history of company names. Since a company can change its name multiple times, we handle this data as a linked list in the graph. We first explode the array of previous names into individual rows.

We create `:PreviousName` nodes and link the company to its first known previous name via a `HAS_PREVIOUS_NAME` relationship. We then use a window function to look ahead in the history and link each name to its successor using a `PREVIOUS_NAME_OF` relationship. This structure allows us to traverse the linear timeline of a company's identity changes directly within the graph.

In [13]:
from pyspark.sql.functions import col, lit, struct, array, explode, to_date

# The fix is in the .alias("name") and .alias("date") calls INSIDE the list comprehension.
history_df = (
    final_df.select(
        col("company_number"),
        explode(
            array(
                [
                    struct(
                        lit(i).alias("idx"),
                        col(f"prev_name_{i}").alias("name"),
                        col(f"prev_date_{i}").alias("date"),
                    )
                    for i in range(1, 11)
                ]
            )
        ).alias("hist"),
    )
    .select(
        col("company_number"),
        col("hist.idx").alias("idx"),
        col("hist.name").alias("name"),
        to_date(col("hist.date"), "dd/MM/yyyy").alias("date"),
    )
    .filter(col("name").isNotNull())
)

(
    history_df.select("name")
    .dropDuplicates(["name"])
    .write.format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":PreviousName")
    .option("node.keys", "name")
    .save()
)

first_prev_df = history_df.filter(col("idx") == 1).select(
    col("company_number"), col("name"), col("date").alias("changed_on")
)

write_rel(
    df=first_prev_df,
    source_label="Company",
    source_key="number",
    source_col="company_number",
    target_label="PreviousName",
    target_key="name",
    target_col="name",
    rel_type="HAS_PREVIOUS_NAME",
    rel_props=["changed_on"],
)

w = Window.partitionBy("company_number").orderBy("idx")

chain_df = (
    history_df
    # Look ahead to find the next name
    .withColumn("next_name", lead("name", 1).over(w))
    .withColumn("next_date", lead("date", 1).over(w))
    # Ensure there IS a next name
    .filter(col("next_name").isNotNull())
    # If the next name is the same as the current name, do not create a relationship
    .filter(col("name") != col("next_name"))
    .select(col("name"), col("next_name"), col("next_date").alias("changed_on"))
)

write_rel(
    df=chain_df,
    source_label="PreviousName",
    source_key="name",
    source_col="name",
    target_label="PreviousName",
    target_key="name",
    target_col="next_name",
    rel_type="PREVIOUS_NAME_OF",
    rel_props=["changed_on"],
)

print("Loading Complete.")

[Stage 860:>                                                        (0 + 1) / 1]

Loading Complete.


                                                                                

We are now ready to perform further analysis and querying on the enriched corporate graph, enabling complex traversals to uncover hidden relationships and insights for KYC/AML, or other regulatory or analytical purposes. Prior to that, let us visualise the resulting schema of the graph to confirm that all nodes and relationships have been created as expected.

In [None]:
from neo4j_analysis import Neo4jAnalysis
from neo4j_viz.neo4j import from_neo4j, ColorSpace

analysis = Neo4jAnalysis(NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD, NEO4J_DATABASE)
schema_result = analysis.run_query_viz(
    """
    CALL apoc.meta.graph()
    """
)

VG = from_neo4j(schema_result)
VG.color_nodes(
    field="caption",  # Using the internal labels property
    color_space=ColorSpace.DISCRETE,
    colors={
        "Country": "#1f77b4",  # Blue for Countries
        "Address": "#ff7f0e",  # Orange for Addresses
        "Person": "#2ca02c",  # Green for Persons
        "PreviousName": "#d62728",  # Red for Previous Names
        "SICCode": "#9467bd",  # Purple for SIC Codes
        "Company": "#8c564b",  # Brown for Companies
        "CompanyCategory": "#e377c2",  # Pink for Company Categories
        "CompanyStatus": "#7f7f7f",  # Gray for Company Statuses
        "SupervisoryAuthority": "#bcbd22",  # Olive for Supervisory Authorities
        "AuthorisedCorporateServiceProvider": "#17becf",  # Cyan for Authorised Corporate Service Providers,
        "Organization": "#aec7e8",  # Light Blue for Organizations
    },
)
generated_html = VG.render(layout="forcedirected", initial_zoom=1.6)
await analysis.capture_graph_to_png(generated_html, "renderings/schema_graph.png")

26/02/16 03:06:06 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 823377 ms exceeds timeout 120000 ms
26/02/16 03:06:06 WARN SparkContext: Killing executors is not supported by current scheduler.
26/02/16 03:06:13 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

![Graph Schema](renderings/schema_graph.png)