# Data Prep - Credit Card Transactions
This notebook documents my data processing steps done in `PySpark` in order to prepare the [Synthetic Credit Card Transactions](https://data.world/ealtman/synthetic-credit-card-transactions) dataset into a more appropriate format for modeling and data analysis.

In [1]:
# installing dependencies:
!pip install -r ../configs/dependencies/dataprep_requirements.txt >> ../configs/dependencies/package_installation.txt

In [2]:
%load_ext lab_black
%load_ext autoreload
%autoreload 2

In [3]:
###### Loading the necessary libraries #########

# PySpark dependencies:s
import pyspark
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
import pyspark.sql.types as T
from pyspark.sql.window import Window

# pandas
import pandas as pd
from tqdm import tqdm

# other relevant libraries:
import warnings
from datetime import datetime, timedelta
import re
import itertools
import os
import shutil
import json
from glob import glob

# trust me, this will make a lot of sense very soon
import emoji

# setting global parameters for visualizations:
warnings.filterwarnings("ignore")
pd.set_option("display.precision", 4)
pd.set_option("display.float_format", lambda x: "%.2f" % x)

# 0. Configuring Spark

In [4]:
# loading the configurations needed for Spark
def init_spark(app_name):

    spark = (
        SparkSession.builder.appName(app_name)
        .config("spark.files.overwrite", "true")
        .config("spark.sql.repl.eagerEval.enabled", True)
        .config("spark.sql.repl.eagerEval.maxNumRows", 5)
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY")
        .config("spark.sql.parquet.compression.codec", "gzip")
        .enableHiveSupport()
        .getOrCreate()
    )

    return spark


# init the spark session:
spark = init_spark("Credit Card Transactions")

In [5]:
# verifying the session
spark

# 1. Helper Functions

In [6]:
def save_to_filesystem(df, target_path, parquet_path, filename):
    """Helper function to save pyspark dataframes as parquets in a way that is similar to writing to local files

    Args:
        df (pyspark.sql.dataframe.DataFrame): dataframe to be saved
        target_path (str): path that will store the file
        filename (str): name of the resulting file

    Returns:
        None
    """
    PARQUET_FILE = f"{target_path}/{parquet_path}"
    OUTPUT_FILE = f"{target_path}/{filename}"

    if os.path.exists(PARQUET_FILE):
        shutil.rmtree(
            PARQUET_FILE
        )  # if the directory already exists, remove it (throws error if not)

    # saves the dataframe:
    df.coalesce(1).write.save(PARQUET_FILE)

    # retrieves file resulting from the saving procedure:
    original_file = glob(f"{PARQUET_FILE}/*.parquet")[0]

    # renames the resulting file and saves it to the target directory:
    os.rename(original_file, OUTPUT_FILE)

    shutil.rmtree(PARQUET_FILE)

    return True


def apply_category_map(category_map):
    """Helper function to convert strings given a map

    Note:
        This function uses the function generator scheme, much like the PySpark code

    Args:
        original_category (str): the original category name
        category_map (dict): the hash table or dictionary for converting the values:

    Returns:
        new_category (str): the resulting category

    """

    def func(row):
        try:
            result = category_map[row]
        except:
            result = None
        return result

    return F.udf(func)


def get_datetime_features(df, time_col):
    """Function to extract time-based features from pyspark dataframes

    Args:
        df (pyspark.sql.dataframe.DataFrame): the original dataframe that needs to be enriched
        time_col (str): the string name of the column containing the date object

    Returns:
        df (pyspark.sql.dataframe.DataFrame): resulting pyspark dataframe with the added features
            -> See list of attribute the source code for the attributes

    """

    # applying date-related functions:

    # day-level attributes:
    df = df.withColumn("day_of_week", F.dayofweek(F.col(time_col)))

    df = df.withColumn("day_of_month", F.dayofmonth(F.col(time_col)))

    df = df.withColumn("day_of_year", F.dayofyear(F.col(time_col)))

    # week-level attributes:
    df = df.withColumn("week_of_year", F.weekofyear(F.col(time_col)))

    # month-level attributes:
    df = df.withColumn("month", F.month(F.col(time_col)))

    df = df.withColumn("quarter", F.quarter(F.col(time_col)))

    # year-level attributes:
    df = df.withColumn("year", F.year(F.col(time_col)))

    return df


@udf(T.StringType())
def remove_duplicate_elements(sequence):
    """Helper UDF to remove consecutive duplicate elements in MCC sequences"""
    return "".join(dict.fromkeys(sequence))

# 2. Loading and Inspecting the Data

In [7]:
# loading the raw dataset:
RAW_DATA_DIR = "../data/raw/"

# readng the training set raw data:
df_transactions = spark.read.csv(RAW_DATA_DIR + "card_transactions.csv", header=True)

In [8]:
# counting the number of records:
df_transactions.count()  # thats a lot of records!

24386900

In [9]:
# verifying a sample of the dataset:
df_transactions

User,Card,Year,Month,Day,Time,Amount,Use Chip,Merchant Name,Merchant City,Merchant State,Zip,MCC,Errors?,Is Fraud?
0,0,2002,9,1,06:21,$134.09,Swipe Transaction,3527213246127876953,La Verne,CA,91750.0,5300,,No
0,0,2002,9,1,06:42,$38.48,Swipe Transaction,-727612092139916043,Monterey Park,CA,91754.0,5411,,No
0,0,2002,9,2,06:22,$120.34,Swipe Transaction,-727612092139916043,Monterey Park,CA,91754.0,5411,,No
0,0,2002,9,2,17:45,$128.95,Swipe Transaction,3414527459579106770,Monterey Park,CA,91754.0,5651,,No
0,0,2002,9,3,06:23,$104.71,Swipe Transaction,5817218446178736267,La Verne,CA,91750.0,5912,,No


In [10]:
# looking at the schema of the dataset:
df_transactions.printSchema()

root
 |-- User: string (nullable = true)
 |-- Card: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- Use Chip: string (nullable = true)
 |-- Merchant Name: string (nullable = true)
 |-- Merchant City: string (nullable = true)
 |-- Merchant State: string (nullable = true)
 |-- Zip: string (nullable = true)
 |-- MCC: string (nullable = true)
 |-- Errors?: string (nullable = true)
 |-- Is Fraud?: string (nullable = true)



From the dataset, we can point out a few necessary steps for data preparation:
1. The entire dataset consists of `string` types, which, by the nature of the dataset, should not be the case. We need to convert the data into appropriate types.
2. The date-related attributes are split into their parts (`Year`, `Month`, ...). It is best that we store it into a single date column instead to allow for easier sorting of the dataframe. 
3. The `Amount` column is represented with the literal numerical value, which contains the dolar sign and other artifacts. This also needs to be fixed such that it can be represented as a `double` type.
4. We can also normalize the column names for easier manipulation.

In [11]:
# normalizing the column names:
for col in df_transactions.columns:
    df_transactions = df_transactions.withColumnRenamed(
        col, col.lower().strip().replace(" ", "_").replace("?", "")
    )

In [12]:
# columns normalized become:
for col in df_transactions.columns:
    print(col)

user
card
year
month
day
time
amount
use_chip
merchant_name
merchant_city
merchant_state
zip
mcc
errors
is_fraud


# 3. Converting Data Types

## 3.1 Amount

In [13]:
# we can remove the first digit and the convert to a double that should work for fixing this column
df_transactions = df_transactions.withColumn(
    "amount", F.expr("substring(amount, 2, length(amount))").cast("double")
)

## 3.2 Year, month, day, time
To improve the date-based columns, we have to do a few things first:

1. Pad the digits to strings of length 2 when appropriate;
2. Concatenate the strings to a single value;
3. Convert the value to a `timestamp` object;

In [14]:
# padding the month and day columns:
df_transactions = df_transactions.withColumn("month", F.lpad(F.col("month"), 2, "0"))

df_transactions = df_transactions.withColumn("day", F.lpad(F.col("day"), 2, "0"))

In [15]:
# padding the right side of the time column
df_transactions = df_transactions.withColumn("time", F.rpad(F.col("time"), 8, ":00"))

In [16]:
# concatenating the columns:
df_transactions = df_transactions.withColumn(
    "timestamp",
    F.concat(
        F.col("year"),
        F.lit("-"),
        F.col("month"),
        F.lit("-"),
        F.col("day"),
        F.lit(" "),
        F.col("time"),
    ),
)

df_transactions = df_transactions.withColumn(
    "timestamp", F.to_timestamp(F.col("timestamp"))
)

## 3.3 Fraud Indicator
We will convert this column to a boolean as it is essentially a binary indicator for fraud. 

In [17]:
df_transactions = df_transactions.withColumn(
    "is_fraud", F.when(F.col("is_fraud") == "Yes", True).otherwise(False)
)

## 3.4 Zip codes
We will fix the zip codes by setting them to integers and then padding zeros to the left when appropriate.

In [18]:
# converting zip codes to integers:
df_transactions = df_transactions.withColumn(
    "zip", F.col("zip").cast("integer")
).withColumn("zip", F.col("zip").cast("string"))

In [19]:
# left-padding the zipcodes
df_transactions = df_transactions.withColumn("zip", F.lpad(F.col("zip"), 5, "0"))

# 4. Preparing the Preprocessed output
For further use of the dataset, I will drop the columns I won't be needing for the remainder of the project.

In [20]:
df_transactions = df_transactions.select(
    F.col("user").alias("user_id"),
    F.col("card").alias("card_id"),
    F.col("timestamp").alias("transaction_timestamp"),
    F.col("amount").alias("transaction_amount"),
    F.col("use_chip").alias("transaction_type"),
    F.col("mcc").alias("merchant_category_code"),
    F.col("merchant_name"),
    F.col("merchant_city"),
    F.col("merchant_state"),
    F.col("zip").alias("zip_code"),
    F.col("is_fraud"),
)

In [21]:
# reading the lookup table defined:
with open(RAW_DATA_DIR + "mcc_lookup.json", "r") as file:
    mcc_list = json.load(file)

lookup = {}
for item in mcc_list:
    mcc = item["mcc"]
    desc = item["description"]

    lookup[mcc] = desc

In [22]:
# let's apply a hash map to the mcc column given the description set we have defined above
df_transactions = df_transactions.withColumn(
    "merchant_category_description",
    apply_category_map(lookup)(F.col("merchant_category_code")),
)

In [8]:
# saving the intermediary preprocessed output:
PROCESSED_DATA_DIR = "../data/processed/"

save_to_filesystem(
    df_transactions, PROCESSED_DATA_DIR, "tb_transactions", "tb_transactions.parquet"
)

# 5. Converting the Dataset into Timeline Format
For this project, I will study the sequence of transactions an user makes with their credit card. With that, we have interest in making this dataset work in a sequential matter. This can be accomplished by writing out the sequence based on the `transaction_timestamp`, with a minimum unit of a `day`. 

In [23]:
# defining a few attributes about the time of the day
df_timeline = df_transactions.withColumn(
    "time_of_day",
    F.when((F.hour(F.col("transaction_timestamp")) < 6), "Early Morning")
    .when(
        (F.hour(F.col("transaction_timestamp")) >= 6)
        & (F.hour(F.col("transaction_timestamp")) < 13),
        "Morning",
    )
    .when(
        (F.hour(F.col("transaction_timestamp")) >= 13)
        & (F.hour(F.col("transaction_timestamp")) < 18),
        "Afternoon",
    )
    .otherwise("Night"),
)

In [24]:
# adding a helper column to the sequence:
df_timeline = df_timeline.withColumn(
    "transaction_date", F.to_date(F.col("transaction_timestamp"))
)


timeline = Window.partitionBy("user_id", "transaction_date").orderBy(
    "transaction_timestamp"
)

In [25]:
# generating the sequence of events:
df_timeline = df_timeline.withColumn("transaction_order", F.row_number().over(timeline))

# generating the next element for the sequence as a helper column
df_timeline = df_timeline.withColumn(
    "timestamp_last_transaction",
    F.lag(F.col("transaction_timestamp"), 1).over(timeline),
)

# adding a boolean handle for the first transaction in the dataset
df_timeline = df_timeline.withColumn(
    "is_first_transaction_of_day", (F.col("timestamp_last_transaction").isNull())
)

# generating the time between events:
df_timeline = df_timeline.withColumn(
    "time_between_purchases_in_seconds",
    F.when(F.col("timestamp_last_transaction").isNull(), None).otherwise(
        F.floor(
            (
                F.col("transaction_timestamp").cast("long")
                - F.col("timestamp_last_transaction").cast("long")
            )
        )
    ),
)

In [26]:
# adding time-based features about the day:
df_timeline = get_datetime_features(df_timeline, "transaction_date")

In [27]:
# dropping helper columns:
df_timeline = df_timeline.drop(
    "timestamp_last_transaction",
)

In [28]:
# verifying the results:
df_timeline

user_id,card_id,transaction_timestamp,transaction_amount,transaction_type,merchant_category_code,merchant_name,merchant_city,merchant_state,zip_code,is_fraud,merchant_category_description,time_of_day,transaction_date,transaction_order,is_first_transaction_of_day,time_between_purchases_in_seconds,day_of_week,day_of_month,day_of_year,week_of_year,month,quarter,year
0,0,2002-09-07 06:16:00,117.05,Swipe Transaction,5411,-727612092139916043,Monterey Park,CA,91754,False,Groceries and sup...,Morning,2002-09-07,1,True,,7,7,250,36,9,3,2002
0,0,2002-09-07 06:34:00,45.3,Swipe Transaction,5942,-5475680618560174533,Monterey Park,CA,91755,False,Bookshops,Morning,2002-09-07,2,False,1080.0,7,7,250,36,9,3,2002
0,0,2002-09-07 09:39:00,29.34,Swipe Transaction,7538,4055257078481058705,La Verne,CA,91750,False,Automotive servic...,Morning,2002-09-07,3,False,11100.0,7,7,250,36,9,3,2002
0,0,2002-11-03 05:36:00,150.08,Swipe Transaction,5300,3527213246127876953,La Verne,CA,91750,False,Wholesale clubs,Early Morning,2002-11-03,1,True,,1,3,307,44,11,4,2002
0,0,2002-11-03 06:05:00,180.58,Swipe Transaction,5912,5817218446178736267,La Verne,CA,91750,False,Drug stores and p...,Morning,2002-11-03,2,False,1740.0,1,3,307,44,11,4,2002


In [35]:
PROCESSED_DATA_DIR = "../data/processed/"

save_to_filesystem(
    df_timeline, PROCESSED_DATA_DIR, "tb_timeline", "tb_timeline.parquet"
)

True

# 6. Representing Sequences of transactions
We have many ways of representing the sequences of transactions a customer makes. We can consider then by day, for example. We could also consider them at an user level (that is, the all time sequence related to a single customer). 

I will generate datasets for each of these representations, but also generate one that is a compound feature of both cases. I will first gather sequences by them, only to gather them by user. These three structures might help us uncover different kinds of patterns in further analysis.

In [9]:
# reading the timeline from cache:
PROCESSED_DATA_DIR = "../data/processed/"

df_timeline = spark.read.parquet(PROCESSED_DATA_DIR + "tb_timeline.parquet")

In [10]:
# generating the representations or the order at user, date level
df_seq = df_timeline.groupby("user_id", "transaction_date").agg(
    F.collect_list("merchant_category_code")
)

In [11]:
# the sequence can be verified as follows:
df_seq

user_id,transaction_date,collect_list(merchant_category_code)
0,2002-09-07,"[5411, 5942, 7538]"
0,2002-11-03,"[5300, 5912]"
0,2004-11-10,[5300]
0,2005-06-17,"[5912, 5411, 5651..."
0,2005-08-20,"[4829, 5912, 5815..."


The MCCs, themselves, however, are not easily interpretable. They require the understanding of the **ISO 18245** specification (which by itself is not that easy to interpret sometimes). The codes effectively represent *"ideas"* or *"situations"* regarding the purchase the client made. It encapsulates both a kind of merchant and what the merchant is allowed to sell (ISO 18245 is used to regulate what fees merchants would pay to credit card processors as part of the transactions, for example).

What we can do here is to convert these series of *situations* into something more meaningful and easier to interpret both by humans and possibly by any algorithms we use to extract informatio about them. This is quite similar to the way `ideograms` work and an easily interpretable way to convert text or code into an ideogram is to use **emojis**.

Emojis are both unique **unicode-complaint** characters, but also can be interpreted visually. It can be used as a surrogate representation of the merchant category codes and allow us to apply *standard text processing techniques*.

In [30]:
# let's load the mcc to emoji hash table and mcc to description:
with open(RAW_DATA_DIR + "mcc_encoding.json", "r") as file:
    mcc_encoding = json.load(file)

with open(RAW_DATA_DIR + "mcc_description.json", "r") as file:
    mcc_description = json.load(file)

In [31]:
# retrieving the dictionary of emojis:
emojis = emoji.EMOJI_ALIAS_UNICODE_ENGLISH

# doing some preprocessing to the lookup:
emojis_clean = {key.lower(): val for key, val in emojis.items()}

In [32]:
# let's generate the mapping in terms of emojis:
mcc_emoji = {}

for mcc, shortcut in mcc_encoding.items():
    try:
        mcc_emoji[mcc] = emojis_clean[shortcut]
    except Exception as e:
        print(e)

In [16]:
# visualizing a few of the encodings:
for mcc, emoji in list(mcc_emoji.items())[:10]:
    print(f"{mcc} -> {emoji}")

1711 -> 🔨
3000 -> ✈
3001 -> ✈
3005 -> ✈
3006 -> ✈
3007 -> ✈
3008 -> ✈
3009 -> ✈
3058 -> ✈
3066 -> ✈


In [33]:
# applying the mapping to the timeline dataframe:
df_timeline = df_timeline.withColumn(
    "merchant_category_encoding",
    apply_category_map(mcc_emoji)(F.col("merchant_category_code")),
)

df_timeline = df_timeline.withColumn(
    "merchant_category_description",
    apply_category_map(mcc_description)(F.col("merchant_category_code")),
)

In [18]:
# we can then visualize some of these ideas represented by the emoji's relation to the MCCs:
df_timeline.select(
    "merchant_category_code",
    "merchant_category_description",
    "merchant_category_encoding",
).distinct()

merchant_category_code,merchant_category_description,merchant_category_encoding
4214,Motor freight car...,🚚
5192,"Books, periodical...",📚
7393,Detective agencie...,👮
7549,Towing services,🚗
3780,"Lodging — hotels,...",🏨


In [19]:
# putting the results into perspective for a single customer:
df_timeline.select(
    "user_id",
    "merchant_category_code",
    "transaction_amount",
    "merchant_category_encoding",
)

user_id,merchant_category_code,transaction_amount,merchant_category_encoding
0,5411,117.05,🛒
0,5942,45.3,📚
0,7538,29.34,🚗
0,5300,150.08,🏬
0,5912,180.58,💊


# 7. Building Sequence Datasets
In order to appropriately analyze the customer transactions from a sequence perspective, we will need to generate a few different views of the timeline dataset. These are:

1. The entire transactional history for each customer;
2. The daily transactional history for each customer;
3. The entire transactional history for each customer but maintaining the scope of daily sequences;

## 7.1 User Transactional Sequence

## 7.1 User Daily Transactional Sequence

In [20]:
# user level representation:
df_daily_history = df_timeline.groupby("user_id", "transaction_date").agg(
    F.concat_ws("", F.collect_list("merchant_category_encoding")).alias("sequence")
)

In [21]:
# verifying the results:
df_daily_history

user_id,transaction_date,sequence
0,2002-09-07,🛒📚🚗
0,2002-11-03,🏬💊
0,2004-11-10,🏬
0,2005-06-17,💊🛒👕💊
0,2005-08-20,💰💊📚📱


Given the nature of the dataset (being synthetic), it is prone to some errors or some types of biases. One of the problems this specific dataset has is to duplicate sequential purchases of the same MCC. This is unlikely to happen in the real world, so we need to perform some data cleaning on those cases. For our purposes I will remove the duplicate entries that happen sequentially from the sequences, using the `remove_duplicate_elements` helper UDF.

In [22]:
# applying the cleaning function to the sequence column:
df_daily_history = df_daily_history.withColumn(
    "sequence", remove_duplicate_elements(F.col("sequence"))
)

In [38]:
save_to_filesystem(
    df_daily_history,
    PROCESSED_DATA_DIR,
    "tb_user_daily_sequence",
    "tb_user_daily_sequence.parquet",
)

True

## 7.2 User Transactional History

In [27]:
# now let's combine both the daily view and the
df_user_history = df_timeline.groupby("user_id").agg(
    F.concat_ws(",", F.collect_list("merchant_category_encoding")).alias("sequence")
)

In [28]:
# intermediary results become:
df_user_history

user_id,sequence
1090,"⛽,🏬,💰,🛒,🛒,💰,..."
1159,"📱,🛒,🍽,🪑,💐,🍽..."
1436,"🍽,💊,🏬,🏬,🏨,🏬..."
1512,"🏬,💊,🏠,🧾,🧾,🛒..."
1572,"🛒,🛒,🛒,🍽,🛒,🌯..."


In [29]:
save_to_filesystem(
    df_user_history,
    PROCESSED_DATA_DIR,
    "tb_user_history",
    "tb_user_history.parquet",
)

True

## 7.3 Tokenized User Transacional History

In [45]:
# grouping the daily sequences into user sequences by separator:
df_tokens = df_daily_history.groupby("user_id").agg(
    F.concat_ws(",", F.collect_list(F.col("sequence"))).alias("sequence")
)

In [46]:
# the final results can be illustrated as:
df_tokens.filter(F.col("user_id") == "0")

user_id,sequence
0,"🛒📚🚗,🏬💊,🏬,💊..."


In [47]:
save_to_filesystem(
    df_tokens,
    PROCESSED_DATA_DIR,
    "tb_user_token_history",
    "tb_user_token_history.parquet",
)

True