In [None]:
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "Access_key")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "Secret_key")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")

In [None]:
transactions_df = spark.read.option("header", True).csv("s3a://databricksbucket79/stream_input/")
importance_df = spark.read.option("header", True).csv("s3a://databricksbucket79/CustomerImportance.csv")
importance_df = importance_df.withColumnRenamed("Source", "customer")
importance_df.show(5)

PATD1

from pyspark.sql.functions import col

transactions_df = transactions_df.withColumn("amount", col("amount").cast("float"))
importance_df = importance_df.withColumn("weight", col("weight").cast("float"))

In [None]:
from pyspark.sql.functions import count

merchant_txn_counts = transactions_df.groupBy("merchant").count().filter("count > 50000")
txn_counts = merchant_txn_counts.groupBy("merchant", "customer").count()

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, percent_rank

window_spec = Window.partitionBy("merchant").orderBy(col("count").desc())

# Add percent_rank to each customer
ranked_customers = txn_counts.withColumn("percentile", percent_rank().over(window_spec))
#ranked_customers.show()
top_1_percent_customers = ranked_customers.filter(col("percentile") <= 0.01)
#top_1_percent_customers.show()

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, percent_rank

window_spec = Window.partitionBy("merchant").orderBy(col("count").desc())

# Add percent_rank to each customer
ranked_customers = txn_counts.withColumn("percentile", percent_rank().over(window_spec))
#ranked_customers.show()
top_1_percent_customers = ranked_customers.filter(col("percentile") <= 0.01)
#top_1_percent_customers.show()

In [None]:
# [0.01] = 1st percentile
# 0.0 = acceptable relative error
percentile_value = importance_df.approxQuantile("weight", [0.01], 0.0)[0]

#print(f"1st percentile of weight = {percentile_value}")
bottom_1_percent_weight_df = importance_df.filter(col("weight") <= percentile_value)
bottom_1_percent_weight_df.show()

In [None]:
final_upgrade = bottom_1_percent_weight_df.join(top_1_percent_customers, on="customer")


In [None]:
from pyspark.sql.functions import current_timestamp, lit

upgrade_detections = final_upgrade.select(
    current_timestamp().alias("YStartTime"),
    current_timestamp().alias("detectionTime"),
    lit("PatId1").alias("patternId"),
    lit("UPGRADE").alias("ActionType"),
    col("customer"),
    col("merchant")
)

In [None]:
from pyspark.sql.functions import floor, col
from pyspark.sql import Row

# Add index
rdd_with_index = upgrade_detections.rdd.zipWithIndex()
indexed_rdd = rdd_with_index.map(lambda row_index: Row(**row_index[0].asDict(), batch_id=int(row_index[1] // 50)))
indexed_df = spark.createDataFrame(indexed_rdd)

indexed_df.write.partitionBy("batch_id").mode("overwrite").option("header", True) \
    .csv("s3a://databricksbucket79/output_detections/patid1_all_batches/")

PATD2

In [None]:
from pyspark.sql.functions import col

transactions_df = transactions_df.withColumn("amount", col("amount").cast("float"))


In [None]:
from pyspark.sql.functions import count, avg

customer_merchant_stats = transactions_df.groupBy("merchant", "customer") \
    .agg(
        count("*").alias("txn_count"),
        avg("amount").alias("avg_amount")
    )
#customer_merchant_stats.show()

In [None]:
patid2_df = customer_merchant_stats \
    .filter((col("txn_count") >= 80) & (col("avg_amount") < 23))


In [None]:
from pyspark.sql.functions import current_timestamp, lit

patid2_detections = patid2_df.select(
    current_timestamp().alias("YStartTime"),
    current_timestamp().alias("detectionTime"),
    lit("PatId2").alias("patternId"),
    lit("CHILD").alias("ActionType"),
    col("customer"),
    col("merchant")
)
#patid2_detections.show()

In [None]:
from pyspark.sql.functions import floor, col
from pyspark.sql import Row

# Add index
rdd_with_index = patid2_detections.rdd.zipWithIndex()
indexed_rdd = rdd_with_index.map(lambda row_index: Row(**row_index[0].asDict(), batch_id=int(row_index[1] // 50)))
indexed_df = spark.createDataFrame(indexed_rdd)

indexed_df.write.partitionBy("batch_id").mode("overwrite").option("header", True) \
    .csv("s3a://databricksbucket79/output_detections/patid2_all_batches/")



PATD3

In [None]:
transactions_df.select("gender").distinct().show()
valid_gender_df = transactions_df.filter(col("gender").isin(["'F'", "'M'"]))
#valid_gender_df.show()

In [None]:
from pyspark.sql.functions import countDistinct

gender_stats = valid_gender_df.groupBy("merchant").pivot("gender").agg(countDistinct("customer"))
#gender_stats.display()

In [None]:
gender_stats = gender_stats.fillna(0, subset=["'F'", "'M'"])
dei_needed_df = gender_stats.filter((col("'F'") > 100) & (col("'F'") < col("'M'")))
#dei_needed_df.show()
new_dei = dei_needed_df.join(transactions_df, on="merchant", how="inner")
new_dei.show()

In [None]:
from pyspark.sql.functions import current_timestamp, lit

patid3_detections = new_dei.select(
    current_timestamp().alias("YStartTime"),
    current_timestamp().alias("detectionTime"),
    lit("PatId3").alias("patternId"),
    lit("DEI-NEEDED").alias("ActionType"),
    col("customer"),
    col("merchant")
)
#patid3_detections.show()

In [None]:
from pyspark.sql.functions import floor, col
from pyspark.sql import Row

# Add index
rdd_with_index = patid3_detections.rdd.zipWithIndex()
indexed_rdd = rdd_with_index.map(lambda row_index: Row(**row_index[0].asDict(), batch_id=int(row_index[1] // 50)))
indexed_df = spark.createDataFrame(indexed_rdd)

indexed_df.write.partitionBy("batch_id").mode("overwrite").option("header", True) \
    .csv("s3a://databricksbucket79/output_detections/patid3_all_batches/")
