In [1]:
from datetime import datetime, timedelta
from pyspark.sql import SparkSession, functions
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import zipfile

In [2]:
from pyspark.sql.types import IntegerType, DateType, ArrayType, StringType, FloatType, TimestampType

In [3]:
import pandas as pd
pd.options.display.max_rows=None
pd.options.display.max_columns=None

In [4]:
spark = SparkSession.builder.appName("Fraud UseCase").getOrCreate()

In [5]:
input_data_df = spark.read.options(header='true').csv("one_lakh_customers_new.csv")


In [6]:
input_data_df.columns

['Class',
 'card_number',
 'customer_name',
 'Income',
 'card_type',
 'Has Security Pin',
 'physical_card_type',
 'frequency',
 'time',
 'transaction_type',
 'transaction_type_sub',
 'merchant_id',
 'amount',
 'merchant_city',
 'merchant_state',
 'product_type',
 'merchant_end',
 'One Time Heavy Loot Score',
 'Drastic Transaction Change Pattern Score',
 'Parasitic Fraud Score',
 'Purchase Behavior Variation Score',
 'Preferred Merchant Variation Score',
 'Multiple Location Fraud Score',
 'Fraudster Merchant Score',
 'fraud_risk',
 'transaction_id']

In [7]:
# input_data_df.count()

# Utility Functions

In [8]:
def get_customer_timeseries_feature(history,current_time):
    last_hour_amount_history = [0,]
    last_6hour_amount_history = [0,]
    last_12hour_amount_history = [0,]
    last_24hour_amount_history = [0,]
    last_3day_amount_history = [0,]
    last_1week_amount_history = [0,]
    last_2week_amount_history = [0,]
    last_1month_amount_history = [0,]
       
    history = sorted(history,key=lambda a: a[1])
    last_hour_time = current_time - timedelta(hours=1) 
    last_6hour_time = current_time - timedelta(hours=6)
    last_12hour_time = current_time - timedelta(hours=12)
    last_24hour_time = current_time - timedelta(hours=24)
    last_3day_time = current_time - timedelta(hours=72)
    last_1week_time = current_time - timedelta(days=7)
    last_2week_time = current_time - timedelta(days=14)
    last_1month_time = current_time - relativedelta(months=1)
    
    for item in history:
        if (item[1] < current_time) and (item[1] >= last_hour_time):
            last_hour_amount_history.append(item[2])
            
        if (item[1] < current_time) and (item[1] >= last_6hour_time):
            last_6hour_amount_history.append(item[2])
            
        if (item[1] < current_time) and (item[1] >= last_12hour_time):
            last_12hour_amount_history.append(item[2])
            
        if (item[1] < current_time) and (item[1] >= last_24hour_time):
            last_24hour_amount_history.append(item[2])
            
        if (item[1] < current_time) and (item[1] >= last_3day_time):
            last_3day_amount_history.append(item[2])
        
        if (item[1] < current_time) and (item[1] >= last_1week_time):
            last_1week_amount_history.append(item[2])
            
        if (item[1] < current_time) and (item[1] >= last_2week_time):
            last_2week_amount_history.append(item[2])
            
        if (item[1] < current_time) and (item[1] >= last_1month_time):
            last_1month_amount_history.append(item[2])
        
        # amount_feature = [sum, count, avg, max]
#     return [last_hour_amount_history,last_6hour_amount_history]
    last_hour_amount_feature = [sum(last_hour_amount_history),len(last_hour_amount_history)-1,max(last_hour_amount_history)]
    last_6hour_amount_feature = [sum(last_6hour_amount_history),len(last_6hour_amount_history)-1,max(last_6hour_amount_history)]
    last_12hour_amount_feature = [sum(last_12hour_amount_history),len(last_12hour_amount_history)-1,max(last_12hour_amount_history)]
    last_24hour_amount_feature = [sum(last_24hour_amount_history),len(last_24hour_amount_history)-1,max(last_24hour_amount_history)]
    last_3day_amount_feature = [sum(last_3day_amount_history),len(last_3day_amount_history)-1,max(last_3day_amount_history)]
    last_1week_amount_feature = [sum(last_1week_amount_history),len(last_1week_amount_history)-1,max(last_1week_amount_history)]
    last_2week_amount_feature = [sum(last_2week_amount_history),len(last_2week_amount_history)-1,max(last_2week_amount_history)]
    last_1month_amount_feature = [sum(last_1month_amount_history),len(last_1month_amount_history)-1,max(last_1month_amount_history)]




    return [last_hour_amount_feature,last_6hour_amount_feature,last_12hour_amount_feature,last_24hour_amount_feature,last_3day_amount_feature,last_1week_amount_feature,last_2week_amount_feature,last_1month_amount_feature]

def get_feature_vector(a,feature_type,timespan):
    return int(a[timespan][feature_type])

def get_avg_feature_vector(x,timespan):
    a= x[timespan][0]
    b= x[timespan][1]
    if b==0:
        return float(a/1)
    else:
        return float(a/b)



In [9]:
def get_merchant_timeseries_feature(history,current_time,current_transaction_id):
    last_hour_amount_history = [0,]
    last_6hour_amount_history = [0,]
    last_12hour_amount_history = [0,]
    last_24hour_amount_history = [0,]
    last_3day_amount_history = [0,]    
           
    history = sorted(history,key=lambda a: a[1])
    last_hour_time = current_time - timedelta(hours=1) 
    last_6hour_time = current_time - timedelta(hours=6)
    last_12hour_time = current_time - timedelta(hours=12)
    last_24hour_time = current_time - timedelta(hours=24)
    last_3day_time = current_time - timedelta(hours=72)
    
    for item in history:
        if item[0]!=current_transaction_id:
            if (item[1] <= current_time) and (item[1] >= last_hour_time):
                last_hour_amount_history.append(item[2])

            if (item[1] <= current_time) and (item[1] >= last_6hour_time):
                last_6hour_amount_history.append(item[2])

            if (item[1] <= current_time) and (item[1] >= last_12hour_time):
                last_12hour_amount_history.append(item[2])

            if (item[1] <= current_time) and (item[1] >= last_24hour_time):
                last_24hour_amount_history.append(item[2])

            if (item[1] <= current_time) and (item[1] >= last_3day_time):
                last_3day_amount_history.append(item[2])

        # amount_feature = [sum, count, avg, max]
#     return [last_hour_amount_history,last_6hour_amount_history]
    last_hour_amount_feature = [sum(last_hour_amount_history),len(last_hour_amount_history)-1,max(last_hour_amount_history)]
    last_6hour_amount_feature = [sum(last_6hour_amount_history),len(last_6hour_amount_history)-1,max(last_6hour_amount_history)]
    last_12hour_amount_feature = [sum(last_12hour_amount_history),len(last_12hour_amount_history)-1,max(last_12hour_amount_history)]
    last_24hour_amount_feature = [sum(last_24hour_amount_history),len(last_24hour_amount_history)-1,max(last_24hour_amount_history)]
    last_3day_amount_feature = [sum(last_3day_amount_history),len(last_3day_amount_history)-1,max(last_3day_amount_history)]

    return [last_hour_amount_feature,last_6hour_amount_feature,last_12hour_amount_feature,last_24hour_amount_feature,last_3day_amount_feature]

# def get_feature_vector(a,feature_type,timespan):
#     return int(a[timespan][feature_type])

# def get_avg_feature_vector(x,timespan):
#     a= x[timespan][0]
#     b= x[timespan][1]
#     if b==0:
#         return float(a/1)
#     else:
#         return float(a/b)


In [10]:
def get_product_type_timeseries_feature(history,product_type,current_time):
    
    last_2month_product_type_amount_history = {'Electrical Appliances':[0,],'Groceries':[0,],'Investments':[0,],'Car Rentals':[0,],'Hospitals and Healthcare':[0,], \
                                           'PetCare':[0,], 'Furniture':[0,], 'Textiles, Accessories and Footwear':[0,], 'Hotels and Restaurants':[0,], \
                                           'Airlines':[0,],'Water, Electricity and Telecommunication bills':[0,],'Cinema, Shows and other Entertainment':[0,], \
                                           'Electronics Gadgets':[0,],'Others':[0,]}
    last_2month_product_type_feature = []
    
    history = sorted(history,key=lambda a: a[2])
    
    last_2month_time = current_time - relativedelta(months=2)    
    
    for item in history:
        if item[1] not in list(last_2month_product_type_amount_history.keys()):
            item[1]='Others'
            
        if (item[2] < current_time) and (item[2] >= last_2month_time):
            last_2month_product_type_amount_history[item[1]].append(item[3])
    
    for productItem in last_2month_product_type_amount_history.keys():
        product_purchase_total_amount = sum(last_2month_product_type_amount_history[productItem])
        product_purchase_count = len(last_2month_product_type_amount_history[productItem]) - 1
        if product_purchase_count== 0:
            product_purchase_avg = float(0)
        else:
            product_purchase_avg = float(product_purchase_total_amount/product_purchase_count)        
            
        last_2month_product_type_feature.append([float(product_purchase_count),product_purchase_avg])
    
    return last_2month_product_type_feature


def get_feature_vector_prod_type(a,product_type,feature_type):
    return a[product_type][feature_type]



In [11]:
def get_trans_city_timeseries_feature(history,current_time,merchant_city):
    # first element is transaction count in the current_merchant_city 
    # and second element is total transaction count in that time period
    last_hour_amount_history = [0,0]
    last_6hour_amount_history = [0,0]
    last_12hour_amount_history = [0,0]
    last_24hour_amount_history = [0,0]
    last_3day_amount_history = [0,0]
    last_1week_amount_history = [0,0]
    last_2week_amount_history = [0,0]
    last_1month_amount_history = [0,0]
       
    history = sorted(history,key=lambda a: a[1])
    last_hour_time = current_time - timedelta(hours=1) 
    last_6hour_time = current_time - timedelta(hours=6)
    last_12hour_time = current_time - timedelta(hours=12)
    last_24hour_time = current_time - timedelta(hours=24)
    last_3day_time = current_time - timedelta(hours=72)
    last_1week_time = current_time - timedelta(days=7)
    last_2week_time = current_time - timedelta(days=14)
    last_1month_time = current_time - relativedelta(months=1)
    
    for item in history:
        if (item[1] <= current_time) and (item[1] >= last_hour_time):
            if item[2]==merchant_city:
                last_hour_amount_history[0] += 1
            last_hour_amount_history[1] += 1
            
        if (item[1] <= current_time) and (item[1] >= last_6hour_time):
            if item[2]==merchant_city:
                last_6hour_amount_history[0] += 1
            last_6hour_amount_history[1] += 1
            
        if (item[1] <= current_time) and (item[1] >= last_12hour_time):
            if item[2]==merchant_city:
                last_12hour_amount_history[0] += 1
            last_12hour_amount_history[1] += 1
            
        if (item[1] <= current_time) and (item[1] >= last_24hour_time):
            if item[2]==merchant_city:
                last_24hour_amount_history[0] += 1
            last_24hour_amount_history[1] += 1
            
        if (item[1] <= current_time) and (item[1] >= last_3day_time):
            if item[2]==merchant_city:
                last_3day_amount_history[0] += 1
            last_3day_amount_history[1] += 1
        
        if (item[1] <= current_time) and (item[1] >= last_1week_time):
            if item[2]==merchant_city:
                last_1week_amount_history[0] += 1
            last_1week_amount_history[1] += 1
            
        if (item[1] <= current_time) and (item[1] >= last_2week_time):
            if item[2]==merchant_city:
                last_2week_amount_history[0] += 1
            last_2week_amount_history[1] += 1
            
        if (item[1] <= current_time) and (item[1] >= last_1month_time):
            if item[2]==merchant_city:
                last_1month_amount_history[0] += 1
            last_1month_amount_history[1] += 1
            
#     return [last_hour_amount_history,last_6hour_amount_history,last_12hour_amount_history,last_24hour_amount_history,last_3day_amount_history,last_1week_amount_history,last_2week_amount_history,last_1month_amount_history]


    last_hour_amount_feature = float(last_hour_amount_history[0]/last_hour_amount_history[1])
    last_6hour_amount_feature = float(last_6hour_amount_history[0]/last_6hour_amount_history[1])
    last_12hour_amount_feature = float(last_12hour_amount_history[0]/last_12hour_amount_history[1])
    last_24hour_amount_feature = float(last_24hour_amount_history[0]/last_24hour_amount_history[1])
    last_3day_amount_feature = float(last_3day_amount_history[0]/last_3day_amount_history[1])
    last_1week_amount_feature = float(last_1week_amount_history[0]/last_1week_amount_history[1])
    last_2week_amount_feature = float(last_2week_amount_history[0]/last_2week_amount_history[1])
    last_1month_amount_feature = float(last_1month_amount_history[0]/last_1month_amount_history[1])


    return [last_hour_amount_feature,last_6hour_amount_feature,last_12hour_amount_feature,last_24hour_amount_feature,last_3day_amount_feature,last_1week_amount_feature,last_2week_amount_feature,last_1month_amount_feature]

def get_feature_vector_trans_city(a,timespan):
    return float(a[timespan])


In [12]:
def get_trans_type_timeseries_feature(history, transaction_type, current_time):
    last_hour_amount_history_offline = [0,]
    last_6hour_amount_history_offline = [0,]
    last_12hour_amount_history_offline = [0,]
    last_24hour_amount_history_offline = [0,]
    last_3day_amount_history_offline = [0,]
    last_1week_amount_history_offline = [0,]
    last_2week_amount_history_offline = [0,]
    last_1month_amount_history_offline = [0,]
    
    last_hour_amount_history_online = [0,]
    last_6hour_amount_history_online = [0,]
    last_12hour_amount_history_online = [0,]
    last_24hour_amount_history_online = [0,]
    last_3day_amount_history_online = [0,]
    last_1week_amount_history_online = [0,]
    last_2week_amount_history_online = [0,]
    last_1month_amount_history_online = [0,]
       
    history = sorted(history,key=lambda a: a[1])
    last_hour_time = current_time - timedelta(hours=1) 
    last_6hour_time = current_time - timedelta(hours=6)
    last_12hour_time = current_time - timedelta(hours=12)
    last_24hour_time = current_time - timedelta(hours=24)
    last_3day_time = current_time - timedelta(hours=72)
    last_1week_time = current_time - timedelta(days=7)
    last_2week_time = current_time - timedelta(days=14)
    last_1month_time = current_time - relativedelta(months=1)
    
    for item in history:
        if (item[1] < current_time) and (item[1] >= last_hour_time):
            if transaction_type=='offline':
                last_hour_amount_history_offline.append(item[2])
            else:
                last_hour_amount_history_online.append(item[2])
                
        if (item[1] < current_time) and (item[1] >= last_6hour_time):
            if transaction_type=='offline':
                last_6hour_amount_history_offline.append(item[2])
            else:
                last_6hour_amount_history_online.append(item[2])
            
        if (item[1] < current_time) and (item[1] >= last_12hour_time):
            if transaction_type=='offline':
                last_12hour_amount_history_offline.append(item[2])
            else:
                last_12hour_amount_history_online.append(item[2])
            
        if (item[1] < current_time) and (item[1] >= last_24hour_time):
            if transaction_type=='offline':
                last_24hour_amount_history_offline.append(item[2])
            else:
                last_24hour_amount_history_online.append(item[2])
            
        if (item[1] < current_time) and (item[1] >= last_3day_time):
            if transaction_type=='offline':
                last_3day_amount_history_offline.append(item[2])
            else:
                last_3day_amount_history_online.append(item[2])
        
        if (item[1] < current_time) and (item[1] >= last_1week_time):
            if transaction_type=='offline':
                last_1week_amount_history_offline.append(item[2])
            else:
                last_1week_amount_history_online.append(item[2])
            
        if (item[1] < current_time) and (item[1] >= last_2week_time):
            if transaction_type=='offline':
                last_2week_amount_history_offline.append(item[2])
            else:
                last_2week_amount_history_online.append(item[2])
            
        if (item[1] < current_time) and (item[1] >= last_1month_time):
            if transaction_type=='offline':
                last_1month_amount_history_offline.append(item[2])
            else:
                last_1month_amount_history_offline.append(item[2])
        
        # amount_feature = [sum, count, avg, max]
#     return [last_hour_amount_history,last_6hour_amount_history]
    last_hour_amount_feature = [sum(last_hour_amount_history_offline),len(last_hour_amount_history_offline)-1,max(last_hour_amount_history_offline), sum(last_hour_amount_history_online),len(last_hour_amount_history_online)-1,max(last_hour_amount_history_online)]
    last_6hour_amount_feature = [sum(last_6hour_amount_history_offline),len(last_6hour_amount_history_offline)-1,max(last_6hour_amount_history_offline), sum(last_6hour_amount_history_online),len(last_6hour_amount_history_online)-1,max(last_6hour_amount_history_online)]
    last_12hour_amount_feature = [sum(last_12hour_amount_history_offline),len(last_12hour_amount_history_offline)-1,max(last_12hour_amount_history_offline),sum(last_12hour_amount_history_online),len(last_12hour_amount_history_online)-1,max(last_12hour_amount_history_online)]
    last_24hour_amount_feature = [sum(last_24hour_amount_history_offline),len(last_24hour_amount_history_offline)-1,max(last_24hour_amount_history_offline), sum(last_24hour_amount_history_online),len(last_24hour_amount_history_online)-1,max(last_24hour_amount_history_online)]
    last_3day_amount_feature = [sum(last_3day_amount_history_offline),len(last_3day_amount_history_offline)-1,max(last_3day_amount_history_offline), sum(last_3day_amount_history_online),len(last_3day_amount_history_online)-1,max(last_3day_amount_history_online)]
    last_1week_amount_feature = [sum(last_1week_amount_history_offline),len(last_1week_amount_history_offline)-1,max(last_1week_amount_history_offline), sum(last_1week_amount_history_online),len(last_1week_amount_history_online)-1,max(last_1week_amount_history_online)]
    last_2week_amount_feature = [sum(last_2week_amount_history_offline),len(last_2week_amount_history_offline)-1,max(last_2week_amount_history_offline), sum(last_2week_amount_history_online),len(last_2week_amount_history_online)-1,max(last_2week_amount_history_online)]
    last_1month_amount_feature = [sum(last_1month_amount_history_offline),len(last_1month_amount_history_offline)-1,max(last_1month_amount_history_offline), sum(last_1month_amount_history_online),len(last_1month_amount_history_online)-1,max(last_1month_amount_history_online)]




    return [last_hour_amount_feature,last_6hour_amount_feature,last_12hour_amount_feature,last_24hour_amount_feature,last_3day_amount_feature,last_1week_amount_feature,last_2week_amount_feature,last_1month_amount_feature]

def get_feature_vector_trans_type(a,transaction_type,feature_type,timespan):
    offset=3
    
    if transaction_type=='offline':
        val = int(a[timespan][feature_type])
    else:
        val = int(a[timespan][offset+feature_type])
    return val

def get_avg_feature_vector_trans_type(x,transaction_type,timespan):
    if transaction_type=='offline':
        a= x[timespan][0]
        b= x[timespan][1]
    else:
        a= x[timespan][3]
        b= x[timespan][4]
        
    if b==0:
        return float(a/1)
    else:
        return float(a/b)


## Data Transformation and Feature Generation

In [13]:
from pyspark.sql.functions import pandas_udf, PandasUDFType,collect_set

def get_customer_static_info(input_df):
    customer_static = input_df.select(["customer_name", "card_number", "card_type"])
    return customer_static

def get_merchant_static_info(input_df):
    merchant_static_temp = input_df.select(["merchant_id", "product_type"])
    merchant_static_data = input_df.select(["merchant_id", "merchant_city", "merchant_state"])
    merchant_static_temp = merchant_static_temp.groupBy("merchant_id").agg(collect_set("product_type").alias("list_of_product"))
    merchant_static = merchant_static_data.join(merchant_static_temp, on='merchant_id', how='left')
    return merchant_static

In [14]:
def get_customer_ts(df):
    df = df.withColumn("time", df["time"].cast(TimestampType()))
    df = df.withColumn("amount", df["amount"].cast(IntegerType()))
    df = df.select(["transaction_id", "time", "customer_name", "card_number", "amount"])
    
    df1 = df.withColumn("tuplecol", functions.struct([functions.col("transaction_id") , functions.col("time") , functions.col("amount")]))
    
    df2 = df1.groupby("card_number").agg(functions.collect_list("tuplecol"))
    final_customer_df = df.join(df2, on='card_number', how = 'inner')
    final_customer_df = final_customer_df.withColumnRenamed('collect_list(tuplecol)', 'tuplecol')
    
    time_udf = functions.udf(get_customer_timeseries_feature, ArrayType(ArrayType(IntegerType())))
    final_customer_df=final_customer_df.withColumn("results",time_udf(functions.col("tuplecol"),functions.col("time")))
    
    final_customer_df = final_customer_df.select(["results", "transaction_id"])
    df = df.select(["transaction_id", "time", "customer_name", "card_number"])
    final_customer_df = df.join(final_customer_df, on='transaction_id', how = 'inner')
    
    feature_vector_udf = functions.udf(get_feature_vector, IntegerType())
    avg_feature_vector_udf = functions.udf(get_avg_feature_vector, FloatType())
    
    feature_type = ['sum','count','max', 'avg']
    timespan = ['one_hour','six_hours','twelve_hours', 'twenty_four_hours', 'three_days', 'one_week', 'two_week', 'one_month']
    for i in range(0,len(timespan)):
        for j in range(0,len(feature_type)):
            if feature_type[j]=='avg':
                final_customer_df = final_customer_df.withColumn("last_" + timespan[i] + "_total_customer_transaction_" + feature_type[j],avg_feature_vector_udf(functions.col("results"),functions.lit(i)))
            else:
                final_customer_df = final_customer_df.withColumn("last_" + timespan[i] + "_total_customer_transaction_" + feature_type[j],feature_vector_udf(functions.col("results"),functions.lit(j),functions.lit(i)))
    
    final_customer_df = final_customer_df.drop('results')
    
    return final_customer_df


In [15]:
def get_merchant_ts(df):
    df = df.withColumn("time", df["time"].cast(TimestampType()))
    df = df.withColumn("amount", df["amount"].cast(IntegerType()))
    # df = df.select(["transaction_id", "time", "customer_name", "card_number", "amount"])
    df = df.select(['transaction_id',"time", 'merchant_id', 'merchant_city', 'merchant_state',"amount"])
    
    df1 = df.withColumn("tuplecol", functions.struct([functions.col("transaction_id") , functions.col("time") , functions.col("amount")]))
    
    df2 = df1.groupby("merchant_id").agg(functions.collect_list("tuplecol"))
    final_customer_df = df.join(df2, on='merchant_id', how = 'inner')
    final_customer_df = final_customer_df.withColumnRenamed('collect_list(tuplecol)', 'tuplecol')
    
    time_udf = functions.udf(get_merchant_timeseries_feature, ArrayType(ArrayType(IntegerType())))
    final_customer_df=final_customer_df.withColumn("results",time_udf(functions.col("tuplecol"),functions.col("time"),functions.col("transaction_id")))
    
    final_customer_df = final_customer_df.select(["results", "transaction_id"])
    df = df.select(['transaction_id', 'merchant_id', 'merchant_city', 'merchant_state'])
    final_customer_df = df.join(final_customer_df, on='transaction_id', how = 'inner')
    
    feature_vector_udf = functions.udf(get_feature_vector, IntegerType())
    avg_feature_vector_udf = functions.udf(get_avg_feature_vector, FloatType())
    
    feature_type = ['sum','count','max', 'avg']
    timespan = ['one_hour','six_hour','twelve_hour', 'twenty_four_hour', 'three_days']
    for i in range(0,5):
        for j in range(0,4):
            if feature_type[j]=='avg':
                final_customer_df = final_customer_df.withColumn("last_" + timespan[i] + "_total_merchant_transaction_" + feature_type[j],avg_feature_vector_udf(functions.col("results"),functions.lit(i)))
            else:
                final_customer_df = final_customer_df.withColumn("last_" + timespan[i] + "_total_merchant_transaction_" + feature_type[j],feature_vector_udf(functions.col("results"),functions.lit(j),functions.lit(i)))

    final_customer_df = final_customer_df.drop('results')
    
    return final_customer_df
                

In [16]:
def get_product_type_ts(df):
    df = df.withColumn("time", df["time"].cast(TimestampType()))
    df = df.withColumn("amount", df["amount"].cast(IntegerType()))
    df = df.select(["transaction_id", "time", 'product_type', "customer_name", "card_number", "amount"])
    
    product_list = [item.product_type for item in df.select("product_type").distinct().collect()]
    product_list.append('Others')
    
    df1 = df.withColumn("tuplecol", functions.struct([functions.col("transaction_id") , functions.col("product_type") , functions.col("time") , functions.col("amount")]))
    
    df2 = df1.groupby("card_number").agg(functions.collect_list("tuplecol"))
    final_customer_df = df.join(df2, on='card_number', how = 'inner')
    final_customer_df = final_customer_df.withColumnRenamed('collect_list(tuplecol)', 'tuplecol')
    
    time_udf = functions.udf(get_product_type_timeseries_feature, ArrayType(ArrayType(FloatType())))
    final_customer_df=final_customer_df.withColumn("results",time_udf(functions.col("tuplecol"),functions.col("product_type"), functions.col("time")))
    
    final_customer_df = final_customer_df.select(["results", "transaction_id"])
    df = df.select(["transaction_id", "time", "customer_name", "card_number"])
    final_customer_df = df.join(final_customer_df, on='transaction_id', how = 'inner')
    
    feature_vector_udf = functions.udf(get_feature_vector_prod_type, FloatType())

    feature_type = ['count', 'avg_amount']
    for i in range(0,len(product_list)):
        for j in range(0,len(feature_type)):
            final_customer_df = final_customer_df.withColumn("last_2_months_" + product_list[i] + "_" + feature_type[j],feature_vector_udf(functions.col("results"),functions.lit(i),functions.lit(j)))

    final_customer_df = final_customer_df.drop('results')
    
    return final_customer_df


In [17]:
def get_transaction_city_ts(df):
    df = df.withColumn("time", df["time"].cast(TimestampType()))
    df = df.withColumn("amount", df["amount"].cast(IntegerType()))
    df = df.select(["transaction_id", "time", "customer_name", "card_number", "merchant_city"])
    
    df1 = df.withColumn("tuplecol", functions.struct([functions.col("transaction_id") , functions.col("time") , functions.col("merchant_city")]))
    
    df2 = df1.groupby("card_number").agg(functions.collect_list("tuplecol"))
    final_customer_df = df.join(df2, on='card_number', how = 'inner')
    final_customer_df = final_customer_df.withColumnRenamed('collect_list(tuplecol)', 'tuplecol')
    
    time_udf = functions.udf(get_trans_city_timeseries_feature, ArrayType(FloatType()))
    final_customer_df=final_customer_df.withColumn("results",time_udf(functions.col("tuplecol"),functions.col("time"),functions.col("merchant_city")))
    
    final_customer_df = final_customer_df.select(["results", "transaction_id"])
    df = df.select(["transaction_id", "time", "customer_name", "card_number"])
    final_customer_df = df.join(final_customer_df, on='transaction_id', how = 'inner')
    
    feature_vector_udf = functions.udf(get_feature_vector_trans_city, FloatType())

    timespan = ['one_hour','six_hours','twelve_hours', 'twenty_four_hours', 'three_days', 'one_week', 'two_week', 'one_month']

    for i in range(0,len(timespan)):
        final_customer_df = final_customer_df.withColumn("last_" + timespan[i] + "_City_Score",feature_vector_udf(functions.col("results"),functions.lit(i)))

    final_customer_df = final_customer_df.drop('results')   
    
    return final_customer_df
        

In [18]:
def get_transaction_type_ts(df):
    df = df.withColumn("time", df["time"].cast(TimestampType()))
    df = df.withColumn("amount", df["amount"].cast(IntegerType()))
    df = df.select(["transaction_id", "time", "customer_name", "card_number", "transaction_type", "amount"])
    
    transaction_types = [item.transaction_type for item in df.select("transaction_type").distinct().collect()]

    df1 = df.withColumn("tuplecol", functions.struct([functions.col("transaction_id"), functions.col("time"), functions.col("amount")]))
    
    df2 = df1.groupby("card_number").agg(functions.collect_list("tuplecol"))
    final_customer_df = df.join(df2, on='card_number', how = 'inner')
    final_customer_df = final_customer_df.withColumnRenamed('collect_list(tuplecol)', 'tuplecol')
    
    time_udf = functions.udf(get_trans_type_timeseries_feature, ArrayType(ArrayType(IntegerType())))
    final_customer_df=final_customer_df.withColumn("results", time_udf(functions.col("tuplecol"), functions.col("transaction_type"), functions.col("time")))

    final_customer_df = final_customer_df.select(["results", "transaction_id"])
    df = df.select(["transaction_id", "time", "customer_name", "card_number"])
    final_customer_df = df.join(final_customer_df, on='transaction_id', how = 'inner')
    
    feature_vector_udf = functions.udf(get_feature_vector_trans_type, IntegerType())
    avg_feature_vector_udf = functions.udf(get_avg_feature_vector_trans_type, FloatType())
    
    transaction_modes = ['offline', 'online']
    feature_type = ['sum','count','max', 'avg']
    timespan = ['one_hour','six_hours','twelve_hours', 'twenty_four_hours', 'three_days', 'one_week', 'two_week', 'one_month']

    for mode in transaction_modes:
        for i in range(0,len(timespan)):
            for j in range(0,len(feature_type)):
                if feature_type[j]=='avg':
                    final_customer_df = final_customer_df.withColumn("last_" + timespan[i] + '_' + mode + "_total_transaction_" + feature_type[j],avg_feature_vector_udf(functions.col("results"),functions.lit(mode),functions.lit(i)))
                else:
                    final_customer_df = final_customer_df.withColumn("last_" + timespan[i] + '_' + mode + "_total_transaction_" + feature_type[j],feature_vector_udf(functions.col("results"),functions.lit(mode),functions.lit(j),functions.lit(i)))

    final_customer_df = final_customer_df.drop('results')
    
    return final_customer_df


## Unit Tests

In [19]:
customer_static = get_customer_static_info(input_data_df)
customer_static = customer_static.dropDuplicates()


In [20]:
merchant_static = get_merchant_static_info(input_data_df)
merchant_static = merchant_static.dropDuplicates()


In [21]:
customer_ts = get_customer_ts(input_data_df)


In [22]:
merchant_ts = get_merchant_ts(input_data_df)


In [23]:
product_type_ts = get_product_type_ts(input_data_df)


In [24]:
transaction_city_ts = get_transaction_city_ts(input_data_df)


In [25]:
transaction_type_ts = get_transaction_type_ts(input_data_df)


# Merging DataFrames

In [26]:
# merged_customer_df = customer_ts.join(customer_static, (customer_ts.card_number==customer_static.card_number)&(customer_ts.customer_name==customer_static.customer_name))
merged_customer_df = customer_ts.join(customer_static, on=['card_number','customer_name'], how='inner')


In [27]:
merged_merchant_df = merchant_ts.join(merchant_static, on=['merchant_id','merchant_city','merchant_state'], how='inner')


In [29]:
merged_df = merged_customer_df.join(merged_merchant_df, on='transaction_id', how='inner')


In [31]:
merged_df1 = merged_df.join(product_type_ts, on=['transaction_id','time','customer_name','card_number'], how='inner')


In [32]:
merged_df2 = merged_df1.join(transaction_city_ts, on=['transaction_id','time','customer_name','card_number'], how='inner')


In [33]:
final_merged_df = merged_df2.join(transaction_type_ts, on=['transaction_id','time','customer_name','card_number'], how='inner')


# Feature Engineering

In [43]:
def get_ratio(a, b):
    if a==0 and b==0:
        return 0.0
    else:
        return a/b

t1_to_t2_udf = functions.udf(get_ratio, FloatType())


In [44]:
final_merged_df = final_merged_df.withColumn("customer_last_hour_to_last_24_hours_amt_ratio", t1_to_t2_udf(functions.col("last_one_hour_total_customer_transaction_sum"),functions.col("last_twenty_four_hours_total_customer_transaction_sum")))
final_merged_df = final_merged_df.withColumn("customer_last_hour_to_last_12_hours_amt_ratio", t1_to_t2_udf(functions.col("last_one_hour_total_customer_transaction_sum"),functions.col("last_twelve_hours_total_customer_transaction_sum")))
final_merged_df = final_merged_df.withColumn("customer_last_hour_to_last_6_hours_amt_ratio", t1_to_t2_udf(functions.col("last_one_hour_total_customer_transaction_sum"),functions.col("last_six_hours_total_customer_transaction_sum")))

final_merged_df = final_merged_df.withColumn("customer_last_hour_to_last_24_hours_count_ratio", t1_to_t2_udf(functions.col("last_one_hour_total_customer_transaction_count"),functions.col("last_twenty_four_hours_total_customer_transaction_count")))
final_merged_df = final_merged_df.withColumn("customer_last_hour_to_last_12_hours_count_ratio", t1_to_t2_udf(functions.col("last_one_hour_total_customer_transaction_count"),functions.col("last_twelve_hours_total_customer_transaction_count")))
final_merged_df = final_merged_df.withColumn("customer_last_hour_to_last_6_hours_count_ratio", t1_to_t2_udf(functions.col("last_one_hour_total_customer_transaction_count"),functions.col("last_six_hours_total_customer_transaction_count")))

final_merged_df = final_merged_df.withColumn("customer_last_day_to_last_3_days_amt_ratio", t1_to_t2_udf(functions.col("last_twenty_four_hours_total_customer_transaction_sum"),functions.col("last_three_days_total_customer_transaction_sum")))
final_merged_df = final_merged_df.withColumn("customer_last_day_to_last_3_days_count_ratio", t1_to_t2_udf(functions.col("last_twenty_four_hours_total_customer_transaction_count"),functions.col("last_three_days_total_customer_transaction_count")))

final_merged_df = final_merged_df.withColumn("customer_last_week_to_last_2_weeks_amt_ratio", t1_to_t2_udf(functions.col("last_one_week_total_customer_transaction_sum"),functions.col("last_two_week_total_customer_transaction_sum")))
final_merged_df = final_merged_df.withColumn("customer_last_week_to_last_2_weeks_count_ratio", t1_to_t2_udf(functions.col("last_one_week_total_customer_transaction_count"),functions.col("last_two_week_total_customer_transaction_count")))


final_merged_df = final_merged_df.withColumn("hour_1_City_score_to_hour_24_City_score_ratio", t1_to_t2_udf(functions.col("last_one_hour_City_Score"),functions.col("last_twenty_four_hours_City_Score")))
final_merged_df = final_merged_df.withColumn("hour_1_City_score_to_hour_12_City_score_ratio", t1_to_t2_udf(functions.col("last_one_hour_City_Score"),functions.col("last_twelve_hours_City_Score")))
final_merged_df = final_merged_df.withColumn("hour_1_City_score_to_hour_6_City_score_ratio", t1_to_t2_udf(functions.col("last_one_hour_City_Score"),functions.col("last_six_hours_City_Score")))

final_merged_df = final_merged_df.withColumn("day_1_City_score_to_day_3_City_score_ratio", t1_to_t2_udf(functions.col("last_twenty_four_hours_City_Score"),functions.col("last_three_days_City_Score")))

final_merged_df = final_merged_df.withColumn("week_1_City_score_to_week_2_City_score_ratio", t1_to_t2_udf(functions.col("last_one_week_City_Score"),functions.col("last_two_week_City_Score")))

final_merged_df = final_merged_df.withColumn("day_1_City_score_to_month_1_City_score_ratio", t1_to_t2_udf(functions.col("last_twenty_four_hours_City_Score"),functions.col("last_one_month_City_Score")))





In [45]:
# final_merged_df.columns

In [46]:
# final_merged_df.show(n=2,truncate=False,vertical=True)

In [47]:
# final_merged_df.count(), len(final_merged_df.columns)

In [48]:
input_data_df = input_data_df.select(['transaction_id',
 'time',
 'customer_name',
 'card_number',
 'card_type',
 'physical_card_type',
 'merchant_id',
 'merchant_city',
 'merchant_state',
 'transaction_type',
 'transaction_type_sub',
 'amount',
 'product_type',
 'fraud_risk'])

In [49]:
final_merged_df = input_data_df.join(final_merged_df, on=['transaction_id', 'time', 'customer_name', 'card_number', 'card_type', 'merchant_id', 'merchant_city', 'merchant_state'], how='inner')



In [50]:
final_merged_df.count(), len(final_merged_df.columns)

(3854112, 189)

In [None]:
final_merged_df.show(n=1,truncate=False,vertical=True)

# Normalization

In [56]:
columns_to_ignore = ["transaction_id", "time", "customer_name", "card_number", "card_type", "merchant_id", "merchant_city", "merchant_state", "physical_card_type", "transaction_type", "transaction_type_sub", "product_type", "list_of_product", "fraud_risk"]
all_columns = list(final_merged_df.columns)
columns_to_scale = [i for i in all_columns if i not in columns_to_ignore]

from copy import deepcopy
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# temp = final_merged_df.withColumn("amount", final_merged_df["amount"].cast(FloatType()))
final_merged_df = final_merged_df.withColumn("amount", final_merged_df["amount"].cast(FloatType()))

assembler = VectorAssembler().setInputCols(columns_to_scale).setOutputCol("features")
transformed = assembler.transform(final_merged_df)
scaler = StandardScaler(inputCol="features",\
         outputCol="scaledFeatures")
scalerModel =  scaler.fit(transformed.select("features"))
scaledData = scalerModel.transform(transformed)
# temp.columns


AnalysisException: "Reference 'amount' is ambiguous, could be: amount, amount.;"

In [None]:
# scaledData.show(n=1,truncate=False,vertical=True)

In [None]:
def extract(row):
    return (row.transaction_id, )+tuple(row.scaledFeatures.toArray().tolist())

normalized_data = scaledData.select("transaction_id","scaledFeatures").rdd.map(extract).toDF(["transaction_id"]+[i+"_scaled" for i in columns_to_scale])



In [45]:
final_merged_df = final_merged_df.select(columns_to_ignore)
normalized_data = final_merged_df.join(normalized_data, on='transaction_id', how='inner')
normalized_data.show(n=1,truncate=False,vertical=True)

-RECORD 0--------------------------------------------------------------------------------------------------------------
 transaction_id                                                                 | 06ac8241-5162-4476-bfb3-427b4192918f 
 time                                                                           | 2018-05-27 12:50:00                  
 customer_name                                                                  | Elizabeth Brawley                    
 card_number                                                                    | db51e1c9-4876-421e-9398-3bee1a24165e 
 card_type                                                                      | savings_debit_card                   
 merchant_id                                                                    | 1000324                              
 merchant_city                                                                  | Recluse                              
 merchant_state                         

In [46]:
normalized_data.count(), len(normalized_data.columns)


(2202, 183)