# IBM Anti-Money-Laundering ETL Project (Team Epsilon)
###### Contributors: Nathon Burwick, Toyin Olaye, Cole Valentyn, Ariel Richardson, Talita Urzeda, Taylor Gibson

In [1]:
# Import Dependencies
import pandas as pd
import numpy as np
import os, fnmatch
import sqlalchemy
from pymongo import MongoClient
from sqlalchemy import Column, Integer, String, Float, MetaData, Table, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session
import datetime as dt
from pprint import pprint
import re
from tqdm import tqdm, trange
from threading import Thread

In [2]:
# Define a read_file function that transforms data into a Pandas DataFrame
def read_file(filename):
    # Establish Column Names
    cols = ['timestamp', 'from_bank', 'from_account', 'to_bank', 'to_account',
            'amount_received','rec_currency', 'amount_paid', 'payment_currency',
            'payment_format', 'is_laundering', 'pattern_type', 'group_id']
    
    # Read File' 
    file = open(os.path.join('Resources', filename), 'r')
    file_data = file.readlines()
    file_data = [l.strip() for l in file_data]
    
    # Filter out blank lines
    for i, l in enumerate(file_data):
        if l == '':
            file_data.pop(i)
            
    # Create basis for group_id (file_ref + grouping number) using regex
    file_type = re.findall(r"(^.+)_",filename)[0]
    group_id = 0
    
    # Loop through to add pattern_type & group_id data
    for i, l in enumerate(file_data):
        if re.search('BEGIN', l) != None:
            group_id += 1
            begin_row = i
            pattern_type = re.findall(r"- (.+)",l)[0]
            if ':' in pattern_type:
                pattern_type = pattern_type.split(':')[0]

        elif re.search('END', l) != None:
            end_row = i
            for index in range(begin_row + 1 , end_row):
                file_data[index] = file_data[index] + f",{pattern_type},{file_type}_{group_id}"
    
    # Remove Rows containing BEGIN or END
    remove_list = [i for i, l in enumerate(file_data) if re.search(r"^BEGIN|^END", l) != None]
    
    for index in sorted(remove_list, reverse=True):
        del file_data[index]

    # Use .split() to convert data rows into list of lists
    file_data = [l.split(',') for l in file_data]        
    
    # Convert Data into DataFrame with established column names
    file_df = pd.DataFrame(file_data, columns=cols)
    
    # Establish Data Types
    dtypes_dict = {'timestamp': 'datetime64', 'from_bank': 'string', 'from_account': 'string' ,
                   'to_bank': 'string', 'to_account': 'string', 'amount_received': 'float64', 
                   'rec_currency': 'string', 'amount_paid': 'float64', 'payment_currency': 'string',
                   'payment_format': 'string', 'is_laundering': 'int64', 'pattern_type': 'string', 'group_id': 'string'}
    
    # Convert Data Types
    file_df = file_df.astype(dtypes_dict)
    
    # Push a CSV
    os.makedirs('Outputs', exist_ok=True)
    file_df.to_csv(os.path.join('Outputs', f'{filename.split(".")[0]}_clean.csv'), index=False)

    # Return DataFrame to variable
    return file_df

In [3]:
# Read All .txt Files into DataFrame for Transformation
data_frames = [read_file(file) for file in os.listdir(os.path.join('Resources')) if fnmatch.fnmatch(file, '*.txt')]

In [4]:
# Display Preview a DataFrame
data_frames[0]

Unnamed: 0,timestamp,from_bank,from_account,to_bank,to_account,amount_received,rec_currency,amount_paid,payment_currency,payment_format,is_laundering,pattern_type,group_id
0,2022-09-01 02:38:00,001812,80279F810,0110,8000A94C0,10154.74,Australian Dollar,10154.74,Australian Dollar,ACH,1,FAN-IN,LI-Small_1
1,2022-09-02 14:36:00,022595,80279F8B0,0110,8000A94C0,5326.79,Australian Dollar,5326.79,Australian Dollar,ACH,1,FAN-IN,LI-Small_1
2,2022-09-03 14:09:00,001120,800E36A50,0110,8000A94C0,4634.81,Australian Dollar,4634.81,Australian Dollar,ACH,1,FAN-IN,LI-Small_1
3,2022-09-01 03:17:00,003671,801BF8E70,002557,8016B3750,8099.96,Euro,8099.96,Euro,ACH,1,FAN-IN,LI-Small_2
4,2022-09-01 06:27:00,015,80074C7E0,002557,8016B3750,10468.56,Euro,10468.56,Euro,ACH,1,FAN-IN,LI-Small_2
...,...,...,...,...,...,...,...,...,...,...,...,...,...
1018,2022-09-14 09:36:00,027,80BF623F0,0027009,80B880140,12499.20,US Dollar,12499.20,US Dollar,ACH,1,FAN-OUT,LI-Small_116
1019,2022-09-14 10:06:00,027,80BF623F0,016934,805F5B360,3902.74,Euro,3902.74,Euro,ACH,1,FAN-OUT,LI-Small_116
1020,2022-09-14 14:10:00,027,80BF623F0,00531,8057B5070,15565.88,US Dollar,15565.88,US Dollar,ACH,1,FAN-OUT,LI-Small_116
1021,2022-09-10 22:10:00,015516,8026EA390,025788,8026EA1A0,3431.61,Euro,3431.61,Euro,ACH,1,GATHER-SCATTER,LI-Small_117


In [5]:
# Display Data Types
data_frames[0].info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1023 entries, 0 to 1022
Data columns (total 13 columns):
 #   Column            Non-Null Count  Dtype         
---  ------            --------------  -----         
 0   timestamp         1023 non-null   datetime64[ns]
 1   from_bank         1023 non-null   string        
 2   from_account      1023 non-null   string        
 3   to_bank           1023 non-null   string        
 4   to_account        1023 non-null   string        
 5   amount_received   1023 non-null   float64       
 6   rec_currency      1023 non-null   string        
 7   amount_paid       1023 non-null   float64       
 8   payment_currency  1023 non-null   string        
 9   payment_format    1023 non-null   string        
 10  is_laundering     1023 non-null   int64         
 11  pattern_type      1023 non-null   string        
 12  group_id          1023 non-null   string        
dtypes: datetime64[ns](1), float64(2), int64(1), string(9)
memory usage: 104.0 KB


In [6]:
# Number of DataFrames Processed
print(f"Number of files were converted into DataFrames: {len(data_frames)}")

Number of files were converted into DataFrames: 6


In [7]:
# Concatenate DataFrames
merged_df = pd.concat(data_frames, ignore_index=True, axis=0).sort_values('group_id').reset_index(drop=True)
merged_df

Unnamed: 0,timestamp,from_bank,from_account,to_bank,to_account,amount_received,rec_currency,amount_paid,payment_currency,payment_format,is_laundering,pattern_type,group_id
0,2022-08-09 05:14:00,00952,8139F54E0,0111632,8062C56E0,5331.44,US Dollar,5331.44,US Dollar,ACH,1,STACK,HI-Large_1
1,2022-08-15 14:19:00,013729,801CF2E60,0123621,81A7090F0,1467.94,US Dollar,1467.94,US Dollar,ACH,1,STACK,HI-Large_1
2,2022-08-13 12:40:00,0024750,81363F410,0213834,808757B00,16898.29,US Dollar,16898.29,US Dollar,ACH,1,STACK,HI-Large_1
3,2022-08-22 06:34:00,0213834,808757B00,000,800073EF0,17607.19,US Dollar,17607.19,US Dollar,ACH,1,STACK,HI-Large_1
4,2022-08-15 07:40:00,0118693,823D5EB90,013729,801CF2E60,1400.54,US Dollar,1400.54,US Dollar,ACH,1,STACK,HI-Large_1
...,...,...,...,...,...,...,...,...,...,...,...,...,...
188276,2022-09-09 08:58:00,01776,8017D1980,02860,8016A5E40,5992.39,US Dollar,5992.39,US Dollar,ACH,1,FAN-OUT,LI-Small_99
188277,2022-09-09 09:13:00,01776,8017D1980,021566,801B7DB90,17650.29,US Dollar,17650.29,US Dollar,ACH,1,FAN-OUT,LI-Small_99
188278,2022-09-10 17:30:00,01776,8017D1980,011222,80095D960,5877.68,US Dollar,5877.68,US Dollar,ACH,1,FAN-OUT,LI-Small_99
188279,2022-09-11 12:17:00,01776,8017D1980,021939,8011E83E0,17696.72,US Dollar,17696.72,US Dollar,ACH,1,FAN-OUT,LI-Small_99


In [8]:
# Find Unique Items in Currency Types, Payment Formats, Pattern Types and Create IDs
# Define Function
def get_unique(df, column_name):
    try:
        temp_df = pd.DataFrame(df[column_name].unique().tolist(), columns=[column_name]).sort_values(column_name).reset_index(drop=True)
        return temp_df
    except KeyError:
        print(f'KeyError: {column_name} does not exist in specified DataFrame')

In [9]:
# Create Currency DataFrame        
currencies = get_unique(merged_df, 'rec_currency')
currencies

Unnamed: 0,rec_currency
0,Australian Dollar
1,Bitcoin
2,Brazil Real
3,Canadian Dollar
4,Euro
5,Mexican Peso
6,Ruble
7,Rupee
8,Saudi Riyal
9,Shekel


In [10]:
# Create Payment Format DataFrame  
payment_formats =  get_unique(merged_df, 'payment_format')
payment_formats

Unnamed: 0,payment_format
0,ACH
1,Bitcoin
2,Wire


In [11]:
# Create Pattern Types DataFrame  
pattern_types =  get_unique(merged_df, 'pattern_type')
pattern_types

Unnamed: 0,pattern_type
0,BIPARTITE
1,CYCLE
2,FAN-IN
3,FAN-OUT
4,GATHER-SCATTER
5,RANDOM
6,SCATTER-GATHER
7,STACK


In [12]:
# Create Group IDs DataFrame  
group_ids =  get_unique(merged_df, 'group_id')
group_ids

Unnamed: 0,group_id
0,HI-Large_1
1,HI-Large_10
2,HI-Large_100
3,HI-Large_1000
4,HI-Large_10000
...,...
22379,LI-Small_95
22380,LI-Small_96
22381,LI-Small_97
22382,LI-Small_98


In [13]:
# Create Aggregated DataFrame for MongoDB Records
agg_df = merged_df[merged_df['group_id'].str.contains('Small')].copy().reset_index(drop=True)
agg_df

Unnamed: 0,timestamp,from_bank,from_account,to_bank,to_account,amount_received,rec_currency,amount_paid,payment_currency,payment_format,is_laundering,pattern_type,group_id
0,2022-09-01 04:33:00,021174,800737690,020,80020C5B0,8630.40,Euro,8630.40,Euro,ACH,1,FAN-OUT,HI-Small_1
1,2022-09-01 09:56:00,021174,800737690,00220,8007A5B70,5738987.96,US Dollar,5738987.96,US Dollar,ACH,1,FAN-OUT,HI-Small_1
2,2022-09-04 15:48:00,021174,800737690,011056,8007486D0,15224.53,Euro,15224.53,Euro,ACH,1,FAN-OUT,HI-Small_1
3,2022-09-04 12:03:00,021174,800737690,010,800692550,2724.99,US Dollar,2724.99,US Dollar,ACH,1,FAN-OUT,HI-Small_1
4,2022-09-04 09:10:00,021174,800737690,011852,800AEBA90,300.93,Euro,300.93,Euro,ACH,1,FAN-OUT,HI-Small_1
...,...,...,...,...,...,...,...,...,...,...,...,...,...
4227,2022-09-09 08:58:00,01776,8017D1980,02860,8016A5E40,5992.39,US Dollar,5992.39,US Dollar,ACH,1,FAN-OUT,LI-Small_99
4228,2022-09-09 09:13:00,01776,8017D1980,021566,801B7DB90,17650.29,US Dollar,17650.29,US Dollar,ACH,1,FAN-OUT,LI-Small_99
4229,2022-09-10 17:30:00,01776,8017D1980,011222,80095D960,5877.68,US Dollar,5877.68,US Dollar,ACH,1,FAN-OUT,LI-Small_99
4230,2022-09-11 12:17:00,01776,8017D1980,021939,8011E83E0,17696.72,US Dollar,17696.72,US Dollar,ACH,1,FAN-OUT,LI-Small_99


In [14]:
# Create Bank/Account Pairs
pairs_df = pd.DataFrame([[row['from_bank'], row['from_account']] for i, row in agg_df.iterrows()],
                        columns=['from_bank', 'from_account']).drop_duplicates().reset_index(drop=True)
display(pairs_df.head())
print(pairs_df.shape)

Unnamed: 0,from_bank,from_account
0,21174,800737690
1,14290,801BA48E0
2,3489,80211F620
3,21745,800A75B90
4,1,8010AA4F0


(2783, 2)


In [15]:
import multiprocessing

#build process pool
num_processes = multiprocessing.cpu_count()

chunk_size = int(pairs_df.shape[0]/num_processes)

#chunks
chunks = [pairs_df.iloc[pairs_df.index[i:i + chunk_size]] for i in range(0, pairs_df.shape[0], chunk_size)]

# Create Process Pool
pool = multiprocessing.Pool(processes=num_processes)

# of chunks
print(f'There are {len(chunks)} chunks')

There are 11 chunks


In [16]:
chunks[0]

Unnamed: 0,from_bank,from_account
0,021174,800737690
1,014290,801BA48E0
2,003489,80211F620
3,021745,800A75B90
4,001,8010AA4F0
...,...,...
273,011,8000631E0
274,020,800102120
275,001,8001093C0
276,011,8000EF670


In [17]:
# Define Function for Threading
df_list = []

def get_nests(chunk):
    import warnings
    warnings.filterwarnings("ignore")
   
    # Create Empty Column for nesting
    chunk['destinations'] = None
    # loop through chunk rows to filter by bank & account
    for index, row in chunk.iterrows():
        temp_df = pd.DataFrame(row.to_dict(), index=[0])
        df = pd.merge(merged_df, temp_df, how='inner', on=['from_bank', 'from_account'])
        df = df[['to_bank','to_account', 'timestamp', 'amount_paid', 'payment_currency', 'payment_format']]
        nest = df.to_dict('records')
        chunk.loc[index, 'destinations'] = nest
        
    # Concat new DF
    updated_df = chunk
    
    # Return New DF
    return df_list.append(updated_df)

In [18]:
%%time

# Empty lists
threads = []

for chunk in chunks:
    thread = Thread(target=get_nests, kwargs={'chunk' : chunk})
    thread.start()
    threads.append(thread)
    
for thread in threads:
    thread.join()
    


CPU times: user 10min 19s, sys: 20.5 s, total: 10min 40s
Wall time: 10min 22s


In [19]:
# Preview First DataFrame
display(df_list[0])
print(f'{len(df_list)} chunks processed')

Unnamed: 0,from_bank,from_account,destinations
2780,2514,800450E00,"[{'to_bank': '014', 'to_account': '800330A10',..."
2781,14,80012D490,"[{'to_bank': '014', 'to_account': '800283000',..."
2782,1776,8017D1980,"[{'to_bank': '01217', 'to_account': '8001D67F0..."


11 chunks processed


In [22]:
# Nest Records in Pairs DataFrame
pairs_df = pd.concat(df_list, axis=0, ignore_index=True)

In [23]:
# Display pairs_df with Nested Data
pairs_df

Unnamed: 0,from_bank,from_account,destinations
0,02514,800450E00,"[{'to_bank': '014', 'to_account': '800330A10',..."
1,014,80012D490,"[{'to_bank': '014', 'to_account': '800283000',..."
2,01776,8017D1980,"[{'to_bank': '01217', 'to_account': '8001D67F0..."
3,00410,800248110,"[{'to_bank': '012', 'to_account': '800052EF0',..."
4,0027365,80DE18E90,"[{'to_bank': '0041407', 'to_account': '80F4549..."
...,...,...,...
2778,003514,801F08FB0,"[{'to_bank': '00220', 'to_account': '8028EFE80..."
2779,025536,8043410B0,"[{'to_bank': '0113935', 'to_account': '807B6EA..."
2780,017729,80720B3B0,"[{'to_bank': '003149', 'to_account': '8032FADC..."
2781,022,8078DC6E0,"[{'to_bank': '017222', 'to_account': '8041EDC7..."


In [24]:
# Create MongoDB and Collection for Data
mongo = MongoClient()
AML_db = mongo['AML_DB']
AML_coll = AML_db['AML_Data']
AML_agg = AML_db['AML_Aggregated']

In [25]:
AML_coll.insert_many(merged_df.to_dict('records'))
AML_agg.insert_many(pairs_df.to_dict('records'))

<pymongo.results.InsertManyResult at 0x1576af2c0>

In [26]:
# Check for DataBase and Collection creation
print("DataBases in LocalHost MongoDB Client:")
print(mongo.list_database_names())
print()
print('Collections Created in AML_DB:')
print(AML_db.list_collection_names())


DataBases in LocalHost MongoDB Client:
['AML_DB', 'admin', 'config', 'demoDB', 'local', 'shoes', 'uk_food']

Collections Created in AML_DB:
['AML_Data', 'AML_Aggregated']


In [27]:
print(f"There were {'{:,}'.format(AML_coll.count_documents({}))} records added into the {AML_coll.name} collection")
print(f"There were {'{:,}'.format(AML_agg.count_documents({}))} records added into the {AML_agg.name} collection")

There were 188,281 records added into the AML_Data collection
There were 2,783 records added into the AML_Aggregated collection


In [None]:
# # Reset Mongo Client
# mongo.drop_database('AML_DB')
# mongo.list_database_names()

In [28]:
# Create SQLite DB Engine
engine = sqlalchemy.create_engine(f"sqlite:///{os.path.join('Outputs', 'AML.sqlite')}", echo=False)

# Establish MetaData Creation Variable
meta = MetaData()
Base = declarative_base()

In [29]:
# Create Tables in SQLite DB
currencies_table = Table('Currencies', meta,
                         Column('Currency', String(255), primary_key=True))

pay_types = Table('Payment_Formats', meta,
                  Column('Payment_Format', String(255), primary_key=True))

patterns = Table('Pattern_Types', meta,
                 Column('Pattern_Type', String(255), primary_key=True))

groups = Table('Group_IDs', meta,
               Column('Group_ID', String(255), primary_key=True))

AML_Table = Table('AML_Data', meta,
                  Column('id', Integer, primary_key=True),
                  Column('timestamp', String(255)),
                  Column('from_bank', String(255)),
                  Column('from_account', String(255)),
                  Column('to_bank', String(255)),
                  Column('to_account', String(255)),
                  Column('amount_received', Float),
                  Column('rec_currency', String(255), ForeignKey('Currencies.Currency', ondelete='CASCADE'), nullable=False),
                  Column('amount_paid', Float),
                  Column('payment_currency', String(255), ForeignKey('Currencies.Currency', ondelete='CASCADE'), nullable=False),
                  Column('payment_format', String(255), ForeignKey('Payment_Formats.Payment_Format', ondelete='CASCADE'), nullable=False),
                  Column('is_laundering', Integer),
                  Column('pattern_type', String(255), ForeignKey('Pattern_Types.Pattern_Type', ondelete='CASCADE'), nullable=False),
                  Column('group_id', String(255),ForeignKey('Group_IDs.Group_ID', ondelete='CASCADE'), nullable=False))

meta.create_all(engine)

In [30]:
from sqlalchemy import inspect

In [31]:
inspector = inspect(engine)
schema = inspector.get_schema_names()

tables = inspector.get_table_names(schema=schema[0])
print(tables)

['AML_Data', 'Currencies', 'Group_IDs', 'Pattern_Types', 'Payment_Formats']


In [33]:
# Insert Data into DataBase from Pandas
conn = engine.connect()
table = 'AML_Data'
merged_df.to_sql(table, conn, if_exists='fail')

ValueError: Table 'AML_Data' already exists.

In [None]:
# # Reset SQLite DB
# for tbl in meta.sorted_tables:
#     print(tbl.name)
#     tbl.drop(engine)