In [None]:
import date
from awsglue.context import GlueContext
from awsglue.transforms import *
from pyspark.context import SparkContext
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

glueContext = GlueContext(SparkContext.getOrCreate())

In [None]:
S3_BUCKET = "aml-project-storage"
DATABASE = "historical_data"
TABLE_NAME = "original"

FOLDER = str(date.today()).replace("-", "")

In [None]:
# Create dataframe
df = glueContext.create_dynamic_frame.from_catalog(
    database=DATABASE, table_name=TABLE_NAME
)

In [None]:
# Drop and rename columns
transactions = (
    df.drop_fields(["step", "nameorig", "isflaggedfraud"])
    .rename_field("oldbalanceorg", "balance_source_old")
    .rename_field("newbalanceorig", "balance_source_new")
    .rename_field("oldbalancedest", "balance_dest_old")
    .rename_field("newbalancedest", "balance_dest_new")
    .rename_field("isfraud", "is_fraud")
)

In [None]:
# One-hot encode transaction types
def encode_type(record):
    if record.type == "CASH_IN":
        record.is_cash_in = 1

    elif record.type == "CASH_OUT":
        record.is_cash_out = 1

    elif record.type == "DEBIT":
        record.is_cash_debit = 1

    elif record.type == "PAYMENT":
        record.is_payment = 1

    else:
        record.is_transfer = 1
    return record


transactions = transactions.map(encode_type)

In [None]:
transactions_df = transactions.toDF()
# Fill NA/Null values
transactions_df = transactions_df.na.fill(0)

# Encode destination
encode_merchant = udf(lambda x: 1 if x[0] == "M" else 0, IntegerType())
transactions_df = transactions_df.withColumn(
    "is_merchant_dest", encode_merchant(transactions_df["namedest"])
)

# Create percentage_amount_source column
calculate_balance_percentage = udf(
    lambda amount, old_balance: amount / old_balance if old_balance != 0 else 0
)
transactions_df = transactions_df.withColumn(
    "percentage_amount_source",
    calculate_balance_percentage(
        transactions_df["amount"], transactions_df["balance_source_old"]
    ),
)

transactions_df = transactions_df.drop("namedest", "type")

In [None]:
# Re-order columns - target must be first for XGBoost training
columns = transactions_df.columns
columns = sorted(columns)
columns.remove("is_fraud")
transactions_df = transactions_df.select("is_fraud", *columns)

In [None]:
# Split data
training_df, test_df = transactions_df.randomSplit([0.8, 0.2])

In [None]:
# Save the files for training and testing the model
training_df.write.mode("overwrite").csv(f"s3a://{S3_BUCKET}/train/{FOLDER}/")
test_df.write.mode("overwrite").csv(f"s3a://{S3_BUCKET}/test/{FOLDER}/")