imports

In [62]:

### open lib
import pandas as pd
import os

# Spark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, TimestampType
from pyspark.sql.functions import col, desc

In [35]:
### self defined lib
from ingestion_lib.analyse import parse_dataset, infer_column_type, add_key_column, dupe_check
from ingestion_lib.main import records, categories, tbl_parsed_dict

Setup local SparkSession

In [3]:
# Ensures proper Python worker binding
os.environ["PYSPARK_PYTHON"] = "python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python"

# Optional: Increase timeout if needed
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"

spark = SparkSession.builder \
    .appName("local Session") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.driver.host", "localhost") \
    .config("spark.network.timeout", "100s") \
    .config("spark.python.worker.reuse", "false") \
    .getOrCreate()


Ingest as Spark df

In [36]:
categories_df = spark.createDataFrame(categories)
categories_df.show()

+--------------------+--------+----------------+--------------------+----+------+
|        category_key|    type|payment_category|         subcategory|item|nature|
+--------------------+--------+----------------+--------------------+----+------+
|Expenses:Food & D...|Expenses|   Food & Drinks|                 NaN| NaN|  Need|
|Expenses:Restaura...|Expenses|   Food & Drinks|    Restaurants, bar| NaN|  Need|
|  Expenses:Groceries|Expenses|   Food & Drinks|           Groceries| NaN|  Need|
|  Expenses:Fast-food|Expenses|   Food & Drinks|           Fast-food| NaN|  Need|
|   Expenses:Shopping|Expenses|        Shopping|                 NaN| NaN|  Want|
|Expenses:Clothes ...|Expenses|        Shopping|     Clothes & shoes| NaN|  Want|
|Expenses:Drug-sto...|Expenses|        Shopping| Drug-store, chemist| NaN|  Want|
|Expenses:Electron...|Expenses|        Shopping|Electronics, acce...| NaN|  Want|
|  Expenses:Free time|Expenses|        Shopping|           Free time| NaN|  Want|
| Expenses:Gifts

In [37]:
records_df = spark.createDataFrame(records)
records_df.show()

+--------------------+--------------------+-------+--------------------+--------+--------+-------------------+--------+-------------------+--------+
|          record_key|        category_key|account|            category|currency|  amount|ref_currency_amount|    type|               date|transfer|
+--------------------+--------------------+-------+--------------------+--------+--------+-------------------+--------+-------------------+--------+
|2025-03-31 23:31:...|  Fast-food:Expenses|Savings|           Fast-food|     INR|  -660.0|             -660.0|Expenses|2025-03-31 23:31:51|   false|
|2025-03-31 23:31:...|Restaurants, bar:...|Savings|    Restaurants, bar|     INR|  -660.0|             -660.0|Expenses|2025-03-31 23:31:51|   false|
|2025-03-31 16:30:...|  Fast-food:Expenses|Savings|           Fast-food|     INR|  -280.0|             -280.0|Expenses|2025-03-31 16:30:15|   false|
|2025-03-30 19:32:...|Loan, interests:E...|Savings|     Loan, interests|     INR| -2500.0|            -250

In [None]:
records_df = records_df.withColumn('timestamp', col('date').cast(TimestampType()))
records_df = records_df.select('record_key', 'category_key', 'account', 'category', 'amount', 'ref_currency_amount', 'type','timestamp','transfer')

categories_df = categories_df.select('category_key', 'type', 'payment_category', 'subcategory', 'item', 'nature').distinct()

Validation for categories

In [43]:
categories_df.select('type').distinct().show()
categories_df.select('nature').distinct().show()

+--------+
|    type|
+--------+
|Expenses|
|  Income|
+--------+

+------+
|nature|
+------+
|  Want|
|  Need|
|  Must|
|   NaN|
+------+



In [42]:
categories_df.select('payment_category').distinct().show()

+--------------------+
|    payment_category|
+--------------------+
|            Shopping|
|       Food & Drinks|
|             Housing|
|      Transportation|
|             Vehicle|
|Life & Entertainment|
|   Communication, PC|
|  Financial expenses|
|         Investments|
|              Income|
|            TRANSFER|
|              Others|
+--------------------+



In [67]:
categories_df.where('payment_category is null').count()

0

Validation for records

In [53]:
records_df.select('type').distinct().show()
records_df.select('transfer').distinct().show()
records_df.select('account').distinct().show()

+--------+
|    type|
+--------+
|  Income|
|Expenses|
+--------+

+--------+
|transfer|
+--------+
|    true|
|   false|
+--------+

+-------+
|account|
+-------+
|Savings|
|  Zerdh|
|   Card|
|   Cash|
| Equity|
| Salary|
+-------+



In [59]:
print(records_df.count())
print(records_df.where('transfer is True').count())

1893
294


In [60]:
records_df.select('category').distinct().count()

65

In [66]:
records_df.groupBy('category').count().orderBy(desc('count')).show()

+--------------------+-----+
|            category|count|
+--------------------+-----+
|            TRANSFER|  294|
|           Fast-food|  242|
|                Fuel|  121|
|     Loan, interests|   85|
|    Lending, renting|   77|
|             Missing|   62|
|           Groceries|   59|
|    Restaurants, bar|   57|
|Books, audio, sub...|   55|
|   Health and beauty|   46|
|      Charity, gifts|   44|
|Telephone, mobile...|   43|
|              Income|   42|
|       Food & Drinks|   37|
|     Clothes & shoes|   36|
|             Parking|   33|
|           Auto/taxi|   30|
|Software, apps, g...|   26|
|Holiday, trips, h...|   26|
|                  RD|   24|
+--------------------+-----+
only showing top 20 rows



Validation for joining

In [70]:
join_df = records_df.join(categories_df, how='left').where('payment_category is null')
print(join_df.count())

0


In [None]:
join_df.groupBy('payment_category').agg('amount').show()

+----------------+-----------+
|payment_category|sum(amount)|
+----------------+-----------+
+----------------+-----------+



Validation for datatypes

In [50]:
print('\ncategories\n')
categories_col_dict = {field.name: str(field.dataType).replace('Type()','') for field in categories_df.schema.fields}
for col, type in categories_col_dict.items():
    print(col, ' : ', type)
    
print('\nrecords\n')
records_col_dict = {field.name: str(field.dataType).replace('Type()','') for field in records_df.schema.fields}
for col, type in records_col_dict.items():
    print(col, ' : ', type)


categories

category_key  :  String
type  :  String
payment_category  :  String
subcategory  :  String
item  :  String
nature  :  String

records

record_key  :  String
category_key  :  String
account  :  String
category  :  String
currency  :  String
amount  :  Double
ref_currency_amount  :  Double
type  :  String
timestamp  :  Timestamp
transfer  :  Boolean


In [None]:
# spark.stop