In [14]:
import os 
import numpy as np
import pandas as pd
import seaborn as sns 
from math import isnan
import multiprocessing

import matplotlib.pyplot as plt

from graphframes import *

########## START - PYSPARK ##########
from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import col, expr, count, to_timestamp, monotonically_increasing_id, \
    desc, sum as _sum, min, max, rand, when, \
    datediff, dayofmonth, weekofyear, month, year, hour, dayofweek, \
    unix_timestamp, array, lit, round

from pyspark.ml import Pipeline
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import StandardScaler, VectorAssembler, StringIndexer 
########## END - PYSPARK ##########

In [2]:
# Count available cores
cores = multiprocessing.cpu_count()
# In this case the amount of executors will be equal to the amount of cores
instances = cores

spark = SparkSession.builder \
          .appName("AMD-SM2L Joint Project") \
          .config("spark.driver.memory", "3g") \
          .config("spark.executor.memory", "4g") \
          .config("spark.executor.instances", cores) \
          .config("spark.executor.cores", cores//instances) \
          .config("spark.sql.shuffle.partitions", cores) \
          .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
          .config("spark.sql.execution.arrow.enabled", "true") \
          .getOrCreate()

spark.sparkContext.setLogLevel("OFF")
dataframe = spark.read.parquet("src/datasets/my_HI-Small_Trans.parquet", header=True)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/18 14:57:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

# Feature computing

In [None]:
class FeatureManager:
    def __init__(self, dataframe):
        # Percorso del file Parquet
        self.origDF = dataframe
        
    def compute_features_of_whole_df(self):
        self.df_features = self.origDF\
            .withColumnRenamed('receiving_currency', 'rec_cur')\
            .withColumnRenamed('payment_currency', 'pay_cur')\
            .withColumnRenamed('payment_format', 'pay_for')\

        currencies = self.df_features.select('rec_cur').distinct().union(self.df_features.select('pay_cur').distinct())
        
        currency = StringIndexer(inputCol='rec_cur', outputCol='receiving_currency')
        payment_format = StringIndexer(inputCol='pay_for', outputCol='payment_format')
        
        rec_currency_model = currency.fit(currencies)
        self.df_features = rec_currency_model.transform(self.df_features)
        pay_currency_model = rec_currency_model.setInputCol('pay_cur').setOutputCol('payment_currency')
        self.df_features = pay_currency_model.transform(self.df_features)
        
        payment_format_model = payment_format.fit(self.df_features)
        self.df_features = payment_format_model.transform(self.df_features)
        self.ach_mapping = {v: k for k, v in dict(enumerate(payment_format_model.labels)).items()}
        
        column_order = ['id', 'timestamp',
                        'from_account','to_account','same_account',
                        'from_bank','to_bank','same_bank',
                        'amount_received','amount_paid','amount_difference','same_amounts',
                        'receiving_currency','payment_currency','same_currency',
                        'payment_format', 'is_laundering']
        
        self.df_features = self.df_features\
                .withColumn('same_bank', (col('from_bank')==col('to_bank')).cast('integer'))\
                .withColumn('same_account', (col('from_account')==col('to_account')).cast('integer'))\
                .withColumn('same_currency', (col('receiving_currency')==col('payment_currency')).cast('integer'))\
                .withColumn('same_amounts', (col('amount_received')==col('amount_paid')).cast('integer'))\
                .withColumn('amount_difference', (col('amount_paid')-col('amount_received')))\
                .select(column_order)

        #self.df_features.write.parquet('src/datasets/HI-Small_features.parquet')

    def compute_timestamp_features(self, train=True):
        df = self.train_df if train else self.test_df

        df = df\
            .withColumn('week', weekofyear("timestamp"))\
            .withColumn('day_of_month', dayofmonth("timestamp"))\
            .withColumn('day_of_week', dayofweek("timestamp"))\
            .withColumn('hour', hour("timestamp"))

        hour_FA = df.groupBy('from_account','hour').agg(count('*').alias('transactions_same_hour_fa'))
        day_of_month_FA = df.groupBy('from_account','day_of_month').agg(count('*').alias('transactions_same_day_fa'))
        week_FA = df.groupBy('from_account','week').agg(count('*').alias('transactions_same_week_fa'))
        
        hour_FA_TA = df.groupBy('from_account','to_account', 'hour').agg(count('*').alias('transactions_same_hour_fata'))
        day_of_month_FA_TA = df.groupBy('from_account','to_account','day_of_month').agg(count('*').alias('transactions_same_day_fata'))
        week_FA_TA = df.groupBy('from_account','to_account','week').agg(count('*').alias('transactions_same_week_fata'))
        
        df = df\
            .join(hour_FA, ['from_account', 'hour'], 'left')\
            .join(day_of_month_FA, ['from_account', 'day_of_month'], 'left')\
            .join(week_FA, ['from_account', 'week'], 'left')\
            .join(hour_FA_TA, ['from_account', 'to_account', 'hour'], 'left')\
            .join(day_of_month_FA_TA, ['from_account', 'to_account', 'day_of_month'], 'left')\
            .join(week_FA_TA, ['from_account', 'to_account', 'week'], 'left')
        
        if train: 
            self.train_df = df
        else: 
            self.test_df = df

    def dataset_sampling(self):
        # ha senso campionare il dataset prendendo quei giorni in cui le transazioni non hanno uno sbilanciamento di classe troppo elevato
        # posso prendere dal giorno 11 in poi e anche un paio di giorni del 
        return 

    def drop_columns(self):
        cols = ('rec_cur','pay_cur','pay_for','from_bank','to_bank','from_account','to_account')
        self.dataframe = self.dataframe.drop(*cols)

    def split_original_datasets(self):
        # classes are unbalanced, so we need to take a similar number of laundering and non laundering transactions:
        # in order to do that the dataset is filtered taking randomically a limited number of non laundering transactions 
        # and then laundering transactions are added 
        self.origDF = self.origDF.drop('from_bank','to_bank')
        launderings = self.origDF.filter('is_laundering==1')

        print('\nSplitting dataframe into train and test set...')
        sample_non_laundering = self.origDF.filter('is_laundering==0').orderBy(rand()).limit(launderings.count())
        sample_dataset = sample_non_laundering.union(launderings.filter('is_laundering==1'))

        self.train_df, test_df = sample_dataset.randomSplit([0.8, 0.2])

        subtracted = self.origDF.subtract(sample_dataset)
        self.test_df = test_df.union(subtracted)   
        print('Dataframe splitted')

    def save_dataframes(self, path):
        self.train_df.write.parquet(path.format('train_set'))
        self.test_df.write.parquet(path.format('test_set'))

## Compute features of the whole dataframe

In [None]:
manager = FeatureManager(dataframe)
manager.compute_features_of_whole_df()

                                                                                

In [None]:
manager.split_original_datasets()
manager.compute_timestamp_features(True)
manager.compute_timestamp_features(False)
manager.save_dataframes('src/datasets/HI_Small_{}.parquet')


Splitting dataframe into train and test set...


                                                                                

Dataframe splitted


                                                                                

# Compute features of the graph
The next step is to understand the structure of the different patterns in order to identify further features


<img src="src/images/patterns.png" style="width: 600px">


In order to do that, I thought that the best solution was to process the dataset using GraphFrames, a package for Apache Spark which provides DataFrame-based Graph.

Looking at the image below, it would be beneficial to process certain features for each node in the graph to gain valuable insights into the transactions:
1. **Compute the number of in-out edges (fan-in, fan-out)** <br>
    A transaction involves an exchange between two accounts, and it would be valuable to calculate the connection degrees for each account:
    * In-out degrees for the sender account
    * In-out degrees for the receiver account
    <br><br>
2. **Identify intermediary transactions (scatter-gather)** <br>
    By analyzing the flow of transactions, we can identify intermediary transactions. These are transactions that act as intermediaries, facilitating the movement of funds between multiple accounts
    <br><br>
3. **Detect forwarding transactions** <br>
    An account receives a sum of money and then forwards it to another account
    <br><br>
4. **Check for intermediate transactions between two transactions** <br>
    We can check if certain transactions act as intermediaries between two other transactions
    <br><br>

____

In [133]:
class MyGraph:
    # create the graph using the vertices and edges found in the dataset taken into account (train or test)
    def __init__(self, df):
        self.df = df
        self.ids = self.df.select('id','from_account','to_account')
        self.vertices, self.edges, self.g = self.create_graph()
        self.compute_inOut_degrees()

    def create_graph(self, init=True):
        vertices = self.df.select("from_account")\
                            .withColumnRenamed('from_account', 'id')\
                            .union(self.ids.select("to_account"))\
                            .distinct()
        if init:
            edges = self.df.withColumnRenamed('from_account', 'src')\
                .withColumnRenamed('to_account', 'dst')
        else:
            edges = self.df.withColumnRenamed('from_account', 'src')\
                .withColumnRenamed('to_account', 'dst').filter('from_account!=to_account and receiving_currency==payment_currency and payment_format="ACH"')\
                .select('id','timestamp','src','dst','payment_currency','payment_format')

        g = GraphFrame(vertices, edges)
        return vertices, edges, g

    def compute_inOut_degrees(self):
        # for each account, it computes the number of ingoing and outgoing transactions 
        vertexInDegrees = self.g.inDegrees
        vertexOutDegrees = self.g.outDegrees
        vertices = vertexInDegrees.join(vertexOutDegrees, 'id', 'fullouter').fillna(0)
        
        vertices = vertices.withColumnRenamed('id', 'from_account')
        self.ids = self.ids.alias('df').join(vertices.alias('vertices'), 'from_account', 'left')\
                        .withColumnRenamed('inDegree','from_account_inDegree')\
                        .withColumnRenamed('outDegree','from_account_outDegree')

        vertices = vertices.withColumnRenamed('from_account', 'to_account')
        self.ids = self.ids.join(vertices.alias('vertices'), 'to_account', 'left')\
                    .withColumnRenamed('inDegree','to_account_inDegree')\
                    .withColumnRenamed('outDegree','to_account_outDegree')

    def get_forwards(self):
        # it consists in getting all transactions in which the receiver of the transaction 
        # sends the same amount of received money to another account
        # OUTPUT: id of inolved transactions where:
        # - before_forward: 1 if a transaction is that one before a secondly forwarding transaction
        # - forward: 1 if a transaction is that one that makes the forward

        motif = "(a)-[e]->(b); (b)-[e2]->(c)"
        forwards = self.g.find(motif).filter("e.amount_received == e2.amount_paid and e.timestamp <= e2.timestamp and a!=b and b!=c")
    
        before_forward = forwards.select(col('e.id').alias('id'))\
            .distinct()\
            .withColumn('before_forward',lit(1))
        # distinct: I can use it, or I can count how many times the id is involved
        forward = forwards.select(col('e2.id').alias('id'))\
            .distinct()\
            .withColumn('forward',lit(1))
        # distinct: I can use it, or I can count how many times the id is involved
    
        self.forwards = before_forward.join(forward, 'id','left')#.na.fill(value=0,subset=['before_forward','forward'])
     
    def same_or_similar(self):
        # it search if for each transaction there is:
        # - another transaction with the same attributes, except the amounts (exists_same)
        # - another transaction with similar attributes, except the timestamps and amounts (exists_similar)
        motif = "(a)-[t1]->(b); (a)-[t2]->(b)"

        same_where = 't1.timestamp == t2.timestamp and \
                        t1.payment_currency == t2.payment_currency and \
                        t1.receiving_currency == t2.receiving_currency and \
                        t1.payment_format == t2.payment_format and \
                        t1.amount_paid != t2.amount_paid and \
                        t1.id != t2.id'
        
        self.same = self.g.find(motif).filter(same_where).select('t1.id').withColumn('exists_same',lit(1)).distinct()

        similar_where = 't1.timestamp != t2.timestamp and \
                        t1.payment_currency == t2.payment_currency and \
                        t1.receiving_currency == t2.receiving_currency and \
                        t1.payment_format == t2.payment_format and \
                        t1.amount_paid != t2.amount_paid'

        
        self.similar = self.g.find(motif).filter(similar_where).select('t1.id').withColumn('exists_similar',lit(1)).distinct()
########## START - FAN PATTERN ##########
    def compute_fan_in(self):
        """
            as explained in undestand_pattern.ipynb it is useful to compute the following feature: 
            - for each to_account, the number of incoming nodes to the same bank and all in node must have the same: 
                * receiving_currency 
                * payment_currency
                * payment_format
                * there must be at most 4 days between the first transaction and the last in the series
        """
        motif = "(a)-[t1]->(b); (c)-[t2]->(b)"
        
        fan_in_query = 'abs(datediff(t1.timestamp, t2.timestamp)) <= 4 and \
                    t1.payment_currency == t2.payment_currency and \
                    t1.receiving_currency == t2.receiving_currency and \
                    t1.payment_format == t2.payment_format'
                
        fan_in = self.g.find(motif).filter(fan_in_query).select('a', 'b', 't1')
        fan_in = fan_in.groupBy('a', 'b', 't1').count().select('t1.id',col('count').alias('fan_in_degree'))

        return fan_in

    def compute_fan_out(self):
        """
            as explained in undestand_pattern.ipynb it is useful to compute the following feature: 
            - for each from_account, the number of outgoing nodes to the same bank and all in node must have the same: 
                * payment_format
                * there must be at most 4 days between the first transaction and the last in the series
            
            in order to handle the big amount of data, data are firstly filtered:
            - self transaction (from_account == to_account) doesn't exist in the same fan-out
            - two similar transactions (t1(from_account, to_account) == t2(from_account, to_account) ) don't exist in the same fan-out 
            - fan-outs have ACH payment_format
        """
        _, _, g = self.create_graph(False)

        motif = "(a)-[t1]->(b); (a)-[t2]->(c)"
        
        fan_out_query = 'abs(datediff(t1.timestamp, t2.timestamp)) <= 4 and \
                        a != b and a != c and b != c and\
                        t1.id != t2.id'
                
        fan_out = g.find(motif).filter(fan_out_query).select('a', 'b', 'c', 't1.id')
        fan_out = fan_out.groupBy('a','b','c','id').count()
        fan_out = fan_out.groupBy('id').agg(count('*').alias('fan_out_degree')).select('id', 'fan_out_degree').withColumn('fan_out_degree', col('fan_out_degree')+1)
        
        return fan_out
    
    def compute_fan(self):
        fan_in = self.compute_fan_in()
        fan_out = self.compute_fan_out()  
        
        self.fans = fan_in.join(fan_out, 'id', 'fullouter')
########## END - FAN PATTERN ##########

########## START - CYCLE PATTERN ##########
    def generate_combinations(self,accounts):
        accounts = set(accounts)
        conditions = []
        for acc in accounts: 
            for acc2 in accounts:
                if acc!=acc2:
                    if ((acc, acc2) not in conditions) and ((acc2, acc) not in conditions):
                        conditions.append((acc,acc2))

        return ' and '.join(conditions[k][0]+'!='+conditions[k][1] for k in range(len(conditions)))

    def build_rules_of_cycles(self, max_iter):
        alphabet = list(map(chr, range(97, 123)))
        start = 2

        rules = []
        
        for i in range(start-1, max_iter+1):
            full_rule = []
            single_query = []
            transactions = []
            receiving_accounts = []
            select = []
            accounts = []
            for j in range(0, i+1):
                receiving_account = alphabet[j+1] if j < i else alphabet[0]
                receiving_accounts.append(receiving_account)

                single_transaction = 't{}'.format(j+1)
                transactions.append(single_transaction)

                starting_account = alphabet[j]

                single_rule = "({})-[{}]->({})".format(starting_account, single_transaction, receiving_account)
                accounts.append(starting_account)
                accounts.append(receiving_account)

                full_rule.append(single_rule)    
                select.append(single_transaction)
            
            single_query.append(' and '.join(transactions[k] + '.timestamp <= ' + transactions[k+1] + '.timestamp' for k in range(len(transactions) - 1)))
            single_query.append(self.generate_combinations(accounts))
        
            rules.append(('; '.join(full_rule), ' and '.join(single_query), select, (i+1)))

        return rules

    def find_cycles(self, max_iter):
        # this method obtains 3 features: 
        # - min_cycle: != 0 if the transaction is the starting one of a cycle
        # - max_cycle: != 0 if the transaction is the starting one of a cycle (== min_cycle if there's only that kind of degree)
        # - involved: 1 if the transaction is involved in a cycle, 0 otherwise
        _, _, g = self.create_graph(False)
        
        created_df = False

        max_iter = 1 if (max_iter-2) < 1 else max_iter-1
        rules = self.build_rules_of_cycles(max_iter)
        
        for rule in rules: 
            motif, query, select, degree = rule
            degree_cycle = g.find(motif).filter(query)
            
            for sel in range(len(select)): 
                if sel==0:
                    new_col = 'start'
                    select_id = '{}.id'.format(select[sel])
                else:
                    new_col = 't{}_id'.format(sel+1)
                    select_id = '{}.id'.format(select[sel])

                select[sel] = select_id
                degree_cycle = degree_cycle.select(select).withColumnRenamed('id', new_col)
                select[sel] = new_col

            degree_cycle_start = degree_cycle.select('start').distinct().withColumnRenamed('start', 'id')
            degree_cycle_involved = degree_cycle.drop('start')
            degree_cycle_involved = degree_cycle_involved.select(array([col(column) for column in degree_cycle_involved.columns])\
                                               .alias('id')).selectExpr('explode(id) as id').distinct()

            startings = self.in_cycle(degree_cycle_start, degree) # != 0 if a transaction is the starting one of a cycle
            print("adding cycles of degree {}...".format(degree))
            if not created_df: 
                starting_cycles = startings
                intermediaries_cycles = degree_cycle_involved
                created_df = True
            else:
                starting_cycles = starting_cycles.union(startings)
                intermediaries_cycles = intermediaries_cycles.union(degree_cycle_involved)

        starting_cycles = starting_cycles.groupBy('id').agg(
            min("min_cycle").alias("min_cycle"),
            max("max_cycle").alias("max_cycle")
        )
        
        intermediaries_cycles = intermediaries_cycles.distinct()
        
        self.cycles = starting_cycles.join(intermediaries_cycles, 'id','fullouter').withColumn('involved', lit(1))
            
    def in_cycle(self, cycle_subset, degree):
        cycle_subset = cycle_subset\
            .withColumn('min_cycle', lit(degree))\
            .withColumn('max_cycle', lit(degree))
                
        return cycle_subset        
    
########## END - CYCLE PATTERN ########## 
    def page_rank(self):
        res = self.g.pageRank(resetProbability=0.15, tol=0.1) 
        edges = res.edges.select('src','dst','weight','id')\
            .withColumnRenamed('src','from_account').withColumnRenamed('dst','to_account')\
            .withColumn('weight_t', round(col('weight'),2))\
            .drop('weight')

        vertices = res.vertices.withColumn('page_rank', round(col('pagerank'),2)).select('id','page_rank').withColumnRenamed('id','from_account')
        edges = vertices.join(edges, 'from_account', 'right').withColumnRenamed('page_rank','fa_pagerank')
        vertices = vertices.withColumnRenamed('from_account', 'to_account')
        edges = vertices.join(edges, 'to_account', 'right').withColumnRenamed('page_rank','ta_pagerank')\
            .select('id','fa_pagerank','ta_pagerank','weight_t')

        self.ids = self.ids.join(edges, 'id', 'left')

    def join_ids(self):
        self.ids = self.ids.drop('from_account','to_account')
        self.ids = self.ids.join(self.forwards, 'id','left')
        self.ids = self.ids.join(self.similar, 'id', 'left').join(self.same, 'id', 'left')
        self.ids = self.ids.join(self.fans, 'id','left')

    def join_dataframe(self):
        self.df = self.df.join(self.ids, 'id', 'left').na.fill(value=0,subset=['exists_similar']).na.fill(value=1,subset=['fan_out_degree', 'fan_in_degree'])


In [134]:
train = spark.read.parquet("src/datasets/HI_Small_train_set.parquet", header=True)
test = spark.read.parquet("src/datasets/HI_Small_test_set.parquet", header=True)
dataframe = spark.read.parquet("src/datasets/HI-Small_features.parquet", header=True).select('id', 'same_account', 'same_bank', 'amount_difference', 'same_amounts', 'receiving_currency', 'payment_currency', 'same_currency', 'payment_format', 'is_laundering')

## Train set

In [140]:
train_graph = MyGraph(train)
train_graph.get_forwards()
train_graph.same_or_similar()
train_graph.compute_fan()

In [141]:
train_graph.join_ids()

train_graph.ids = train_graph.ids.drop('before_forward','forward','exists_same')
train_graph.page_rank()

train_graph.df = train_graph.df.join(train_graph.ids, 'id', 'left').na.fill(value=0,subset=['exists_similar'])\
    .na.fill(value=1,subset=['fan_out_degree', 'fan_in_degree'])

In [142]:
# seleziono solo alcuni campi, dato che le features calcolate dal mygraph sono sporche (in quanto il dataset viene campionato troppo dividendolo in train e set)
train = train_graph.df.select('id',
 'week',
 'day_of_month',
 'hour',
 'amount_received',
 'amount_paid',
 'day_of_week',
 'transactions_same_hour_fa',
 'transactions_same_day_fa',
 'transactions_same_week_fa',
 'transactions_same_hour_fata',
 'transactions_same_day_fata',
 'transactions_same_week_fata',
 'from_account_inDegree',
 'from_account_outDegree',
 'to_account_inDegree',
 'to_account_outDegree',
 'exists_similar',
 'fan_in_degree',
 'fan_out_degree',
 'fa_pagerank','ta_pagerank','weight_t')

train = train.join(dataframe, 'id', 'left')
#train.write.parquet("src/datasets/train_all_feats.parquet")

In [143]:
train.write.parquet("src/datasets/train_page_rank.parquet")

                                                                                

## Test set

In [147]:
test_graph = MyGraph(test)
test_graph.get_forwards()
test_graph.same_or_similar()
test_graph.compute_fan()

In [None]:
test_graph.join_ids()

test_graph.ids = test_graph.ids.drop('before_forward','forward','exists_same')
test_graph.page_rank()

test_graph.df = test_graph.df.join(test_graph.ids, 'id', 'left').na.fill(value=0,subset=['exists_similar'])\
    .na.fill(value=1,subset=['fan_out_degree', 'fan_in_degree'])


# come per il caso sopra, seleziono solo alcuni campi, dato che le features calcolate dal mygraph sono sporche (in quanto il dataset viene campionato troppo dividendolo in train e set)
test = test_graph.df.select('id',
 'week',
 'day_of_month',
 'hour',
 'amount_received',
 'amount_paid',
 'day_of_week',
 'transactions_same_hour_fa',
 'transactions_same_day_fa',
 'transactions_same_week_fa',
 'transactions_same_hour_fata',
 'transactions_same_day_fata',
 'transactions_same_week_fata',
 'from_account_inDegree',
 'from_account_outDegree',
 'to_account_inDegree',
 'to_account_outDegree',
 'exists_similar',
 'fan_in_degree',
 'fan_out_degree',
 'fa_pagerank','ta_pagerank','weight_t')

test = test.join(dataframe, 'id', 'left')
#test.write.parquet("src/datasets/test_all_feats.parquet")
test.write.parquet("src/datasets/test_page_rank.parquet")

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Users/simonemalesardi/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/simonemalesardi/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/simonemalesardi/opt/anaconda3/envs/amd_sm2l/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 