In [3]:
from pyspark.sql.functions import col
import pandas as pd

# Load the raw CSV file from Lakehouse file path
df = spark.read.option("header", True).csv("Files/sample_mpesa_business_6months.csv")

# Show first few rows
df.show(30)


StatementMeta(, 4cb7b21b-fd66-475d-b35a-2cbbe82cef0a, 5, Finished, Available, Finished)

+----------+-----+--------------------+--------+---------+--------+--------------------+
|      Date| Time|    Transaction Type|  Amount|Direction| Balance|           Narration|
+----------+-----+--------------------+--------+---------+--------+--------------------+
|2024-10-09|16:38|      Salary Payment|  3117.3|      Out| 11882.7|Salary Payment ex...|
|2024-10-09| 9:34|Paybill Payment -...| 2577.35|      Out| 9305.35|Paybill Payment -...|
|2024-10-09|10:20|    Customer Payment|12537.29|       In|21842.64|Customer Payment ...|
|2024-10-09| 8:46|      Loan Repayment|12538.96|      Out| 9303.68|Loan Repayment ex...|
|2024-10-09|18:58|    Customer Payment|11545.49|       In|20849.17|Customer Payment ...|
|2024-10-10|17:12|           Buy Goods| 5421.95|      Out|15427.22|  Buy Goods executed|
|2024-10-10|11:42|Utility Bill Payment| 4776.95|      Out|10650.27|Utility Bill Paym...|
|2024-10-10|17:34|Paybill Payment -...| 5373.64|      Out| 5276.63|Paybill Payment -...|
|2024-10-10|18:03|   

In [4]:
# Clean column names
for colname in df.columns:
    df = df.withColumnRenamed(colname, colname.strip().lower().replace(" ", "_"))

# Convert numeric fields
df = df.withColumn("amount", col("amount").cast("double"))
df = df.withColumn("balance", col("balance").cast("double"))

# Preview cleaned schema
df.printSchema()


StatementMeta(, 4cb7b21b-fd66-475d-b35a-2cbbe82cef0a, 6, Finished, Available, Finished)

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- direction: string (nullable = true)
 |-- balance: double (nullable = true)
 |-- narration: string (nullable = true)



In [5]:
# Save cleaned data as a managed table
df.write.mode("overwrite").saveAsTable("mpesa_cleaned")


StatementMeta(, 4cb7b21b-fd66-475d-b35a-2cbbe82cef0a, 7, Finished, Available, Finished)

In [7]:
df = spark.read.table("mpesa_cleaned")
df.show(30)


StatementMeta(, 4cb7b21b-fd66-475d-b35a-2cbbe82cef0a, 9, Finished, Available, Finished)

+----------+-----+--------------------+--------+---------+--------+--------------------+
|      date| time|    transaction_type|  amount|direction| balance|           narration|
+----------+-----+--------------------+--------+---------+--------+--------------------+
|2024-10-09|16:38|      Salary Payment|  3117.3|      Out| 11882.7|Salary Payment ex...|
|2024-10-09| 9:34|Paybill Payment -...| 2577.35|      Out| 9305.35|Paybill Payment -...|
|2024-10-09|10:20|    Customer Payment|12537.29|       In|21842.64|Customer Payment ...|
|2024-10-09| 8:46|      Loan Repayment|12538.96|      Out| 9303.68|Loan Repayment ex...|
|2024-10-09|18:58|    Customer Payment|11545.49|       In|20849.17|Customer Payment ...|
|2024-10-10|17:12|           Buy Goods| 5421.95|      Out|15427.22|  Buy Goods executed|
|2024-10-10|11:42|Utility Bill Payment| 4776.95|      Out|10650.27|Utility Bill Paym...|
|2024-10-10|17:34|Paybill Payment -...| 5373.64|      Out| 5276.63|Paybill Payment -...|
|2024-10-10|18:03|   

In [8]:
from pyspark.sql.functions import sum, when

summary = df.groupBy("direction").agg(sum("amount").alias("total"))
summary.show()


StatementMeta(, 4cb7b21b-fd66-475d-b35a-2cbbe82cef0a, 10, Finished, Available, Finished)

+---------+-----------------+
|direction|            total|
+---------+-----------------+
|       In|2535333.839999999|
|      Out|2554142.030000001|
+---------+-----------------+



In [9]:
daily_cashflow = df.groupBy("date", "direction").agg(sum("amount").alias("total"))
daily_cashflow = daily_cashflow.orderBy("date")
daily_cashflow.show(30)


StatementMeta(, 4cb7b21b-fd66-475d-b35a-2cbbe82cef0a, 11, Finished, Available, Finished)

+----------+---------+------------------+
|      date|direction|             total|
+----------+---------+------------------+
|2024-10-09|       In|          24082.78|
|2024-10-09|      Out|          18233.61|
|2024-10-10|      Out|          18944.33|
|2024-10-11|       In|          26991.37|
|2024-10-11|      Out|           5563.12|
|2024-10-12|      Out|           5600.89|
|2024-10-12|       In|           1846.27|
|2024-10-13|      Out|          18575.41|
|2024-10-13|       In|          15672.12|
|2024-10-14|      Out|          15072.73|
|2024-10-15|      Out|15452.230000000001|
|2024-10-15|       In|          23269.49|
|2024-10-16|      Out|           5369.62|
|2024-10-16|       In|          18607.85|
|2024-10-17|       In|           20679.8|
|2024-10-17|      Out|          19039.42|
|2024-10-18|      Out|          20640.38|
|2024-10-19|      Out|            3043.3|
|2024-10-19|       In|          29430.63|
|2024-10-20|      Out|          29782.64|
|2024-10-21|       In|26643.879999