# Spark launcher

In [1]:
# !conda create --name old_spark  python=3.7 -y

In [2]:
# DIR="$(cd -- "$(dirname -- "${0}")" && pwd)"
# export ROOT_DIR="$(cd -- "$(dirname -- "${0}")" && cd .. && pwd)"
# export SERP_HOME="${SERP_HOME:-$ROOT_DIR/env/serp}"
# export PATH="$SERP_HOME/bin:$ROOT_DIR/env/ds/bin:$ROOT_DIR/env/ds/opt/anaconda/bin:$PATH"


In [2]:
import os
os.environ["SERP_HOME"] = "."
import sys
sys.path.append('python-datatoolbox/')

In [3]:
%env SERP_HOME
!. ./.env

import pandas as pd
import psutil

pd.options.display.max_rows = 50 
pd.options.display.max_columns = 500 
pd.options.display.width = 1000 
# pd.options.display.max_colwidth = None 
pd.options.display.float_format = lambda x: '%.3f' % x 


import os

POV_INTEGRATION_PATH = os.path.join(os.getcwd(), '..')

if 'S8_CONFIG_DIR_PATH' not in os.environ:
    MOAPOV_CONFIG_PATH = os.path.join(POV_INTEGRATION_PATH, 'conf/default')
    os.environ['S8_CONFIG_DIR_PATH'] = MOAPOV_CONFIG_PATH
    
if 'MOAPOV_WORKING_DIR_PATH' not in os.environ:
    MOAPOV_WORKING_DIR_PATH = os.path.join(POV_INTEGRATION_PATH, 'working_dir/default')
    os.environ['MOAPOV_WORKING_DIR_PATH'] = MOAPOV_WORKING_DIR_PATH
    
MOAPOV_PIPELINE_RESULTS_DIR_PATH = os.path.join(POV_INTEGRATION_PATH, 'notebooks-outputs')
MOAPOV_NOTEBOOKS_PATH = os.path.join(POV_INTEGRATION_PATH, 'moapov/notebooks')
CS_NOTEBOOKS_PATH = os.path.join(POV_INTEGRATION_PATH, 'pov_notebooks')

os.environ['MOAPOV_PIPELINE_RESULTS_DIR_PATH'] = MOAPOV_PIPELINE_RESULTS_DIR_PATH
os.environ['MOAPOV_NOTEBOOKS_PATH'] = MOAPOV_NOTEBOOKS_PATH
os.environ['CS_NOTEBOOKS_PATH'] = CS_NOTEBOOKS_PATH

DEBUG=True

!mkdir -p $MOAPOV_PIPELINE_RESULTS_DIR_PATH


## Victor config.ini - merge with config 

import sys, os, platform, pyspark, logging
from configparser import ConfigParser
from glob import glob
from collections.abc import Iterable
from typing import Union
from pprint import pprint

PLATFORM_SYSTEM = platform.system()
config = ConfigParser()

if PLATFORM_SYSTEM == 'Windows':
    config_file_path = 'config-windows.ini'
elif PLATFORM_SYSTEM == 'Darwin':
    config_file_path = 'config-mac.ini'
else:
    config_file_path = 'config.ini'
    
config.read(config_file_path)

PYTHON_DATA_TOOLBOX_PATH = config['APP']['PYTHON_DATA_TOOLBOX_PATH']
DATA_DIR = config['APP']['DATA_DIR']

RAW_DATA_DIR = config['APP']['RAW_DATA_DIR']
STANDARDIZED_DATA_DIR = config['APP']['STANDARDIZED_DATA_DIR']
CLEANSED_DATA_DIR = config['APP']['CLEANSED_DATA_DIR']
APPLICATION_DATA_DIR = config['APP']['APPLICATION_DATA_DIR']
REPORT_DATA_DIR = config['APP']['REPORT_DATA_DIR']
SANDBOX_DATA_DIR = config['APP']['SANDBOX_DATA_DIR']
APPLICATION_CONF_DIR = config['APP']['APPLICATION_CONF_DIR']


SPARK_APP_NAME = config['SPARK']['APP_NAME']
SPARK_MASTER = config['SPARK']['MASTER']
SPARK_DRIVER_MEMORY = config['SPARK']['DRIVER_MEMORY']
SPARK_USE_OPTIMAL_CONFIG = config['SPARK'].getboolean('USE_OPTIMAL_CONFIG')

SPARK_IVY2_DIR = config['SPARK']['IVY2_DIR']
SPARK_DB_JARS = config['SPARK']['DB_JARS']
SPARK_TMP_DIR = config['SPARK']['TMP_DIR']
# SPARK_COMPILE_EGG = config['SPARK'].getboolean('IS_COMPILE_EGG')

PYTHON_SKIP_PACKAGE_DEPENDENCY = config['PYTHON'].getboolean('SKIP_PACKAGE_DEPENDENCY')

In [7]:
import psutil
import utils.config_service as configservice

# TODO: Integrate the spark configs to sparkservice
# import utils.spark_service as sparkservice

def to_bytes(value):
    if value.endswith('g'):
        return int(value.replace('g','')) * 1024 * 1024 * 1024
    elif value.endswith('m'):
        return int(value.replace('m','')) * 1024 * 1024
    elif value.endswith('k'):
        return int(value.replace('k','')) * 1024
        
    return value

def validate_available_memory():
    properties = configservice.get_spark_config()
    
    available_bytes = psutil.virtual_memory().available
    
    driver_mem_bytes = to_bytes(properties['config']['spark.driver.memory'])
    executor_mem_bytes = to_bytes(properties['config']['spark.executor.memory'])
    
    required_bytes = driver_mem_bytes + executor_mem_bytes

    min_available_memory_1 = float(properties.get('min-available-memory-ratio', 0)) * required_bytes
    min_available_memory_2 = to_bytes(properties.get('min-available-memory-over-required', '0g')) + required_bytes
    
    min_available_memory = max(min_available_memory_1, min_available_memory_2)
    
    print(f'Available memory: {available_bytes}')
    print(f'Required memory: {min_available_memory}')
    
    if available_bytes < min_available_memory:
        raise Exception(
            f'Not enough available memory left: {available_bytes} bytes. Required at least: {min_available_memory} bytes')
    
    

validate_available_memory()

# Config and create Spark instance

import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import udf

# import helper.dbhelper as dbhelper

import pyspark 

pyspark.__version__

# Seems Spark 3 improved the join a lot, there is no need to use extra mergejoin jar
PYSPARK_SMALLEST_VERSION = '2.4.2'

if pyspark.__version__ >= '3.1':
    spark_jars_packages = 'io.delta:delta-core_2.12:1.0.0'
elif pyspark.__version__ >= '3.0':
    spark_jars_packages = 'io.delta:delta-core_2.12:0.8.0'
elif pyspark.__version__ >= PYSPARK_SMALLEST_VERSION:
    spark_jars_packages = 'io.delta:delta-core_2.11:0.6.1'
else:
    spark_jars_packages = ''
    raise ValueError(f'Delta lake is supported from pySpark {PYSPARK_SMALLEST_VERSION} onwards only.')
    
# spark_jars_packages = 'icom.hindog.spark:spark-mergejoin_2.11:2.0.1,org.postgresql:postgresql:42.2.20,com.oracle:ojdbc8:12.2.0.1'

# <!-- https://mvnrepository.com/artifact/io.delta/delta-core -->
# <dependency org="io.delta" name="delta-core_2.12" rev="1.0.0"/>

# <!-- https://mvnrepository.com/artifact/org.postgresql/postgresql -->
# <dependency org="org.postgresql" name="postgresql" rev="42.2.20"/>

# <!-- https://mvnrepository.com/artifact/com.oracle.jdbc/ojdbc8 -->
# <dependency org="com.oracle.jdbc" name="ojdbc8" rev="12.2.0.1"/>

# <!-- https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 -->
# <dependency org="com.oracle.database.jdbc" name="ojdbc8" rev="21.1.0.0"/>

SPARK_MASTER, SPARK_DRIVER_MEMORY, SPARK_APP_NAME, SPARK_DB_JARS, SPARK_IVY2_DIR, spark_jars_packages, SPARK_TMP_DIR

# Spark object needs to be created first before importing "delta" module, e.g. from delta.tables import *
spark_conf = pyspark.SparkConf()

# spark configuration
spark_conf.setAll([('spark.master', SPARK_MASTER),
                   ('spark.driver.memory', SPARK_DRIVER_MEMORY),
                   ('spark.app.name', SPARK_APP_NAME),
                   ('spark.jars', SPARK_DB_JARS),
                   ('spark.jars.ivy', os.path.abspath(SPARK_IVY2_DIR)),
                   ('spark.jars.packages', spark_jars_packages),
#                    ('spark.submit.pyFiles', '%s,%s' % (EGG_PATH, SPARK_DELTA_JAR)),
#                    ('spark.submit.pyFiles', SPARK_DELTA_JAR),
                   ('spark.driver.extraJavaOpions', '-Djava.io.tmpdir=' + SPARK_TMP_DIR),
                   ('spark.local.dir', SPARK_TMP_DIR)])

spark_conf.setAll([("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"),
                       ("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")])

import psutil, math
def set_spark_cpu_memory(spark_conf):
    def _get_optimal_ram():
        available_ram_gb = psutil.virtual_memory().available / (1024 ** 3)
        if available_ram_gb >= 64:
            ram_gb = math.ceil(available_ram_gb * 0.5)
        if available_ram_gb >= 32:
            ram_gb = math.ceil(available_ram_gb * 0.6)
        elif available_ram_gb >= 24:
            ram_gb = math.ceil(available_ram_gb * 0.7)
        elif available_ram_gb >= 16:
            ram_gb = math.ceil(available_ram_gb * 0.75)
        else:
            ram_gb = math.ceil(available_ram_gb * 0.8)
        
        return str(ram_gb) + 'g'

    cpu_count = psutil.cpu_count() / 2
    master = 'local[%d]' % cpu_count if SPARK_USE_OPTIMAL_CONFIG else SPARK_MASTER
    memory = _get_optimal_ram() if SPARK_USE_OPTIMAL_CONFIG else SPARK_DRIVER_MEMORY
    
    spark_conf.setAll([('spark.master', '%s' % master),
                       ('spark.driver.memory', '%s' % memory)
                      ])
    
    print('Actual Spark Master: %s' % master)
    print('Actual Spark Driver Memory: %s' % memory)

# spark_conf.setAll([('spark.sql.parquet.enableVectorizedReader', 'false')])
set_spark_cpu_memory(spark_conf)
spark = SparkSession.builder.config(conf=spark_conf).appName(SPARK_APP_NAME).getOrCreate()
spark.sparkContext.addPyFile("/app/dependencies/ivy2/cache/io.delta/delta-core_2.12/jars/delta-core_2.12-1.0.0.jar")

# delta package can only be called after spark object created
if pyspark.__version__ >= PYSPARK_SMALLEST_VERSION:
    from delta.tables import *
import helper.dbhelper as dbhelper
oracle_db = dbhelper.DbHelper(spark, 'ORACLE')
pg_db = dbhelper.DbHelper(spark, 'POSTGRES')

Available memory: 49392152576
Required memory: 4831838208
Actual Spark Master: local[10]
Actual Spark Driver Memory: 28g


21/12/09 14:50:21 WARN SparkContext: The path /app/dependencies/ivy2/cache/io.delta/delta-core_2.12/jars/delta-core_2.12-1.0.0.jar has been added already. Overwriting of added paths is not supported in the current version.


In [8]:
# !export PIP_INDEX_URL=https://atanona:AKCp8k8Prres8xHvKmJiNbR8APWLKavCZbUox2zBqXNwYF4wozziyjrpB3rgkP6SCPwqvgxtg@repo.silenteight.com/artifactory/api/pypi/pypi/simple; pip install -r requirements.txt

In [9]:
# !export PIP_INDEX_URL=https://atanona:AKCp8k8Prres8xHvKmJiNbR8APWLKavCZbUox2zBqXNwYF4wozziyjrpB3rgkP6SCPwqvgxtg@repo.silenteight.com/artifactory/api/pypi/pypi/simple; pip install -r python-datatoolbox/pip-requirements.txt

In [6]:
# !pip install pyspark==3.1.1

# Create standardized delta/csv files - 1.0-raw-to-standardized - DeltaConverter, Spark Manager

In [4]:
from silenteight.utils.jupyter import set_jupyter_cell_width, set_pd_display
set_jupyter_cell_width()
set_pd_display()

# Convert raw data to standardized format

# Create cleansed csv files - 2.0-standardized-to-cleansed - Spark manager, XML Parser, Match/Hit Handler,Delta Converter, NRIC handler, Status Preprocessor, Notes Preprocessor

In [5]:
from silenteight.aia.alerts import *

In [6]:
# Implementation: XML Parser
alert_hit_dict_factory = AlertHitDictFactory()
alert_hit_extractor = AlertHitExtractor()

In [7]:
# Implementation: XML Parser
alert_schema = alert_hit_dict_factory.get_alert_spark_schema()
hit_schema = alert_hit_dict_factory.get_hit_spark_schema()

In [8]:
# Implementation: Spark manager
schema = StructType([
    StructField('alert_header', alert_schema),
    StructField('hits', ArrayType(hit_schema))
])

In [12]:
ls -l tests/data/2.standardized/

total 16
drwxr-xr-x 3 1000 1000 4096 Dec  9 11:25 [0m[01;34mACM_ALERT_NOTES.delta[0m/
drwxr-xr-x 3 1000 1000 4096 Dec  9 11:25 [01;34mACM_ITEM_STATUS_HISTORY.delta[0m/
drwxr-xr-x 3 1000 1000 4096 Dec  9 11:25 [01;34mACM_MD_ALERT_STATUSES.delta[0m/
drwxr-xr-x 3 1000 1000 4096 Dec  9 11:25 [01;34mALERTS.delta[0m/


In [10]:
# file_name = 'RCMDB.ALERTS_SAMPLE.delta'
# Implementation: Spark manager
file_name = 'ALERTS.delta'
std_alert_df = spark.read.format('delta').load("tests/data/2.standardized/" + (file_name))

21/12/09 14:50:25 INFO DelegatingLogStore: LogStore org.apache.spark.sql.delta.storage.HDFSLogStore is used for scheme file
21/12/09 14:50:25 INFO DeltaLog: Loading version 4.
21/12/09 14:50:25 INFO Snapshot: [tableId=1f96eaa8-59ec-4da1-8ede-37f9c6bc8141] DELTA: Compute snapshot for version: 4
21/12/09 14:50:25 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 178.7 KiB, free 16.6 GiB)
21/12/09 14:50:25 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 28.8 KiB, free 16.6 GiB)
21/12/09 14:50:25 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 813ca8f75e1e:36659 (size: 28.8 KiB, free: 16.6 GiB)
21/12/09 14:50:25 INFO SparkContext: Created broadcast 0 from toString at String.java:2951
21/12/09 14:50:25 INFO DeltaLogFileIndex: Created DeltaLogFileIndex(JSON, numFilesInSegment: 5, totalFileSize: 11140)
21/12/09 14:50:27 INFO FileSourceStrategy: Pushed Filters: 
21/12/09 14:50:27 INFO FileSourceStrategy: Post-Scan F

21/12/09 14:50:29 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 420.7 KiB, free 16.6 GiB)
21/12/09 14:50:29 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 97.3 KiB, free 16.6 GiB)
21/12/09 14:50:29 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 813ca8f75e1e:36659 (size: 97.3 KiB, free: 16.6 GiB)
21/12/09 14:50:29 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1383
21/12/09 14:50:29 INFO DAGScheduler: Submitting 50 missing tasks from ShuffleMapStage 1 (MapPartitionsRDD[15] at toString at String.java:2951) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
21/12/09 14:50:29 INFO TaskSchedulerImpl: Adding task set 1.0 with 50 tasks resource profile 0
21/12/09 14:50:29 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 5) (813ca8f75e1e, executor driver, partition 0, NODE_LOCAL, 4442 bytes) taskResourceAssignments Map()
21/12/09 14

21/12/09 14:50:29 INFO Executor: Finished task 11.0 in stage 1.0 (TID 20). 3700 bytes result sent to driver
21/12/09 14:50:29 INFO TaskSetManager: Starting task 19.0 in stage 1.0 (TID 28) (813ca8f75e1e, executor driver, partition 19, PROCESS_LOCAL, 4442 bytes) taskResourceAssignments Map()
21/12/09 14:50:29 INFO TaskSetManager: Finished task 11.0 in stage 1.0 (TID 20) in 98 ms on 813ca8f75e1e (executor driver) (14/50)
21/12/09 14:50:29 INFO Executor: Running task 19.0 in stage 1.0 (TID 28)
21/12/09 14:50:29 INFO Executor: Finished task 8.0 in stage 1.0 (TID 17). 3700 bytes result sent to driver
21/12/09 14:50:29 INFO Executor: Finished task 6.0 in stage 1.0 (TID 15). 3700 bytes result sent to driver
21/12/09 14:50:29 INFO Executor: Finished task 10.0 in stage 1.0 (TID 19). 3700 bytes result sent to driver
21/12/09 14:50:29 INFO TaskSetManager: Starting task 20.0 in stage 1.0 (TID 29) (813ca8f75e1e, executor driver, partition 20, PROCESS_LOCAL, 4442 bytes) taskResourceAssignments Map()


21/12/09 14:50:29 INFO ShuffleBlockFetcherIterator: Getting 0 (0.0 B) non-empty blocks including 0 (0.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
21/12/09 14:50:29 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
21/12/09 14:50:29 INFO ShuffleBlockFetcherIterator: Getting 0 (0.0 B) non-empty blocks including 0 (0.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
21/12/09 14:50:29 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
21/12/09 14:50:29 INFO MemoryStore: Block rdd_12_46 stored as values in memory (estimated size 46.0 B, free 16.6 GiB)
21/12/09 14:50:29 INFO BlockManagerInfo: Added rdd_12_46 in memory on 813ca8f75e1e:36659 (size: 46.0 B, free: 16.6 GiB)
21/12/09 14:50:29 INFO MemoryStore: Block rdd_12_45 stored as values in memory (estimated size 46.0 B, free 16.6 GiB)
21/12/09 14:50:29 INFO BlockManagerInfo: Added rdd_12_45 in memory on 813ca8f75e1e:36659 (size: 46.0 B, free: 16.6 GiB)
21/12/09 14:50:29 INFO

In [4]:
x = spark.read.format('delta').load('tests/data/2.standardized/ALERTS.delta/')

Py4JJavaError: An error occurred while calling o95.load.
: java.lang.ClassNotFoundException: Failed to find data source: delta. Please find packages at http://spark.apache.org/third-party-projects.html
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:689)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:743)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:266)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:240)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: delta.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:663)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:663)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:663)
	... 14 more


False

[Row(ALERT_INTERNAL_ID='1619405', ENTITY_TYPE_ID=None, ALERT_DATE='8-Oct-18', ALERT_TYPE_ID=None, STATUS_ID=None, STATUS_INTERNAL_ID='7027', DELETED='0', HTML_FILE_KEY='<?xml version="1.0" encoding="UTF16" standalone="no" ?><alert><alert-header><elem name="alertId">WLF101-939701-62908</elem><elem name="alertDate">08/10/18 12:31:01</elem><elem name="alertEntityKey">42823012-P560016505</elem><elem name="score">80</elem><elem name="ahData"><elem name="alertID">WLF101-939701-62908</elem></elem><elem name="ahData"><elem name="alertDateTime">08/10/18 12:31:01</elem></elem><elem name="ahData"><elem name="jobID">01-9397</elem></elem><elem name="ahData"><elem name="jobName">AML_MAGNUM_1810081211_55</elem></elem><elem name="ahData"><elem name="jobType">Self Service Batch Scan</elem></elem><elem name="ahData"><elem name="score">80</elem></elem><elem name="ahData"><elem name="numberOfHits">1</elem></elem><elem name="ahData"><elem name="partyKey">42823012-P560016505</elem></elem><elem name="ahData"

In [98]:
columns = df1.columns
df3 = df1.alias("d1").join(df2.alias("d2"), f.col("d1.id") == f.col("d2.id"), "left")


AnalysisException: cannot resolve '`d1.id`' given input columns: [d1.ALERT_CUSTOM_ATTRIBUTES_ID, d2.ALERT_CUSTOM_ATTRIBUTES_ID, d1.ALERT_DATE, d2.ALERT_DATE, d1.ALERT_ID, d2.ALERT_ID, d1.ALERT_INTERNAL_ID, d2.ALERT_INTERNAL_ID, d1.ALERT_NAME, d2.ALERT_NAME, d1.ALERT_TYPE_ID, d2.ALERT_TYPE_ID, d1.ALERT_TYPE_INTERNAL_ID, d2.ALERT_TYPE_INTERNAL_ID, d1.ALERT_TYPE_VERSION, d2.ALERT_TYPE_VERSION, d1.AUTO_ESC_STATUS_INTERNAL_ID, d2.AUTO_ESC_STATUS_INTERNAL_ID, d1.BUNIT_IDENTIFIER, d2.BUNIT_IDENTIFIER, d1.BU_INTERNAL_ID, d2.BU_INTERNAL_ID, d1.CASE_COUNT_FOR_CONFIDENTIAL, d2.CASE_COUNT_FOR_CONFIDENTIAL, d1.CLOSED_DATE, d2.CLOSED_DATE, d1.CONSOLIDATION_KEY, d2.CONSOLIDATION_KEY, d1.CREATE_DATE, d2.CREATE_DATE, d1.DEADLINE_DATE, d2.DEADLINE_DATE, d1.DELETED, d2.DELETED, d1.DETAILS, d2.DETAILS, d1.DETAILS_FOR_SEARCH, d2.DETAILS_FOR_SEARCH, d1.EMAIL_DATE, d2.EMAIL_DATE, d1.ENTITY_ID, d2.ENTITY_ID, d1.ENTITY_TYPE_ID, d2.ENTITY_TYPE_ID, d1.FL_ARCHIVE, d2.FL_ARCHIVE, d1.FL_DOUBT, d2.FL_DOUBT, d1.FL_ENCRYPTED, d2.FL_ENCRYPTED, d1.FL_GENERATED_BY_ACM, d2.FL_GENERATED_BY_ACM, d1.FL_HAS_ATTACHMENTS, d2.FL_HAS_ATTACHMENTS, d1.FL_HAS_CONFIDENTIAL_NOTES, d2.FL_HAS_CONFIDENTIAL_NOTES, d1.FL_HAS_NOTES, d2.FL_HAS_NOTES, d1.FL_MANUAL, d2.FL_MANUAL, d1.FL_READ, d2.FL_READ, d1.FL_READ_BY_OWNER, d2.FL_READ_BY_OWNER, d1.FL_UPDATED_BY_ACM, d2.FL_UPDATED_BY_ACM, d1.GLOBAL_AUTO_ESC_STATUS_ID, d2.GLOBAL_AUTO_ESC_STATUS_ID, d1.GLOBAL_DEADLINE_DATE, d2.GLOBAL_DEADLINE_DATE, d1.GLOBAL_EMAIL_DATE, d2.GLOBAL_EMAIL_DATE, d1.GLOBAL_HIGHLIGHT_DATE, d2.GLOBAL_HIGHLIGHT_DATE, d1.HIBERNATE_OBJECT_VERSION, d2.HIBERNATE_OBJECT_VERSION, d1.HIGHLIGHT_DATE, d2.HIGHLIGHT_DATE, d1.HTML_FILE_KEY, d2.HTML_FILE_KEY, d1.IS_CASE, d2.IS_CASE, d1.LAST_READ_DATE, d2.LAST_READ_DATE, d1.LAST_READ_USER_ID, d2.LAST_READ_USER_ID, d1.LAST_REFRESH_MODIFED_DATE, d2.LAST_REFRESH_MODIFED_DATE, d1.LAST_UPDATE_DATE, d2.LAST_UPDATE_DATE, d1.LAST_UPDATE_USER_ID, d2.LAST_UPDATE_USER_ID, d1.NUM_EXISTING_ENTITIES, d2.NUM_EXISTING_ENTITIES, d1.ORIGINAL_BU_INTERNAL_ID, d2.ORIGINAL_BU_INTERNAL_ID, d1.OWNER_IDENTIFIER, d2.OWNER_IDENTIFIER, d1.OWNER_INTERNAL_ID, d2.OWNER_INTERNAL_ID, d1.P11, d2.P11, d1.P12, d2.P12, d1.P13, d2.P13, d1.P14, d2.P14, d1.P15, d2.P15, d1.P16, d2.P16, d1.P17, d2.P17, d1.P18, d2.P18, d1.P19, d2.P19, d1.P20, d2.P20, d1.P21, d2.P21, d1.P22, d2.P22, d1.P23, d2.P23, d1.P24, d2.P24, d1.P25, d2.P25, d1.P26, d2.P26, d1.P27, d2.P27, d1.P28, d2.P28, d1.P29, d2.P29, d1.P30, d2.P30, d1.P31, d2.P31, d1.P32, d2.P32, d1.P33, d2.P33, d1.P34, d2.P34, d1.P35, d2.P35, d1.P36, d2.P36, d1.P37, d2.P37, d1.P38, d2.P38, d1.P39, d2.P39, d1.P40, d2.P40, d1.P41, d2.P41, d1.P42, d2.P42, d1.P43, d2.P43, d1.P44, d2.P44, d1.P45, d2.P45, d1.P46, d2.P46, d1.P47, d2.P47, d1.P48, d2.P48, d1.P49, d2.P49, d1.P50, d2.P50, d1.PREV_STATUS_INTERNAL_ID, d2.PREV_STATUS_INTERNAL_ID, d1.PRIORITY_INTERNAL_ID, d2.PRIORITY_INTERNAL_ID, d1.RESOLUTION_ID, d2.RESOLUTION_ID, d1.RFI_STATE, d2.RFI_STATE, d1.SCORE, d2.SCORE, d1.STATUS_ID, d2.STATUS_ID, d1.STATUS_INTERNAL_ID, d2.STATUS_INTERNAL_ID, d1.WORKSPACE_INTERNAL_ID, d2.WORKSPACE_INTERNAL_ID];
'Join LeftOuter, ('d1.id = 'd2.id)
:- SubqueryAlias d1
:  +- Relation[ALERT_INTERNAL_ID#46133,ENTITY_TYPE_ID#46134,ALERT_DATE#46135,ALERT_TYPE_ID#46136,STATUS_ID#46137,STATUS_INTERNAL_ID#46138,DELETED#46139,HTML_FILE_KEY#46140,P11#46141,P12#46142,P13#46143,P14#46144,P15#46145,P16#46146,P17#46147,P18#46148,P19#46149,P20#46150,P21#46151,P22#46152,P23#46153,P24#46154,P25#46155,P26#46156,... 74 more fields] parquet
+- SubqueryAlias d2
   +- Relation[ALERT_INTERNAL_ID#46329,ENTITY_TYPE_ID#46330,ALERT_DATE#46331,ALERT_TYPE_ID#46332,STATUS_ID#46333,STATUS_INTERNAL_ID#46334,DELETED#46335,HTML_FILE_KEY#46336,P11#46337,P12#46338,P13#46339,P14#46340,P15#46341,P16#46342,P17#46343,P18#46344,P19#46345,P20#46346,P21#46347,P22#46348,P23#46349,P24#46350,P25#46351,P26#46352,... 74 more fields] parquet


In [97]:
df1 = std_alert_df
df2 = x

In [91]:
from pyspark.sql import functions as f



In [16]:
# Implementation: Spark manager
show_dim(std_alert_df)

Dimension 3 98


In [17]:
# Implementation: Spark manager, XML Parser
alert_df = std_alert_df.withColumn('alert_hits',
                                   F.udf(alert_hit_extractor.extract_alert_hits_from_xml, schema)('html_file_key'))

In [18]:
%%time
# Implementation: Delta Converter

alert_df = write_read_delta(spark, alert_df, in_cleansed_data_dir(file_name))

CPU times: user 58.5 ms, sys: 0 ns, total: 58.5 ms
Wall time: 2.03 s


In [19]:
# Implementation: Spark manager
group_count(alert_df, 'alert_hits', 3)

21/12/09 11:22:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/09 11:22:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/09 11:22:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Unnamed: 0,alert_hits,count,percent,count_cum_sum,percent_cum_sum
0,"((None, 08/10/18 12:31:01, 42823012-P560016505, 80, [WLF101-939701-62908], [08/10/18 12:31:01], [01-9397], [AML_MAGNUM_1810081211_55], [Self Service Batch Scan], [80], 1, [42823012-P560016505], [P...",1,33.333,1,33.333
1,"((None, 10/10/18 11:43:55, SSBPLASC550212199, 115, [WLF101-945401-62939], [10/10/18 11:43:55], [01-9454], [AML_PLAS_W_1810061800_971], [Self Service Batch Scan], [115], 1, [SSBPLASC550212199], [KI...",1,33.333,2,66.667
2,"((None, 24/02/20 15:02:12, SSBSAPB0025000389, 80, [WLF101-1363601-89626], [24/02/20 15:02:12], [01-13636], [AML_SAP_BN01_20022415001_334], [Self Service Batch Scan], [80], 1, [SSBSAPB0025000389], ...",1,33.333,3,100.0


In [20]:
# Implementation: Spark manager
alert_df.count()

3

In [21]:
# Implementation: Spark manager, XML Parser, Match/Hit Handler
alert_hits_df = alert_df.selectExpr('*', 'alert_hits.*') \
    .selectExpr('*', 'explode(hits) as hit') \
    .selectExpr('*', 'alert_header.*') \
    .selectExpr('*', 'hit.*') \
    .drop('alert_hits', 'alert_header', 'hits', 'hit')

In [22]:
%%time
# Implementation: Delta Exporter
alert_hits_df = write_read_delta(spark, alert_hits_df,
                                 in_cleansed_data_dir(file_name),
                                 user_metadata='At hit level'
                                )

CPU times: user 38.7 ms, sys: 3.23 ms, total: 42 ms
Wall time: 1.29 s


In [23]:
# Implementation: XML Parser, Match/Hit Handler
alert_hits_df.select('alert_internal_id').distinct().count()

3

In [24]:
# Implementation: XML Parser, Match/Hit Handler
def get_wl_hit_aliases_matched_name(hit_aliases_displayName, hit_aliases_matchedName, hit_inputExplanations):
    if hit_inputExplanations is None or len(hit_inputExplanations) == 0:
        return []
    else:
        result = []
        hit_inputExplanations = list(set(hit_inputExplanations))
        for hit_inputExplanation in hit_inputExplanations:
            if hit_inputExplanation in hit_aliases_matchedName:
                index_in_matchedName = hit_aliases_matchedName.index(hit_inputExplanation)
                result.append(hit_aliases_displayName[index_in_matchedName])
            elif hit_inputExplanation in hit_aliases_displayName:
                result.append(hit_inputExplanation)
            else:
                result.append(hit_inputExplanation)
        
        return result

In [25]:
alert_hits_df.toPandas()["hit_inputExplanations_matchedName_inputExplanation"]

0    []
1    []
2    []
Name: hit_inputExplanations_matchedName_inputExplanation, dtype: object

In [26]:
# Implementation: Match/Hit Handler, Spark manager
ap_hit_names_sql = sql_merge_to_target_col_from_source_cols(alert_hits_df, 'ap_hit_names', 'hit_inputExplanations_matchedName_inputExplanation', 'hit_inputExplanations_aliases_matchedName_inputExplanation')
alert_ap_hit_names_df = alert_hits_df.select('*', ap_hit_names_sql)

alert_ap_wl_hit_names_df = alert_ap_hit_names_df.withColumn('wl_hit_matched_name',
                                                              F.when(F.expr('size(hit_explanations_matchedName_Explanation) > 0'), F.col('hit_displayName')) \
                                                               .otherwise(F.lit(None))
                                                             ) \
                                                  .withColumn('wl_hit_aliases_matched_name',
                                                              udf(get_wl_hit_aliases_matched_name, ArrayType(StringType()))('hit_aliases_displayName', 'hit_aliases_matchedName', 'hit_explanations_aliases_matchedName_Explanation')
                                                             )

wl_hit_names_sql = sql_merge_to_target_col_from_source_cols(alert_ap_wl_hit_names_df, 'wl_hit_names', 'wl_hit_matched_name', 'wl_hit_aliases_matched_name')
alert_ap_wl_hit_names_df = alert_ap_wl_hit_names_df.select('*', wl_hit_names_sql)

In [27]:
alert_ap_wl_hit_names_df
len(alert_ap_wl_hit_names_df.toPandas()["wl_hit_aliases_matched_name"])

3

In [28]:
# Implementation: Match/Hit Handler, Spark manager
merge_hit_and_aliases_displayName_sql = sql_merge_to_target_col_from_source_cols(alert_ap_wl_hit_names_df, 'wl_hit_names', 'hit_displayName', 'hit_aliases_displayName')
merge_ap_names_sql = sql_merge_to_target_col_from_source_cols(alert_ap_wl_hit_names_df, 'ap_hit_names', 'alert_ahData_partyName', return_array=True)

alert_ap_wl_hit_names_df = alert_ap_wl_hit_names_df.withColumn('wl_hit_names',
                                                                 F.when(F.expr('size(wl_hit_names) > 0'), F.col('wl_hit_names')) \
                                                                  .otherwise(merge_hit_and_aliases_displayName_sql)
                                                                ) \
                                                   .withColumn('ap_hit_names',
                                                                 F.when(F.expr('size(ap_hit_names) > 0'), F.col('ap_hit_names')) \
                                                                  .otherwise(merge_ap_names_sql)
                                                                )

In [29]:
alert_ap_wl_hit_names_df.toPandas()['ap_hit_names']

0    [CPF BOARD]
1        [P ONE]
2          [KIM]
Name: ap_hit_names, dtype: object

In [30]:
# Implementation: Match/Hit Handler, Spark manager
alert_statuses_df = spark.read.format('delta').load(in_standardized_data_dir('ACM_MD_ALERT_STATUSES.delta')).select('STATUS_INTERNAL_ID', 'STATUS_NAME')
alert_ap_wl_hit_names_df = alert_ap_wl_hit_names_df.join(alert_statuses_df, 'STATUS_INTERNAL_ID')
alert_ap_wl_hit_names_df = reorder_cols(alert_ap_wl_hit_names_df, 'STATUS_INTERNAL_ID', 'STATUS_NAME')

In [31]:
%%time
# Implementation: Delta Exporter
alert_ap_wl_hit_names_df = write_read_delta(spark, alert_ap_wl_hit_names_df, in_cleansed_data_dir(file_name),
                                            user_metadata = 'More processing on AP and WL names, pinpoint to the exact names from AP and WL which caused the hits'
                                           )

CPU times: user 87.3 ms, sys: 2.3 ms, total: 89.6 ms
Wall time: 1.38 s


#### Below is not crucial?

In [32]:
# Implementation: NRIC handler (Customer specifics)

def extract_wl_nric_dob(custom_field):
    def _extract_yob_from_st_nric(nric):
        nric_type = nric[0]
        two_digit_year = nric[1:3]

        if nric_type.lower() == 's':
            if int(two_digit_year) >= 68:
                yob = '19' + two_digit_year
            else:
                yob = None
        else:
            yob = '20' + two_digit_year
            
        return yob
    
    nric_match = re.match('^NRIC:.*?([STGF]\d{7}[A-Z])', custom_field)
    dob_match = re.match('.*DOB: (.+?\d{4})[,.]', custom_field)
    possible_nric_match = re.findall('([STGF]\d{7}[A-Z])', custom_field)
    
    if nric_match:
        nric = nric_match.groups()
    else:
        nric = None
        
    if dob_match:
        dob = dob_match.groups()
    else:
        if nric and nric[0].lower() in ['s', 't']:
            dob = _extract_yob_from_st_nric(nric)
        else:
            dob = None
            
    if possible_nric_match:
        possible_nric = possible_nric_match
    else:
        possible_nric = None 
    
    return {'nric': nric, 'dob': dob, 'possible_nric': possible_nric}

extract_wl_nric_dob('NRIC: S6959726J, DOB: 1955, Freque')

alert_nric_df = alert_ap_wl_hit_names_df.withColumn('hit_cs_1_data_points',
                                                    udf(extract_wl_nric_dob, MapType(StringType(), ArrayType(StringType())))('hit_cs_1')
                                                   )

def extract_ap_nric(ap_id_numbers):
    ap_nrics = []
    for id_number in set(ap_id_numbers):
        if id_number and re.match('^[STGF]\d{7}[A-Z]$', id_number.upper()):
            ap_nrics.append(id_number)
    
    return ap_nrics

extract_ap_nric(['S7364776B', 'S7335736B'])

alert_nric_df = alert_nric_df.withColumn('ap_nric',
                                         udf(extract_ap_nric, ArrayType(StringType()))('alert_partyIds_idNumber')
                                        )

group_count(alert_nric_df.selectExpr('size(ap_nric) as s', 'ap_nric'), 's')

alert_nric_df.selectExpr('size(ap_nric) as s', 'ap_nric').where('s = 2').limit(2).toPandas()



21/12/09 11:22:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/09 11:22:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/09 11:22:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Unnamed: 0,s,ap_nric


In [33]:
%%time
alert_nric_df = write_read_delta(spark,
                                 alert_nric_df,
                                 in_cleansed_data_dir(file_name),
                                 user_metadata = 'Extracted AP and WL NRIC'
                                 )

CPU times: user 42.9 ms, sys: 2.55 ms, total: 45.4 ms
Wall time: 1 s


### Process alert notes and statuses


In [34]:
# Implementation: Spark manager
alert_notes_file_name = 'ACM_ALERT_NOTES.delta'
item_status_file_name = 'ACM_ITEM_STATUS_HISTORY.delta'

alert_notes_df = spark.read.format('delta').load(in_standardized_data_dir(alert_notes_file_name))
item_status_history_df = spark.read.format('delta').load(in_standardized_data_dir(item_status_file_name))
alert_statuses_df = spark.read.format('delta').load(in_standardized_data_dir('ACM_MD_ALERT_STATUSES.delta'))

Merge the "STATUS_NAME" to make the data more descriptive.

In [35]:
item_status_history_df.toPandas()

Unnamed: 0,STATUS_JOIN_ID,ITEM_JOIN_ID,ITEM_ID,FROM_STATUS_IDENTIFIER,FROM_STATE,FROM_FINDING,TO_STATUS_IDENTIFIER,TO_STATE,TO_FINDING,CREATE_DATE,USER_JOIN_ID
0,6059387,3658498,WLF101-1363601-89626,2.0,Open,No_Determination,3,Open,No_Determination,2021-08-16 18:02:47,34001
1,6059825,3658498,WLF101-1363601-89626,3.0,Open,No_Determination,IMPL_AML_FALSE_POSITIVE,Closed,Non_Issue,2021-08-17 09:03:03,30203
2,5780701,3541863,WLF101-939701-62908,,,,2,Open,No_Determination,2021-03-17 14:52:06,22601
3,5135431,3292184,WLF101-945401-62939,2.0,Open,No_Determination,24,Open,No_Determination,2020-03-17 15:22:11,30202
4,5782825,3541863,WLF101-939701-62908,3.0,Open,No_Determination,IMPL_AML_FALSE_POSITIVE,Closed,Non_Issue,2021-03-18 12:04:25,30204
5,6059345,3658498,WLF101-1363601-89626,,,,2,Open,No_Determination,2021-08-16 17:54:06,22601
6,5781004,3541863,WLF101-939701-62908,2.0,Open,No_Determination,3,Open,No_Determination,2021-03-17 15:00:55,34001
7,5800865,3292184,WLF101-945401-62939,24.0,Open,No_Determination,IMPL_AML_FALSE_POSITIVE,Closed,Non_Issue,2021-03-25 16:35:50,18506
8,5074477,3292184,WLF101-945401-62939,,,,2,Open,No_Determination,2020-02-11 16:00:01,22601


In [36]:
# Implementation: Spark manager, Delta Converter

item_status_history_df.createOrReplaceTempView('status_df')

system_id = "22601"
item_status_history_stage_df = spark.sql(f'''
    with status_row_num as (
        select *,
            row_number() over (partition by item_id order by create_date asc) as row_num
        from status_df),
    first_last_analyst_row_num as (
        select ITEM_ID,
            min(row_num) as first_analyst_row_num,
            max(row_num) as last_analyst_row_num
        from status_row_num
        where user_join_id != "{system_id}"
        group by ITEM_ID
        )
    select a.*,
        b.first_analyst_row_num,
        b.last_analyst_row_num,
        case 
            when row_num = first_analyst_row_num and row_num = last_analyst_row_num then "first_last_analyst_status"
            when row_num = first_analyst_row_num then "first_analyst_status"
            when row_num = last_analyst_row_num then "last_analyst_status"
            when row_num > first_analyst_row_num then "middle_analyst_status"
            else "system_activity"
        end as analyst_status_stage
    from status_row_num a
    join first_last_analyst_row_num b
    on a.ITEM_ID = b.ITEM_ID
''')

ANTLR Tool version 4.7 used for code generation does not match the current runtime version 4.8ANTLR Tool version 4.7 used for code generation does not match the current runtime version 4.8

In [37]:
%%time
a = write_read_delta(spark,
                                                item_status_history_stage_df,
                                                delta_path=in_cleansed_data_dir(item_status_file_name),
                                                user_metadata='Tagged the status stage'
                                               )

alert_notes_df.createOrReplaceTempView('notes_df')

alert_notes_stage_df = spark.sql('''
    with notes_row_num as (
        select *,
            row_number() over (partition by alert_id order by create_date asc) as row_num
        from notes_df),
    first_last_analyst_row_num as (
        select *,
            min(row_num) over (partition by alert_id) as first_analyst_row_num,
            max(row_num) over (partition by alert_id) as last_analyst_row_num
        from notes_row_num)
    select *,
        case 
            when row_num = first_analyst_row_num and row_num = last_analyst_row_num then "first_last_analyst_note"
            when row_num = first_analyst_row_num then "first_analyst_note"
            when row_num = last_analyst_row_num then "last_analyst_note"
            else "middle_analyst_note"
        end as analyst_note_stage
    from first_last_analyst_row_num    
''')

CPU times: user 2.43 ms, sys: 2.27 ms, total: 4.7 ms
Wall time: 1.7 s


In [38]:
%%time
alert_notes_stage_df = write_read_delta(spark,
                                        alert_notes_stage_df,
                                        delta_path=in_cleansed_data_dir(alert_notes_file_name),
                                        user_metadata='Tagged the note stage'
                                       )

extract_wl_nric_dob('NRIC: S6959726J, DOB: 1955, Freque')

alert_nric_df = alert_ap_wl_hit_names_df.withColumn('hit_cs_1_data_points',
                                                    udf(extract_wl_nric_dob, MapType(StringType(), ArrayType(StringType())))('hit_cs_1')
                                                   )

def extract_ap_nric(ap_id_numbers):
    ap_nrics = []
    for id_number in set(ap_id_numbers):
        if id_number and re.match('^[STGF]\d{7}[A-Z]$', id_number.upper()):
            ap_nrics.append(id_number)
    
    return ap_nrics

extract_ap_nric(['S7364776B', 'S7335736B'])

alert_nric_df = alert_nric_df.withColumn('ap_nric',
                                         udf(extract_ap_nric, ArrayType(StringType()))('alert_partyIds_idNumber')
                                        )

group_count(alert_nric_df.selectExpr('size(ap_nric) as s', 'ap_nric'), 's')

alert_nric_df.selectExpr('size(ap_nric) as s', 'ap_nric').where('s = 2').limit(2).toPandas()

21/12/09 11:22:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/09 11:22:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/09 11:22:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


CPU times: user 21.1 ms, sys: 9.02 ms, total: 30.1 ms
Wall time: 1.74 s


Unnamed: 0,s,ap_nric


In [39]:
%%time
alert_nric_df = write_read_delta(spark,
                                 alert_nric_df,
                                 in_cleansed_data_dir(file_name),
                                 user_metadata = 'Extracted AP and WL NRIC'
                                 )

CPU times: user 45.3 ms, sys: 354 µs, total: 45.6 ms
Wall time: 992 ms


### Process alert notes and statuses

In [40]:
# Spark manager
alert_notes_file_name = 'ACM_ALERT_NOTES.delta'
item_status_file_name = 'ACM_ITEM_STATUS_HISTORY.delta'

alert_notes_df = spark.read.format('delta').load(in_standardized_data_dir(alert_notes_file_name))
item_status_history_df = spark.read.format('delta').load(in_standardized_data_dir(item_status_file_name))
alert_statuses_df = spark.read.format('delta').load(in_standardized_data_dir('ACM_MD_ALERT_STATUSES.delta'))

Merge the "STATUS_NAME" to make the data more descriptive.

In [41]:
# Status preprocessor

item_status_history_df.createOrReplaceTempView('status_df')

system_id = "22601"
item_status_history_stage_df = spark.sql(f'''
    with status_row_num as (
        select *,
            row_number() over (partition by item_id order by create_date asc) as row_num
        from status_df),
    first_last_analyst_row_num as (
        select ITEM_ID,
            min(row_num) as first_analyst_row_num,
            max(row_num) as last_analyst_row_num
        from status_row_num
        where user_join_id != "{system_id}"
        group by ITEM_ID
        )
    select a.*,
        b.first_analyst_row_num,
        b.last_analyst_row_num,
        case 
            when row_num = first_analyst_row_num and row_num = last_analyst_row_num then "first_last_analyst_status"
            when row_num = first_analyst_row_num then "first_analyst_status"
            when row_num = last_analyst_row_num then "last_analyst_status"
            when row_num > first_analyst_row_num then "middle_analyst_status"
            else "system_activity"
        end as analyst_status_stage
    from status_row_num a
    join first_last_analyst_row_num b
    on a.ITEM_ID = b.ITEM_ID
''')

In [42]:
%%time
# Spark manager
item_status_history_stage_df = write_read_delta(spark,
                                                item_status_history_stage_df,
                                                delta_path=in_cleansed_data_dir(item_status_file_name),
                                                user_metadata='Tagged the status stage'
                                               )

CPU times: user 1.13 ms, sys: 1.19 ms, total: 2.32 ms
Wall time: 1.04 s


In [43]:
# Notes preprocessor
alert_notes_df.createOrReplaceTempView('notes_df')

alert_notes_stage_df = spark.sql('''
    with notes_row_num as (
        select *,
            row_number() over (partition by alert_id order by create_date asc) as row_num
        from notes_df),
    first_last_analyst_row_num as (
        select *,
            min(row_num) over (partition by alert_id) as first_analyst_row_num,
            max(row_num) over (partition by alert_id) as last_analyst_row_num
        from notes_row_num)
    select *,
        case 
            when row_num = first_analyst_row_num and row_num = last_analyst_row_num then "first_last_analyst_note"
            when row_num = first_analyst_row_num then "first_analyst_note"
            when row_num = last_analyst_row_num then "last_analyst_note"
            else "middle_analyst_note"
        end as analyst_note_stage
    from first_last_analyst_row_num    
''')

In [44]:
%%time
# Spark manager
alert_notes_stage_df = write_read_delta(spark,
                                        alert_notes_stage_df,
                                        delta_path=in_cleansed_data_dir(alert_notes_file_name),
                                        user_metadata='Tagged the note stage'
                                       )

CPU times: user 2.05 ms, sys: 0 ns, total: 2.05 ms
Wall time: 877 ms


# Create agent inputs - 3.0-cleansed-to-application-agent-input Spark manager, Note Preprocessor, Status Preprocessor, Agent input creator


# Load data

In [45]:
# file_name = 'RCMDB.ALERTS_SAMPLE.delta'
# Spark manager

alert_file_name = 'ALERTS.delta'
cleansed_alert_df = spark.read.format('delta').load(in_cleansed_data_dir(alert_file_name))

note_file_name = 'ACM_ALERT_NOTES.delta'
cleansed_note_df = spark.read.format('delta').load(in_cleansed_data_dir(note_file_name))

In [46]:
# Note Preprocessor, Status Preprocessor

cleansed_note_df = cleansed_note_df.where('analyst_note_stage like "%last%"').selectExpr('ALERT_ID', 'note as last_note')
cleansed_alert_df = cleansed_alert_df.join(cleansed_note_df, 'ALERT_ID', how='left')

In [47]:
group_count(cleansed_alert_df, 'last_note', 5)

21/12/09 11:22:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/09 11:22:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/09 11:22:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Unnamed: 0,last_note,count,percent,count_cum_sum,percent_cum_sum
0,<p>*1-3: Name mismatch</p>,1,33.333,1,33.333
1,"<p><span style=""font-family: 'courier new', courier;"">Number of hits : 1 </span></p>\n<p><span style=""font-family: 'courier new', courier;"">Party Name : ...",1,33.333,2,66.667
2,<p>RBA applied. Approval from JM to close the alerts.</p>,1,33.333,3,100.0


In [48]:
cleansed_alert_df.toPandas()

Unnamed: 0,ALERT_ID,STATUS_INTERNAL_ID,STATUS_NAME,ALERT_INTERNAL_ID,ENTITY_TYPE_ID,ALERT_DATE,ALERT_TYPE_ID,STATUS_ID,DELETED,HTML_FILE_KEY,P11,P12,P13,P14,P15,P16,P17,P18,P19,P20,P21,P22,P23,P24,P25,P26,P27,P28,P29,P30,P31,P32,P33,P34,P35,P36,P37,P38,P39,P40,P41,P42,P43,P44,P45,P46,P47,P48,P49,IS_CASE,BUNIT_IDENTIFIER,OWNER_INTERNAL_ID,BU_INTERNAL_ID,ORIGINAL_BU_INTERNAL_ID,FL_ARCHIVE,FL_READ,FL_READ_BY_OWNER,LAST_READ_DATE,LAST_READ_USER_ID,LAST_UPDATE_DATE,LAST_UPDATE_USER_ID,CLOSED_DATE,CREATE_DATE,ALERT_CUSTOM_ATTRIBUTES_ID,SCORE,ALERT_TYPE_VERSION,FL_MANUAL,FL_GENERATED_BY_ACM,RESOLUTION_ID,ALERT_TYPE_INTERNAL_ID,FL_HAS_ATTACHMENTS,FL_UPDATED_BY_ACM,ENTITY_ID,PREV_STATUS_INTERNAL_ID,FL_ENCRYPTED,LAST_REFRESH_MODIFED_DATE,DEADLINE_DATE,HIGHLIGHT_DATE,EMAIL_DATE,AUTO_ESC_STATUS_INTERNAL_ID,CASE_COUNT_FOR_CONFIDENTIAL,P50,GLOBAL_DEADLINE_DATE,GLOBAL_HIGHLIGHT_DATE,GLOBAL_EMAIL_DATE,GLOBAL_AUTO_ESC_STATUS_ID,RFI_STATE,FL_HAS_NOTES,FL_HAS_CONFIDENTIAL_NOTES,CONSOLIDATION_KEY,HIBERNATE_OBJECT_VERSION,OWNER_IDENTIFIER,FL_DOUBT,NUM_EXISTING_ENTITIES,WORKSPACE_INTERNAL_ID,ALERT_NAME,PRIORITY_INTERNAL_ID,DETAILS_FOR_SEARCH,DETAILS,alert_alertId,alert_alertDate,alert_alertEntityKey,alert_score,alert_ahData_alertID,alert_ahData_alertDateTime,alert_ahData_jobID,alert_ahData_jobName,alert_ahData_jobType,alert_ahData_score,alert_ahData_numberOfHits,alert_ahData_partyKey,alert_ahData_partyName,alert_ahData_entityExcludeListsNames,alert_ahData_hitExcludeListsNames,alert_partyType,alert_partyDOB,alert_partyYOB,alert_partyBirthCountry,alert_partyBirthLocation,alert_partyGender,alert_partyIds_idType,alert_partyIds_idNumber,alert_partyIds_idCountry,alert_partyNatCountries_countryCd,alert_partyAddresses_partyAddressLine1,alert_partyAddresses_partyAddressLine2,alert_partyAddresses_partyCity,alert_partyAddresses_partyPostalCd,alert_partyAddresses_partyStateProvince,alert_partyAddresses_partyCountry,hit_listId,hit_entryId,hit_listVersion,hit_entryType,hit_listUpdateDate,hit_entryCreatedDate,hit_entryUpdateDate,hit_displayName,hit_matchedName,hit_isNameBroken,hit_aliases_displayName,hit_aliases_matchedName,hit_aliases_isNameBroken,hit_aliases_matchStrength,hit_addresses_streetAddress1,hit_addresses_streetAddress2,hit_addresses_city,hit_addresses_stateProvince,hit_addresses_postalCode,hit_addresses_country,hit_ids_idType,hit_ids_idNumber,hit_ids_idCountry,hit_nationalityCountries_country,hit_placesOfBirth_birthPlace,hit_placesOfBirth_birthCountry,hit_age,hit_ageAsOfDate,hit_datesOfBirth_birthDate,hit_datesOfBirth_yearOfBirth,hit_categories_category,hit_keywords_keyword,hit_title,hit_position,hit_gender,hit_isDeceased,hit_deceasedDate,hit_cs_1,hit_cs_2,hit_cs_3,hit_cs_4,hit_cs_5,hit_cs_6,hit_cs_7,hit_cs_8,hit_cs_9,hit_cs_10,hit_cs_11,hit_cs_12,hit_cs_13,hit_cs_14,hit_cs_15,hit_cs_16,hit_cs_17,hit_cs_18,hit_additionalInfo_name,hit_additionalInfo_value,hit_score,hit_matchType,hit_scoreFactors_factorId,hit_scoreFactors_factorDesc,hit_scoreFactors_factorValue,hit_scoreFactors_factorScore,hit_scoreFactors_factorImpact,hit_scoresBreakdown_matchedName,hit_scoresBreakdown_aliases_matchedName,hit_scoresBreakdown_addresses_city,hit_scoresBreakdown_addresses_country,hit_scoresBreakdown_addresses_stateProvince,hit_scoresBreakdown_ids_idNumber,hit_explanations_matchedName_Explanation,hit_explanations_aliases_matchedName_Explanation,hit_explanations_nationalityCountries_country_Explanation,hit_explanations_address_city_Explanation,hit_explanations_address_country_Explanation,hit_explanations_addresses_stateProvince_Explanation,hit_explanations_ids_idNumber_Explanation,hit_inputExplanations_matchedName_inputExplanation,hit_inputExplanations_aliases_matchedName_inputExplanation,hit_inputExplanations_nationalityCountries_country_inputExplanation,hit_inputExplanations_address_city_inputExplanation,hit_inputExplanations_address_country_inputExplanation,hit_inputExplanations_addresses_stateProvince_inputExplanation,hit_inputExplanations_ids_idNumber_inputExplanation,ap_hit_names,wl_hit_matched_name,wl_hit_aliases_matched_name,wl_hit_names,hit_cs_1_data_points,ap_nric,last_note
0,WLF101-1363601-89626,7027,False Positive,1649364,,24-Feb-20,,,0,"<?xml version=""1.0"" encoding=""UTF16"" standalone=""no"" ?><alert><alert-header><elem name=""alertId"">WLF101-1363601-89626</elem><elem name=""alertDate"">24/02/20 15:02:12</elem><elem name=""alertEntityKe...",01-13636,SSBSAPB0025000389,,,I,NAME;,,,,,,,,,,,,,,,AML_SAP_BN01_20022415001_334,FACTIVA_PEP_SAN,CPF BOARD,,AML-EWLF,FACTIVA_SIE;,1995-12-08;,Special Interest Entity (SIE)-Other Official Lists-Bank;Special Interest Entity (SIE)-Other Official Lists-State Owned Company;,Self Service Batch Scan,,,,,,,,,,Watch List: FACTIVA_SIE; Entry ID: 1091285; Entity Type: ORGANIZATION; Match Type: NAME;,0,1,,3280419,3280419,0,1,0,12-Jul-21,30801,12-Jul-21,30801,12-Jul-21,24-Feb-20,865017,80,AML-EWLF/3.5.1.33,0,1,,119,0,1,,,0,,,,,,0,,,,,,0,1,0,,13,,0,,,,,,\r,,24/02/20 15:02:12,SSBSAPB0025000389,80,[WLF101-1363601-89626],[24/02/20 15:02:12],[01-13636],[AML_SAP_BN01_20022415001_334],[Self Service Batch Scan],[80],1,[SSBSAPB0025000389],[CPF BOARD],[None],[None],Organization,,,,,UNKNOWN,[None],[None],[None],[None],[],[],[],[],[],[],FACTIVA_SIE,1091285,160,ORGANIZATION,21/02/20,02/03/18,02/03/18,China Petroleum Finance,China Petroleum Finance,,"[CPF, China Petroleum Finance Co. Ltd, Zhongyou Caiwu, Zhong You Cai Wu, 中油财务有限责任公司, 中油財務有限責任公司]","[CPF, China Petroleum Finance Co. Ltd, Zhongyou Caiwu, Zhong You Cai Wu, 中油财务有限责任公司, 中油財務有限責任公司]","[False, False, False, False, False, False]","[H, H, H, H, H, H]","[No 5 Gulouwai Avenue, Xicheng District]",[None],[Beijing;;100029],[None],[None],[CN],"[Company Identification No., DUNS Number, National Tax No., Standard Industrial Classification (SIC), North American Industry Classification System (NAICS), NACE (European Union Economic Activity ...","[91110000100018558M, 544956829, 91110000100018558M, 60, 522120, 65.1]","[None, None, None, None, None, None]",[CN],[],[],,,[1995-12-08],[1995],"[Special Interest Entity (SIE)-Other Official Lists-Bank, Special Interest Entity (SIE)-Other Official Lists-State Owned Company]",[2395],,,,,,4;4;,4;4;,12;14;,,,,,,,,,,,,,,,,"[ProfileNotes, Sanction References]","[IOWA BOARD OF REGENTS NOTES: SUDAN Scrutinized Companies Category: Ongoing Engagement From: 30-Jun-2011 to 30-Jun-2013, Since 30 Jun 2011, List 2395,]",80.0,NAME,"[AML_WLF_CF_SF_matchingEngine, AML_WLF_CF_SF_singleTokenMatch]","[Matching Engine Match, Single Token Match]","[95, YES]","[95.0, -15.0]","[MEDIUM, CORRECTIVE]",0,95,0,0,0,0,[],[CPF],[],[],[],[],[],[],[CPF BOARD],[],[],[],[],[],[CPF BOARD],,[CPF],[CPF],"{'possible_nric': None, 'nric': None, 'dob': None}",[],<p>*1-3: Name mismatch</p>
1,WLF101-939701-62908,7027,False Positive,1619405,,8-Oct-18,,,0,"<?xml version=""1.0"" encoding=""UTF16"" standalone=""no"" ?><alert><alert-header><elem name=""alertId"">WLF101-939701-62908</elem><elem name=""alertDate"">08/10/18 12:31:01</elem><elem name=""alertEntityKey...",Jan-97,42823012-P560016505,1.0,1/1/89,P,NAME;,,,,,,,,,,,,,,,AML_MAGNUM_1810081211_55,FACTIVA_PEP_SAN,P ONE,:S0135242C,AML-EWLF,FACTIVA_SAN;,1992; 1994; 1993; 1995;,Special Interest Person (SIP)-Sanctions Lists;Special Interest Person (SIP)-Other Official Lists;,Self Service Batch Scan,,,,,,,,,,Watch List: FACTIVA_SAN; Entry ID: 4790496; Entity Type: PERSON; Match Type: NAME;,0,1,,3280419,3280419,0,1,0,12-Jul-21,30801,29-Dec-20,28202,29-Dec-20,8-Oct-18,812105,80,AML-EWLF/3.4.0.8,0,1,,119,0,1,,,0,,,,,,0,,,,,,0,1,0,,9,,0,0.0,,,,,\r,,08/10/18 12:31:01,42823012-P560016505,80,[WLF101-939701-62908],[08/10/18 12:31:01],[01-9397],[AML_MAGNUM_1810081211_55],[Self Service Batch Scan],[80],1,[42823012-P560016505],[P ONE],[None],[None],Person,01/01/89,1989.0,,,MALE,[None],[S0135242C],[None],[SG],[],[],[],[],[],[],FACTIVA_SAN,4790496,167,PERSON,06/06/18,17/08/17,17/08/17,Ali Kony,{GN=Ali}{SN=Kony},True,"[アリ・コニー, アリ・ラロボ・バシール, Bashir,Ali,Lalobo, Kapere,Otim, Kony,Ali,Mohammed, Labola,Ali,Mohammed, Labolo,Ali,Mohammad, Lalobo,Ali, Lalobo,Ali,Bashir, Lalobo,Ali,Mohammed, Salongo,Ali,Mohammed, Mohamme...","[アリ・コニー, アリ・ラロボ・バシール, {GN=Ali,Lalobo}{SN=Bashir}, {GN=Otim}{SN=Kapere}, {GN=Ali,Mohammed}{SN=Kony}, {GN=Ali,Mohammed}{SN=Labola}, {GN=Ali,Mohammad}{SN=Labolo}, {GN=Ali}{SN=Lalobo}, {GN=Ali,Bashir}...","[False, False, True, True, True, True, True, True, True, True, True, True, False, False, False, False, False, False, False, False, False]","[H, H, H, H, H, H, H, H, H, H, H, H, L, L, L, L, L, L, L, L, L]","[Kafia Kingi (border of Sudan and South Sudan), Kafia Kingi (a territory on the border of Sudan and South Sudan whose final status has yet to be determined), None]","[None, None, None]","[None, None, ;Kafia Kingi]","[None, None, None]","[None, None, None]","[None, None, None]","[SECO SSID, DFAT Reference Number]","[34712, 3194]","[None, None]",[None],[None],[None],,,"[1992, 1994, 1993, 1995]","[1992, 1994, 1993, 1995]","[Special Interest Person (SIP)-Sanctions Lists, Special Interest Person (SIP)-Other Official Lists]","[1, 275, 281, 283, 1156, 1326, 2556, 2644, 3166, 3322, 3324, 3550, 3552]",,,MALE,False,,3;3;,1;2;,,,,,,,,,,,,,,,,,"[ProfileNotes, Images, Primary Occupation, Other Roles, Previous Roles, Sanction References]","[OFFICE OF FOREIGN ASSETS CONTROL (OFAC) NOTES:\n\nRegion: Kafia Kingi\n\nTreasury Sanctions Lord’s Resistance Army Commanders Salim and Ali Kony\n\n8/23/2016 ​\nWASHINGTON – Today, the U.S. Depar...",80.0,NAME,"[AML_WLF_CF_SF_matchingEngine, AML_WLF_CF_SF_yearOfBirthMatch, AML_WLF_CF_SF_lowQualityAliases]","[Matching Engine Match, Year of Birth Match, Low Quality Aliases]","[100, MISMATCH, YES]","[100.0, -10.0, -10.0]","[HIGH, CORRECTIVE, CORRECTIVE]",0,100,0,0,0,0,[],"[One P, One P]",[],[],[],[],[],[],"[P ONE, P ONE]",[],[],[],[],[],[P ONE],,[One P],[One P],"{'possible_nric': None, 'nric': None, 'dob': None}",[S0135242C],"<p><span style=""font-family: 'courier new', courier;"">Number of hits : 1 </span></p>\n<p><span style=""font-family: 'courier new', courier;"">Party Name : ..."
2,WLF101-945401-62939,7027,False Positive,1619436,,10-Oct-18,,,0,"<?xml version=""1.0"" encoding=""UTF16"" standalone=""no"" ?><alert><alert-header><elem name=""alertId"">WLF101-945401-62939</elem><elem name=""alertDate"">10/10/18 11:43:55</elem><elem name=""alertEntityKey...",Jan-54,SSBPLASC550212199,1.0,5/12/62,P,NAME;,,,,,,,,,,,,,,,AML_PLAS_W_1810061800_971,FACTIVA_PEP_SAN,KIM,,AML-EWLF,FACTIVA_SAN;,1964; 1962-08-28;,Politically Exposed Person (PEP);Special Interest Person (SIP)-Sanctions Lists;,Self Service Batch Scan,,,,,,,,,,Watch List: FACTIVA_SAN; Entry ID: 1198704; Entity Type: PERSON; Match Type: NAME;,0,1,,3280419,3280419,0,1,0,12-Jul-21,30801,29-Dec-20,28202,29-Dec-20,10-Oct-18,812136,115,AML-EWLF/3.4.0.8,0,1,,119,0,1,,,0,,,,,,0,,,,,,0,1,0,,9,,0,0.0,,,,,,,10/10/18 11:43:55,SSBPLASC550212199,115,[WLF101-945401-62939],[10/10/18 11:43:55],[01-9454],[AML_PLAS_W_1810061800_971],[Self Service Batch Scan],[115],1,[SSBPLASC550212199],[KIM],[None],[None],Person,05/12/62,1962.0,,,UNKNOWN,[None],[None],[None],[None],[],[],[],[],[],[],FACTIVA_SAN,1198704,168,PERSON,06/06/18,06/06/18,06/06/18,Tong-Myo'ng Kim,{GN=Tong-Myo'ng}{SN=Kim},True,"[김동명, キム・トンミョン, Дон Мён Ким, キム・チンソク, Чин Сок Ким, Kim,Chin-So'k, Kim,Jin Sok, Kim,Dong Myong, Kim,Hyok Chol, Kim,, Hyok-Chol,, Tong My'ong,Kim, Jin-Sok,Kim, Tong-Myong,Kim, Chin-So'k,Kim, Kim,Ton...","[김동명, キム・トンミョン, Дон Мён Ким, キム・チンソク, Чин Сок Ким, {GN=Chin-So'k}{SN=Kim}, {GN=Jin Sok}{SN=Kim}, {GN=Dong Myong}{SN=Kim}, {GN=Hyok Chol}{SN=Kim}, Kim,, Hyok-Chol,, {GN=Kim}{SN=Tong My'ong}, {GN=Ki...","[False, False, False, False, False, True, True, True, True, False, False, True, True, True, True, True, True, True, True, True, True, True, True, False]","[H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, L]","[c/o Tanchon Commercial Bank, Saemul 1-Dong Pyongchon, District, Linked to: Tanchon Commercial Bank, Saemul 1-Dong Pyongchon District]","[None, None]","[Pyongyang, Pyongyang]","[None, None]","[None, None]","[KP, KP]","[Passport No., SECO SSID, DFAT Reference Number, DFAT Reference Number, OSFI North Korea ID]","[290320764, 33845, 1133, 3157, P23]","[None, None, None, None, None]","[KP, KP]",[None],[None],,,"[1964, 1962-08-28]","[1964, 1962]","[Politically Exposed Person (PEP), Special Interest Person (SIP)-Sanctions Lists]","[13, 275, 281, 283, 627, 1310, 1318, 1324, 1326, 1906, 1908, 2342, 2416, 3160, 3162, 3264, 3630]",,"President, Tanchon Commercial Bank",MALE,False,,1;3;,1;,,,,,,,,,,,,,,,,,"[ProfileNotes, Images, Primary Occupation, Other Roles, Previous Roles, Sanction References]","[OFFICE OF FOREIGN ASSETS CONTROL (OFAC) NOTES:\n\nOctober 23, 2009\nTG-330\n\nTreasury Designates North Korean Bank and Banking Official\nAs Proliferators of Weapons of Mass destruction\n\nWASHIN...",115.0,NAME,"[AML_WLF_CF_SF_matchingEngine, AML_WLF_CF_SF_yearOfBirthMatch, AML_WLF_CF_SF_singleTokenMatch]","[Matching Engine Match, Year of Birth Match, Single Token Match]","[100, MATCH, YES]","[100.0, 30.0, -15.0]","[HIGH, LOW, CORRECTIVE]",0,100,0,0,0,0,[],"[Kim,]",[],[],[],[],[],[],[KIM],[],[],[],[],[],[KIM],,"[Kim,]","[Kim,]","{'possible_nric': None, 'nric': None, 'dob': None}",[],<p>RBA applied. Approval from JM to close the alerts.</p>


__Detailed implementation__

It's rather easy to implement if the goal is just to produce the dataframe for agent to consume. Some interim data need to be created to serve the purpose of analytics.

There are 2 main categories of transformations.
1. Interface/config transformation: Activities on the agent input config/interface.
1. Data transformation: Activities on the data based on the config/interface.

Steps
1. Create the agent input config.
    1. Interface/config transformation. Define agent input template. Each agent's input is a dictionary with 4 key-value pairs.
    ```
    {
        'ap': [],
        'ap_aliases': [],
        'wl': [] ,
        'wl_aliases': []
    }
    ```

        - `ap`: The primary value(s) of alerted party's specific attribute, e.g, name, it could be from one or multiple columns.
        - `ap_aliases`: The aliases of alerted party's specific attribute, it could be from one or multiple columns.
        - So on and so forth for `wl` and `wl_aliases`.

    1. Interface/config transformation. Define the list of agents. __Each agent's name must end with `_agent`.__
    ```
    agent_list = [
        'name_agent',
        'gender_agent'
    ]
    ```

    1. Interface/config transformation. Config the agent input by specifying which column(s) should be treated as the input of which agent's which party's primary or aliase value(s). 
    ```
    {
        'name_agent': {
            'ap': ['record_name', 'short_name'],
            'ap_aliases': ['alternate_name'],
            'wl': ['name_hit'],
            'wl_aliases': []
        },
        'gender_agent': {'ap': ['record_gender'],
                         'ap_aliases': [],
                         'wl': ['additional_infos_gender'],
                         'wl_aliases': []
                        },
    }
    ```
    Certain concepts need to be defined here.
        1. `level-1-key`: The name of each agent, it's `name_agent` and `gender_agent`.
        1. `level-1-value`: The value of each agent's config, it's a dictionary, e.g, 
        ```
        {
            'ap': ['record_name', 'short_name'],
            'ap_aliases': ['alternate_name'],
            'wl': ['name_hit'],
            'wl_aliases': []
        }
        ```
        1. `level-2-key`: The key of each agent config's value, or rather the key of `level-1-value`. It's `ap`, `ap_aliases`, `wl` and `wl_aliases`.
        1. `level-2-value`: The list of column names, e.g, `['record_name', 'short_name']`.

1. Create the interim agent input config and data. The interface is standardized from here onwards.  In reality, the data format can be more complex, e.g, national IDs we need to consider both type and document number.
    1. Interface/config transformation. Prepend `level-1-key` to `level-2-key` so that `level-2-key` can be used as new column names to host the interim data for analytics and/or debugging activites. Take `name_agent` for example.
    ```
    {
        'name_agent': {
            'name_agent_ap': ['record_name', 'short_name'],
            'name_agent_ap_aliases': ['alternate_name'],
            'name_agent_wl': ['name_hit'],
            'name_agent_wl_aliases': []
        }
    }
    ```
    1. Data transformation. Merge the values from `level-2-value` columns to `level-2-key` column. Below table will be the result.
    
| uuid | record_name | short_name | alternate_name | wl_primary_name | name_hit |   name_agent_ap   | name_agent_ap_aliases | name_agent_wl | name_agent_wl_aliases |
| ---- | :---------: | :--------: | :------------: | :-------------: | :------: | :---------------: | :-------------------: | :-----------: | :-------------------: |
| 1234 |  Jim Green  |    J.G.    |      Jim       |   James Greg    |   J.G    | [Jim Green, J.G.] |          Jim          |      J.G      |         None          |

1. Create the final agent input config and data based on the standardized interface.
    1. Interface/config transformation. Now we have a consistent schema to create the 1 list of alerted party values and 1 list of watchlist party values. We no longer need to worry about the customer specific schema, e.g, `record_name`, `short_name` and etcs. They have been standardized as `name_agent_ap`, `name_agent_ap_aliases` and etcs.
    ```
    {
        'name_agent': {'ap_all_names_aggregated': ['name_agent_ap', 'name_agent_ap_aliases'],
                       'wl_all_names_aggregated': ['name_agent_wl', 'name_agent_wl_aliases']
                      }
    }
    ```
    1. Data transformation. Merge the values from the primary and alias columns. Below table will be the result.
    
| uuid | ap_all_names_aggregated | wl_all_names_aggregated |
| ---- | :---------------------: | :---------------------: |
| 1234 | [Jim Green, J.G., Jim]  |          [J.G]          |

In [49]:
# Agent input creator

import copy
input_template = {'ap': [],
                  'ap_aliases': [],
                  'wl': [] ,
                  'wl_aliases': []
                 }

agent_list = [
    'party_type_agent',
    'name_agent',
    'dob_agent',
    'pob_agent',
    'gender_agent',
    'national_id_agent',
#     'passport_agent',
    'document_number_agent',
    'nationality_agent',
    'historical_decision_name_agent',
    'pep_payment_agent',
    'hit_is_san_agent',
    'hit_is_deceased_agent',
    'hit_has_dob_id_address_agent',
    'rba_agent'
]

agent_input_config = {}

for agent in agent_list:
    new_input = copy.deepcopy(input_template)
    agent_input_config[agent] = new_input
    
agent_input_config

{'party_type_agent': {'ap': [], 'ap_aliases': [], 'wl': [], 'wl_aliases': []},
 'name_agent': {'ap': [], 'ap_aliases': [], 'wl': [], 'wl_aliases': []},
 'dob_agent': {'ap': [], 'ap_aliases': [], 'wl': [], 'wl_aliases': []},
 'pob_agent': {'ap': [], 'ap_aliases': [], 'wl': [], 'wl_aliases': []},
 'gender_agent': {'ap': [], 'ap_aliases': [], 'wl': [], 'wl_aliases': []},
 'national_id_agent': {'ap': [], 'ap_aliases': [], 'wl': [], 'wl_aliases': []},
 'document_number_agent': {'ap': [],
  'ap_aliases': [],
  'wl': [],
  'wl_aliases': []},
 'nationality_agent': {'ap': [], 'ap_aliases': [], 'wl': [], 'wl_aliases': []},
 'historical_decision_name_agent': {'ap': [],
  'ap_aliases': [],
  'wl': [],
  'wl_aliases': []},
 'pep_payment_agent': {'ap': [], 'ap_aliases': [], 'wl': [], 'wl_aliases': []},
 'hit_is_san_agent': {'ap': [], 'ap_aliases': [], 'wl': [], 'wl_aliases': []},
 'hit_is_deceased_agent': {'ap': [],
  'ap_aliases': [],
  'wl': [],
  'wl_aliases': []},
 'hit_has_dob_id_address_agent'

In [50]:
# Agent input creator

# DO NOTE the fff_format use i-based index While, python list is 0-based index
agent_input_config['party_type_agent']['ap'].extend(['alert_partyType'])
agent_input_config['party_type_agent']['wl'].extend(['hit_entryType'])

agent_input_config['name_agent']['ap'].extend(['ap_hit_names'])
agent_input_config['name_agent']['wl'].extend(['wl_hit_names'])

# agent_input_config['dob_agent']['ap'].extend(['alert_partyDOB', 'alert_partyYOB'])
# agent_input_config['dob_agent']['wl'].extend(['hit_datesOfBirth_birthDate', 'hit_datesOfBirth_yearOfBirth'])

# The alert_partyDOB has value of '31/12/99' which is actually '9999-12-31' from P14
agent_input_config['dob_agent']['ap'].extend(['P14'])
agent_input_config['dob_agent']['wl'].extend(['hit_datesOfBirth_birthDate', 'hit_cs_1_data_points.dob'])

agent_input_config['pob_agent']['ap'].extend(['alert_partyBirthCountry'])
# It's weird hit_placesOfBirth_birthPlace has country data instead of hit_placesOfBirth_birthCountry
agent_input_config['pob_agent']['wl'].extend(['hit_placesOfBirth_birthPlace'])

agent_input_config['gender_agent']['ap'].extend(['alert_partyGender'])
agent_input_config['gender_agent']['wl'].extend(['hit_gender'])

# The national ID is for SG NRIC only
agent_input_config['national_id_agent']['ap'].extend(['ap_nric'])
agent_input_config['national_id_agent']['wl'].extend(['hit_cs_1_data_points.nric'])

# agent_input_config['passport_agent']['ap'].extend(get_ap_screenable_attributes('PASSPORT'))
# agent_input_config['passport_agent']['wl'].extend(['passport'])
agent_input_config['document_number_agent']['ap'].extend(['alert_partyIds_idNumber'])
# The 'hit_cs_1_data_points.nric' is a subset of 'hit_cs_1_data_points.possible_nric'. Hence, use 'hit_cs_1_data_points.possible_nric' only
agent_input_config['document_number_agent']['wl'].extend(['hit_ids_idNumber', 'hit_cs_1_data_points.possible_nric'])

# "alert_partyNatCountries_countryCd" is alwasy empty, "alert_partyNatCountries_countryCd" is actually part of address
# but it's used by screening engine and analyst to match with WL nationality country
agent_input_config['nationality_agent']['ap'].extend(['alert_partyNatCountries_countryCd', 'alert_partyAddresses_partyCountry'])
agent_input_config['nationality_agent']['wl'].append('hit_nationalityCountries_country')

agent_input_config['historical_decision_name_agent']['ap'].extend(['alert_ahData_numberOfHits', 'STATUS_NAME', 'alert_ahData_partyName', 'ALERT_DATE'])
agent_input_config['historical_decision_name_agent']['wl'].extend(['hit_entryId'])

# agent_input_config['historical_decision_entity_key_agent']['ap'].extend(['alert_ahData_numberOfHits', 'STATUS_NAME', 'alert_alertEntityKey', 'ALERT_DATE'])
# agent_input_config['historical_decision_entity_key_agent']['wl'].extend(['hit_entryId'])

agent_input_config['pep_payment_agent']['ap'].extend(['P12'])
agent_input_config['pep_payment_agent']['wl'].extend(['hit_listID'])

# The P36 and P38 are at alert level, use `hit_listId` and `hit_categories_category` which is at hit level
agent_input_config['hit_is_san_agent']['wl'].extend(['hit_listId', 'hit_categories_category'])

agent_input_config['hit_is_deceased_agent']['wl'].extend(['hit_isDeceased'])

agent_input_config['hit_has_dob_id_address_agent']['wl'].extend(['hit_datesOfBirth_birthDate', 'hit_cs_1_data_points.dob', 'hit_ids_idNumber', 'hit_cs_1_data_points.possible_nric', 'hit_addresses_streetAddress1'])

agent_input_config['rba_agent']['ap'].extend(['alert_ahData_numberOfHits', 'STATUS_NAME', 'alert_ahData_partyName', 'ALERT_DATE', 'last_note'])
agent_input_config['rba_agent']['wl'].extend(['hit_entryId'])

In [51]:
# Agent input creator

def prepend_agent_name_to_ap_or_wl_or_aliases_key(agent_input_config):
    ''' Prepend the agent name (level 1 key) to level 2 key. So the new level 2 key will be 
    
        Input:
        { 'name_agent': {'ap': ['record_name'],
                         'ap_aliases': [],
                         'wl': ['name_hit'],
                         'wl_aliases': []
                        }
        }
        
        Output:
        { 'name_agent': {'name_agent_ap': ['record_name'],
                         'name_agent_ap_aliases': [],
                         'name_agent_wl': ['name_hit'],
                         'name_agent_wl_aliases': []
                        }
        }
    '''
    result = dict()
    for agent_name, config in agent_input_config.items():
        result[agent_name] = dict()
        
        for ap_or_wl_or_aliases, source_cols in config.items():
            prepended_key_name = '_'.join([agent_name, ap_or_wl_or_aliases])
            result[agent_name][prepended_key_name] = source_cols
            
    return result

In [52]:
# Sanity check
agent_input_prepended_agent_name_config = prepend_agent_name_to_ap_or_wl_or_aliases_key(agent_input_config)
agent_input_prepended_agent_name_config

{'party_type_agent': {'party_type_agent_ap': ['alert_partyType'],
  'party_type_agent_ap_aliases': [],
  'party_type_agent_wl': ['hit_entryType'],
  'party_type_agent_wl_aliases': []},
 'name_agent': {'name_agent_ap': ['ap_hit_names'],
  'name_agent_ap_aliases': [],
  'name_agent_wl': ['wl_hit_names'],
  'name_agent_wl_aliases': []},
 'dob_agent': {'dob_agent_ap': ['P14'],
  'dob_agent_ap_aliases': [],
  'dob_agent_wl': ['hit_datesOfBirth_birthDate', 'hit_cs_1_data_points.dob'],
  'dob_agent_wl_aliases': []},
 'pob_agent': {'pob_agent_ap': ['alert_partyBirthCountry'],
  'pob_agent_ap_aliases': [],
  'pob_agent_wl': ['hit_placesOfBirth_birthPlace'],
  'pob_agent_wl_aliases': []},
 'gender_agent': {'gender_agent_ap': ['alert_partyGender'],
  'gender_agent_ap_aliases': [],
  'gender_agent_wl': ['hit_gender'],
  'gender_agent_wl_aliases': []},
 'national_id_agent': {'national_id_agent_ap': ['ap_nric'],
  'national_id_agent_ap_aliases': [],
  'national_id_agent_wl': ['hit_cs_1_data_points.n

In [53]:
x = cleansed_alert_df.toPandas()

In [54]:
# Spark manager / Agent input creator

def spark_sql_create_agent_primary_alias_input_cols(df, agent_input_prepended_agent_name_config):
    ''' Merge the customer specific columns into standardized agent primary and alias input columns.
    
        Input:
        { 'name_agent': {'name_agent_ap': ['record_name', 'whatever_other_name'],
                         'name_agent_ap_aliases': [],
                         'name_agent_wl': ['name_hit'],
                         'name_agent_wl_aliases': []
                        }
        }
        
        Output:
        Take {'name_agent_ap': ['record_name', 'whatever_other_name']} as an example. The 2 columns
        'record_name' and 'whatever_other_name' will be merged to create a new column named 'name_agent_ap'.
    '''
    sql_expr_list = []
    
    for agent_name, config in agent_input_prepended_agent_name_config.items():
        for target_col, source_cols in config.items():
            
            sql_expr = sql_merge_to_target_col_from_source_cols(df, target_col, *source_cols)
            if sql_expr is not None:
                sql_expr_list.append(sql_expr)
                
    return sql_expr_list

In [55]:
# Agent input creator
agent_input_raw_df = cleansed_alert_df.select('*',
                                      *spark_sql_create_agent_primary_alias_input_cols(cleansed_alert_df, agent_input_prepended_agent_name_config))

In [56]:
x = agent_input_raw_df.toPandas()

In [57]:
agent_input_refined_df = agent_input_raw_df

In [58]:
# Agent input creator

def create_agent_input_agg_col_config(agent_input_prepended_agent_name_config):
    ''' Create the source and target columns based on the standardized agent input config.
    
        Input:
        { 'name_agent': {'name_agent_ap': ['record_name', 'whatever_other_name'],
                         'name_agent_ap_aliases': [],
                         'name_agent_wl': ['name_hit'],
                         'name_agent_wl_aliases': []
                        }
        }
        
        Output:
        {'name_agent': {'ap_all_names_aggregated': ['name_agent_ap', 'name_agent_ap_aliases'],
                        'wl_all_names_aggregated': ['name_agent_wl', 'name_agent_wl_aliases']
                       }
        }
    '''
    def _generate_simple_plural(word):
        if word.lower().endswith('s'):
            return word.lower() + 'es'
        elif word.lower().endswith('y') \
                and word.lower()[-2:] not in ['ay', 'ey', 'iy', 'oy', 'uy']:
            return word.lower()[:-1] + 'ies'
        else:
            return word.lower() + 's'
        
    def _get_ap_or_wl_agg_source_cols(level_1_value, party):
        source_cols = []
        for col in level_1_value.keys():
            if col.endswith(f'_{party}') or col.endswith(f'_{party}_aliases'):
                source_cols.append(col)
                
        return source_cols
        
    agent_input_agg_col_config = dict()
    
    for agent_name, config in agent_input_prepended_agent_name_config.items():
        agent_type = agent_name.split('_agent', 1)[0]
        
        agent_ap_agg_col = f"""ap_all_{_generate_simple_plural(agent_type)}_aggregated"""
        agent_wl_agg_col = f"""wl_all_{_generate_simple_plural(agent_type)}_aggregated"""
        
        agent_ap_agg_source_cols = _get_ap_or_wl_agg_source_cols(config, 'ap')
        agent_wl_agg_source_cols = _get_ap_or_wl_agg_source_cols(config, 'wl')
                
        agent_input_agg_col_config[agent_name] = dict()
        agent_input_agg_col_config[agent_name][agent_ap_agg_col] = agent_ap_agg_source_cols
        agent_input_agg_col_config[agent_name][agent_wl_agg_col] = agent_wl_agg_source_cols
            
    return agent_input_agg_col_config

In [59]:
# DO NOTE the input to the function is the config object after prepending agent name.
agent_input_agg_col_config = create_agent_input_agg_col_config(agent_input_prepended_agent_name_config)
agent_input_agg_col_config

{'party_type_agent': {'ap_all_party_types_aggregated': ['party_type_agent_ap',
   'party_type_agent_ap_aliases'],
  'wl_all_party_types_aggregated': ['party_type_agent_wl',
   'party_type_agent_wl_aliases']},
 'name_agent': {'ap_all_names_aggregated': ['name_agent_ap',
   'name_agent_ap_aliases'],
  'wl_all_names_aggregated': ['name_agent_wl', 'name_agent_wl_aliases']},
 'dob_agent': {'ap_all_dobs_aggregated': ['dob_agent_ap',
   'dob_agent_ap_aliases'],
  'wl_all_dobs_aggregated': ['dob_agent_wl', 'dob_agent_wl_aliases']},
 'pob_agent': {'ap_all_pobs_aggregated': ['pob_agent_ap',
   'pob_agent_ap_aliases'],
  'wl_all_pobs_aggregated': ['pob_agent_wl', 'pob_agent_wl_aliases']},
 'gender_agent': {'ap_all_genders_aggregated': ['gender_agent_ap',
   'gender_agent_ap_aliases'],
  'wl_all_genders_aggregated': ['gender_agent_wl', 'gender_agent_wl_aliases']},
 'national_id_agent': {'ap_all_national_ids_aggregated': ['national_id_agent_ap',
   'national_id_agent_ap_aliases'],
  'wl_all_nationa

In [60]:
# Agent input creator

# The agent_agg_cols_config will be needed later
with open('agent_input_agg_col_config.json', 'w') as outfile:
    json.dump(agent_input_agg_col_config, outfile)

In [61]:
# Spark manager, Agent input creator

def spark_sql_create_agg_cols(df, agent_input_agg_col_config):
    sql_expr_list = []
    for agent, config in agent_input_agg_col_config.items():
        for party_agg_col, party_agg_source_cols in config.items():
            target_col = party_agg_col
            source_cols = party_agg_source_cols
            
            sql_expr = sql_merge_to_target_col_from_source_cols(df, target_col, *source_cols, return_array=True)
            if sql_expr is not None:
                sql_expr_list.append(sql_expr)

    return sql_expr_list

In [62]:
%%time
# Agent input creator
agent_input_agg_df = agent_input_refined_df.select('*',
                                                   *spark_sql_create_agg_cols(agent_input_refined_df, agent_input_agg_col_config)
                                                  ) \
                                            .withColumn('_index', F.monotonically_increasing_id())

agent_input_agg_df = write_read_delta(spark,
                                     agent_input_agg_df,
                                     in_application_data_dir('agent_input_agg_df.delta')
                                    )

CPU times: user 196 ms, sys: 22.5 ms, total: 218 ms
Wall time: 2.24 s


In [63]:
agent_input_agg_df.toPandas().head()

Unnamed: 0,ALERT_ID,STATUS_INTERNAL_ID,STATUS_NAME,ALERT_INTERNAL_ID,ENTITY_TYPE_ID,ALERT_DATE,ALERT_TYPE_ID,STATUS_ID,DELETED,HTML_FILE_KEY,P11,P12,P13,P14,P15,P16,P17,P18,P19,P20,P21,P22,P23,P24,P25,P26,P27,P28,P29,P30,P31,P32,P33,P34,P35,P36,P37,P38,P39,P40,P41,P42,P43,P44,P45,P46,P47,P48,P49,IS_CASE,BUNIT_IDENTIFIER,OWNER_INTERNAL_ID,BU_INTERNAL_ID,ORIGINAL_BU_INTERNAL_ID,FL_ARCHIVE,FL_READ,FL_READ_BY_OWNER,LAST_READ_DATE,LAST_READ_USER_ID,LAST_UPDATE_DATE,LAST_UPDATE_USER_ID,CLOSED_DATE,CREATE_DATE,ALERT_CUSTOM_ATTRIBUTES_ID,SCORE,ALERT_TYPE_VERSION,FL_MANUAL,FL_GENERATED_BY_ACM,RESOLUTION_ID,ALERT_TYPE_INTERNAL_ID,FL_HAS_ATTACHMENTS,FL_UPDATED_BY_ACM,ENTITY_ID,PREV_STATUS_INTERNAL_ID,FL_ENCRYPTED,LAST_REFRESH_MODIFED_DATE,DEADLINE_DATE,HIGHLIGHT_DATE,EMAIL_DATE,AUTO_ESC_STATUS_INTERNAL_ID,CASE_COUNT_FOR_CONFIDENTIAL,P50,GLOBAL_DEADLINE_DATE,GLOBAL_HIGHLIGHT_DATE,GLOBAL_EMAIL_DATE,GLOBAL_AUTO_ESC_STATUS_ID,RFI_STATE,FL_HAS_NOTES,FL_HAS_CONFIDENTIAL_NOTES,CONSOLIDATION_KEY,HIBERNATE_OBJECT_VERSION,OWNER_IDENTIFIER,FL_DOUBT,NUM_EXISTING_ENTITIES,WORKSPACE_INTERNAL_ID,ALERT_NAME,PRIORITY_INTERNAL_ID,DETAILS_FOR_SEARCH,DETAILS,alert_alertId,alert_alertDate,alert_alertEntityKey,alert_score,alert_ahData_alertID,alert_ahData_alertDateTime,alert_ahData_jobID,alert_ahData_jobName,alert_ahData_jobType,alert_ahData_score,alert_ahData_numberOfHits,alert_ahData_partyKey,alert_ahData_partyName,alert_ahData_entityExcludeListsNames,alert_ahData_hitExcludeListsNames,alert_partyType,alert_partyDOB,alert_partyYOB,alert_partyBirthCountry,alert_partyBirthLocation,alert_partyGender,alert_partyIds_idType,alert_partyIds_idNumber,alert_partyIds_idCountry,alert_partyNatCountries_countryCd,alert_partyAddresses_partyAddressLine1,alert_partyAddresses_partyAddressLine2,alert_partyAddresses_partyCity,alert_partyAddresses_partyPostalCd,alert_partyAddresses_partyStateProvince,alert_partyAddresses_partyCountry,hit_listId,hit_entryId,hit_listVersion,hit_entryType,hit_listUpdateDate,hit_entryCreatedDate,hit_entryUpdateDate,hit_displayName,hit_matchedName,hit_isNameBroken,hit_aliases_displayName,hit_aliases_matchedName,hit_aliases_isNameBroken,hit_aliases_matchStrength,hit_addresses_streetAddress1,hit_addresses_streetAddress2,hit_addresses_city,hit_addresses_stateProvince,hit_addresses_postalCode,hit_addresses_country,hit_ids_idType,hit_ids_idNumber,hit_ids_idCountry,hit_nationalityCountries_country,hit_placesOfBirth_birthPlace,hit_placesOfBirth_birthCountry,hit_age,hit_ageAsOfDate,hit_datesOfBirth_birthDate,hit_datesOfBirth_yearOfBirth,hit_categories_category,hit_keywords_keyword,hit_title,hit_position,hit_gender,hit_isDeceased,hit_deceasedDate,hit_cs_1,hit_cs_2,hit_cs_3,hit_cs_4,hit_cs_5,hit_cs_6,hit_cs_7,hit_cs_8,hit_cs_9,hit_cs_10,hit_cs_11,hit_cs_12,hit_cs_13,hit_cs_14,hit_cs_15,hit_cs_16,hit_cs_17,hit_cs_18,hit_additionalInfo_name,hit_additionalInfo_value,hit_score,hit_matchType,hit_scoreFactors_factorId,hit_scoreFactors_factorDesc,hit_scoreFactors_factorValue,hit_scoreFactors_factorScore,hit_scoreFactors_factorImpact,hit_scoresBreakdown_matchedName,hit_scoresBreakdown_aliases_matchedName,hit_scoresBreakdown_addresses_city,hit_scoresBreakdown_addresses_country,hit_scoresBreakdown_addresses_stateProvince,hit_scoresBreakdown_ids_idNumber,hit_explanations_matchedName_Explanation,hit_explanations_aliases_matchedName_Explanation,hit_explanations_nationalityCountries_country_Explanation,hit_explanations_address_city_Explanation,hit_explanations_address_country_Explanation,hit_explanations_addresses_stateProvince_Explanation,hit_explanations_ids_idNumber_Explanation,hit_inputExplanations_matchedName_inputExplanation,hit_inputExplanations_aliases_matchedName_inputExplanation,hit_inputExplanations_nationalityCountries_country_inputExplanation,hit_inputExplanations_address_city_inputExplanation,hit_inputExplanations_address_country_inputExplanation,hit_inputExplanations_addresses_stateProvince_inputExplanation,hit_inputExplanations_ids_idNumber_inputExplanation,ap_hit_names,wl_hit_matched_name,wl_hit_aliases_matched_name,wl_hit_names,hit_cs_1_data_points,ap_nric,last_note,party_type_agent_ap,party_type_agent_wl,name_agent_ap,name_agent_wl,dob_agent_ap,dob_agent_wl,pob_agent_ap,pob_agent_wl,gender_agent_ap,gender_agent_wl,national_id_agent_ap,national_id_agent_wl,document_number_agent_ap,document_number_agent_wl,nationality_agent_ap,nationality_agent_wl,historical_decision_name_agent_ap,historical_decision_name_agent_wl,pep_payment_agent_ap,pep_payment_agent_wl,hit_is_san_agent_wl,hit_is_deceased_agent_wl,hit_has_dob_id_address_agent_wl,rba_agent_ap,rba_agent_wl,ap_all_party_types_aggregated,wl_all_party_types_aggregated,ap_all_names_aggregated,wl_all_names_aggregated,ap_all_dobs_aggregated,wl_all_dobs_aggregated,ap_all_pobs_aggregated,wl_all_pobs_aggregated,ap_all_genders_aggregated,wl_all_genders_aggregated,ap_all_national_ids_aggregated,wl_all_national_ids_aggregated,ap_all_document_numbers_aggregated,wl_all_document_numbers_aggregated,ap_all_nationalities_aggregated,wl_all_nationalities_aggregated,ap_all_historical_decision_names_aggregated,wl_all_historical_decision_names_aggregated,ap_all_pep_payments_aggregated,wl_all_pep_payments_aggregated,ap_all_hit_is_sans_aggregated,wl_all_hit_is_sans_aggregated,ap_all_hit_is_deceaseds_aggregated,wl_all_hit_is_deceaseds_aggregated,ap_all_hit_has_dob_id_addresses_aggregated,wl_all_hit_has_dob_id_addresses_aggregated,ap_all_rbas_aggregated,wl_all_rbas_aggregated,_index
0,WLF101-1363601-89626,7027,False Positive,1649364,,24-Feb-20,,,0,"<?xml version=""1.0"" encoding=""UTF16"" standalone=""no"" ?><alert><alert-header><elem name=""alertId"">WLF101-1363601-89626</elem><elem name=""alertDate"">24/02/20 15:02:12</elem><elem name=""alertEntityKe...",01-13636,SSBSAPB0025000389,,,I,NAME;,,,,,,,,,,,,,,,AML_SAP_BN01_20022415001_334,FACTIVA_PEP_SAN,CPF BOARD,,AML-EWLF,FACTIVA_SIE;,1995-12-08;,Special Interest Entity (SIE)-Other Official Lists-Bank;Special Interest Entity (SIE)-Other Official Lists-State Owned Company;,Self Service Batch Scan,,,,,,,,,,Watch List: FACTIVA_SIE; Entry ID: 1091285; Entity Type: ORGANIZATION; Match Type: NAME;,0,1,,3280419,3280419,0,1,0,12-Jul-21,30801,12-Jul-21,30801,12-Jul-21,24-Feb-20,865017,80,AML-EWLF/3.5.1.33,0,1,,119,0,1,,,0,,,,,,0,,,,,,0,1,0,,13,,0,,,,,,\r,,24/02/20 15:02:12,SSBSAPB0025000389,80,[WLF101-1363601-89626],[24/02/20 15:02:12],[01-13636],[AML_SAP_BN01_20022415001_334],[Self Service Batch Scan],[80],1,[SSBSAPB0025000389],[CPF BOARD],[None],[None],Organization,,,,,UNKNOWN,[None],[None],[None],[None],[],[],[],[],[],[],FACTIVA_SIE,1091285,160,ORGANIZATION,21/02/20,02/03/18,02/03/18,China Petroleum Finance,China Petroleum Finance,,"[CPF, China Petroleum Finance Co. Ltd, Zhongyou Caiwu, Zhong You Cai Wu, 中油财务有限责任公司, 中油財務有限責任公司]","[CPF, China Petroleum Finance Co. Ltd, Zhongyou Caiwu, Zhong You Cai Wu, 中油财务有限责任公司, 中油財務有限責任公司]","[False, False, False, False, False, False]","[H, H, H, H, H, H]","[No 5 Gulouwai Avenue, Xicheng District]",[None],[Beijing;;100029],[None],[None],[CN],"[Company Identification No., DUNS Number, National Tax No., Standard Industrial Classification (SIC), North American Industry Classification System (NAICS), NACE (European Union Economic Activity ...","[91110000100018558M, 544956829, 91110000100018558M, 60, 522120, 65.1]","[None, None, None, None, None, None]",[CN],[],[],,,[1995-12-08],[1995],"[Special Interest Entity (SIE)-Other Official Lists-Bank, Special Interest Entity (SIE)-Other Official Lists-State Owned Company]",[2395],,,,,,4;4;,4;4;,12;14;,,,,,,,,,,,,,,,,"[ProfileNotes, Sanction References]","[IOWA BOARD OF REGENTS NOTES: SUDAN Scrutinized Companies Category: Ongoing Engagement From: 30-Jun-2011 to 30-Jun-2013, Since 30 Jun 2011, List 2395,]",80.0,NAME,"[AML_WLF_CF_SF_matchingEngine, AML_WLF_CF_SF_singleTokenMatch]","[Matching Engine Match, Single Token Match]","[95, YES]","[95.0, -15.0]","[MEDIUM, CORRECTIVE]",0,95,0,0,0,0,[],[CPF],[],[],[],[],[],[],[CPF BOARD],[],[],[],[],[],[CPF BOARD],,[CPF],[CPF],"{'possible_nric': None, 'nric': None, 'dob': None}",[],<p>*1-3: Name mismatch</p>,Organization,ORGANIZATION,[CPF BOARD],[CPF],,[1995-12-08],,[],UNKNOWN,,[],,[None],"[91110000100018558M, 544956829, 60, 522120, 65.1]",[None],[CN],"[1, False Positive, CPF BOARD, 24-Feb-20]",1091285,SSBSAPB0025000389,FACTIVA_SIE,"[FACTIVA_SIE, Special Interest Entity (SIE)-Other Official Lists-Bank, Special Interest Entity (SIE)-Other Official Lists-State Owned Company]",,"[1995-12-08, 91110000100018558M, 544956829, 60, 522120, 65.1, No 5 Gulouwai Avenue, Xicheng District]","[1, False Positive, CPF BOARD, 24-Feb-20, <p>*1-3: Name mismatch</p>]",1091285,[Organization],[ORGANIZATION],[CPF BOARD],[CPF],[],[1995-12-08],[],[],[UNKNOWN],[],[],[],[None],"[91110000100018558M, 544956829, 60, 522120, 65.1]",[None],[CN],"[1, False Positive, CPF BOARD, 24-Feb-20]",[1091285],[SSBSAPB0025000389],[FACTIVA_SIE],[],"[FACTIVA_SIE, Special Interest Entity (SIE)-Other Official Lists-Bank, Special Interest Entity (SIE)-Other Official Lists-State Owned Company]",[],[],[],"[1995-12-08, 91110000100018558M, 544956829, 60, 522120, 65.1, No 5 Gulouwai Avenue, Xicheng District]","[1, False Positive, CPF BOARD, 24-Feb-20, <p>*1-3: Name mismatch</p>]",[1091285],0
1,WLF101-939701-62908,7027,False Positive,1619405,,8-Oct-18,,,0,"<?xml version=""1.0"" encoding=""UTF16"" standalone=""no"" ?><alert><alert-header><elem name=""alertId"">WLF101-939701-62908</elem><elem name=""alertDate"">08/10/18 12:31:01</elem><elem name=""alertEntityKey...",Jan-97,42823012-P560016505,1.0,1/1/89,P,NAME;,,,,,,,,,,,,,,,AML_MAGNUM_1810081211_55,FACTIVA_PEP_SAN,P ONE,:S0135242C,AML-EWLF,FACTIVA_SAN;,1992; 1994; 1993; 1995;,Special Interest Person (SIP)-Sanctions Lists;Special Interest Person (SIP)-Other Official Lists;,Self Service Batch Scan,,,,,,,,,,Watch List: FACTIVA_SAN; Entry ID: 4790496; Entity Type: PERSON; Match Type: NAME;,0,1,,3280419,3280419,0,1,0,12-Jul-21,30801,29-Dec-20,28202,29-Dec-20,8-Oct-18,812105,80,AML-EWLF/3.4.0.8,0,1,,119,0,1,,,0,,,,,,0,,,,,,0,1,0,,9,,0,0.0,,,,,\r,,08/10/18 12:31:01,42823012-P560016505,80,[WLF101-939701-62908],[08/10/18 12:31:01],[01-9397],[AML_MAGNUM_1810081211_55],[Self Service Batch Scan],[80],1,[42823012-P560016505],[P ONE],[None],[None],Person,01/01/89,1989.0,,,MALE,[None],[S0135242C],[None],[SG],[],[],[],[],[],[],FACTIVA_SAN,4790496,167,PERSON,06/06/18,17/08/17,17/08/17,Ali Kony,{GN=Ali}{SN=Kony},True,"[アリ・コニー, アリ・ラロボ・バシール, Bashir,Ali,Lalobo, Kapere,Otim, Kony,Ali,Mohammed, Labola,Ali,Mohammed, Labolo,Ali,Mohammad, Lalobo,Ali, Lalobo,Ali,Bashir, Lalobo,Ali,Mohammed, Salongo,Ali,Mohammed, Mohamme...","[アリ・コニー, アリ・ラロボ・バシール, {GN=Ali,Lalobo}{SN=Bashir}, {GN=Otim}{SN=Kapere}, {GN=Ali,Mohammed}{SN=Kony}, {GN=Ali,Mohammed}{SN=Labola}, {GN=Ali,Mohammad}{SN=Labolo}, {GN=Ali}{SN=Lalobo}, {GN=Ali,Bashir}...","[False, False, True, True, True, True, True, True, True, True, True, True, False, False, False, False, False, False, False, False, False]","[H, H, H, H, H, H, H, H, H, H, H, H, L, L, L, L, L, L, L, L, L]","[Kafia Kingi (border of Sudan and South Sudan), Kafia Kingi (a territory on the border of Sudan and South Sudan whose final status has yet to be determined), None]","[None, None, None]","[None, None, ;Kafia Kingi]","[None, None, None]","[None, None, None]","[None, None, None]","[SECO SSID, DFAT Reference Number]","[34712, 3194]","[None, None]",[None],[None],[None],,,"[1992, 1994, 1993, 1995]","[1992, 1994, 1993, 1995]","[Special Interest Person (SIP)-Sanctions Lists, Special Interest Person (SIP)-Other Official Lists]","[1, 275, 281, 283, 1156, 1326, 2556, 2644, 3166, 3322, 3324, 3550, 3552]",,,MALE,False,,3;3;,1;2;,,,,,,,,,,,,,,,,,"[ProfileNotes, Images, Primary Occupation, Other Roles, Previous Roles, Sanction References]","[OFFICE OF FOREIGN ASSETS CONTROL (OFAC) NOTES:\n\nRegion: Kafia Kingi\n\nTreasury Sanctions Lord’s Resistance Army Commanders Salim and Ali Kony\n\n8/23/2016 ​\nWASHINGTON – Today, the U.S. Depar...",80.0,NAME,"[AML_WLF_CF_SF_matchingEngine, AML_WLF_CF_SF_yearOfBirthMatch, AML_WLF_CF_SF_lowQualityAliases]","[Matching Engine Match, Year of Birth Match, Low Quality Aliases]","[100, MISMATCH, YES]","[100.0, -10.0, -10.0]","[HIGH, CORRECTIVE, CORRECTIVE]",0,100,0,0,0,0,[],"[One P, One P]",[],[],[],[],[],[],"[P ONE, P ONE]",[],[],[],[],[],[P ONE],,[One P],[One P],"{'possible_nric': None, 'nric': None, 'dob': None}",[S0135242C],"<p><span style=""font-family: 'courier new', courier;"">Number of hits : 1 </span></p>\n<p><span style=""font-family: 'courier new', courier;"">Party Name : ...",Person,PERSON,[P ONE],[One P],1/1/89,"[1992, 1994, 1993, 1995]",,[None],MALE,MALE,[S0135242C],,[S0135242C],"[34712, 3194]",[SG],[None],"[1, False Positive, P ONE, 8-Oct-18]",4790496,42823012-P560016505,FACTIVA_SAN,"[FACTIVA_SAN, Special Interest Person (SIP)-Sanctions Lists, Special Interest Person (SIP)-Other Official Lists]",False,"[1992, 1994, 1993, 1995, 34712, 3194, Kafia Kingi (border of Sudan and South Sudan), Kafia Kingi (a territory on the border of Sudan and South Sudan whose final status has yet to be determined), N...","[1, False Positive, P ONE, 8-Oct-18, <p><span style=""font-family: 'courier new', courier;"">Number of hits : 1 </span></p>\n<p><span style=""font-family: 'courier new'...",4790496,[Person],[PERSON],[P ONE],[One P],[1/1/89],"[1992, 1994, 1993, 1995]",[],[None],[MALE],[MALE],[S0135242C],[],[S0135242C],"[34712, 3194]",[SG],[None],"[1, False Positive, P ONE, 8-Oct-18]",[4790496],[42823012-P560016505],[FACTIVA_SAN],[],"[FACTIVA_SAN, Special Interest Person (SIP)-Sanctions Lists, Special Interest Person (SIP)-Other Official Lists]",[],[False],[],"[1992, 1994, 1993, 1995, 34712, 3194, Kafia Kingi (border of Sudan and South Sudan), Kafia Kingi (a territory on the border of Sudan and South Sudan whose final status has yet to be determined), N...","[1, False Positive, P ONE, 8-Oct-18, <p><span style=""font-family: 'courier new', courier;"">Number of hits : 1 </span></p>\n<p><span style=""font-family: 'courier new'...",[4790496],1
2,WLF101-945401-62939,7027,False Positive,1619436,,10-Oct-18,,,0,"<?xml version=""1.0"" encoding=""UTF16"" standalone=""no"" ?><alert><alert-header><elem name=""alertId"">WLF101-945401-62939</elem><elem name=""alertDate"">10/10/18 11:43:55</elem><elem name=""alertEntityKey...",Jan-54,SSBPLASC550212199,1.0,5/12/62,P,NAME;,,,,,,,,,,,,,,,AML_PLAS_W_1810061800_971,FACTIVA_PEP_SAN,KIM,,AML-EWLF,FACTIVA_SAN;,1964; 1962-08-28;,Politically Exposed Person (PEP);Special Interest Person (SIP)-Sanctions Lists;,Self Service Batch Scan,,,,,,,,,,Watch List: FACTIVA_SAN; Entry ID: 1198704; Entity Type: PERSON; Match Type: NAME;,0,1,,3280419,3280419,0,1,0,12-Jul-21,30801,29-Dec-20,28202,29-Dec-20,10-Oct-18,812136,115,AML-EWLF/3.4.0.8,0,1,,119,0,1,,,0,,,,,,0,,,,,,0,1,0,,9,,0,0.0,,,,,,,10/10/18 11:43:55,SSBPLASC550212199,115,[WLF101-945401-62939],[10/10/18 11:43:55],[01-9454],[AML_PLAS_W_1810061800_971],[Self Service Batch Scan],[115],1,[SSBPLASC550212199],[KIM],[None],[None],Person,05/12/62,1962.0,,,UNKNOWN,[None],[None],[None],[None],[],[],[],[],[],[],FACTIVA_SAN,1198704,168,PERSON,06/06/18,06/06/18,06/06/18,Tong-Myo'ng Kim,{GN=Tong-Myo'ng}{SN=Kim},True,"[김동명, キム・トンミョン, Дон Мён Ким, キム・チンソク, Чин Сок Ким, Kim,Chin-So'k, Kim,Jin Sok, Kim,Dong Myong, Kim,Hyok Chol, Kim,, Hyok-Chol,, Tong My'ong,Kim, Jin-Sok,Kim, Tong-Myong,Kim, Chin-So'k,Kim, Kim,Ton...","[김동명, キム・トンミョン, Дон Мён Ким, キム・チンソク, Чин Сок Ким, {GN=Chin-So'k}{SN=Kim}, {GN=Jin Sok}{SN=Kim}, {GN=Dong Myong}{SN=Kim}, {GN=Hyok Chol}{SN=Kim}, Kim,, Hyok-Chol,, {GN=Kim}{SN=Tong My'ong}, {GN=Ki...","[False, False, False, False, False, True, True, True, True, False, False, True, True, True, True, True, True, True, True, True, True, True, True, False]","[H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, L]","[c/o Tanchon Commercial Bank, Saemul 1-Dong Pyongchon, District, Linked to: Tanchon Commercial Bank, Saemul 1-Dong Pyongchon District]","[None, None]","[Pyongyang, Pyongyang]","[None, None]","[None, None]","[KP, KP]","[Passport No., SECO SSID, DFAT Reference Number, DFAT Reference Number, OSFI North Korea ID]","[290320764, 33845, 1133, 3157, P23]","[None, None, None, None, None]","[KP, KP]",[None],[None],,,"[1964, 1962-08-28]","[1964, 1962]","[Politically Exposed Person (PEP), Special Interest Person (SIP)-Sanctions Lists]","[13, 275, 281, 283, 627, 1310, 1318, 1324, 1326, 1906, 1908, 2342, 2416, 3160, 3162, 3264, 3630]",,"President, Tanchon Commercial Bank",MALE,False,,1;3;,1;,,,,,,,,,,,,,,,,,"[ProfileNotes, Images, Primary Occupation, Other Roles, Previous Roles, Sanction References]","[OFFICE OF FOREIGN ASSETS CONTROL (OFAC) NOTES:\n\nOctober 23, 2009\nTG-330\n\nTreasury Designates North Korean Bank and Banking Official\nAs Proliferators of Weapons of Mass destruction\n\nWASHIN...",115.0,NAME,"[AML_WLF_CF_SF_matchingEngine, AML_WLF_CF_SF_yearOfBirthMatch, AML_WLF_CF_SF_singleTokenMatch]","[Matching Engine Match, Year of Birth Match, Single Token Match]","[100, MATCH, YES]","[100.0, 30.0, -15.0]","[HIGH, LOW, CORRECTIVE]",0,100,0,0,0,0,[],"[Kim,]",[],[],[],[],[],[],[KIM],[],[],[],[],[],[KIM],,"[Kim,]","[Kim,]","{'possible_nric': None, 'nric': None, 'dob': None}",[],<p>RBA applied. Approval from JM to close the alerts.</p>,Person,PERSON,[KIM],"[Kim,]",5/12/62,"[1964, 1962-08-28]",,[None],UNKNOWN,MALE,[],,[None],"[290320764, 33845, 1133, 3157, P23]",[None],"[KP, KP]","[1, False Positive, KIM, 10-Oct-18]",1198704,SSBPLASC550212199,FACTIVA_SAN,"[FACTIVA_SAN, Politically Exposed Person (PEP), Special Interest Person (SIP)-Sanctions Lists]",False,"[1964, 1962-08-28, 290320764, 33845, 1133, 3157, P23, c/o Tanchon Commercial Bank, Saemul 1-Dong Pyongchon, District, Linked to: Tanchon Commercial Bank, Saemul 1-Dong Pyongchon District]","[1, False Positive, KIM, 10-Oct-18, <p>RBA applied. Approval from JM to close the alerts.</p>]",1198704,[Person],[PERSON],[KIM],"[Kim,]",[5/12/62],"[1964, 1962-08-28]",[],[None],[UNKNOWN],[MALE],[],[],[None],"[290320764, 33845, 1133, 3157, P23]",[None],[KP],"[1, False Positive, KIM, 10-Oct-18]",[1198704],[SSBPLASC550212199],[FACTIVA_SAN],[],"[FACTIVA_SAN, Politically Exposed Person (PEP), Special Interest Person (SIP)-Sanctions Lists]",[],[False],[],"[1964, 1962-08-28, 290320764, 33845, 1133, 3157, P23, c/o Tanchon Commercial Bank, Saemul 1-Dong Pyongchon, District, Linked to: Tanchon Commercial Bank, Saemul 1-Dong Pyongchon District]","[1, False Positive, KIM, 10-Oct-18, <p>RBA applied. Approval from JM to close the alerts.</p>]",[1198704],2


In [64]:
spark.read.format('delta').load(in_application_data_dir('agent_input_agg_df.delta')).count()

3

In [65]:
to_pandas(agent_input_agg_df)

Unnamed: 0,ALERT_ID,STATUS_INTERNAL_ID,STATUS_NAME,ALERT_INTERNAL_ID,ENTITY_TYPE_ID,ALERT_DATE,ALERT_TYPE_ID,STATUS_ID,DELETED,HTML_FILE_KEY,P11,P12,P13,P14,P15,P16,P17,P18,P19,P20,P21,P22,P23,P24,P25,P26,P27,P28,P29,P30,P31,P32,P33,P34,P35,P36,P37,P38,P39,P40,P41,P42,P43,P44,P45,P46,P47,P48,P49,IS_CASE,BUNIT_IDENTIFIER,OWNER_INTERNAL_ID,BU_INTERNAL_ID,ORIGINAL_BU_INTERNAL_ID,FL_ARCHIVE,FL_READ,FL_READ_BY_OWNER,LAST_READ_DATE,LAST_READ_USER_ID,LAST_UPDATE_DATE,LAST_UPDATE_USER_ID,CLOSED_DATE,CREATE_DATE,ALERT_CUSTOM_ATTRIBUTES_ID,SCORE,ALERT_TYPE_VERSION,FL_MANUAL,FL_GENERATED_BY_ACM,RESOLUTION_ID,ALERT_TYPE_INTERNAL_ID,FL_HAS_ATTACHMENTS,FL_UPDATED_BY_ACM,ENTITY_ID,PREV_STATUS_INTERNAL_ID,FL_ENCRYPTED,LAST_REFRESH_MODIFED_DATE,DEADLINE_DATE,HIGHLIGHT_DATE,EMAIL_DATE,AUTO_ESC_STATUS_INTERNAL_ID,CASE_COUNT_FOR_CONFIDENTIAL,P50,GLOBAL_DEADLINE_DATE,GLOBAL_HIGHLIGHT_DATE,GLOBAL_EMAIL_DATE,GLOBAL_AUTO_ESC_STATUS_ID,RFI_STATE,FL_HAS_NOTES,FL_HAS_CONFIDENTIAL_NOTES,CONSOLIDATION_KEY,HIBERNATE_OBJECT_VERSION,OWNER_IDENTIFIER,FL_DOUBT,NUM_EXISTING_ENTITIES,WORKSPACE_INTERNAL_ID,ALERT_NAME,PRIORITY_INTERNAL_ID,DETAILS_FOR_SEARCH,DETAILS,alert_alertId,alert_alertDate,alert_alertEntityKey,alert_score,alert_ahData_alertID,alert_ahData_alertDateTime,alert_ahData_jobID,alert_ahData_jobName,alert_ahData_jobType,alert_ahData_score,alert_ahData_numberOfHits,alert_ahData_partyKey,alert_ahData_partyName,alert_ahData_entityExcludeListsNames,alert_ahData_hitExcludeListsNames,alert_partyType,alert_partyDOB,alert_partyYOB,alert_partyBirthCountry,alert_partyBirthLocation,alert_partyGender,alert_partyIds_idType,alert_partyIds_idNumber,alert_partyIds_idCountry,alert_partyNatCountries_countryCd,alert_partyAddresses_partyAddressLine1,alert_partyAddresses_partyAddressLine2,alert_partyAddresses_partyCity,alert_partyAddresses_partyPostalCd,alert_partyAddresses_partyStateProvince,alert_partyAddresses_partyCountry,hit_listId,hit_entryId,hit_listVersion,hit_entryType,hit_listUpdateDate,hit_entryCreatedDate,hit_entryUpdateDate,hit_displayName,hit_matchedName,hit_isNameBroken,hit_aliases_displayName,hit_aliases_matchedName,hit_aliases_isNameBroken,hit_aliases_matchStrength,hit_addresses_streetAddress1,hit_addresses_streetAddress2,hit_addresses_city,hit_addresses_stateProvince,hit_addresses_postalCode,hit_addresses_country,hit_ids_idType,hit_ids_idNumber,hit_ids_idCountry,hit_nationalityCountries_country,hit_placesOfBirth_birthPlace,hit_placesOfBirth_birthCountry,hit_age,hit_ageAsOfDate,hit_datesOfBirth_birthDate,hit_datesOfBirth_yearOfBirth,hit_categories_category,hit_keywords_keyword,hit_title,hit_position,hit_gender,hit_isDeceased,hit_deceasedDate,hit_cs_1,hit_cs_2,hit_cs_3,hit_cs_4,hit_cs_5,hit_cs_6,hit_cs_7,hit_cs_8,hit_cs_9,hit_cs_10,hit_cs_11,hit_cs_12,hit_cs_13,hit_cs_14,hit_cs_15,hit_cs_16,hit_cs_17,hit_cs_18,hit_additionalInfo_name,hit_additionalInfo_value,hit_score,hit_matchType,hit_scoreFactors_factorId,hit_scoreFactors_factorDesc,hit_scoreFactors_factorValue,hit_scoreFactors_factorScore,hit_scoreFactors_factorImpact,hit_scoresBreakdown_matchedName,hit_scoresBreakdown_aliases_matchedName,hit_scoresBreakdown_addresses_city,hit_scoresBreakdown_addresses_country,hit_scoresBreakdown_addresses_stateProvince,hit_scoresBreakdown_ids_idNumber,hit_explanations_matchedName_Explanation,hit_explanations_aliases_matchedName_Explanation,hit_explanations_nationalityCountries_country_Explanation,hit_explanations_address_city_Explanation,hit_explanations_address_country_Explanation,hit_explanations_addresses_stateProvince_Explanation,hit_explanations_ids_idNumber_Explanation,hit_inputExplanations_matchedName_inputExplanation,hit_inputExplanations_aliases_matchedName_inputExplanation,hit_inputExplanations_nationalityCountries_country_inputExplanation,hit_inputExplanations_address_city_inputExplanation,hit_inputExplanations_address_country_inputExplanation,hit_inputExplanations_addresses_stateProvince_inputExplanation,hit_inputExplanations_ids_idNumber_inputExplanation,ap_hit_names,wl_hit_matched_name,wl_hit_aliases_matched_name,wl_hit_names,hit_cs_1_data_points,ap_nric,last_note,party_type_agent_ap,party_type_agent_wl,name_agent_ap,name_agent_wl,dob_agent_ap,dob_agent_wl,pob_agent_ap,pob_agent_wl,gender_agent_ap,gender_agent_wl,national_id_agent_ap,national_id_agent_wl,document_number_agent_ap,document_number_agent_wl,nationality_agent_ap,nationality_agent_wl,historical_decision_name_agent_ap,historical_decision_name_agent_wl,pep_payment_agent_ap,pep_payment_agent_wl,hit_is_san_agent_wl,hit_is_deceased_agent_wl,hit_has_dob_id_address_agent_wl,rba_agent_ap,rba_agent_wl,ap_all_party_types_aggregated,wl_all_party_types_aggregated,ap_all_names_aggregated,wl_all_names_aggregated,ap_all_dobs_aggregated,wl_all_dobs_aggregated,ap_all_pobs_aggregated,wl_all_pobs_aggregated,ap_all_genders_aggregated,wl_all_genders_aggregated,ap_all_national_ids_aggregated,wl_all_national_ids_aggregated,ap_all_document_numbers_aggregated,wl_all_document_numbers_aggregated,ap_all_nationalities_aggregated,wl_all_nationalities_aggregated,ap_all_historical_decision_names_aggregated,wl_all_historical_decision_names_aggregated,ap_all_pep_payments_aggregated,wl_all_pep_payments_aggregated,ap_all_hit_is_sans_aggregated,wl_all_hit_is_sans_aggregated,ap_all_hit_is_deceaseds_aggregated,wl_all_hit_is_deceaseds_aggregated,ap_all_hit_has_dob_id_addresses_aggregated,wl_all_hit_has_dob_id_addresses_aggregated,ap_all_rbas_aggregated,wl_all_rbas_aggregated,_index
0,WLF101-1363601-89626,7027,False Positive,1649364,,24-Feb-20,,,0,"<?xml version=""1.0"" encoding=""UTF16"" standalone=""no"" ?><alert><alert-header><elem name=""alertId"">WLF101-1363601-89626</elem><elem name=""alertDate"">24/02/20 15:02:12</elem><elem name=""alertEntityKe...",01-13636,SSBSAPB0025000389,,,I,NAME;,,,,,,,,,,,,,,,AML_SAP_BN01_20022415001_334,FACTIVA_PEP_SAN,CPF BOARD,,AML-EWLF,FACTIVA_SIE;,1995-12-08;,Special Interest Entity (SIE)-Other Official Lists-Bank;Special Interest Entity (SIE)-Other Official Lists-State Owned Company;,Self Service Batch Scan,,,,,,,,,,Watch List: FACTIVA_SIE; Entry ID: 1091285; Entity Type: ORGANIZATION; Match Type: NAME;,0,1,,3280419,3280419,0,1,0,12-Jul-21,30801,12-Jul-21,30801,12-Jul-21,24-Feb-20,865017,80,AML-EWLF/3.5.1.33,0,1,,119,0,1,,,0,,,,,,0,,,,,,0,1,0,,13,,0,,,,,,\r,,24/02/20 15:02:12,SSBSAPB0025000389,80,[WLF101-1363601-89626],[24/02/20 15:02:12],[01-13636],[AML_SAP_BN01_20022415001_334],[Self Service Batch Scan],[80],1,[SSBSAPB0025000389],[CPF BOARD],[None],[None],Organization,,,,,UNKNOWN,[None],[None],[None],[None],[],[],[],[],[],[],FACTIVA_SIE,1091285,160,ORGANIZATION,21/02/20,02/03/18,02/03/18,China Petroleum Finance,China Petroleum Finance,,"[CPF, China Petroleum Finance Co. Ltd, Zhongyou Caiwu, Zhong You Cai Wu, 中油财务有限责任公司, 中油財務有限責任公司]","[CPF, China Petroleum Finance Co. Ltd, Zhongyou Caiwu, Zhong You Cai Wu, 中油财务有限责任公司, 中油財務有限責任公司]","[False, False, False, False, False, False]","[H, H, H, H, H, H]","[No 5 Gulouwai Avenue, Xicheng District]",[None],[Beijing;;100029],[None],[None],[CN],"[Company Identification No., DUNS Number, National Tax No., Standard Industrial Classification (SIC), North American Industry Classification System (NAICS), NACE (European Union Economic Activity ...","[91110000100018558M, 544956829, 91110000100018558M, 60, 522120, 65.1]","[None, None, None, None, None, None]",[CN],[],[],,,[1995-12-08],[1995],"[Special Interest Entity (SIE)-Other Official Lists-Bank, Special Interest Entity (SIE)-Other Official Lists-State Owned Company]",[2395],,,,,,4;4;,4;4;,12;14;,,,,,,,,,,,,,,,,"[ProfileNotes, Sanction References]","[IOWA BOARD OF REGENTS NOTES: SUDAN Scrutinized Companies Category: Ongoing Engagement From: 30-Jun-2011 to 30-Jun-2013, Since 30 Jun 2011, List 2395,]",80.0,NAME,"[AML_WLF_CF_SF_matchingEngine, AML_WLF_CF_SF_singleTokenMatch]","[Matching Engine Match, Single Token Match]","[95, YES]","[95.0, -15.0]","[MEDIUM, CORRECTIVE]",0,95,0,0,0,0,[],[CPF],[],[],[],[],[],[],[CPF BOARD],[],[],[],[],[],[CPF BOARD],,[CPF],[CPF],"{'possible_nric': None, 'nric': None, 'dob': None}",[],<p>*1-3: Name mismatch</p>,Organization,ORGANIZATION,[CPF BOARD],[CPF],,[1995-12-08],,[],UNKNOWN,,[],,[None],"[91110000100018558M, 544956829, 60, 522120, 65.1]",[None],[CN],"[1, False Positive, CPF BOARD, 24-Feb-20]",1091285,SSBSAPB0025000389,FACTIVA_SIE,"[FACTIVA_SIE, Special Interest Entity (SIE)-Other Official Lists-Bank, Special Interest Entity (SIE)-Other Official Lists-State Owned Company]",,"[1995-12-08, 91110000100018558M, 544956829, 60, 522120, 65.1, No 5 Gulouwai Avenue, Xicheng District]","[1, False Positive, CPF BOARD, 24-Feb-20, <p>*1-3: Name mismatch</p>]",1091285,[Organization],[ORGANIZATION],[CPF BOARD],[CPF],[],[1995-12-08],[],[],[UNKNOWN],[],[],[],[None],"[91110000100018558M, 544956829, 60, 522120, 65.1]",[None],[CN],"[1, False Positive, CPF BOARD, 24-Feb-20]",[1091285],[SSBSAPB0025000389],[FACTIVA_SIE],[],"[FACTIVA_SIE, Special Interest Entity (SIE)-Other Official Lists-Bank, Special Interest Entity (SIE)-Other Official Lists-State Owned Company]",[],[],[],"[1995-12-08, 91110000100018558M, 544956829, 60, 522120, 65.1, No 5 Gulouwai Avenue, Xicheng District]","[1, False Positive, CPF BOARD, 24-Feb-20, <p>*1-3: Name mismatch</p>]",[1091285],0
1,WLF101-939701-62908,7027,False Positive,1619405,,8-Oct-18,,,0,"<?xml version=""1.0"" encoding=""UTF16"" standalone=""no"" ?><alert><alert-header><elem name=""alertId"">WLF101-939701-62908</elem><elem name=""alertDate"">08/10/18 12:31:01</elem><elem name=""alertEntityKey...",Jan-97,42823012-P560016505,1.0,1/1/89,P,NAME;,,,,,,,,,,,,,,,AML_MAGNUM_1810081211_55,FACTIVA_PEP_SAN,P ONE,:S0135242C,AML-EWLF,FACTIVA_SAN;,1992; 1994; 1993; 1995;,Special Interest Person (SIP)-Sanctions Lists;Special Interest Person (SIP)-Other Official Lists;,Self Service Batch Scan,,,,,,,,,,Watch List: FACTIVA_SAN; Entry ID: 4790496; Entity Type: PERSON; Match Type: NAME;,0,1,,3280419,3280419,0,1,0,12-Jul-21,30801,29-Dec-20,28202,29-Dec-20,8-Oct-18,812105,80,AML-EWLF/3.4.0.8,0,1,,119,0,1,,,0,,,,,,0,,,,,,0,1,0,,9,,0,0.0,,,,,\r,,08/10/18 12:31:01,42823012-P560016505,80,[WLF101-939701-62908],[08/10/18 12:31:01],[01-9397],[AML_MAGNUM_1810081211_55],[Self Service Batch Scan],[80],1,[42823012-P560016505],[P ONE],[None],[None],Person,01/01/89,1989.0,,,MALE,[None],[S0135242C],[None],[SG],[],[],[],[],[],[],FACTIVA_SAN,4790496,167,PERSON,06/06/18,17/08/17,17/08/17,Ali Kony,{GN=Ali}{SN=Kony},True,"[アリ・コニー, アリ・ラロボ・バシール, Bashir,Ali,Lalobo, Kapere,Otim, Kony,Ali,Mohammed, Labola,Ali,Mohammed, Labolo,Ali,Mohammad, Lalobo,Ali, Lalobo,Ali,Bashir, Lalobo,Ali,Mohammed, Salongo,Ali,Mohammed, Mohamme...","[アリ・コニー, アリ・ラロボ・バシール, {GN=Ali,Lalobo}{SN=Bashir}, {GN=Otim}{SN=Kapere}, {GN=Ali,Mohammed}{SN=Kony}, {GN=Ali,Mohammed}{SN=Labola}, {GN=Ali,Mohammad}{SN=Labolo}, {GN=Ali}{SN=Lalobo}, {GN=Ali,Bashir}...","[False, False, True, True, True, True, True, True, True, True, True, True, False, False, False, False, False, False, False, False, False]","[H, H, H, H, H, H, H, H, H, H, H, H, L, L, L, L, L, L, L, L, L]","[Kafia Kingi (border of Sudan and South Sudan), Kafia Kingi (a territory on the border of Sudan and South Sudan whose final status has yet to be determined), None]","[None, None, None]","[None, None, ;Kafia Kingi]","[None, None, None]","[None, None, None]","[None, None, None]","[SECO SSID, DFAT Reference Number]","[34712, 3194]","[None, None]",[None],[None],[None],,,"[1992, 1994, 1993, 1995]","[1992, 1994, 1993, 1995]","[Special Interest Person (SIP)-Sanctions Lists, Special Interest Person (SIP)-Other Official Lists]","[1, 275, 281, 283, 1156, 1326, 2556, 2644, 3166, 3322, 3324, 3550, 3552]",,,MALE,False,,3;3;,1;2;,,,,,,,,,,,,,,,,,"[ProfileNotes, Images, Primary Occupation, Other Roles, Previous Roles, Sanction References]","[OFFICE OF FOREIGN ASSETS CONTROL (OFAC) NOTES:\n\nRegion: Kafia Kingi\n\nTreasury Sanctions Lord’s Resistance Army Commanders Salim and Ali Kony\n\n8/23/2016 ​\nWASHINGTON – Today, the U.S. Depar...",80.0,NAME,"[AML_WLF_CF_SF_matchingEngine, AML_WLF_CF_SF_yearOfBirthMatch, AML_WLF_CF_SF_lowQualityAliases]","[Matching Engine Match, Year of Birth Match, Low Quality Aliases]","[100, MISMATCH, YES]","[100.0, -10.0, -10.0]","[HIGH, CORRECTIVE, CORRECTIVE]",0,100,0,0,0,0,[],"[One P, One P]",[],[],[],[],[],[],"[P ONE, P ONE]",[],[],[],[],[],[P ONE],,[One P],[One P],"{'possible_nric': None, 'nric': None, 'dob': None}",[S0135242C],"<p><span style=""font-family: 'courier new', courier;"">Number of hits : 1 </span></p>\n<p><span style=""font-family: 'courier new', courier;"">Party Name : ...",Person,PERSON,[P ONE],[One P],1/1/89,"[1992, 1994, 1993, 1995]",,[None],MALE,MALE,[S0135242C],,[S0135242C],"[34712, 3194]",[SG],[None],"[1, False Positive, P ONE, 8-Oct-18]",4790496,42823012-P560016505,FACTIVA_SAN,"[FACTIVA_SAN, Special Interest Person (SIP)-Sanctions Lists, Special Interest Person (SIP)-Other Official Lists]",False,"[1992, 1994, 1993, 1995, 34712, 3194, Kafia Kingi (border of Sudan and South Sudan), Kafia Kingi (a territory on the border of Sudan and South Sudan whose final status has yet to be determined), N...","[1, False Positive, P ONE, 8-Oct-18, <p><span style=""font-family: 'courier new', courier;"">Number of hits : 1 </span></p>\n<p><span style=""font-family: 'courier new'...",4790496,[Person],[PERSON],[P ONE],[One P],[1/1/89],"[1992, 1994, 1993, 1995]",[],[None],[MALE],[MALE],[S0135242C],[],[S0135242C],"[34712, 3194]",[SG],[None],"[1, False Positive, P ONE, 8-Oct-18]",[4790496],[42823012-P560016505],[FACTIVA_SAN],[],"[FACTIVA_SAN, Special Interest Person (SIP)-Sanctions Lists, Special Interest Person (SIP)-Other Official Lists]",[],[False],[],"[1992, 1994, 1993, 1995, 34712, 3194, Kafia Kingi (border of Sudan and South Sudan), Kafia Kingi (a territory on the border of Sudan and South Sudan whose final status has yet to be determined), N...","[1, False Positive, P ONE, 8-Oct-18, <p><span style=""font-family: 'courier new', courier;"">Number of hits : 1 </span></p>\n<p><span style=""font-family: 'courier new'...",[4790496],1
2,WLF101-945401-62939,7027,False Positive,1619436,,10-Oct-18,,,0,"<?xml version=""1.0"" encoding=""UTF16"" standalone=""no"" ?><alert><alert-header><elem name=""alertId"">WLF101-945401-62939</elem><elem name=""alertDate"">10/10/18 11:43:55</elem><elem name=""alertEntityKey...",Jan-54,SSBPLASC550212199,1.0,5/12/62,P,NAME;,,,,,,,,,,,,,,,AML_PLAS_W_1810061800_971,FACTIVA_PEP_SAN,KIM,,AML-EWLF,FACTIVA_SAN;,1964; 1962-08-28;,Politically Exposed Person (PEP);Special Interest Person (SIP)-Sanctions Lists;,Self Service Batch Scan,,,,,,,,,,Watch List: FACTIVA_SAN; Entry ID: 1198704; Entity Type: PERSON; Match Type: NAME;,0,1,,3280419,3280419,0,1,0,12-Jul-21,30801,29-Dec-20,28202,29-Dec-20,10-Oct-18,812136,115,AML-EWLF/3.4.0.8,0,1,,119,0,1,,,0,,,,,,0,,,,,,0,1,0,,9,,0,0.0,,,,,,,10/10/18 11:43:55,SSBPLASC550212199,115,[WLF101-945401-62939],[10/10/18 11:43:55],[01-9454],[AML_PLAS_W_1810061800_971],[Self Service Batch Scan],[115],1,[SSBPLASC550212199],[KIM],[None],[None],Person,05/12/62,1962.0,,,UNKNOWN,[None],[None],[None],[None],[],[],[],[],[],[],FACTIVA_SAN,1198704,168,PERSON,06/06/18,06/06/18,06/06/18,Tong-Myo'ng Kim,{GN=Tong-Myo'ng}{SN=Kim},True,"[김동명, キム・トンミョン, Дон Мён Ким, キム・チンソク, Чин Сок Ким, Kim,Chin-So'k, Kim,Jin Sok, Kim,Dong Myong, Kim,Hyok Chol, Kim,, Hyok-Chol,, Tong My'ong,Kim, Jin-Sok,Kim, Tong-Myong,Kim, Chin-So'k,Kim, Kim,Ton...","[김동명, キム・トンミョン, Дон Мён Ким, キム・チンソク, Чин Сок Ким, {GN=Chin-So'k}{SN=Kim}, {GN=Jin Sok}{SN=Kim}, {GN=Dong Myong}{SN=Kim}, {GN=Hyok Chol}{SN=Kim}, Kim,, Hyok-Chol,, {GN=Kim}{SN=Tong My'ong}, {GN=Ki...","[False, False, False, False, False, True, True, True, True, False, False, True, True, True, True, True, True, True, True, True, True, True, True, False]","[H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, H, L]","[c/o Tanchon Commercial Bank, Saemul 1-Dong Pyongchon, District, Linked to: Tanchon Commercial Bank, Saemul 1-Dong Pyongchon District]","[None, None]","[Pyongyang, Pyongyang]","[None, None]","[None, None]","[KP, KP]","[Passport No., SECO SSID, DFAT Reference Number, DFAT Reference Number, OSFI North Korea ID]","[290320764, 33845, 1133, 3157, P23]","[None, None, None, None, None]","[KP, KP]",[None],[None],,,"[1964, 1962-08-28]","[1964, 1962]","[Politically Exposed Person (PEP), Special Interest Person (SIP)-Sanctions Lists]","[13, 275, 281, 283, 627, 1310, 1318, 1324, 1326, 1906, 1908, 2342, 2416, 3160, 3162, 3264, 3630]",,"President, Tanchon Commercial Bank",MALE,False,,1;3;,1;,,,,,,,,,,,,,,,,,"[ProfileNotes, Images, Primary Occupation, Other Roles, Previous Roles, Sanction References]","[OFFICE OF FOREIGN ASSETS CONTROL (OFAC) NOTES:\n\nOctober 23, 2009\nTG-330\n\nTreasury Designates North Korean Bank and Banking Official\nAs Proliferators of Weapons of Mass destruction\n\nWASHIN...",115.0,NAME,"[AML_WLF_CF_SF_matchingEngine, AML_WLF_CF_SF_yearOfBirthMatch, AML_WLF_CF_SF_singleTokenMatch]","[Matching Engine Match, Year of Birth Match, Single Token Match]","[100, MATCH, YES]","[100.0, 30.0, -15.0]","[HIGH, LOW, CORRECTIVE]",0,100,0,0,0,0,[],"[Kim,]",[],[],[],[],[],[],[KIM],[],[],[],[],[],[KIM],,"[Kim,]","[Kim,]","{'possible_nric': None, 'nric': None, 'dob': None}",[],<p>RBA applied. Approval from JM to close the alerts.</p>,Person,PERSON,[KIM],"[Kim,]",5/12/62,"[1964, 1962-08-28]",,[None],UNKNOWN,MALE,[],,[None],"[290320764, 33845, 1133, 3157, P23]",[None],"[KP, KP]","[1, False Positive, KIM, 10-Oct-18]",1198704,SSBPLASC550212199,FACTIVA_SAN,"[FACTIVA_SAN, Politically Exposed Person (PEP), Special Interest Person (SIP)-Sanctions Lists]",False,"[1964, 1962-08-28, 290320764, 33845, 1133, 3157, P23, c/o Tanchon Commercial Bank, Saemul 1-Dong Pyongchon, District, Linked to: Tanchon Commercial Bank, Saemul 1-Dong Pyongchon District]","[1, False Positive, KIM, 10-Oct-18, <p>RBA applied. Approval from JM to close the alerts.</p>]",1198704,[Person],[PERSON],[KIM],"[Kim,]",[5/12/62],"[1964, 1962-08-28]",[],[None],[UNKNOWN],[MALE],[],[],[None],"[290320764, 33845, 1133, 3157, P23]",[None],[KP],"[1, False Positive, KIM, 10-Oct-18]",[1198704],[SSBPLASC550212199],[FACTIVA_SAN],[],"[FACTIVA_SAN, Politically Exposed Person (PEP), Special Interest Person (SIP)-Sanctions Lists]",[],[False],[],"[1964, 1962-08-28, 290320764, 33845, 1133, 3157, P23, c/o Tanchon Commercial Bank, Saemul 1-Dong Pyongchon, District, Linked to: Tanchon Commercial Bank, Saemul 1-Dong Pyongchon District]","[1, False Positive, KIM, 10-Oct-18, <p>RBA applied. Approval from JM to close the alerts.</p>]",[1198704],2


In [66]:
%%time
# Agent input creator
key_cols = ['_index', 'ALERT_INTERNAL_ID', 'ALERT_ID', 'hit_listId', 'hit_entryId']
for agent_name, input_agg_col_config in agent_input_agg_col_config.items():
    start = time.time()
    
    if agent_name in ['name_agent', 'dob_agent']:
        agent_input_df = agent_input_agg_df.select(*key_cols,
                                             *input_agg_col_config.keys(),
                                             'party_type_agent_ap',
                                             'party_type_agent_wl'
                                            )
    elif 'pary_type' in agent_name:
        continue
    else:
        agent_input_df = agent_input_agg_df.select(*key_cols, *input_agg_col_config.keys())
    
    # Our agent support the input list has None, hence, filter out None from all the agg columns (they will be the agent inputs)
    for agg_col_name in input_agg_col_config.keys():
        agent_input_df = agent_input_df.withColumn(agg_col_name, F.expr(f'array_except({agg_col_name}, array(null))'))
        
    agent_input_df_path = in_application_data_dir(f'agent-input/{agent_name}_input.delta')
    agent_input_df = write_read_delta(spark, agent_input_df, agent_input_df_path)
    logging.info(f'Agent: {agent}, Input written to {agent_input_df_path}, elapsed time: {time.time() - start:.2f}s')

2021/12/09 11:22:59 - root INFO: Agent: rba_agent, Input written to ./data/4.application/agent-input/party_type_agent_input.delta, elapsed time: 1.03s
2021/12/09 11:23:00 - root INFO: Agent: rba_agent, Input written to ./data/4.application/agent-input/name_agent_input.delta, elapsed time: 1.09s
2021/12/09 11:23:02 - root INFO: Agent: rba_agent, Input written to ./data/4.application/agent-input/dob_agent_input.delta, elapsed time: 1.04s
2021/12/09 11:23:03 - root INFO: Agent: rba_agent, Input written to ./data/4.application/agent-input/pob_agent_input.delta, elapsed time: 1.02s
2021/12/09 11:23:04 - root INFO: Agent: rba_agent, Input written to ./data/4.application/agent-input/gender_agent_input.delta, elapsed time: 1.03s
2021/12/09 11:23:05 - root INFO: Agent: rba_agent, Input written to ./data/4.application/agent-input/national_id_agent_input.delta, elapsed time: 1.04s
2021/12/09 11:23:06 - root INFO: Agent: rba_agent, Input written to ./data/4.application/agent-input/document_number_

CPU times: user 58.9 ms, sys: 5.72 ms, total: 64.6 ms
Wall time: 14.3 s


# Test agent (optional if we have time) - 3.2.0 - Agent Manager, Spark Manager

In [67]:
# Agent Manager

from google.protobuf.json_format import MessageToJson

In [68]:
# Agent Manager

def get_agent_input_output_path(agent_name):
    def _get_agent_input_path(input_dir, agent_name):
        return os.path.join(input_dir, agent_name+'_input.delta')

    def _get_agent_output_path(output_dir, agent_name):
        os.makedirs(output_dir, exist_ok=True)
        return os.path.join(output_dir, agent_name+'_output.csv')
    
    agent_input_dir_name = in_application_data_dir('agent-input')
    agent_output_dir_name = agent_input_dir_name.replace('input', 'output')

    agent_input_path = _get_agent_input_path(agent_input_dir_name, agent_name)
    agent_output_path = _get_agent_output_path(agent_output_dir_name, agent_name)
    
    return agent_input_path, agent_output_path

In [69]:
# Agent Manager
agent_name = 'name_agent'

agent_input_path, agent_output_path = get_agent_input_output_path(agent_name)
print(agent_input_path)
print(agent_output_path)

./data/4.application/agent-input/name_agent_input.delta
./data/4.application/agent-output/name_agent_output.csv


In [70]:
# Agent Manager
agent_input_df = spark.read.format('delta').load(agent_input_path)

# Limit the name agent to 6 threads only, because other agents run on single thread, e.g, there are 4 doc
# and they need to run sequentially
agent_input_df = agent_input_df.repartition(16)

instance_name_mapping = {
'PERSON': 'advname-indv',
'ORGANIZATION': 'advname-org',
}

instance_name_col = 'name_agent_config'
alerted_names_col = 'ap_all_names_aggregated'
matched_names_col = 'wl_all_names_aggregated'

In [71]:
%%time
# Agent Manager
agent_input_df = agent_input_df.withColumn(instance_name_col, F.lit(''))

for key, value in instance_name_mapping.items():

    agent_input_df = agent_input_df.withColumn(
        instance_name_col,
        F.when(F.col('party_type_agent_wl') == key, value).otherwise(F.col(instance_name_col))
    )

display(agent_input_df.limit(10).toPandas())

Unnamed: 0,_index,ALERT_INTERNAL_ID,ALERT_ID,hit_listId,hit_entryId,ap_all_names_aggregated,wl_all_names_aggregated,party_type_agent_ap,party_type_agent_wl,name_agent_config
0,1,1619405,WLF101-939701-62908,FACTIVA_SAN,4790496,[P ONE],[One P],Person,PERSON,advname-indv
1,0,1649364,WLF101-1363601-89626,FACTIVA_SIE,1091285,[CPF BOARD],[CPF],Organization,ORGANIZATION,advname-org
2,2,1619436,WLF101-945401-62939,FACTIVA_SAN,1198704,[KIM],"[Kim,]",Person,PERSON,advname-indv


CPU times: user 14.9 ms, sys: 0 ns, total: 14.9 ms
Wall time: 135 ms


In [72]:
# Agent Manager
group_count(agent_input_df, 'name_agent_config')

21/12/09 11:23:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/09 11:23:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/09 11:23:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Unnamed: 0,name_agent_config,count,percent,count_cum_sum,percent_cum_sum
0,advname-indv,2,66.667,2,66.667
1,advname-org,1,33.333,3,100.0


In [73]:
print('test')

test


In [74]:
import utils.agent_service_standalone as standalone_agents

TypeError: Couldn't build proto file into descriptor pool!
Invalid proto descriptor for file "silenteight/agent/document/v1/api/document_numbers_comparer_agent.proto":
  silenteight.agent.document.v1.api.CompareDocumentNumbersRequest: "silenteight.agent.document.v1.api.CompareDocumentNumbersRequest" is already defined in file "silenteight/agent/document/v1/api/document_agent.proto".
  silenteight.agent.document.v1.api.CompareDocumentNumbersResponse: "silenteight.agent.document.v1.api.CompareDocumentNumbersResponse" is already defined in file "silenteight/agent/document/v1/api/document_agent.proto".
  silenteight.agent.document.v1.api.DocumentNumbersComparerAgent.CompareDocumentNumbers: "silenteight.agent.document.v1.api.CompareDocumentNumbersRequest" seems to be defined in "silenteight/agent/document/v1/api/document_agent.proto", which is not imported by "silenteight/agent/document/v1/api/document_numbers_comparer_agent.proto".  To use it here, please add the necessary import.
  silenteight.agent.document.v1.api.DocumentNumbersComparerAgent.CompareDocumentNumbers: "silenteight.agent.document.v1.api.CompareDocumentNumbersResponse" seems to be defined in "silenteight/agent/document/v1/api/document_agent.proto", which is not imported by "silenteight/agent/document/v1/api/document_numbers_comparer_agent.proto".  To use it here, please add the necessary import.


In [None]:
# Agent Manager

def call_name_agent(instance_name, alerted_names, matched_names):
    def _remove_none_from_list(values):
        return [v for v in values if v is not None]
    
    def _get_reason_as_json(reason):
        if reason is None:
            return '{}'
        elif isinstance(reason, (dict, list)):
            return json.dumps(reason)
        else:
            return MessageToJson(reason)
        
#     response = agentservice.call_name_agent(instance_name, alerted_names, matched_names)
    alerted_names = _remove_none_from_list(alerted_names)
    matched_names = _remove_none_from_list(matched_names)
    response = standalone_agents.call_name_agent(instance_name, alerted_names, matched_names)

    return response.result, _get_reason_as_json(response.reason)

# Spark manager
name_agent_schema = StructType([
    StructField('name_agent', StringType()),
    StructField('name_agent_reason', StringType())
])

In [None]:
# Agent Manager

call_name_agent('advname-org', ['cpf'], ['cpf board'])

In [None]:
# Spark manager

agent_output_df = agent_input_df.withColumn('name_agent_all',
                                            udf(call_name_agent, name_agent_schema)(instance_name_col, alerted_names_col, matched_names_col)
                                           ) \
                                .select('*', 'name_agent_all.*') \
                                .drop('name_agent_all')

In [None]:
# Agent Manager

agent_output_df.limit(10).toPandas()
