In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql import functions as F


# Initialize Spark Session
spark = SparkSession.builder \
    .appName("FinancialTransactionAnalysis") \
    .getOrCreate()

# Reading CSV file
csv_file_path = "/Users/amitjha/Downloads/AccountTransaction.csv"

df = spark.read.csv(csv_file_path, header=True, inferSchema=True)
#print(df.count())
df = df.withColumn("TransactionDateString", F.col("TransactionDate").cast("string"))
df = df.withColumn("TransactionDate",F.to_date(F.col("TransactionDateString"),"yyyyMMdd"))
df.show(10)

df_clean = df.dropna(subset=["TransactionDate", "AccountNumber", "TransactionType", "Amount"])

#df_clean.show(10)
df_with_value = df_clean.withColumn(
    "TransactionValue",
    when(F.trim(F.col("TransactionType")) == "Credit", F.col("Amount").cast("integer"))
    .when(F.trim(F.col("TransactionType")) == "Debit", -F.col("Amount").cast("integer"))
    .otherwise(0)  # In case of unexpected TransactionType
)

#df_with_value.show()
#df_with_value.printSchema()

# Add a unique transaction identifier if multiple transactions on the same day
window_spec_unique = Window.partitionBy("AccountNumber", "TransactionDate").orderBy("TransactionDate")
df_with_row = df_with_value.withColumn("TransactionOrder", F.row_number().over(window_spec_unique))

# Define window for running sum
window_spec = Window.partitionBy("AccountNumber") \
    .orderBy("TransactionDate", "TransactionOrder") \
    .rowsBetween(Window.unboundedPreceding, 0)

# Calculate Running Balance
df_with_balance = df_with_row.withColumn(
    "CurrentBalance",
    F.sum("TransactionValue").over(window_spec)
)

#df_with_balance.show()

final_df = df_with_balance.select(
    "TransactionDate",
    "AccountNumber",
    F.trim("TransactionType"),
    "Amount",
    "CurrentBalance"
).orderBy("AccountNumber", "TransactionDate", "TransactionOrder")

final_df.show()
final_df.write.csv(f"/Users/amitjha/Downloads/updated_balances.csv", header=True, mode="overwrite")

spark.stop()


+---------------+-------------+---------------+------+---------------------+
|TransactionDate|AccountNumber|TransactionType|Amount|TransactionDateString|
+---------------+-------------+---------------+------+---------------------+
|     2023-01-01|          100|         Credit|  1000|             20230101|
|     2023-01-02|          100|        Credit |  1500|             20230102|
|     2023-01-03|          100|         Debit |  1000|             20230103|
|     2023-01-02|          200|         Credit|  3500|             20230102|
|     2023-01-03|          200|         Debit |  2000|             20230103|
|     2023-01-04|          200|         Credit|  3500|             20230104|
|     2023-01-13|          300|        Credit |  4000|             20230113|
|     2023-01-14|          300|          Debit|  4500|             20230114|
|     2023-01-15|          300|         Credit|  1500|             20230115|
+---------------+-------------+---------------+------+---------------------+