In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct, avg, max, min, count, sum, when, round


In [None]:
# 1️⃣ Create Spark session
spark = SparkSession.builder \
    .appName("Digital Payments Analytics") \
    .getOrCreate()

In [None]:
# 2️⃣ Read the dataset
file_path = "9f7f9ec1-ab86-4a3c-96e0-7bff9f1c1596.csv"  # change path if needed
df = spark.read.csv(file_path, header=True, inferSchema=True)

print("Schema:")
df.printSchema()

print("\nSample data:")
df.show(5)



In [None]:
# 4️⃣ Number of unique customers
unique_customers = df.select(countDistinct("CustomerID")).collect()[0][0]
print(f"Number of unique customers: {unique_customers}")


In [None]:
#5️⃣ Total transaction amount per state
print("\nTotal transaction amount per state:")
df.groupBy("State").agg(round(sum("Amount"), 2).alias("Total_Amount")).orderBy(col("Total_Amount").desc()).show()



In [None]:
# 6️⃣ Most used payment mode
print("\nMost used payment mode:")
payment_mode_counts = df.groupBy("PaymentMode").count().orderBy(col("count").desc())
payment_mode_counts.show(5)
most_used = payment_mode_counts.first()["PaymentMode"]
print(f"Most used payment mode: {most_used}")



In [None]:
# 7️⃣ Min / Max / Avg transaction amounts per merchant
print("\nMin/Max/Avg transaction amounts per merchant:")
df.groupBy("Merchant").agg(
    round(min("Amount"), 2).alias("Min_Amount"),
    round(max("Amount"), 2).alias("Max_Amount"),
    round(avg("Amount"), 2).alias("Avg_Amount")
).orderBy(col("Avg_Amount").desc()).show(10)



In [None]:
# 8️⃣ Transactions above average amount per payment mode
print("\nTransactions above average per payment mode:")
avg_amounts = df.groupBy("PaymentMode").agg(avg("Amount").alias("AvgAmount"))
above_avg_df = df.join(avg_amounts, "PaymentMode") \
    .filter(col("Amount") > col("AvgAmount")) \
    .select("TransactionID", "CustomerID", "PaymentMode", "Amount", "Merchant", "State")

above_avg_df.show(10)



In [None]:
# 9️⃣ Increase transaction amount by 5% for VIP or frequent customers
# Define "frequent" as customers with more than 10 transactions
customer_counts = df.groupBy("CustomerID").count().withColumnRenamed("count", "TxnCount")
vip_df = df.join(customer_counts, "CustomerID")
vip_updated = vip_df.withColumn(
    "UpdatedAmount",
    when(col("TxnCount") > 10, round(col("Amount") * 1.05, 2)).otherwise(col("Amount"))
)

print("\nTransaction amounts increased by 5% for frequent customers (TxnCount > 10):")
vip_updated.select("CustomerID", "TxnCount", "Amount", "UpdatedAmount").show(10)



In [None]:
# 10️⃣ Save filtered results to new CSV
output_path = "frequent_customers_transactions.csv"
vip_updated.filter(col("TxnCount") > 10) \
    .select("TransactionID", "CustomerID", "PaymentMode", "UpdatedAmount", "Merchant", "State") \
    .write.mode("overwrite").csv(output_path, header=True)

print(f"\n✅ Filtered results saved successfully to: {output_path}")

# Stop session
spark.stop()