In [41]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, DateType
import datetime, math

print(sc)

<pyspark.context.SparkContext object at 0x10221b470>


In [42]:
spark = (SparkSession
         .builder
         .appName("Catch you merchant")
         .config("spark.sql.warehouse.dir", "/opt/jupyter_workspace/spark-warehouse")
         .getOrCreate())
print(spark)

<pyspark.sql.session.SparkSession object at 0x10522ce48>


In [43]:
workspace = "/Users/AUM/Desktop/MerchantInsight/mock_data/"

df = (spark
     .read
     .option("header", "true")
     .option("inferSchema", "true")
     .csv(workspace + "deposit_mock.csv"))

In [44]:
df = df.dropna()

df.show()

+-----+-----------+-------+---------+---------+------+--------+------+------+--------+
|ar_id|fm_to_ar_id|txn_amt|svc_br_no|opm_tp_cd|txn_cd|ptn_yyyy|ptn_mm|ptn_dd|  txn_tm|
+-----+-----------+-------+---------+---------+------+--------+------+------+--------+
|    A|          B|     10|      900|       DR|     0|    2017|     1|     3| 5:00:00|
|    B|          A|     10|      900|       CR|     0|    2017|     1|     3| 5:00:00|
|    A|          B|    100|      900|       CR|     0|    2017|     1|     3|11:00:00|
|    B|          A|    100|      900|       DR|     0|    2017|     1|     3|11:00:00|
|    A|          B|     10|      900|       DR|     0|    2017|     1|     3|17:00:00|
|    B|          A|     10|      900|       CR|     0|    2017|     1|     3|17:00:00|
|    A|          B|    100|      900|       CR|     0|    2017|     1|     3|23:00:00|
|    B|          A|    100|      900|       DR|     0|    2017|     1|     3|23:00:00|
|    A|          B|     10|      900|      

In [45]:
# define useful variable

number_of_months = 10
transfer_code = 0
deposit_code = 2
withdraw_code = 1

In [46]:
# Re-organize the original data set

def no_days_in_month(month, year):
    if month in day_months_31: 
        return 31
    elif month in day_months_30:
        return 30
    else:
        if calendar.isleap(year):
            return 29
        else:
            return 28
        
def day_of_week_code(day_of_week):
    if day_of_week < 4:
        return 0
    elif day_of_week > 4:
        return 2
    else:
        return 1
    
def quarter_code(date, month):
    month_31_days = [1,3,5,7,8,10,12]
    month_30_days = [4,6,9,11]
    if(month in month_31_days):
        if(date in range(1,9)):
            return 1
        elif(date in range(9,16)):
            return 2
        elif(date in range(16,24)):
            return 3
        else:
            return 4
    elif(month in month_30_days):
        if(date in range(1,9)):
            return 1
        elif(date in range(9,16)):
            return 2
        elif(date in range(16,23)):
            return 3
        else:
            return 4
    else: # February
        return math.ceil(date / 4.0) 

def period_code(time):
    hour = int(time[:-6])
    if hour in range(0, 6):
        return 0
    elif hour in range(6, 12):
        return 1
    elif hour in range(12, 18):
        return 2
    else:
        return 3

date = udf(lambda y, m, d : datetime.datetime(y, m ,d), DateType())
day_of_week = udf(lambda date : int(date.weekday()), IntegerType())
day_of_week_code_udf = udf(day_of_week_code, IntegerType())
quarter_code_udf = udf(quarter_code, IntegerType())
period_code_udf = udf(period_code, IntegerType())

df = df.withColumn("date", date(df["ptn_yyyy"], df["ptn_mm"], df["ptn_dd"]))
df = df.withColumn("day_of_week", day_of_week(df["date"]))
df = df.withColumn("day_of_week_code", day_of_week_code_udf(df["day_of_week"]))
df = df.withColumn("quarter_code", quarter_code_udf(df["ptn_dd"], df["ptn_mm"]))
df = df.withColumn("period_code", period_code_udf(df["txn_tm"]))


# df = df.select("ar_id", "fm_to_ar_id", "txn_amt", "svc_br_no", "opm_tp_cd", "txn_cd", "day_of_week_code")
# df.show()

+-----+-----------+-------+---------+---------+------+--------+------+------+--------+----------+-----------+----------------+------------+-----------+
|ar_id|fm_to_ar_id|txn_amt|svc_br_no|opm_tp_cd|txn_cd|ptn_yyyy|ptn_mm|ptn_dd|  txn_tm|      date|day_of_week|day_of_week_code|quarter_code|period_code|
+-----+-----------+-------+---------+---------+------+--------+------+------+--------+----------+-----------+----------------+------------+-----------+
|    A|          B|     10|      900|       DR|     0|    2017|     1|     3| 5:00:00|2017-01-03|          1|               0|           1|          0|
|    B|          A|     10|      900|       CR|     0|    2017|     1|     3| 5:00:00|2017-01-03|          1|               0|           1|          0|
|    A|          B|    100|      900|       CR|     0|    2017|     1|     3|11:00:00|2017-01-03|          1|               0|           1|          1|
|    B|          A|    100|      900|       DR|     0|    2017|     1|     3|11:00:00|20

In [47]:
df.show(10)

+-----+-----------+-------+---------+---------+------+--------+------+------+--------+----------+-----------+----------------+------------+-----------+
|ar_id|fm_to_ar_id|txn_amt|svc_br_no|opm_tp_cd|txn_cd|ptn_yyyy|ptn_mm|ptn_dd|  txn_tm|      date|day_of_week|day_of_week_code|quarter_code|period_code|
+-----+-----------+-------+---------+---------+------+--------+------+------+--------+----------+-----------+----------------+------------+-----------+
|    A|          B|     10|      900|       DR|     0|    2017|     1|     3| 5:00:00|2017-01-03|          1|               0|           1|          0|
|    B|          A|     10|      900|       CR|     0|    2017|     1|     3| 5:00:00|2017-01-03|          1|               0|           1|          0|
|    A|          B|    100|      900|       CR|     0|    2017|     1|     3|11:00:00|2017-01-03|          1|               0|           1|          1|
|    B|          A|    100|      900|       DR|     0|    2017|     1|     3|11:00:00|20

In [48]:
# define all users

user_data = df.select("ar_id").distinct().union(df.select("fm_to_ar_id").distinct()).distinct()
user_data.show()

+-----+
|ar_id|
+-----+
|    B|
|    A|
+-----+



In [49]:
# define feature extraction functions

def noDepositBranchVisit(is_unique):
    if (is_unique):
        return df.select("ar_id", "svc_br_no").groupby("ar_id").agg({"svc_br_no" : "count"}).withColumnRenamed("count(svc_br_no)", "noDepositBranchVisit")
    else:
        return df.select("ar_id", "svc_br_no").distinct().groupby("ar_id").agg({"svc_br_no" : "count"}).withColumnRenamed("count(svc_br_no)", "noDepositBranchVisitUnique")
    
def noDepositTransferIn(is_unique):
    if (is_unique):
        return df.filter("opm_tp_cd = 'CR' and txn_cd = 0").groupby(["ar_id", "fm_to_ar_id"]).agg({"*" : "count"}).withColumnRenamed("count(1)", "noDepositTransferInUnique")
    else:
        return df.filter("opm_tp_cd = 'CR' and txn_cd = 0").groupby("ar_id").agg({"*" : "count"}).withColumnRenamed("count(1)", "noDepositTransferIn")
    
def noDepositTransferOut(is_unique):
    if (is_unique):
        return df.filter("opm_tp_cd = 'DR' and txn_cd = 0").groupby(["ar_id", "fm_to_ar_id"]).agg({"*" : "count"}).withColumnRenamed("count(1)", "noDepositTransferOutUniqie")
    else:
        return df.filter("opm_tp_cd = 'DR' and txn_cd = 0").groupby("ar_id").agg({"*" : "count"}).withColumnRenamed("count(1)", "noDepositTransferOut")

def noDeposit(): 
    # return df.filter("opm_tp_cd = 'CR' and txn_cd = 0").groupby("ar_id").agg({"*" : "count"}).withColumnRenamed("count(1)", "noDeposit")
    return df.filter("opm_tp_cd = 'CR' and txn_cd = 0")\
            .groupby("ar_id", "quarter_code").agg({"*" : "count"})\
            .withColumnRenamed("count(1)", "noDeposit")\
            .crosstab("ar_id", "quarter_code")\
            .toDF('ar_id','noDepositQ1','noDepositQ2','noDepositQ3','noDepositQ4')
def noWithdraw(): 
    # return df.filter("opm_tp_cd = 'DR' and txn_cd = 0").groupby("ar_id").agg({"*" : "count"}).withColumnRenamed("count(1)", "noWithdraw")
    return df.filter("opm_tp_cd = 'DR' and txn_cd = 0")\
            .groupby("ar_id", "quarter_code").agg({"*" : "count"})\
            .withColumnRenamed("count(1)", "noWithdraw")\
            .crosstab("ar_id", "quarter_code")\
            .toDF('ar_id','noWithdrawQ1','noWithdrawQ2','noWithdrawQ3','noWithdrawQ4')
def depositAmount(): 
    return df.filter("opm_tp_cd = 'CR' and txn_cd = 0").groupby("ar_id").agg({"txn_amt" : "sum"}).withColumnRenamed("sum(txn_amt)", "depositAmount")

def withdrawAmount(): 
    return df.filter("opm_tp_cd = 'DR' and txn_cd = 0").groupby("ar_id").agg({"txn_amt" : "sum"}).withColumnRenamed("sum(txn_amt)", "withdrawAmount")

def transferInAmount():
    return df.filter("opm_tp_cd = 'CR' and txn_cd = 0").groupby("ar_id").agg({"txn_amt" : "sum"}).withColumnRenamed("sum(txn_amt)", "transferInAmount")
    
def transferOutAmount():
    return df.filter("opm_tp_cd = 'DR' and txn_cd = 0").groupby("ar_id").agg({"txn_amt" : "sum"}).withColumnRenamed("sum(txn_amt)", "transferOutAmount")

In [50]:
# define udf functions for all about ratio
ratioDepositBranchVisit = udf(lambda visit, unique_visit : visit / unique_visit)
ratioTransferIn = udf(lambda transfer_in, transfer_in_unique : transfer_in / transfer_in_unique)
ratioTransferOut = udf(lambda transfer_out, transfer_out_unique : transfer_out / transfer_out_unique)

In [65]:
# Quarter code example

def get_df_with_quarter_code(data, col_name, agg_func, agg_col):
    if agg_func == "sum":
        return data.groupby("ar_id").agg({agg_col : "sum"})\
                .withColumnRenamed(agg_func + '(' + agg_col + ')', "all" + col_name[0].upper() + col_name[1:])\
                .join(data.groupby("ar_id").pivot("quarter_code").sum(agg_col)\
                .toDF('ar_id', col_name + 'Q1', col_name + 'Q2', col_name + 'Q3', col_name + 'Q4'), "ar_id")
    else :
        return data.groupby("ar_id").agg({agg_col : "count"})\
                .withColumnRenamed("count(1)", "all" + col_name[0].upper() + col_name[1:])\
                .join(data.groupby("ar_id").pivot("quarter_code").count()\
                .toDF('ar_id', col_name + 'Q1', col_name + 'Q2', col_name + 'Q3', col_name + 'Q4'), "ar_id")

data = df.filter("opm_tp_cd = 'CR' and txn_cd = 2")

# data.groupby("ar_id").agg({"txn_amt" : "sum"})\
# .withColumnRenamed("sum(txn_amt)", "allDepositAmount").show()

# data.groupby(["ar_id", "quarter_code"]).agg({"txn_amt" : "sum"})\
# .withColumnRenamed("sum(txn_amt)", "depositAmount").show()

# data.groupby("ar_id").pivot("quarter_code").sum("txn_amt")\
# .toDF('ar_id', 'depositAmountQ1', 'depositAmountQ2', 'depositAmountQ3', 'depositAmountQ4').show()

get_df_with_quarter_code(data, "depositAmount", "sum", "txn_amt").show()

data.groupby("ar_id").agg({"txn_amt" : "sum"})\
.withColumnRenamed("sum(txn_amt)", "allDepositAmount")\
.join(data.groupby("ar_id").pivot("quarter_code").sum("txn_amt")\
.toDF('ar_id', 'depositAmountQ1', 'depositAmountQ2', 'depositAmountQ3', 'depositAmountQ4'), "ar_id").show()

+-----+----------------+---------------+---------------+---------------+---------------+
|ar_id|allDepositAmount|depositAmountQ1|depositAmountQ2|depositAmountQ3|depositAmountQ4|
+-----+----------------+---------------+---------------+---------------+---------------+
|    A|            2190|           1760|            210|            200|             20|
+-----+----------------+---------------+---------------+---------------+---------------+

+-----+----------------+---------------+---------------+---------------+---------------+
|ar_id|allDepositAmount|depositAmountQ1|depositAmountQ2|depositAmountQ3|depositAmountQ4|
+-----+----------------+---------------+---------------+---------------+---------------+
|    A|            2190|           1760|            210|            200|             20|
+-----+----------------+---------------+---------------+---------------+---------------+



In [72]:
# Period code example

def get_df_with_period_code(data, col_name, agg_func, agg_col):
    if agg_func == "sum":
        return data.groupby("ar_id").agg({agg_col : "sum"})\
                .withColumnRenamed(agg_func + '(' + agg_col + ')', "all" + col_name[0].upper() + col_name[1:])\
                .join(data.groupby("ar_id").pivot("period_code").sum(agg_col)\
                .toDF('ar_id', col_name + 'P1', col_name + 'P2', col_name + 'P3', col_name + 'P4'), "ar_id")
    else :
        return data.groupby("ar_id").agg({agg_col : "count"})\
                .withColumnRenamed("count(1)", "all" + col_name[0].upper() + col_name[1:])\
                .join(data.groupby("ar_id").pivot("period_code").count()\
                .toDF('ar_id', col_name + 'P1', col_name + 'P2', col_name + 'P3', col_name + 'P4'), "ar_id")

data = df.filter("opm_tp_cd = 'CR' and txn_cd = 2")

# data.groupby("ar_id").agg({"*" : "count"})\
# .withColumnRenamed("count(1)", "allNoDeposit").show()

# data.groupby("ar_id", "period_code").agg({"*" : "count"})\
# .withColumnRenamed("count(1)", "noDeposit").show()

# data.groupby("ar_id").pivot("period_code").count()\
# .toDF('ar_id', 'noDepositQ1', 'noDepositQ2', 'noDepositQ3', 'noDepositQ4').show()

get_df_with_period_code(data, "noDeposit", "count", "*").show()

data.groupby("ar_id").agg({"*" : "count"})\
.withColumnRenamed("count(1)", "allNoDeposit")\
.join(data.groupby("ar_id").pivot("period_code").count()\
.toDF('ar_id', 'noDepositP1', 'noDepositP2', 'noDepositP3', 'noDepositP4'), "ar_id").show()

+-----+------------+-----------+-----------+-----------+-----------+
|ar_id|allNoDeposit|noDepositP1|noDepositP2|noDepositP3|noDepositP4|
+-----+------------+-----------+-----------+-----------+-----------+
|    A|          39|          9|         10|         10|         10|
+-----+------------+-----------+-----------+-----------+-----------+

+-----+------------+-----------+-----------+-----------+-----------+
|ar_id|allNoDeposit|noDepositP1|noDepositP2|noDepositP3|noDepositP4|
+-----+------------+-----------+-----------+-----------+-----------+
|    A|          39|          9|         10|         10|         10|
+-----+------------+-----------+-----------+-----------+-----------+



In [None]:
# Monthly example
## Find oldest & youngest Datetime object
from pyspark.sql.functions import from_unixtime, unix_timestamp, min, max

str_to_date = udf(lambda date_str : datetime.datetime.strptime(date_str[:-12], "%Y-%m"), DateType()) # ignore time & date
get_user_number_of_month = udf(lambda oldest, youngest : (youngest.year - oldest.year) * 12 + (youngest.month - oldest.month) + 1, IntegerType())


def get_user_number_of_month(user_df):
user_df = df.select("ar_id", "date").withColumn("unix_date", unix_timestamp("date")).groupby("ar_id").agg(
            from_unixtime(min("unix_date")).alias("min_date"), 
            from_unixtime(max("unix_date")).alias("max_date"))

# user_df.show()

user_df = user_df.withColumn("oldest_month", str_to_date(user_df["min_date"]))
user_df = user_df.withColumn("youngest_month", str_to_date(user_df["max_date"]))
user_df = user_df.withColumn("user_number_of_month", get_user_number_of_month(user_df["oldest_month"], user_df["youngest_month"]))

# user_df.show()

return user_df

In [53]:
type(date_df.select("min_date").collect()[0].min_date)

str

In [17]:
# Join section

user_data.join(depositAmount(), "ar_id", "left_outer").show()

+-----+-------------+
|ar_id|depositAmount|
+-----+-------------+
|  13c|           10|
|  11a|          100|
|  12b|       205000|
+-----+-------------+

+-----+-------------+
|ar_id|depositAmount|
+-----+-------------+
|  13c|           10|
|  11a|          100|
|  12b|       205000|
|  16d|         null|
|  15c|         null|
|  14d|         null|
|  13c|           10|
|  11b|         null|
+-----+-------------+

