In [1]:
import os
import glob
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType,BooleanType,DateType,IntegerType,DoubleType
from pyspark.sql.window import Window

#create spark Session
spark = SparkSession.builder.appName("PF").config("spark.sql.caseSensitive", "True").getOrCreate()

In [27]:

#read master ledger file, this file will also be the output of this notebook
#read using pandas then convert to spark dataframe
df = spark.createDataFrame(pd.read_excel('../data/other_input/Master Ledger.xlsx',sheet_name="Master Ledger"))

#change column type to the appropriate type
df = df.withColumn("ID",col("ID").cast(IntegerType()))\
        .withColumn("Amount",col("Amount").cast(DoubleType()))\
        .withColumn("Subscriptions",col("Subscriptions").cast(BooleanType()))\
        .withColumn("Return",col("Return").cast(BooleanType()))\
        .withColumn("Real Amount",col("Real Amount").cast(DoubleType()))
#change format of Date
df = df.withColumn("Date",to_date(col("Date"),"MM/dd/yyyy"))
#print Schema
df.printSchema()
#drop all rows that don't have any ID, fill NaN with blank
df = df.dropna(how="all",subset= ["ID"]).drop('ID','Limit')
df = df.replace('NaN',"")
#show dataframe
df.orderBy("ID", ascending=False).show()

root
 |-- ID: integer (nullable = true)
 |-- Account: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Date: date (nullable = true)
 |-- Transaction Type: string (nullable = true)
 |-- Categories: string (nullable = true)
 |-- Categories 2: string (nullable = true)
 |-- Real Amount: double (nullable = true)
 |-- Note: string (nullable = true)
 |-- Subscriptions: boolean (nullable = true)
 |-- Return: boolean (nullable = true)
 |-- Limit: double (nullable = true)
 |-- Account Type: string (nullable = true)
 |-- Owner: string (nullable = true)

+--------------------+--------------------+-------+----------+----------------+-------------+-------------+-----------+------------------+-------------+------+------------+-----+
|             Account|                Item| Amount|      Date|Transaction Type|   Categories| Categories 2|Real Amount|              Note|Subscriptions|Return|Account Type|Owner|
+--------------------+----------------

In [28]:
#read all supplementary inputs
acc_meta = spark.read.options(inferSchema='True',header='True').csv('../data/other_input/account_metadata.csv')

#create a category mapping based on past data to auto-assign category
category_map = df.dropDuplicates(['Account','Item','Categories','Categories 2','Transaction Type']).select(['Account','Item','Categories','Categories 2','Transaction Type'])

In [25]:
#read all csv files exported from Empower, merge into one spark dataframe
path = glob.glob('../data/empower_input/*.csv')
df2 = spark.read.options(inferSchema='True',header='True').csv(path)


#add more columns to df2 (all empower transactions), so that it matches columns in df (master ledger)
df2 = df2.join(acc_meta,on='Account')\
.drop("Limit")\
.filter(~(col('Account Type') == "Investment"))\
.withColumn("Item",col("Description")).drop("Description")\
.withColumn("Real Amount",col("Amount"))\
.withColumn("Amount",abs(col("Amount")))\
.withColumn("Transaction Type",when(col("Real Amount") <0, "Expense").otherwise("Income"))\
.drop("Category")\
.withColumn("Owner",lit(None))\
.withColumn("Subscriptions",lit(False))\
.withColumn("Return",lit(False))\
.drop("Tags")\

#print schema and show
df2.printSchema()
df2.show()

root
 |-- Account: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Account Type: string (nullable = true)
 |-- Owner: void (nullable = true)
 |-- Item: string (nullable = true)
 |-- Real Amount: double (nullable = true)
 |-- Transaction Type: string (nullable = false)
 |-- Subscriptions: boolean (nullable = false)
 |-- Return: boolean (nullable = false)

+--------------------+----------+-------+------------+-----+--------------------+-----------+----------------+-------------+------+
|             Account|      Date| Amount|Account Type|Owner|                Item|Real Amount|Transaction Type|Subscriptions|Return|
+--------------------+----------+-------+------------+-----+--------------------+-----------+----------------+-------------+------+
|Checking BoA Ritu...|2023-06-02|  12.32|       Debit| null|              Klarna|     -12.32|         Expense|        false| false|
|Checking BoA PA 7462|2023-06-02|  250.0|       Debit| null|E

In [29]:
#find the latest date in master ledger file
max_date = df.select(max("Date")).first()[0]

# union master ledger with empower, where the empower dataframe is filtered on max_date - 5. 
# This is to ensure it captures all transactions, because sometimes the transactions are updated few days after, so 5 days is a good limit. 
df3 = df.unionByName(df2.filter(col("Date")> lit(max_date)-5), allowMissingColumns=True)
df3 = df3.drop("Account Type","Owner").join(acc_meta, on = 'Account').na.fill("")

#auto-assign category using category mapping 
df4 = df3.filter(col("Categories") == lit("")).drop('Categories','Categories 2').join(category_map, on=['Account','Item','Transaction Type'])
df3 = df3.filter(~(col("Categories") == lit(""))).unionByName(df4, allowMissingColumns=True)

#further drop duplicates, in case the Note column are already filled using window partition
#group all transactions which have the same date, account, item and amount into a partition, then assign row number
#if any of them has row_number value higher than 1, and their Note columns is blank, indicate these as dup and filter them out
window = Window.partitionBy(['Date','Account','Item','Real Amount']).orderBy(col("Note").desc())
df3 = df3.withColumn("row_number",row_number().over(window))\
    .withColumn('dup',when((col('row_number')>1) & (col('Note') == ""),True).otherwise(False))\
    .filter(col('dup') == False)\
    .drop("row_number","dup")\
    .orderBy("Date")


#resort dataframe by descending date
df3.orderBy("Date", ascending=False).show(20)


+--------------------+--------------------+-------+----------+----------------+-------------+-------------+-----------+------------------+-------------+------+------------+-----+-----+
|             Account|                Item| Amount|      Date|Transaction Type|   Categories| Categories 2|Real Amount|              Note|Subscriptions|Return|Account Type|Owner|Limit|
+--------------------+--------------------+-------+----------+----------------+-------------+-------------+-----------+------------------+-------------+------+------------+-----+-----+
|Capital One Quick...|             Comcast|  39.99|2023-06-03|         Expense|      Housing|    Utilities|     -39.99|                  |        false| false|      Credit|   PA| 9000|
|Adv Plus Banking ...|Checkcard 0601 Ku...|  150.0|2023-06-02|         Expense|        Legal|  Immigration|     -150.0|                  |        false| false|       Debit|Joint|    0|
|Adv Plus Banking ...|          Craigslist|    5.0|2023-06-02|         Expe

In [5]:
#export to csv and add column ID
df3.toPandas().to_csv("../data/output/out.csv", index_label="ID")