In [1]:
#packages
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Imputer
import math as m
from pyspark.ml.stat import Correlation
import numpy as np
import pandas as pd

In [2]:
# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("Project 1")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/15 16:46:12 WARN Utils: Your hostname, Lachys-Laptop, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/09/15 16:46:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/15 16:46:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
#reading in data\n

tbl_merchants_raw = spark.read.parquet('data/tables/merchant_data/tbl_merchants.parquet')
consumer_user_details = spark.read.parquet('data/tables/merchant_data/consumer_user_details.parquet')
transactions21 = spark.read.parquet('data/tables/transaction_data/transactions_20210228_20210827_snapshot/')
transactions2122 = spark.read.parquet('data/tables/transaction_data/transactions_20210828_20220227_snapshot/')
transactions22 = spark.read.parquet('data/tables/transaction_data/transactions_20220228_20220828_snapshot/')
con_fraud_prob = spark.read.option("header","true").csv('data/tables/merchant_data/consumer_fraud_probability.csv')
merch_fraud_prob = spark.read.option("header", "true").csv('data/tables/merchant_data/merchant_fraud_probability.csv')

tbl_consumer_raw = spark.read.option("header", "true").csv('data/tables/merchant_data/tbl_consumer.csv')

transactions = transactions21.unionByName(transactions2122)
transactions = transactions.unionByName(transactions22)

                                                                                

In [4]:
#Functions
def OHE_variables(data, cat_nom_columns, cat_ord_columns):
    
    """Indexes and encodes categorical features"""

    for c in cat_nom_columns:
        indexer = StringIndexer(inputCol=c, outputCol=str(c) + '_index')
        
        indexed_df = indexer.fit(data).transform(data)
        data.drop(c)
        encoder = OneHotEncoder(inputCol=str(c)+'_index', outputCol=str(c)+'_OHE')
        encoded_df = encoder.fit(indexed_df).transform(indexed_df)
        data.drop(str(c)+'_index')

    for c in cat_ord_columns:
        indexer = StringIndexer(inputCol=c, outputCol=str(c) + '_index')
        indexed_df = indexer.fit(data).transform(data)
        data.drop(c)
        
    return data

def find_NULL(dfs):
    for df in dfs:
        condition = f.lit(False)
        for col_name in df.columns:
            condition = condition | f.col(col_name).isNull()

        df.filter(condition).show()
    return df.filter(condition).count()

def filter_outliers(data, variables):
    
    """filters outliers of continuous data"""

    n=data.count()
    for feature in variables:
        # Calculate Q1 and Q3
        quantiles = data.approxQuantile(feature, [0.25, 0.75], 0.01)
        q1, q3 = quantiles
        iqr = q3 - q1

        #from ADS lecture slides, n>>100
        scale = m.sqrt(m.log(n)) - 0.5
        if scale<3:
            scale=3
        lower_bound = q1 - scale * iqr
        upper_bound = q3 + scale * iqr
        if lower_bound<0:
            data = data.filter((col(feature) >= 0) & (col(feature) <= upper_bound))
        else:
            data = data.filter((col(feature) >= lower_bound) & (col(feature) <= upper_bound))
    
    return data

def corr_func(data, CORR_COLS):

    """A function to return the correlation matrix of correlation between variables"""

    features = "correlation_features"

    assembler = VectorAssembler(
        inputCols=CORR_COLS, 
        outputCol=features 
    )
    
    feature_vector = assembler.transform(data).select(features)

    corr_matrix_dense = Correlation.corr(feature_vector, features)
    corr_matrix_dense.collect()
    corr_matrix = corr_matrix_dense.collect()[0][0].toArray().tolist()

    return corr_matrix

In [5]:
#cleaning tags
string = "name|address|state|postcode|gender|consumer_id"

# Clean consumer table
tbl_consumer = (
    tbl_consumer_raw
    .withColumn("cust_name", f.split(col(string), "\\|").getItem(0))
    .withColumn("address", f.split(col(string), "\\|").getItem(1))
    .withColumn("state", f.split(col(string), "\\|").getItem(2))
    .withColumn("postcode", f.split(col(string), "\\|").getItem(3))
    .withColumn("gender", f.split(col(string), "\\|").getItem(4))
    .withColumn("consumer_id", f.split(col(string), "\\|").getItem(5))
    .drop(string)
)


# Clean merchants table
tbl_merchants = (
    tbl_merchants_raw
    # remove leading (( or [[ and trailing )) or ]]
    .withColumn(
        "tags_clean",
        f.regexp_replace(
            "tags",
            r"^\(\(|^\(\[|^\[\(|^\[\[|\)\)$|\]\)$|\)\]$|\]\]$",
            ""
        )
    )
    # split on `), (` or `], [`
    .withColumn("tags_array", f.split("tags_clean", r"\)\s*,\s*\(|\]\s*,\s*\["))
    # extract each element
    .withColumn("biz_tags", f.lower(f.col("tags_array")[0]))
    .withColumn("rev_band", f.col("tags_array")[1])
    .withColumn("take_rate", f.regexp_extract(f.col("tags_array")[2], r"take rate:\s*([0-9.]+)", 1)
    )
    .drop("tags", "tags_clean", "tags_array")
)

In [6]:
merchant_transactions=transactions.join(tbl_merchants, on='merchant_abn', how='left')
find_NULL([merchant_transactions])

                                                                                

+------------+-------+------------------+--------------------+--------------+----+--------+--------+---------+
|merchant_abn|user_id|      dollar_value|            order_id|order_datetime|name|biz_tags|rev_band|take_rate|
+------------+-------+------------------+--------------------+--------------+----+--------+--------+---------+
| 29566626791|      8| 74.15732460440282|71a81652-cc91-4bf...|    2021-08-20|NULL|    NULL|    NULL|     NULL|
| 32234779638|  18490|107.14809429376949|20149572-a55b-41f...|    2021-08-20|NULL|    NULL|    NULL|     NULL|
| 67202032418|     20| 55.46394975814555|a29071b4-29b3-4f2...|    2021-08-20|NULL|    NULL|    NULL|     NULL|
| 32461318592|     23| 613.9306657410166|4b2e2154-65d8-44f...|    2021-08-20|NULL|    NULL|    NULL|     NULL|
| 32234779638|     25| 87.15685629102919|e6763664-e95f-4eb...|    2021-08-20|NULL|    NULL|    NULL|     NULL|
| 23633724513|     26|3459.2423030023524|fee9ead7-9ce2-4a4...|    2021-08-20|NULL|    NULL|    NULL|     NULL|
|

                                                                                

580830

In [7]:
merchant_transactions = merchant_transactions.dropna()

In [16]:
tbl_consumer.select('address').show(truncate=False)

+-----------------------------+
|address                      |
+-----------------------------+
|413 Haney Gardens Apt. 742   |
|3764 Amber Oval              |
|40693 Henry Greens           |
|00653 Davenport Crossroad    |
|9271 Michael Manors Suite 651|
|2706 Stewart Oval Suite 588  |
|122 Brandon Cliff            |
|6804 Wright Crest Suite 311  |
|5813 Denise Land Suite 690   |
|461 Ryan Common Suite 734    |
|33983 Kevin Drive Suite 628  |
|13706 Kimberly Port          |
|0236 Mills Land Suite 203    |
|8943 Kenneth Camp            |
|60495 Ryan Hill              |
|9671 Jacob Harbors Suite 431 |
|44353 Nathan Ridge           |
|89400 Torres Fort            |
|68657 Johnson Glen Suite 266 |
|790 Ramos Landing            |
+-----------------------------+
only showing top 20 rows


In [11]:
#filter outliers by revenue band

n = merchant_transactions.count()
# compute the scale factor
scale = m.sqrt(m.log(n)) - 0.5
stats_by_band = (merchant_transactions.groupby('rev_band')
                                      .agg(f.expr("percentile_approx(dollar_value, 0.25)").alias("Q1"),
                                           f.expr("percentile_approx(dollar_value, 0.75)").alias("Q3")
                ).withColumn("IQR", f.col("Q3") - f.col("Q1"))
                 .withColumn("lower_bound", f.col("Q1") - scale * f.col("IQR"))
                 .withColumn("upper_bound", f.col("Q3") + scale * f.col("IQR"))
                )
stats_by_band = stats_by_band.drop('IQR')

                                                                                

In [12]:
merchant_transactions = (
    merchant_transactions
    .join(stats_by_band, on="rev_band", how="left")
    .filter(
        (col("dollar_value") >= col("lower_bound")) &
        (col("dollar_value") <= col("upper_bound"))
    )
    .select(merchant_transactions["*"])
)

In [17]:
merchant_transactions=merchant_transactions.withColumnRenamed('name', 'business')
merchant_transactions=merchant_transactions.drop('order_id')

In [18]:
merchant_transactions

                                                                                

merchant_abn,user_id,dollar_value,order_datetime,business,biz_tags,rev_band,take_rate
19249968599,24,72.76262499748037,2021-08-20,Nunc Pulvinar Ind...,computer programm...,d,1.31
42500153308,18545,261.96248992726936,2021-08-20,Vestibulum Accums...,digital goods: bo...,d,0.62
32825608803,82,25.374214703723062,2021-08-20,Dapibus Quam Quis...,lawn and garden s...,d,1.4
73804469952,18650,46.2865155495329,2021-08-20,Egestas Duis LLP,digital goods: bo...,d,1.17
19249968599,18662,371.8161422972321,2021-08-20,Nunc Pulvinar Ind...,computer programm...,d,1.31
32825608803,18705,72.26547797242499,2021-08-20,Dapibus Quam Quis...,lawn and garden s...,d,1.4
62464569593,18763,30.404324060534837,2021-08-20,Est Ac LLC,digital goods: bo...,d,1.06
17523010120,255,62.866404914497146,2021-08-20,Urna Convallis Fo...,tent and awning s...,d,0.79
80982848464,18777,9.629774037523674,2021-08-20,Tempor Augue Limited,florists supplies...,d,1.05
62464569593,342,67.73760028341513,2021-08-20,Est Ac LLC,digital goods: bo...,d,1.06


In [15]:
tbl_consumer

                                                                                

cust_name,address,state,postcode,gender,consumer_id
Yolanda Williams,413 Haney Gardens...,WA,6935,Female,1195503
Mary Smith,3764 Amber Oval,NSW,2782,Female,179208
Jill Jones MD,40693 Henry Greens,NT,862,Female,1194530
Lindsay Jimenez,00653 Davenport C...,NSW,2780,Female,154128
Rebecca Blanchard,9271 Michael Mano...,WA,6355,Female,712975
Karen Chapman,2706 Stewart Oval...,NSW,2033,Female,407340
Andrea Jones,122 Brandon Cliff,QLD,4606,Female,511685
Stephen Williams,6804 Wright Crest...,WA,6056,Male,448088
Stephanie Reyes,5813 Denise Land ...,NSW,2482,Female,650435
Jillian Gonzales,461 Ryan Common S...,VIC,3220,Female,1058499


In [None]:
merch_tran_cust=merchant_transactions.join(consumer_user_details, on='user_id', how='left')
merch_tran_cust=merch_tran_cust.join(tbl_consumer, on='consumer_id', how='left')
merch_tran_cust=merch_tran_cust.drop('consumer_id', 'order_id')

In [None]:
mtc_fraud1=merch_tran_cust.join(merch_fraud_prob, on=['merchant_abn', 'order_datetime'], how='left')
mtc_fraud1=mtc_fraud1.withColumnRenamed('fraud_probability', 'merch_fraud_prob')
mtc_fraud=mtc_fraud1.join(con_fraud_prob, on=['user_id', 'order_datetime'], how='left')
mtc_fraud=mtc_fraud.withColumnRenamed('fraud_probability', 'con_fraud_prob')

In [None]:
#mtc_fraud.write.parquet("data/curated/joined_dataset", mode="overwrite")

In [19]:
merchant_transactions.write.parquet("data/curated/merchants_transactions", mode="overwrite")

                                                                                