CS 543 Fall 2023

Project #1

Date: 10/13/2023

Group Number: 4

**Data Preprocessing**

In [1]:
#imports
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col, sum, mean, stddev, median, year, month, count, concat_ws, sha2, desc, asc, lag, when, lit, round, avg, min, max
from pyspark.sql.window import Window

In [2]:
#initialize Spark
spark = SparkSession(sc)

In [3]:
#define the path to the CSV file with transactions data
csv_file_path = '/common/users/shared/cs543_group4/HI-Large_Trans.csv'

#load the data as a spark df
data_df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

#rename columns
data_df = data_df.withColumnRenamed('Timestamp', 'timestamp') \
                 .withColumnRenamed('From Bank', 'bank_from') \
                 .withColumnRenamed('Account2', 'account_from') \
                 .withColumnRenamed('To Bank', 'bank_to') \
                 .withColumnRenamed('Account4', 'account_to') \
                 .withColumnRenamed('Amount Received', 'amount_to') \
                 .withColumnRenamed('Receiving Currency', 'currency_to') \
                 .withColumnRenamed('Amount Paid', 'amount_from') \
                 .withColumnRenamed('Payment Currency', 'currency_from') \
                 .withColumnRenamed('Payment Format', 'payment_format') \
                 .withColumnRenamed('Is Laundering', 'is_laundering')

#convert timestamp column to date format
data_df = data_df.withColumn("timestamp", to_date(data_df["timestamp"], "yyyy/MM/dd HH:mm"))

#add month and year columns
data_df = data_df.withColumn('month', month('timestamp'))
data_df = data_df.withColumn('year', year('timestamp'))

#check count of df
raw_data_count = data_df.count()
print('df with %d rows imported' %raw_data_count)



df with 179702229 rows imported


                                                                                

In [4]:
#check for nulls
null_counts = data_df.select([sum(col(c).isNull().cast('int')).alias(c) for c in data_df.columns])
null_counts.show()

23/10/18 05:27:34 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Timestamp, From Bank, Account, To Bank, Account, Amount Received, Receiving Currency, Amount Paid, Payment Currency, Payment Format, Is Laundering
 Schema: Timestamp, From Bank, Account2, To Bank, Account4, Amount Received, Receiving Currency, Amount Paid, Payment Currency, Payment Format, Is Laundering
Expected: Account2 but found: Account
CSV file: file:///common/users/shared/cs543_group4/HI-Large_Trans.csv

+---------+---------+------------+-------+----------+---------+-----------+-----------+-------------+--------------+-------------+-----+----+
|timestamp|bank_from|account_from|bank_to|account_to|amount_to|currency_to|amount_from|currency_from|payment_format|is_laundering|month|year|
+---------+---------+------------+-------+----------+---------+-----------+-----------+-------------+--------------+-------------+-----+----+
|        0|        0|           0|      0|         0|        0|          0|          0|            0|             0|            0|    0|   0|
+---------+---------+------------+-------+----------+---------+-----------+-----------+-------------+--------------+-------------+-----+----+



                                                                                

In [5]:
#check for duplicate records and store them in a df
#duplicate_df = data_df.exceptAll(data_df.dropDuplicates())

#count the number of duplicate rows
#num_duplicates = duplicate_df.count()
#print('Number of duplicate rows: %d' %num_duplicates)

In [6]:
#check for outliers (+/- 3SD) in amount_to, amount_from columns
amount_from_stats = data_df.select(mean('amount_from').alias('mean_from'), stddev('amount_from').alias('stddev_from'), median('amount_from').alias('median_from')).first()
amount_to_stats = data_df.select(mean('amount_to').alias('mean_to'), stddev('amount_to').alias('stddev_to'), median('amount_to').alias('median_to')).first()

#create outlier dfs that have records with +/- 3SD payment amounts
from_minus_3sd_df = data_df.filter((col('amount_from') < (amount_from_stats.mean_from - 3 * amount_from_stats.stddev_from)))
from_plus_3sd_df = data_df.filter((col('amount_from') > (amount_from_stats.mean_from + 3 * amount_from_stats.stddev_from)))
to_minus_3sd_df = data_df.filter((col('amount_to') < (amount_to_stats.mean_to - 3 * amount_to_stats.stddev_to)))
to_plus_3sd_df = data_df.filter((col('amount_to') > (amount_to_stats.mean_to + 3 * amount_to_stats.stddev_to)))

print('outliers for amount_from are %f +/- %f.  the median for amount_from is %f' %(amount_from_stats.mean_from, amount_from_stats.stddev_from, amount_from_stats.median_from))
print('outliers for amount_to are %f +/- %f.  the median for amount_from is %f' %(amount_to_stats.mean_to, amount_to_stats.stddev_to, amount_to_stats.median_to))

[Stage 13:>                                                         (0 + 1) / 1]

outliers for amount_from are 3974061.499561 +/- 1389274856.969850.  the median for amount_from is 1399.300000
outliers for amount_to are 5606773.502230 +/- 1815450373.623863.  the median for amount_from is 1397.240000


                                                                                

In [7]:
#calculate average conversion rates for each currency pair
conversion_rates = data_df.groupBy('currency_from', 'currency_to').agg(
    avg(data_df['amount_to'] / data_df['amount_from']).alias('conversion_rate'),
    count(lit(1)).alias('num_transactions'),
    min(data_df['amount_to'] / data_df['amount_from']).alias('min_conversion_rate'),
    max(data_df['amount_to'] / data_df['amount_from']).alias('max_conversion_rate'),
    median(data_df['amount_to'] / data_df['amount_from']).alias('median_conversion_rate')
)

conversion_rates.sort(asc('num_transactions')).show() #check to make sure we have a reasonable sample (5+ transactions) to calculate conversion rate

conversion_rates = conversion_rates.withColumn('conversion_rate_diff', col('max_conversion_rate') - col('min_conversion_rate'))
conversion_rates = conversion_rates.withColumn('conversion_rate_diff_perc', col('conversion_rate_diff') / col('max_conversion_rate'))

conversion_rates.sort(desc('conversion_rate_diff_perc')).show() #check to see where the max and min conversion rate differs the most

                                                                                

+-----------------+-----------------+-------------------+----------------+-------------------+-------------------+----------------------+
|    currency_from|      currency_to|    conversion_rate|num_transactions|min_conversion_rate|max_conversion_rate|median_conversion_rate|
+-----------------+-----------------+-------------------+----------------+-------------------+-------------------+----------------------+
|      Brazil Real|            Ruble| 13.779621325296002|               9| 13.778697906146906| 13.781498534951863|    13.779279454722493|
|      Saudi Riyal|            Ruble| 20.721147685411374|               9| 20.571428571428573| 20.746235606731624|     20.74162632294102|
|          Bitcoin|           Shekel| 40123.289047052545|              10|  40102.30179028133|  40137.64624913971|    40124.251069247766|
|      Saudi Riyal|      Swiss Franc|0.24399733382330274|              15|0.24385245901639344|0.24453024453024452|    0.2439227986745251|
|          Bitcoin|      Swiss Fra



+---------------+------------+--------------------+----------------+--------------------+--------------------+----------------------+--------------------+-------------------------+
|  currency_from| currency_to|     conversion_rate|num_transactions| min_conversion_rate| max_conversion_rate|median_conversion_rate|conversion_rate_diff|conversion_rate_diff_perc|
+---------------+------------+--------------------+----------------+--------------------+--------------------+----------------------+--------------------+-------------------------+
|           Yuan|   US Dollar| 0.14930356143378667|          296239| 0.11111111111111112| 0.33333333333333337|   0.14930721458906726| 0.22222222222222227|       0.6666666666666667|
|            Yen|        Yuan| 0.06377803534474567|            1817|0.045454545454545456|               0.125|   0.06354459358836462| 0.07954545454545454|       0.6363636363636364|
|           Yuan|     Bitcoin|1.306865196903374...|            3664|9.090909090909091E-6|2.4999

                                                                                

In [8]:
#convert transactions to usd so they can be scaled properly for decision trees
conversion_rates_usd = conversion_rates.select(col('currency_from').alias('currency_from_conv'),col('conversion_rate')).filter((col('currency_to') == 'US Dollar'))
#conversion_rates_usd.show()

data_df_usd = data_df.join(conversion_rates_usd, data_df.currency_from == conversion_rates_usd.currency_from_conv, 'inner')
data_df_usd = data_df_usd.withColumnRenamed('conversion_rate', 'conversion_rate_from').drop('currency_from_conv').withColumn('amount_from_usd', col('amount_from')*col('conversion_rate_from'))

data_df_usd = data_df_usd.join(conversion_rates_usd, data_df_usd.currency_to == conversion_rates_usd.currency_from_conv, 'inner')
data_df_usd = data_df_usd.withColumnRenamed('conversion_rate', 'conversion_rate_to').drop('currency_from_conv').withColumn('amount_to_usd', col('amount_to')*col('conversion_rate_to'))
data_df_usd.show()

23/10/18 05:32:16 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Timestamp, From Bank, Account, To Bank, Account, Amount Received, Receiving Currency, Amount Paid, Payment Currency, Payment Format, Is Laundering
 Schema: Timestamp, From Bank, Account2, To Bank, Account4, Amount Received, Receiving Currency, Amount Paid, Payment Currency, Payment Format, Is Laundering
Expected: Account2 but found: Account
CSV file: file:///common/users/shared/cs543_group4/HI-Large_Trans.csv

+----------+---------+------------+-------+----------+----------+-----------+-----------+-------------+--------------+-------------+-----+----+--------------------+------------------+--------------------+------------------+
| timestamp|bank_from|account_from|bank_to|account_to| amount_to|currency_to|amount_from|currency_from|payment_format|is_laundering|month|year|conversion_rate_from|   amount_from_usd|  conversion_rate_to|     amount_to_usd|
+----------+---------+------------+-------+----------+----------+-----------+-----------+-------------+--------------+-------------+-----+----+--------------------+------------------+--------------------+------------------+
|2022-08-01|    34377|   801BEB6D0|  34377| 801BEB6D0|  99492.58|        Yen|   99492.58|          Yen|  Reinvestment|            0|    8|2022|0.009474283158489959| 942.6208750887149|0.009474283158489959| 942.6208750887149|
|2022-08-01|    34371|   801CA4900|  34371| 801CA4900| 163760.09|        Yen|  163760.09|          Yen| 

                                                                                

In [9]:
#recheck for outliers (+/- 3SD) with amounts in usd
amount_from_usd_stats = data_df_usd.select(mean('amount_from_usd').alias('mean_from_usd'), stddev('amount_from_usd').alias('stddev_from_usd'), median('amount_from_usd').alias('median_from_usd')).first()
amount_to_usd_stats = data_df_usd.select(mean('amount_to_usd').alias('mean_to_usd'), stddev('amount_to_usd').alias('stddev_to_usd'), median('amount_to_usd').alias('median_to_usd')).first()

#create outlier dfs that have records with +/- 3SD payment amounts
from_minus_3sd_df = data_df_usd.filter((col('amount_from_usd') < (amount_from_usd_stats.mean_from_usd - 3 * amount_from_usd_stats.stddev_from_usd)))
from_plus_3sd_df = data_df_usd.filter((col('amount_from_usd') > (amount_from_usd_stats.mean_from_usd + 3 * amount_from_usd_stats.stddev_from_usd)))
to_minus_3sd_df = data_df_usd.filter((col('amount_to_usd') < (amount_to_usd_stats.mean_to_usd - 3 * amount_to_usd_stats.stddev_to_usd)))
to_plus_3sd_df = data_df_usd.filter((col('amount_to_usd') > (amount_to_usd_stats.mean_to_usd + 3 * amount_to_usd_stats.stddev_to_usd)))

print('outliers for amount_from_usd are %f +/- %f.  the median for amount_from_usd is %f' %(amount_from_usd_stats.mean_from_usd, amount_from_usd_stats.stddev_from_usd, amount_from_usd_stats.median_from_usd))
print('outliers for amount_to_usd are %f +/- %f.  the median for amount_from_usd is %f' %(amount_to_usd_stats.mean_to_usd, amount_to_usd_stats.stddev_to_usd, amount_to_usd_stats.median_to_usd))

                                                                                

outliers for amount_from_usd are 308769.701271 +/- 38648241.196831.  the median for amount_from_usd is 833.329229
outliers for amount_to_usd are 308766.488507 +/- 38649611.481540.  the median for amount_from_usd is 833.336258


In [10]:
#create unique id for bank + account number by hashing the bank and account number
data_df_usd = data_df_usd.withColumn("unique_id_from", sha2(concat_ws("_", "bank_from", "account_from"), 256))
data_df_usd = data_df_usd.withColumn("unique_id_to", sha2(concat_ws("_", "bank_to", "account_to"), 256))


In [11]:
#check if receiving currency and payment currency have the same values for encoding

distinct_currency_from = data_df_usd.select('currency_from').distinct().orderBy('currency_from').rdd.flatMap(lambda x: x).collect()
distinct_currency_to = data_df_usd.select('currency_to').distinct().orderBy('currency_to').rdd.flatMap(lambda x: x).collect()

print('Distinct currencies in paid and received are the same: ' + str(distinct_currency_from == distinct_currency_to))



Distinct currencies in paid and received are the same: True


                                                                                

In [12]:
#check number of records by month
count_by_month = data_df_usd.groupBy('month', 'year').agg(count('*').alias('count')).sort(asc('year'), asc('month'))
count_by_month.show()

#documentation shows data is generated for 8/1-11/5 so we can drop all data after 11/5 
#documentation mentions that data after 11/5 are subsequent laundering transactions for chains that started within the timeframe but did not end within the timeframe
aug_nov_df = data_df_usd.filter((col('month') <= 11) & (col('month') >=8))

#get new total count of records
num_rows = aug_nov_df.count()

#get new counts by month and percentage of total data
count_by_month = aug_nov_df.groupBy('month', 'year').agg(count('*').alias('count'), (count('*')/num_rows).alias('percentage')).sort(asc('year'), asc('month'))
count_by_month.show()

                                                                                

+-----+----+--------+
|month|year|   count|
+-----+----+--------+
|    8|2022|59468136|
|    9|2022|56137121|
|   10|2022|54821376|
|   11|2022| 9269215|
|   12|2022|    6132|
|    1|2023|     249|
+-----+----+--------+





+-----+----+--------+-------------------+
|month|year|   count|         percentage|
+-----+----+--------+-------------------+
|    8|2022|59468136|  0.330937729846713|
|    9|2022|56137121| 0.3124007684362301|
|   10|2022|54821376|0.30507870165147055|
|   11|2022| 9269215|0.05158280006558638|
+-----+----+--------+-------------------+



                                                                                

In [13]:
#convert timestamp to min max


In [14]:
#encode categorical columns

#scale continuous columns