# Loading data

In [None]:
from pyspark.sql import SparkSession
session = SparkSession.builder.getOrCreate()

df = session.read.parquet("fraud-all.parquet")

## Cleaning data

These data are mostly clean but we need to add a new field for transaction interarrival time.

In [None]:
df.printSchema()

In [None]:
import pyspark.sql.window as W
import pyspark.sql.functions as F

interarrival_spec = W.Window.partitionBy("user_id").orderBy("timestamp")

df_interarrival = df.withColumn(
    "previous_timestamp", 
    F.lag(df["timestamp"]).over(
        interarrival_spec
    )
).withColumn(
    "interarrival",
    (F.col("timestamp") - F.col("previous_timestamp")).cast("int")
)

In [None]:
%%time
df_interarrival.show(100)

In [None]:
# never computed; an option for comparison

df_dist_unused = df_interarrival.\
    withColumn("amount_quantile",
        F.cume_dist().over(
            W.Window.partitionBy("user_id").orderBy("amount")
        )
    )

In [None]:
session.conf.set("spark.rapids.sql.castFloatToIntegralTypes.enabled", True)

windowSpec = \
    W.Window.partitionBy("user_id").orderBy(
        (F.col("amount") * 100).cast("int")
    )

# not identical to cume_dist; this rank is the fraction of 
# transactions that are strictly less than the current row

df_dist = df_interarrival.\
    withColumn("amount_rank",
        (F.rank().over(windowSpec) / 
         F.count("user_id").over(windowSpec)).cast("float")
    )

In [None]:
df_dist.printSchema()

In [None]:
df_out = df_dist.drop(
    "previous_timestamp"
).withColumn(
    "amount", 
    F.col("amount").cast("float")
).withColumn(
    "user_id", 
    F.col("user_id").cast("int")
).withColumn(
    "merchant_id", 
    F.col("merchant_id").cast("int")
)

In [None]:
import time
output_time = int(time.time())

df_out.write.parquet(f"fraud-cleaned-{output_time}.parquet")

In [None]:
df_out.sample(fraction=0.05).write.parquet(f"fraud-cleaned-{output_time}-sample.parquet")