MIKE MINNEBACH - 500903092 - FinTech Systems Development

In [658]:
import re #to write regex
import pandas as pd #for the dataframe
import datetime #for date formatting
import os #to import the directory that contains the swift messages
import itertools #to split returns based on strings and numbers


Convert a MT103 SWIFT message 
(https://www2.swift.com/knowledgecentre/products/Standards%20MT) into the 
following data structure. Write a code in your chosen language to convert the message to 
the following data structure. The notEmpty fields are mandatory. (25 points)
Converting each of the mandatory fields correctly carries 1 point and converting each of 
the non-mandatory fields contains 0.5 points (15+8.5+1.5). 1.5 points are given for coding 
style.

In [659]:
file = open("MT103.txt", 'r') #read the file

In [660]:
#initiate lookup variables
transaction_re = re.compile(r'transaction_|instrument_|originator_|beneficiary_|ingoing_|outgoing_*') #regex to identify the items which are needed in the dataframe


In [661]:
#####################################################################################################
# Create dataframe based on the MT103.txt example                                                   #
#####################################################################################################

columns = [] #initiate columns list
values = [] #initiate values list

for line in file.read().split('\n'): #start for loop for each row in the .txt file
    if transaction_re.search(line): #if row starts with the value(s) mentioned in the regex, keep row
        item = line.split() #split row to extract the item (transaction id etc.) and corresponding value. This returns a list ['item', 'value']
        columns.append(item[0].replace(":","")) #append item to columns, remove ":" for tidiness
        try: #start try except for the values, as some values are empty, which stops Python
            values.append(item[1]) #append value to values
        except IndexError: #indexerrors occur when an index is appointed which is not available. Thus in case no value is entered. 
            values.append("No value entered") #if there is no value, append "No value entered" 

df = pd.DataFrame([columns, values], index=['item', 'value']).T.explode('value') #code from https://stackoverflow.com/questions/66615474/create-a-pandas-dataframe-from-two-lists-column-1-is-first-list-column-2-is-se
#explode() method is used to transform each element of a list to a separate record.

df.head() #print first five rows of the df to see what we are dealing with


Unnamed: 0,item,value
0,transaction_date,xDateTimeTz
1,transaction_id,notEmpty
2,transaction_message,notEmpty
3,transaction_currency,No value entered
4,transaction_amount,notEmpty


In [662]:
#####################################################################################################
# Transforming the df                                                                               #
#####################################################################################################
 
#transpose
df = df.transpose() #columns, rows as explained above

#column names are the first row
df.columns = df.iloc[0] 

#remove the first row, as these are the column names
df = df[1:1]

#transform date to date format
#df["transaction_date"][0] = datetime.datetime.strptime(df["transaction_date"][0],"%d/%m/%Y").date() #code from https://stackoverflow.com/questions/2803852/python-date-string-to-date-object

#drop // columns, which was erroneously extracted
df.drop("//",axis=1,inplace=True)

#df.head()

In [663]:
#####################################################################################################
# Set the right datatypes                                                                           #
##################################################################################################### 

df['transaction_date'] = pd.to_datetime(df['transaction_date'])

df = df.astype({'transaction_id':'string',
                'transaction_message':'string',
                'transaction_currency':'string',
                'transaction_amount':'float',
                'transaction_type':'string',
                'transaction_direction':'string',
                'transaction_status':'string',
                'instrument_type':'string',
                'originator_full_name':'string',
                'originator_first_name':'string',
                'originator_middle_names_patronymic':'string',
                'originator_last_name':'string',
                'originator_address':'string',
                'originator_country':'string',
                'originator_account_number':'string',
                'originator_branch_id':'string',
                'originator_bic':'string',
                'originator_fi_name':'string',
                'originator_fi_country':'string',
                'outgoing_intermediary_fi_bic':'string',
                'beneficiary_full_name':'string',
                'beneficiary_first_name':'string',
                'beneficiary_middle_names_patronymic':'string',
                'beneficiary_last_name':'string',
                'beneficiary_address':'string',
                'beneficiary_country':'string',
                'beneficiary_account_number':'string',
                'beneficiary_branch_id':'string',
                'beneficiary_bic':'string',
                'beneficiary_fi_name':'string',
                'beneficiary_fi_country':'string'})

df.dtypes

item
transaction_date                       datetime64[ns]
transaction_id                                 string
transaction_message                            string
transaction_currency                           string
transaction_amount                            float64
transaction_type                               string
transaction_direction                          string
transaction_status                             string
instrument_type                                string
originator_full_name                           string
originator_first_name                          string
originator_middle_names_patronymic             string
originator_last_name                           string
originator_address                             string
originator_country                             string
originator_account_number                      string
originator_branch_id                           string
originator_bic                                 string
originator_fi_name     

In [664]:
#close the file 
file.close()

Fill the dataframe with  SWIFT MT103 messages.

In [665]:
#set directory with files to loop through
swift_messages = r'G:\Mijn Drive\School\Master in Digital Driven Business - AUAS Amsterdam\Q3\1. Fintech Systems dev\Assignment\examples'

In [666]:
#####################################################################################################
# First we initiate the variables with an empty string, in such a                                   #
# value (content of the SWIFT message) can be allocated during the code. This is done for each      #
# iteration of the loop.                                                                            #
#####################################################################################################     

for message in os.listdir(swift_messages): #start looping over all the files in the directory. Set '' for each variable.
    transaction_date = ''
    transaction_id = ''
    transaction_message = ''
    transaction_currency=''
    transaction_amount=''
    transaction_type=''
    transaction_direction=''
    transaction_status=''
    instrument_type=''
    originator_full_name=''
    originator_first_name=''
    originator_middle_names_patronymic=''
    originator_last_name=''
    originator_address =''
    originator_country=''
    originator_account_number=''
    originator_branch_id=''
    originator_bic=''
    originator_fi_name=''
    originator_fi_country=''
    outgoing_intermediary_fi_bic=''
    beneficiary_full_name=''
    beneficiary_first_name=''
    beneficiary_middle_names_patronymic=''
    beneficiary_last_name = ''
    beneficiary_address = ''
    beneficiary_country = '' 
    beneficiary_account_number = '' 
    beneficiary_branch_id=''
    beneficiary_bic=''
    beneficiary_fi_name=''
    beneficiary_fi_country=''

    #############################################################################################    
    # General information retrieval, and splitting of the values                                #                         
    #############################################################################################

    swift_message = open(os.path.join(swift_messages, message), 'r') #open the swift message
    file = swift_message.read()
    item_string = repr(file)
    items = re.split(".(:[A-Za-z0-9]+:)",item_string)
    items = items[1:]
    res = [i+j for i,j in zip(items[::2], items[1::2])] #code from https://stackoverflow.com/questions/5850986/joining-pairs-of-elements-of-a-list
    
    ids = []

    for id_cont in res:                
        mt103_identifier = id_cont[:4].replace(':','')
        ids.append(mt103_identifier)

    #############################################################################################    
    # Transaction content                                                                       #                         
    #############################################################################################
#transaction_id
    if '20' in ids:
        for i in res:
            if i[:4].replace(':','') == '20':
                transaction_id = (i[4:]).replace('\\','')
    elif '20' not in ids:
        transaction_id = 'Not found'

#transaction_type
    if '23B' in ids:
        for i in res:
            if i[:4].replace(':','') == '23B':
                transaction_type = (i[4:]).replace('\\','').replace(':','')
    elif '59' not in ids:
        transaction_type = 'Not found'

#transaction_date, #transaction currency, transaction amount
    if '32A' in ids:
        for i in res:
            if i[:4].replace(':','') == '32A':
                split = ["".join(x) for _, x in itertools.groupby(i[4:].replace('\\','').replace(':','').replace(',',''), key=str.isdigit)] #code from https://stackoverflow.com/questions/13673781/splitting-a-string-where-it-switches-between-numeric-and-alphabetic-characters
                transaction_date = datetime.datetime.strptime(split[0],'%d%m%y')
                transaction_currency = split[1]
                transaction_amount = split[2] 
    elif '32A' not in ids:
        transaction_date, transaction_currency, transaction_amount = 'Not found', 'Not found', 'Not found'

#transaction message
    if '70' in ids:
        for i in res:
            if i[:4].replace(':','') == '70':
                transaction_message = i[4:].replace("\\n",' ').replace(':','').replace('}','').replace('-','')
    elif '70' not in ids:
        #check long items in the '7' category
        for i in res:
            if (i[:4].replace(':','')[0]) == '7': #if the first item is a '7' 
                if ' ' in i: # if the text contains white spaces
                    transaction_message = i[4:].replace("\\n",' ').replace(':','').replace('}','').replace('-','')
    else:
        transaction_message = 'Not found'

    #############################################################################################    
    # Originator content                                                                        #                         
    #############################################################################################

    if '50A' in ids or '50F' in ids or '50K' in ids:
        for i in res:
            if i[:4].replace(':','') == '50A' or i[:4].replace(':','') == '50F' or i[:4].replace(':','') == '50K':
                ori_list = (i[4:].split("\\n"))
                ori_list = list(map(lambda x: x.replace(':', '').replace('/','').replace('\\',''), ori_list)) #code from https://www.geeksforgeeks.org/how-to-replace-values-in-a-list-in-python/
                originator_account_number = ori_list[0]
                originator_country = ori_list[0][:2]
                ori_list.remove(originator_account_number)
            #find bic
                for bic in ori_list:
                    if(re.match('^[A-Z]{6}[A-Z0-9]{2}([A-Z0-9]{3})?$',bic)):
                        originator_bic = bic
                ori_list.remove(originator_bic)
                originator_full_name = ori_list[0]
                originator_last_name = originator_full_name.split()[-1]
                originator_first_name = originator_full_name.split()[0]
            #middle name
                ori_list.remove(originator_full_name)
                split_fn = (originator_full_name.split())
                if len(split_fn) >= 3:
                    originator_middle_names_patronymic = str((split_fn[1:-1])).replace('[','').replace(']','').replace("'",'')
                else:
                    originator_middle_names_patronymic = ''
            #address
                originator_address = ' '.join(ori_list)                
            #bic branch
                if (len(originator_bic)) >= 10:
                    originator_branch_id = (originator_bic[-3:])
                else:
                    originator_branch_id = 'Not found'
    else: 
        originator_account_number = 'Not found'
        originator_country = 'Not found'
        originator_full_name = 'Not found'
        originator_last_name = 'Not found'
        originator_last_name = 'Not found'
        originator_first_name = 'Not found'
        originator_middle_names_patronymic = 'Not found'
        originator_address = 'Not found'

    #############################################################################################    
    # Beneficiary content                                                                       #                         
    #############################################################################################

    if '59' in ids:
        for i in res:
            #print(i)
            if i[:4].replace(':','') == '59':
                ben_list = ((i[4:].split("\\n")))
                ben_list = list(map(lambda x: x.replace(':', '').replace('/','').replace('\\',''), ben_list)) 
                beneficiary_full_name = (i[4:].split("\\n")[1])
                beneficiary_last_name = beneficiary_full_name.split()[-1]
                beneficiary_first_name = beneficiary_full_name.split()[0]
#middle name
                split_bn = beneficiary_full_name.split()
                if len(split_bn) >= 3:
                    beneficiary_middle_names_patronymic = str((split_bn[1:-1])).replace('[','').replace(']','').replace("'",'')
                else:
                    beneficiary_middle_names_patronymic = ''
                ben_list.remove(beneficiary_full_name)
#BIC
        for bic in ben_list:
                if (re.match('^[A-Z]{6}[A-Z0-9]{2}([A-Z0-9]{3})?$',bic)) != None: #if match is found (thus not None)
                    beneficiary_bic = bic
                    ben_list.remove(beneficiary_bic)
                elif '57A' in ids and len(beneficiary_bic) == 0:
                    for i in res:
                        if i[:4].replace(':','') == '57A':
                            beneficiary_bic = (i[4:].split("\\n")[0]).replace(':','').replace('/','')
#Branch
        if (len(beneficiary_bic)) >= 10:
            beneficiary_branch_id = (beneficiary_bic[-3:])
        else:
            beneficiary_branch_id = 'Not found'
#IBAN       
        for iban in ben_list:
            if (re.match('[a-zA-Z]{2}[0-9]{2}[a-zA-Z0-9]{4}[0-9]{7}([a-zA-Z0-9]?){0,16}',iban)) != None:
                beneficiary_account_number = iban
                ben_list.remove(iban)
            elif len(beneficiary_account_number) == 0: #if the variable is empty (thus len = 0), value = 'Not found'
                beneficiary_account_number = "Not found"
#address
            beneficiary_address = ' '.join(ben_list) 
#beneficiary_country
        if beneficiary_account_number == "Not found":
            beneficiary_country = "Not found"
        else:
            beneficiary_country = beneficiary_account_number[:2]
    else:
        beneficiary_full_name = 'Not found'
        beneficiary_last_name = 'Not found'
        beneficiary_first_name = 'Not found'
        beneficiary_middle_names_patronymic = ''
        beneficiary_bic = 'Not found'
        beneficiary_account_number = 'Not found'
        beneficiary_address = 'Not found'
        beneficiary_country = "Not found"

    #############################################################################################    
    # Other (originator_fi, Beneficiary_fi and outgoing_intermediary) content                   #                         
    #############################################################################################

#originator_fi
    if '53A' in ids:
        for i in res:
            if i[:4].replace(':','') == '53A':
                originator_fi_name = ((i).split("\\n"))[1]
                originator_fi_country = (i).split("\\n")[-1].replace('\\','')
    else:
        originator_fi_name, originator_fi_country = 'Not found', 'Not found'
#beneficiary_fi
    if '54A' in ids:
        for i in res:
            if i[:4].replace(':','') == '54A':
                beneficiary_fi_name = (i).split("\\n")[1]
                beneficiary_fi_country = (i).split("\\n")[-1].replace('\\','')
    else:
        beneficiary_fi_name, beneficiary_fi_country = 'Not found', 'Not found'
#outgoing_intermediary_fi_bic
    if '56A' in ids:
        for i in res:
            if i[:4].replace(':','') == '56A':
                fi_bic = i[4:].split("\\n")
                fi_bic = list(map(lambda x: x.replace(':', '').replace('/','').replace('\\',''), fi_bic))
                for bic in fi_bic:
                    if (re.match('^[A-Z]{6}[A-Z0-9]{2}([A-Z0-9]{3})?$',bic)) != None: #if match is found (thus not None)
                        outgoing_intermediary_fi_bic = bic
    else:
        outgoing_intermediary_fi_bic = 'Not found'

    #############################################################################################    
    # Append the variables to the DF                                                            #                         
    #############################################################################################

    df.loc[message] = [transaction_date,                
                transaction_id,                         
                transaction_message,                    
                transaction_currency,                   
                transaction_amount,                     
                transaction_type,                       
                transaction_direction,
                transaction_status,
                instrument_type,
                originator_full_name,                   
                originator_first_name,                  
                originator_middle_names_patronymic,     
                originator_last_name,                   
                originator_address,                     
                originator_country,                     
                originator_account_number,              
                originator_branch_id,                   
                originator_bic,                        
                originator_fi_name,
                originator_fi_country,
                outgoing_intermediary_fi_bic,
                beneficiary_full_name,                  
                beneficiary_first_name,                 
                beneficiary_middle_names_patronymic,    
                beneficiary_last_name,                  
                beneficiary_address,                    
                beneficiary_country,                    
                beneficiary_account_number,             
                beneficiary_branch_id,                  
                beneficiary_bic,                        
                beneficiary_fi_name,                    
                beneficiary_fi_country]
    swift_message.close()

#df.to_excel('MT103_analysis.xlsx')

#df.head()

Use this structure to either write python functions or sql code to identify the following risk 
patterns (75 points)

> a. Round Amount payments - 15 points

In [667]:
def roundamount(value):

    #####################################################################################
    # This function checks if the amount in 'transaction_amount is a multiple of 100    #        
    # If it is, return True, otherwise return False.                                    #
    ##################################################################################### 

    if int(value) % 100 == 0:
        return True
    else:
        return False

df['isRound'] = df['transaction_amount'].apply(roundamount)

print(df['isRound'])

example_1.txt     True
example_2.txt     True
example_3.txt    False
Name: isRound, dtype: bool


> b. Payments from high risk countries -15 points (https://www.fatfgafi.org/en/countries/black-and-grey-lists.html) to tax havens 
(https://fsi.taxjustice.no/fsi/2022/world/score/top)

In [668]:
import pandas as pd

def is_high_risk(df, originator_col, beneficiary_col):

    #############################################################################################    
    # This function checks if the originator country is in a list called 'high_risk_countries'  #        
    # and if the beneficiary country is in a list called 'tax_havens'.                          #
    # If both countries are in the lists, it returns True. Otherwise, it returns False.         #
    ############################################################################################# 
    
    high_risk_countries = ['KP', 'IR', 'MM', 'AL', 'BB', 'BF', 'KY', 'CD', 'GI', 'HT', 'JM', 'JO', 'ML', 'MZ', 'NG', 'PA', 'PH', 'SN', 'ZA', 'SS', 'SY', 'TZ', 'TR', 'UG', 'AE', 'YE']
    tax_havens = ['VN', 'AO', 'BO', 'AE', 'DZ', 'PR', 'KN', 'AG', 'CW', 'VU', 'LK', 'TC', 'BS', 'AI', 'MV', 'BZ', 'GT', 'BD', 'KW', 'MS', 'BB', 'QA', 'MC', 'OM', 'BN', 'LR', 'WS', 'PA', 'GM', 'KY', 'LC', 'LI', 'SC', 'RW', 'JO', 'VE', 'VI', 'NM', 'MH', 'AW', 'VG', 'GG', 'FJ', 'GU', 'CM', 'BM', 'MU', 'CH', 'TH', 'CK', 'AS', 'SA', 'TT', 'XK', 'TZ', 'EG', 'BH', 'US', 'SG', 'PH', 'GI', 'KE', 'VC', 'CN', 'PK', 'PY', 'MA', 'GD', 'MY', 'DM', 'IM', 'HK', 'NG', 'DO', 'NL', 'LB', 'KR', 'JE', 'JP', 'MO', 'NZ', 'KZ', 'MK', 'CY', 'TR', 'ME', 'SV', 'SM', 'TW', 'ZA', 'CL', 'RU', 'TN', 'RO', 'IL', 'NR', 'UA', 'UY', 'PT', 'BW', 'DE', 'ES', 'AU', 'ID', 'CR', 'LV', 'HU', 'LU', 'AD', 'IT', 'IN', 'AT', 'MT', 'AL', 'RS', 'CO', 'PE', 'NO', 'SK', 'HR', 'MX', 'GR', 'BG', 'GH', 'BE', 'EC', 'FI', 'CA', 'LT', 'CZ', 'BR', 'AR', 'DK', 'FR', 'IE', 'GB', 'PL', 'SE', 'EE', 'IS', 'SI'] 

    
    # Check if both originator and beneficiary countries are in the high-risk and tax haven lists
    is_high_risk = df[originator_col].isin(high_risk_countries) & df[beneficiary_col].isin(tax_havens)
    
    return is_high_risk

df['highRisk'] = is_high_risk(df, 'originator_country', 'beneficiary_country') #Append the outcome to the df

print(df['highRisk'])


example_1.txt    False
example_2.txt    False
example_3.txt    False
Name: highRisk, dtype: bool


> Smurfing -10 points

In [669]:
#som per originator_account_number
#if over past X days at least N cash deposits were made with an amount between the fix threshold (10k) and [100 - CD(n) * 2%] of that amount (38:29)


> Nesting -10 points

In [670]:
#for nesting we will assess the number of origin accounts that transfer money to the beneficiary. 

def count_transfers(df, beneficiary_col, originator_col):
    
    #############################################################################################    
    # This function counts the number of times money is transferred to                          #
    # each unique beneficiary account number. The function returns a new pandas dataframe with  #
    # the counts for each beneficiary account number. If both countries are in the lists, it    #
    # returns True. Otherwise, it returns False.                                                #
    ############################################################################################# 
    
    # Group the dataframe by beneficiary account number and count the number of transfers
    transfer_counts = df.groupby(beneficiary_col)[originator_col].count()
    
    # Create a new dataframe with the counts and the beneficiary account numbers as the index
    counts_df = pd.DataFrame(transfer_counts)
    
    # If the index contains 'Not found', set the transfer count to 0
    if 'Not found' in counts_df.index:
        counts_df.loc['Not found'] = 'N/a'
    
    return counts_df

counts_df = count_transfers(df, 'beneficiary_account_number', 'originator_account_number')

print(counts_df)


                           originator_account_number
beneficiary_account_number                          
CN123456789012345678                               1
GB57METR12345678901234                             1
Not found                                        N/a


> e. Non-adherence to FATF Recommendation 16 -10 points 
<br>
> f. (https://www.fatfgafi.org/content/dam/recommandations/pdf/FATF%20Recommendations%202012.pdf.coredownload.inline.pdf) - 5 points

In [671]:
def FATF_compliance(df):
    
    #############################################################################################    
    # This function checks if there are any columns that contain missing or 'Not found' values, #                         
    # excluding a list of columns that are allowed to miss a value.                             #
    ############################################################################################# 

    allowed_cols=['transaction_direction', 'transaction_status', 'instrument_type', 'originator_middle_names_patronymic', 'beneficiary_middle_names_patronymic']
    missing_cols = []
    
    # Iterate through each column in the dataframe
    for col in df.columns:
        
        # Skip columns that are allowed to have missing values
        if col in allowed_cols:
            continue
        
        # Check if any values in the column are missing or 'Not found'
        if df[col].isnull().any() or 'Not found' in df[col].values:
            missing_cols.append(col)
    
    # If there are missing columns, print their names and return False
    if missing_cols:
        print('Columns with missing values (excluding allowed columns):', missing_cols)
        return False
    
    # Otherwise, return True
    return True

df["ComplFATF"] = (FATF_compliance(df))


Columns with missing values (excluding allowed columns): ['originator_branch_id', 'originator_fi_name', 'originator_fi_country', 'outgoing_intermediary_fi_bic', 'beneficiary_country', 'beneficiary_account_number', 'beneficiary_branch_id', 'beneficiary_fi_name', 'beneficiary_fi_country']


  if df[col].isnull().any() or 'Not found' in df[col].values:


> g. Shell company characteristics using address and name data. - 5 points

> h. Trade based money laundering - 5 points
<br>
(https://www.fatf-gafi.org/content/fatfgafi/en/publications/Methodsandtrends/Trade-based-money-launderingindicators.html
<br>
https://stats.wto.org/dashboard/tradeconnectivity_en.html)

In [672]:
df.to_excel('MT103_analysis.xlsx') #export to Excel