---
layout: post
title:  ICIJ Fraud Analysis
date:   2025-12-06
categories: [Spark, Scala]
mermaid: true
maths: true
typora-root-url: /Users/ojitha/GitHub/ojitha.github.io
typora-copy-images-to: ../../blog/assets/images/${filename}
excerpt: '<div class="image-text-container"><div class="image-column"><img src="https://raw.githubusercontent.com/ojitha/blog/master/assets/images/2025-11-07-SparkDataset/DatasetAPI.jpg" alt="Scala Functors" width="150" height="150" /></div><div class="text-column">To be filled</div></div>'
---

<!--more-->

------

* TOC
{:toc}
------

## Introduction

ICIJ stands for the International Consortium of Investigative Journalists. It is a non-profit news organisation based in Washington, D.C., that operates as a global network of reporters and media organisations. They are best known for coordinating massive, cross-border investigations into corruption, money laundering, and tax abuse.

### What is in the database?

It is not just one leak; it is a "master list" combined from five different massive investigations. It contains data on more than **810,000 offshore companies**, trusts, and foundations from:

-   **Pandora Papers (2021):** The most recent major addition, exposing 35 world leaders.
-   **Paradise Papers (2017):** Focused heavily on multinational corporations (like Apple and Nike) and the ultra-wealthy.
-   **Bahamas Leaks (2016):** A leak from the corporate registry of the Bahamas.
-   **Panama Papers (2016):** The famous leak from the law firm Mossack Fonseca.
-   **Offshore Leaks (2013):** The original investigation that started the project.

### What does it actually show?

The database does **not** show bank balances, emails, or money transfers. It shows **relationships**.

-   **Beneficial Owners:** The actual human beings who own the companies (often hidden behind nominees).
-   **Intermediaries:** The lawyers, banks, and accountants who helped set up the structures.
-   **Addresses:** Physical locations linked to the owners. (You can literally search your own city to see who in your neighbourhood has an offshore account.)

The ICIJ Offshore Leaks Database is a free, publicly accessible search engine that tracks the ownership of anonymous shell companies. The ICIJ Offshore Leaks Database is not just a spreadsheet; it is a Graph Database exported into CSV format. To understand the hidden wealth of nations, you must understand how to reconstruct these fragments. The data is split into two primary concepts: Nodes (actors) and Relationships (actions).

-   **Nodes:** Every row represents a distinct object. These are not all companies; they are categorised into four different roles. Understanding this distribution is critical for filtering your analysis.
    -   `EntityNode`: The offshore companies, trusts, or foundations created in tax havens.
    -   `OfficerNode`: The people or companies playing a role (Director, Shareholder, Beneficiary).
    -   `Intermediary`: The lawyers, banks, or accountants (middle-men) who facilitate the setup.
    -   `AddressNode`: Physical locations linked to the other nodes.
    -   `OtherNode`
-   **Edges:** `Relationship`: The relationships.csv file is the bridge. It contains no names, only IDs. It connects a Start Node to an End Node via a specific Relationship Type. Without this file, the nodes are isolated islands of data.

![Nodes Relationships](../blog/assets/images/2025-12-06-ICIJ-Fraud-Analysis/nodes-relationships.png)




In [1]:
// Enable compiler to use Java classpath (REMOVED the invalid doc.value line)
interp.configureCompiler(c => {
c.settings.usejavacp.value = true
})

// Configure Coursier to fetch doc JARs
// Import Spark
import $ivy.`org.apache.spark:spark-sql_2.13:3.5.7`
// import Almond Spark plugin
// import $ivy.`sh.almond::almond-spark:0.14.0-RC8`

import org.apache.logging.log4j.{LogManager, Level}
import org.apache.logging.log4j.core.config.Configurator
// Set log levels BEFORE creating SparkSession
Configurator.setRootLevel(Level.WARN)
Configurator.setLevel("org.apache.spark", Level.WARN)
Configurator.setLevel("org.apache.spark.executor.Executor", Level.WARN)

[32mimport [39m[36m$ivy.$[39m
[32mimport [39m[36morg.apache.logging.log4j.{LogManager, Level}[39m
[32mimport [39m[36morg.apache.logging.log4j.core.config.Configurator[39m

In [2]:
import org.apache.spark.sql._
import org.apache.log4j.{Level, Logger}

// Silence logs
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)

val spark = {
  NotebookSparkSession.builder()
    .appName("2025-11-07-SparkDataset")
    .master("local[*]")
    .config("spark.driver.host", "localhost")
    .config("spark.driver.bindAddress", "0.0.0.0")
    .config("spark.driver.memory", "6g")
    .config("spark.ui.showConsoleProgress", "false")
    .config("spark.sql.repl.eagerEval.enabled", "true")
    .getOrCreate()
}

// ‚≠ê CRITICAL: This line forces the UI widget to persist after the cell is done
spark

spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._

println(s"‚úÖ Spark ${spark.version} ready")
println(s"üåê Spark UI: http://localhost:4040")

val filePath="" // path for jupyter

22:18:02.498 [scala-kernel-interpreter-1] WARN  org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


‚úÖ Spark 3.5.7 ready
üåê Spark UI: http://localhost:4040


[32mimport [39m[36morg.apache.spark.sql._[39m
[32mimport [39m[36morg.apache.log4j.{Level, Logger}[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@38181f78
[36mres2_5[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@38181f78
[32mimport [39m[36mspark.implicits._[39m
[36mfilePath[39m: [32mString[39m = [32m""[39m

In [3]:
// Run this once to disable line-wrapping in output
val style = """
<style>
.jp-OutputArea-output pre {
    white-space: pre !important;
}
</style>
"""
kernel.publish.html(style)

[36mstyle[39m: [32mString[39m = [32m"""
<style>
.jp-OutputArea-output pre {
    white-space: pre !important;
}
</style>
"""[39m

In [4]:
// create a case class for the `nodes-entities.csv`
/**
  * Represents an entity from the nodes-entities.csv file.
  *
  * @param node_id The unique identifier for the node.
  * @param name The name of the entity.
  * @param original_name The original name of the entity, if different.
  * @param former_name A previous name for the entity.
  * @param jurisdiction The legal jurisdiction of the entity.
  * @param jurisdiction_description A description of the jurisdiction.
  * @param company_type The type of company (e.g., Limited, Corp).
  * @param address The registered address of the entity.
  * @param incorporation_date The date the entity was incorporated.
  * @param inactivation_date The date the entity was inactivated.
  * @param struck_off_date The date the entity was struck off the register.
  * @param status The current status of the entity.
  * @param service_provider The service provider associated with the entity.
  * @param ibc_ruc The IBC or RUC number.
  * @param country_codes The country codes associated with the entity.
  * @param countries The countries associated with the entity.
  * @param sourceID The ID of the data source.
  * @param valid_until The date until which the data is considered valid.
  * @param note Any additional notes.
  */
case class EntityNode(
    node_id: String,
    name: Option[String],
    original_name: Option[String],
    former_name: Option[String],
    jurisdiction: Option[String],
    jurisdiction_description: Option[String],
    company_type: Option[String],
    address: Option[String],
    incorporation_date: Option[String],
    inactivation_date: Option[String],
    struck_off_date: Option[String],
    status: Option[String],
    service_provider: Option[String],
    ibcRuc: Option[String],
    country_codes: Option[String],
    countries: Option[String],
    sourceID: Option[String],
    valid_until: Option[String],
    note: Option[String]
)


defined [32mclass[39m [36mEntityNode[39m

In [7]:
// Read CSV and convert to Dataset
val entityNodeDS = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv(s"${filePath}full-oldb.LATEST/nodes-entities.csv")
  .as[EntityNode]

// Example queries
entityNodeDS.show(false)

+--------+--------------------------------------------+----------------------------------------------------------+------------------------+------------+------------------------+------------+---------------------------------------------------------------------------------------------------------------------------------------------+-----------+------------------+-----------------+---------------+---------+-------------+----------------+------+-------------+-----------+-------------+----------------------------------------------+----+
|node_id |name                                        |original_name                                             |former_name             |jurisdiction|jurisdiction_description|company_type|address                                                                                                                                      |internal_id|incorporation_date|inactivation_date|struck_off_date|dorm_date|status       |service_provider|ibcRUC|country_codes|cou

[36mentityNodeDS[39m: [32mDataset[39m[[32mEntityNode[39m] = [node_id: string, name: string ... 19 more fields]

In [5]:
// create a case class for the `nodes-intermediaries.csv`
/**
  * Represents an intermediary from the nodes-intermediaries.csv file.
  *
  * @param node_id The unique identifier for the node.
  * @param name The name of the intermediary.
  * @param status The current status of the intermediary.
  * @param internal_id An internal identifier used by the source.
  * @param address The address of the intermediary.
  * @param countries The countries associated with the intermediary.
  * @param country_codes The country codes associated with the intermediary.
  * @param sourceID The ID of the data source.
  * @param valid_until The date until which the data is considered valid.
  * @param note Any additional notes.
  */
case class Intermediary(
    node_id: String,
    name: String,
    status: Option[String],
    internal_id: Option[String],
    address: Option[String],
    countries: Option[String],
    country_codes: Option[String],
    sourceID: String,
    valid_until: String,
    note: Option[String]
)

defined [32mclass[39m [36mIntermediary[39m

In [6]:
// Read CSV and convert to Dataset
val intermediaryDS = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv(s"${filePath}full-oldb.LATEST/nodes-intermediaries.csv")
  .as[Intermediary]

// Example queries
intermediaryDS.show(false)

+--------+-----------------------------------+----------------------+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------+-------------+-------------+-----------------------------------------------+----+
|node_id |name                               |status                |internal_id|address                                                                                                                                                    |countries            |country_codes|sourceID     |valid_until                                    |note|
+--------+-----------------------------------+----------------------+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------+-------------+-------------+-------------------------------

[36mintermediaryDS[39m: [32mDataset[39m[[32mIntermediary[39m] = [node_id: string, name: string ... 8 more fields]

In [8]:
// create a case class for the `nodes-intermediaries.csv`
/**
  * Represents a node from the nodes-others.csv file.
  *
  * @param node_id The unique identifier for the node.
  * @param name The name associated with the node.
  * @param `type` The type of the node (e.g., 'company', 'person').
  * @param country_codes The country codes associated with the node.
  * @param countries The countries associated with the node.
  * @param sourceID The ID of the data source.
  * @param valid_until The date until which the data is considered valid.
  * @param note Any additional notes.
  */
case class OtherNode(
  node_id: String,
  name: Option[String],
  `type`: Option[String],
  country_codes: Option[String],
  countries: Option[String],
  sourceID: Option[String],
  valid_until: Option[String],
  note: Option[String]
)


defined [32mclass[39m [36mOtherNode[39m

In [9]:
// Read CSV and convert to Dataset
val otherNodeDS = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv(s"${filePath}full-oldb.LATEST/nodes-others.csv")
  .as[OtherNode]

// Example queries
otherNodeDS.show(false)

+--------+-------------------------------------+-------------------------+------------------+---------------+-----------+------------+------------------------+---------+-------------+------------------------------------------+-----------------------------------------------------+----------------------------------------+
|node_id |name                                 |type                     |incorporation_date|struck_off_date|closed_date|jurisdiction|jurisdiction_description|countries|country_codes|sourceID                                  |valid_until                                          |note                                    |
+--------+-------------------------------------+-------------------------+------------------+---------------+-----------+------------+------------------------+---------+-------------+------------------------------------------+-----------------------------------------------------+----------------------------------------+
|85004929|ANTAM ENTERPRISES N.V.  

[36motherNodeDS[39m: [32mDataset[39m[[32mOtherNode[39m] = [node_id: int, name: string ... 11 more fields]

In [13]:
/**
  * Represents an officer from the nodes-officers.csv file.
  *
  * @param node_id The ID of the node.
  * @param name The name of the officer.
  * @param country_codes The country codes associated with the officer.
  * @param countries The countries associated with the officer.
  * @param sourceID The ID of the source.
  * @param valid_until The date until which the data is valid.
  * @param note Any notes associated with the record.
  */
case class OfficerNode(
  node_id: String,
  name: Option[String],
  country_codes: Option[String],
  countries: Option[String],
  sourceID: Option[String],
  valid_until: Option[String],
  note: Option[String]
)


defined [32mclass[39m [36mOfficerNode[39m

In [14]:
// Read CSV and convert to Dataset
val officerNodeDS = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv(s"${filePath}full-oldb.LATEST/nodes-officers.csv")
  .as[OfficerNode]

// Example queries
officerNodeDS.show(false)

+--------+---------------------------------+-----------+-------------+-------------+----------------------------------------------+----+
|node_id |name                             |countries  |country_codes|sourceID     |valid_until                                   |note|
+--------+---------------------------------+-----------+-------------+-------------+----------------------------------------------+----+
|12000001|KIM SOO IN                       |South Korea|KOR          |Panama Papers|The Panama Papers data is current through 2015|NULL|
|12000002|Tian Yuan                        |China      |CHN          |Panama Papers|The Panama Papers data is current through 2015|NULL|
|12000003|GREGORY JOHN SOLOMON             |Australia  |AUS          |Panama Papers|The Panama Papers data is current through 2015|NULL|
|12000004|MATSUDA MASUMI                   |Japan      |JPN          |Panama Papers|The Panama Papers data is current through 2015|NULL|
|12000005|HO THUY NGA                    

[36mofficerNodeDS[39m: [32mDataset[39m[[32mOfficerNode[39m] = [node_id: string, name: string ... 5 more fields]

In [12]:
// create AddressNode case class
case class AddressNode(
  node_id: String,
  address: Option[String],
  name: Option[String],
  countries: Option[String],
  country_codes: Option[String],
  sourceID: Option[String],
  valid_until: Option[String],
  note: Option[String]
)


defined [32mclass[39m [36mAddressNode[39m

In [13]:
// Read CSV and convert to Dataset
val addressNodeDS = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv(s"${filePath}full-oldb.LATEST/nodes-addresses.csv")
  .as[AddressNode]

// Example queries
addressNodeDS.show(false)

+--------+--------------------------------------------------------------------------------------------------+----+---------+-------------+-------------+-----------------------------------------------------+----+
|node_id |address                                                                                           |name|countries|country_codes|sourceID     |valid_until                                          |note|
+--------+--------------------------------------------------------------------------------------------------+----+---------+-------------+-------------+-----------------------------------------------------+----+
|24000001|ANNEX FREDERICK & SHIRLEY STS, P.O. BOX N-4805, NASSAU, BAHAMAS                                   |NULL|Bahamas  |BHS          |Bahamas Leaks|The Bahamas Leaks data is current through early 2016.|NULL|
|24000002|SUITE E-2,UNION COURT BUILDING, P.O. BOX N-8188, NASSAU, BAHAMAS                                  |NULL|Bahamas  |BHS          |Bahamas Leaks|

[36maddressNodeDS[39m: [32mDataset[39m[[32mAddressNode[39m] = [node_id: string, address: string ... 6 more fields]

In [15]:
// create relationships case class
case class Relationship(node_id_start: String, 
    node_id_end: String, 
    rel_type: String, 
    link: String, 
    status: String, 
    start_date: String, 
    end_date: String, 
    sourceID: String)

defined [32mclass[39m [36mRelationship[39m

In [16]:
// Read CSV and convert to Dataset
val relationshipNodeDS = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv(s"${filePath}full-oldb.LATEST/relationships.csv")
  .as[Relationship]

// Example queries
relationshipNodeDS.show(false)

+-------------+-----------+------------------+------------------+------+-----------+--------+-------------+
|node_id_start|node_id_end|rel_type          |link              |status|start_date |end_date|sourceID     |
+-------------+-----------+------------------+------------------+------+-----------+--------+-------------+
|10002580     |14106952   |registered_address|registered address|NULL  |NULL       |NULL    |Panama Papers|
|10004460     |14101133   |registered_address|registered address|NULL  |NULL       |NULL    |Panama Papers|
|10023813     |14105100   |registered_address|registered address|NULL  |NULL       |NULL    |Panama Papers|
|10023840     |14100712   |registered_address|registered address|NULL  |NULL       |NULL    |Panama Papers|
|10010428     |14093957   |registered_address|registered address|NULL  |NULL       |NULL    |Panama Papers|
|10012916     |14093957   |registered_address|registered address|NULL  |NULL       |NULL    |Panama Papers|
|10016348     |14091822   |r

[36mrelationshipNodeDS[39m: [32mDataset[39m[[32mRelationship[39m] = [node_id_start: string, node_id_end: string ... 6 more fields]

Find the available `rel_type`:

In [17]:
relationshipNodeDS.groupBy("rel_type").count().show(false)

+------------------------+-------+
|rel_type                |count  |
+------------------------+-------+
|registered_address      |832721 |
|NULL                    |2      |
|same_name_as            |104170 |
|intermediary_of         |598546 |
|officer_of              |1720357|
|underlying              |1308   |
|similar                 |46761  |
|same_as                 |4272   |
|connected_to            |12145  |
|01-AUG-2011             |1      |
|same_id_as              |3120   |
|same_intermediary_as    |4      |
|same_company_as         |15523  |
|probably_same_officer_as|132    |
|similar_company_as      |203    |
|same_address_as         |5      |
+------------------------+-------+



## 1. BASIC RELATIONSHIP JOINS - UNDERSTANDING THE GRAPH

1.1. Find all entities with their officers (SHAREHOLDER_OF, DIRECTOR_OF, OFFICER_OF)

In [16]:
val entitiesWithOfficers = entityNodeDS
  .join(
    relationshipNodeDS.filter($"rel_type".isin("officer_of", "shareholder_of", "director_of")),
    entityNodeDS("node_id") === relationshipNodeDS("node_id_end"),
    "inner"
  )
  .join(
    officerNodeDS,
    relationshipNodeDS("node_id_start") === officerNodeDS("node_id"),
    "inner"
  )
  .select(
    entityNodeDS("node_id").as("entity_id"),
    entityNodeDS("name").as("entity_name"),
    entityNodeDS("jurisdiction"),
    officerNodeDS("node_id").as("officer_id"),
    officerNodeDS("name").as("officer_name"),
    officerNodeDS("countries").as("officer_countries"),
    relationshipNodeDS("rel_type").as("relationship_type"),
    relationshipNodeDS("link"),
    relationshipNodeDS("start_date"),
    relationshipNodeDS("end_date")
  )


[36mentitiesWithOfficers[39m: [32mDataFrame[39m = [entity_id: string, entity_name: string ... 8 more fields]

In [17]:
entityNodeDS.count()
relationshipNodeDS.count()
officerNodeDS.count()
entitiesWithOfficers.count()

[36mres17_0[39m: [32mLong[39m = [32m814606L[39m
[36mres17_1[39m: [32mLong[39m = [32m3339270L[39m
[36mres17_2[39m: [32mLong[39m = [32m771366L[39m
[36mres17_3[39m: [32mLong[39m = [32m1711446L[39m

In [18]:
entitiesWithOfficers.show()

+---------+--------------------+------------+----------+--------------------+-----------------+-----------------+--------------+----------+----------+
|entity_id|         entity_name|jurisdiction|officer_id|        officer_name|officer_countries|relationship_type|          link|start_date|  end_date|
+---------+--------------------+------------+----------+--------------------+-----------------+-----------------+--------------+----------+----------+
|   140102|      Lanka Aces Ltd|         BVI|    100010| Balan Vijayarahavan|   Not identified|       officer_of|   director of|2004-12-09|      NULL|
|   123348|  JADE TIGER LIMITED|         BVI|    100014|   Mary May-Lit Shih|   Not identified|       officer_of|shareholder of|      NULL|      NULL|
|   133981| SIAM ORCHID LIMITED|         BVI|    100021|Asia Pacific Prop...|   Not identified|       officer_of|shareholder of|2003-12-17|      NULL|
|   144930|      Sanwa Asan Ltd|         BVI|    100062|        Almo Santoso|   Not identified

1.2 Find all entities with their intermediaries

In [19]:
val entitiesWithIntermediaries = entityNodeDS
  .join(
    relationshipNodeDS.filter($"rel_type" === "intermediary_of"),
    entityNodeDS("node_id") === relationshipNodeDS("node_id_end"),
    "inner"
  )
  .join(
    intermediaryDS,
    relationshipNodeDS("node_id_start") === intermediaryDS("node_id"),
    "inner"
  )
  .select(
    entityNodeDS("node_id").as("entity_id"),
    entityNodeDS("name").as("entity_name"),
    entityNodeDS("jurisdiction"),
    intermediaryDS("node_id").as("intermediary_id"),
    intermediaryDS("name").as("intermediary_name"),
    intermediaryDS("countries").as("intermediary_countries"),
    intermediaryDS("status").as("intermediary_status")
  )

[36mentitiesWithIntermediaries[39m: [32mDataFrame[39m = [entity_id: string, entity_name: string ... 5 more fields]

In [20]:
entitiesWithIntermediaries.show()

+---------+--------------------+------------+---------------+--------------------+----------------------+-------------------+
|entity_id|         entity_name|jurisdiction|intermediary_id|   intermediary_name|intermediary_countries|intermediary_status|
+---------+--------------------+------------+---------------+--------------------+----------------------+-------------------+
| 10000007|KENT DEVELOPMENT ...|         SAM|       11001746|ORION HOUSE SERVI...|             Hong Kong|             ACTIVE|
| 10000016|NINGBO RAPID INTE...|         SAM|       11001746|ORION HOUSE SERVI...|             Hong Kong|             ACTIVE|
| 10000018|      CHEM D-T Corp.|         SAM|       11002484|GO SHINE MANAGEME...|                Taiwan|             ACTIVE|
| 10000021|FORTUNE PALACE LI...|         SAM|       11005766|AFOR LAW FIRM, SI...|                 China|             ACTIVE|
| 10000036|RIVIERA HOLDINGS ...|         SAM|       11000040|         GESTAR S.A.|           Switzerland|             

1.3 Find entities with their registered addresses

In [21]:
val entitiesWithAddresses = entityNodeDS
  .join(
    relationshipNodeDS.filter($"rel_type" === "registered_address"),
    entityNodeDS("node_id") === relationshipNodeDS("node_id_start"),
    "inner"
  )
  .join(
    addressNodeDS,
    relationshipNodeDS("node_id_end") === addressNodeDS("node_id"),
    "inner"
  )
  .select(
    entityNodeDS("node_id").as("entity_id"),
    entityNodeDS("name").as("entity_name"),
    entityNodeDS("jurisdiction"),
    addressNodeDS("node_id").as("address_id"),
    addressNodeDS("address"),
    addressNodeDS("countries").as("address_countries")
  )


[36mentitiesWithAddresses[39m: [32mDataFrame[39m = [entity_id: string, entity_name: string ... 4 more fields]

In [22]:
entitiesWithAddresses.show()

+---------+--------------------+------------+----------+--------------------+--------------------+
|entity_id|         entity_name|jurisdiction|address_id|             address|   address_countries|
+---------+--------------------+------------+----------+--------------------+--------------------+
|   108047|        Robert Burns|         XXX|    105020|Robert Burns 10 M...|       United States|
|   220320|Kennington Financ...|         XXX|    105020|Robert Burns 10 M...|       United States|
|   220319|Blackstone Financ...|         XXX|    105020|Robert Burns 10 M...|       United States|
|   147752|SAKURA ENTERPRISE...|         BVI|    105140|P.O. Box 1109 Geo...|      Cayman Islands|
|101740505|HOMELAND SERVICES...|         BRB| 120000031|SUITE 203- BUILDI...|            Barbados|
|101739974|CACIQUES DEL ESTE...|         BRB| 120000031|SUITE 203- BUILDI...|            Barbados|
|101300802|LIQUID NUTRITION SRL|         BRB| 120000063|P.O.BOX 963, 2ND ...|            Barbados|
|100328901

## 2. MULTI-HOP JOINS - NETWORK ANALYSIS

2.1 Complete entity profile: `Entity -> Officers + Intermediaries + Addresses`. Complete entity profiles combining all relationship types

In [23]:
import org.apache.spark.sql.functions.col

val completeEntityProfile = entityNodeDS
  // Join with officers
  .join(
    relationshipNodeDS.filter(col("rel_type").contains("officer"))
      .withColumnRenamed("node_id_start", "officer_node_id")
      .withColumnRenamed("node_id_end", "entity_node_id")
      .alias("officer_rel"),
    entityNodeDS("node_id") === col("officer_rel.entity_node_id"),
    "left"
  )
  .join(
    officerNodeDS.alias("officer"),
    col("officer_rel.officer_node_id") === col("officer.node_id"),
    "left"
  )
  // Join with intermediaries
  .join(
    relationshipNodeDS.filter(col("rel_type") === "intermediary_of")
      .withColumnRenamed("node_id_start", "intermediary_node_id")
      .withColumnRenamed("node_id_end", "entity_node_id_int")
      .alias("int_rel"),
    entityNodeDS("node_id") === col("int_rel.entity_node_id_int"),
    "left"
  )
  .join(
    intermediaryDS.alias("intermediary"),
    col("int_rel.intermediary_node_id") === col("intermediary.node_id"),
    "left"
  )
  // Join with addresses
  .join(
    relationshipNodeDS.filter(col("rel_type") === "registered_address")
      .withColumnRenamed("node_id_start", "entity_addr_start")
      .withColumnRenamed("node_id_end", "address_node_id")
      .alias("addr_rel"),
    entityNodeDS("node_id") === col("addr_rel.entity_addr_start"),
    "left"
  )
  .join(
    addressNodeDS.alias("address"),
    col("addr_rel.address_node_id") === col("address.node_id"),
    "left"
  )


[32mimport [39m[36morg.apache.spark.sql.functions.col[39m
[36mcompleteEntityProfile[39m: [32mDataFrame[39m = [node_id: string, name: string ... 68 more fields]

In [24]:
completeEntityProfile.show(false)


08:52:38.821 [scala-kernel-interpreter-1] WARN  org.apache.spark.sql.catalyst.util.SparkStringUtils - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------+--------------------------------------------+--------------------------------------------+-----------+------------+------------------------+----------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------+-----------+------------------+-----------------+---------------+---------+-------------------------------+-------------------+-------+-------------+--------------------------------+--------------+-----------------------------------------------+----+---------------+--------------+----------+--------------+------+-----------+-----------+--------------+--------+------------------------------+-----------+-------------+--------------+-----------------------------------------------+----+--------------------+------------------+---------------+---------------+------+----------+--------+--------------+--------+-----------------------------------+------+-----------+--------

2.2 Officer network: Find all entities connected to the same officer: Officer network analysis (finding entities sharing officers)

In [26]:
val officerEntityNetwork = relationshipNodeDS
  .filter(col("rel_type").contains("officer"))
  .alias("rel1")
  .join(
    relationshipNodeDS.filter(col("rel_type").contains("officer")).alias("rel2"),
    col("rel1.node_id_start") === col("rel2.node_id_start") && 
    col("rel1.node_id_end") =!= col("rel2.node_id_end"),
    "inner"
  )
  .join(
    officerNodeDS,
    col("rel1.node_id_start") === officerNodeDS("node_id"),
    "inner"
  )
  .join(
    entityNodeDS.alias("entity1"),
    col("rel1.node_id_end") === col("entity1.node_id"),
    "inner"
  )
  .join(
    entityNodeDS.alias("entity2"),
    col("rel2.node_id_end") === col("entity2.node_id"),
    "inner"
  )
  .select(
    officerNodeDS("node_id").as("officer_id"),
    officerNodeDS("name").as("officer_name"),
    col("entity1.node_id").as("entity1_id"),
    col("entity1.name").as("entity1_name"),
    col("entity2.node_id").as("entity2_id"),
    col("entity2.name").as("entity2_name")
  )

[36mofficerEntityNetwork[39m: [32mDataFrame[39m = [officer_id: string, officer_name: string ... 4 more fields]

In [27]:
officerEntityNetwork.show()

+----------+--------------------+----------+--------------------+----------+-------------+
|officer_id|        officer_name|entity1_id|        entity1_name|entity2_id| entity2_name|
+----------+--------------------+----------+--------------------+----------+-------------+
|  12160432|MOSSFON SUBSCRIBE...|  10000172|     FAR WAY LIMITED|  10000108|ENCHANTE S.A.|
|  12160432|MOSSFON SUBSCRIBE...|  10000304|CHAMPLE INVESTMEN...|  10000108|ENCHANTE S.A.|
|  12160432|MOSSFON SUBSCRIBE...|  10000454|NEW SILK ROAD SHI...|  10000108|ENCHANTE S.A.|
|  12160432|MOSSFON SUBSCRIBE...|  10000472|ECOPACK INDUSTRIA...|  10000108|ENCHANTE S.A.|
|  12160432|MOSSFON SUBSCRIBE...|  10000528|        OPAQ LIMITED|  10000108|ENCHANTE S.A.|
|  12160432|MOSSFON SUBSCRIBE...|  10000720|SILVER INTERNATIO...|  10000108|ENCHANTE S.A.|
|  12160432|MOSSFON SUBSCRIBE...|  10000835|AMASINO CEMENTOS LTD|  10000108|ENCHANTE S.A.|
|  12160432|MOSSFON SUBSCRIBE...|  10000989|     Matchpoint Inc.|  10000108|ENCHANTE S.A.|

## 3. ANALYTICAL JOINS - FRAUD DETECTION PATTERNS

Shared address analysis: Identifies potential shell companies at the same address.

3.1 Shared address analysis: Entities at the same address (potential shell companies)

In [28]:
val entitiesSharedAddress = relationshipNodeDS
  .filter(col("rel_type") === "registered_address")
  .alias("rel1")
  .join(
    relationshipNodeDS.filter(col("rel_type") === "registered_address").alias("rel2"),
    col("rel1.node_id_end") === col("rel2.node_id_end") && 
    col("rel1.node_id_start") =!= col("rel2.node_id_start"),
    "inner"
  )
  .join(
    addressNodeDS,
    col("rel1.node_id_end") === addressNodeDS("node_id"),
    "inner"
  )
  .join(
    entityNodeDS.alias("entity1"),
    col("rel1.node_id_start") === col("entity1.node_id"),
    "inner"
  )
  .join(
    entityNodeDS.alias("entity2"),
    col("rel2.node_id_start") === col("entity2.node_id"),
    "inner"
  )
  .select(
    addressNodeDS("node_id").as("address_id"),
    addressNodeDS("address"),
    col("entity1.node_id").as("entity1_id"),
    col("entity1.name").as("entity1_name"),
    col("entity1.jurisdiction").as("entity1_jurisdiction"),
    col("entity2.node_id").as("entity2_id"),
    col("entity2.name").as("entity2_name"),
    col("entity2.jurisdiction").as("entity2_jurisdiction")
  )


[36mentitiesSharedAddress[39m: [32mDataFrame[39m = [address_id: string, address: string ... 6 more fields]

In [None]:
entitiesSharedAddress.show(false)

To get meaningful insights, we cannot simply join tables on a common foreign key like a standard relational database. Instead, we must follow the graph path: **Node A ‚Üí Relationship (Edge) ‚Üí Node B**

### Proposed Solution: The "Beneficial Owner & Location" Join

I recommend creating a **Master Entity View**. This involves joining the `EntityNode` to its **Officers** (Directors/Shareholders) and its **Registered Address**.

Here is the logic for the join directionality based on ICIJ standards:

1.  **Officers ‚Üí Entities:** Officers (Start) usually have a relationship _to_ the Entity (End).
2.  **Entities ‚Üí Addresses:** Entities (Start) usually have a relationship _to_ an Address (End).

### Scala Spark Implementation

Here is the complete code block to perform this complex join. This code uses the DataSets you have already defined (`entityNodeDS`, `officerNodeDS`, `addressNodeDS`, `relationshipNodeDS`).

In [None]:
// Import necessary functions for easier column handling
import org.apache.spark.sql.functions._

// 1. Alias the Datasets to avoid column name ambiguity (all have "node_id")
val entities = entityNodeDS.as("e")
val relationships = relationshipNodeDS.as("rel")
val officers = officerNodeDS.as("o")
val addresses = addressNodeDS.as("addr")

In [None]:
// 2. Perform the Join: Entity <- [officer_of] - Officer
// We use a LEFT OUTER JOIN so we don't lose Entities that have no listed officers
val entityOfficerJoin = entities
  .join(relationships, col("e.node_id") === col("rel.node_id_end"), "left_outer")
  .join(officers, col("rel.node_id_start") === col("o.node_id"), "left_outer")
  .select(
    col("e.node_id").as("entity_id"),
    col("e.name").as("entity_name"),
    col("e.jurisdiction").as("entity_jurisdiction"),
    col("rel.rel_type").as("officer_role"),
    col("o.name").as("officer_name"),
    col("o.countries").as("officer_country")
  )

In [None]:
entityOfficerJoin.count()

In [None]:
// 3. Perform the Join: Entity - [registered_address] -> Address
// Note: We rejoin with relationships again (aliased as 'rel2') for the address direction
val relAddress = relationshipNodeDS.as("rel2")

val fullEntityProfile = entityOfficerJoin.as("eo")
  .join(relAddress, col("eo.entity_id") === col("rel2.node_id_start"), "left_outer")
  .join(addresses, col("rel2.node_id_end") === col("addr.node_id"), "left_outer")
  .filter(col("rel2.rel_type").isNull || col("rel2.rel_type") === "registered_address") // Optional: specific link type
  .select(
    col("eo.entity_id"),
    col("eo.entity_name"),
    col("eo.entity_jurisdiction"),
    col("eo.officer_name"),
    col("eo.officer_role"),
    col("eo.officer_country"),
    col("addr.address").as("registered_address"),
    col("addr.countries").as("address_country")
  )

In [None]:
fullEntityProfile.show(false)

In [None]:
import scala.sys.process._

val script = """
ls -al /home/jovyan/work/blogs/full-oldb.LATEST
"""

// Run using bash -c
Seq("bash", "-c", script).!

In [None]:
scala.util.Properties.versionNumberString

In [None]:
spark.stop()