In [0]:
from pyspark.sql.functions import col, mean, stddev, when, to_timestamp
import pyspark.sql.functions as F

#load data
spark.conf.set(
  "fs.azure.account.key.storageaccount9613.dfs.core.windows.net",
  "storage-account-key"
)
data_path = "abfss://fraud-detection-container@storageaccount9613.dfs.core.windows.net/fraud_data"
df = spark.read.format("delta").load(data_path)

#calculate the mean and standard deviation of the transactions
statistics = df.select(mean("amount").alias("mean"), stddev("amount").alias("stddev")).collect()[0]
mean_amount = statistics['mean']
standard_deviation = statistics['stddev']

print(f"Mean: {mean_amount}, Standard Deviation: {standard_deviation}")

Mean: 350.2966773162939, Standard Deviation: 1158.5110815003147


In [0]:
#use z-score to do the anomaly detection - if transactions have |z_score| > 3 will be flagged as anomalies
zscores_df = df.withColumn("z_score", (col("amount") - mean_amount)/standard_deviation)

#flag anomalies
zscores_df = zscores_df.withColumn(
    "anomaly_flag",
    when(F.abs(col("z_score")) > 3, 1).otherwise(0)
)

#sanity check
zscores_df.select("transaction_id", "amount", "location", "z_score", "anomaly_flag").show(10)

+--------------+------+--------+--------------------+------------+
|transaction_id|amount|location|             z_score|anomaly_flag|
+--------------+------+--------+--------------------+------------+
|    txn_758552|191.29|      MA| -0.1372508902637119|           0|
|    txn_218333|243.18|      CT| -0.0924606410994134|           0|
|    txn_230549|  30.5|      NJ|-0.27604110346717214|           0|
|    txn_461771| 67.71|      NY|-0.24392229114488373|           0|
|    txn_200241| 68.04|      NJ|-0.24363744276900745|           0|
|    txn_467565| 78.13|      NJ|-0.23492798788236705|           0|
|    txn_507949|102.33|      OH|-0.21403910698477555|           1|
|    txn_950740| 67.45|      MA|-0.24414671713799835|           0|
|    txn_609614| 227.8|      NJ|-0.10573630176903974|           0|
|    txn_230062|249.76|      NJ|-0.08678093711982035|           0|
+--------------+------+--------+--------------------+------------+
only showing top 10 rows



In [0]:
#save outputs when using static data
output_path = "abfss://fraud-detection-container@storageaccount9613.dfs.core.windows.net/anomaly_data"
zscores_df.write.format("delta").mode("overwrite").save(output_path)  

In [0]:
#save outputs when streaming data
output_path = "abfss://fraud-detection-container@storageaccount9613.dfs.core.windows.net/anomaly_data"
checkpoint_path = "abfss://fraud-detection-container@storageaccount9613.dfs.core.windows.net/checkpoints/anomaly"

query = (
    zscores_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", checkpoint_path)
    .start(output_path)
)

# Wait until stream is terminated (or stop after debug)
query.awaitTermination()


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-8975461228797872>, line 6[0m
[1;32m      2[0m output_path [38;5;241m=[39m [38;5;124m"[39m[38;5;124mabfss://fraud-detection-container@storageaccount9613.dfs.core.windows.net/anomaly_data[39m[38;5;124m"[39m
[1;32m      3[0m checkpoint_path [38;5;241m=[39m [38;5;124m"[39m[38;5;124mabfss://fraud-detection-container@storageaccount9613.dfs.core.windows.net/checkpoints/anomaly[39m[38;5;124m"[39m
[1;32m      5[0m query [38;5;241m=[39m (
[0;32m----> 6[0m     zscores_df[38;5;241m.[39mwriteStream
[1;32m      7[0m     [38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)
[1;32m      8[0m     [38;5;241m.[39moutputMode([38;5;124m"[39m[38;5;124mappend[39m[38;5;124m"[39m)
[1;32m      9[0m     [38;5;241m.[39moption([38;5;124m"[39m[38;5;1