In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
import pandas as pd
from pyspark.sql.functions import when, regexp_replace, col, udf, date_format, sum, row_number
from pyspark.sql.types import DateType
from datetime import datetime
from pyspark.sql.window import Window

In [2]:
def clean_blank_spaces(df: DataFrame)-> DataFrame:
    """
    Delete blank spaces.

    Args:
        df (DataFrame): Dataframe.

    Returns:
        (Dataframe): New dataframe without blank spaces.
    """
    df = df.select([regexp_replace(col(column), ' ', '').alias(column) for column in df.columns])
    return df

In [3]:
def pre_process_data(df: DataFrame)-> DataFrame:
    """
    Prepare de data to be process.

    Args:
        df (DataFrame): Dataframe.

    Returns:
        (Dataframe): New dataframe prepare to be transformed.
    """
    func =  udf (lambda x: datetime.strptime(x, '%Y%m%d'), DateType())
    df = df.select(date_format(func(col('TransactionDate')), 'yyyy-MM-dd').alias('TransactionDate'),
             col('AccountNumber'),
             col('TransactionType'),
             (when(col('TransactionType') == 'Debit', col('Amount').cast('float')*(-1))
              .otherwise(col('Amount').cast('float'))).alias('Amount'))
    return df

In [4]:
def process_data(df: DataFrame)-> DataFrame:
    """
    Calculate the current balance per AccountNumber.

    Args:
        df (DataFrame): Dataframe.

    Returns:
        (Dataframe): New dataframe transformed.
    """
    windowSpecAgg  = Window.partitionBy('Accountnumber').orderBy("TransactionDate")
    df = df.withColumn('CurrentBalance',sum(col('Amount')).over(windowSpecAgg))
    df_final = df.select(col('TransactionDate'),
             col('AccountNumber'),
             col('TransactionType'),
             (when(col('Amount')<0, col('Amount')*(-1)).otherwise(col('Amount'))).alias('Amount'),
             col('CurrentBalance'))
    return df_final

In [5]:
def save_to_csv(df: DataFrame, delimiter = ';', mode = 'overwrite', output = 'output'):
    """
    Save dataframe into a csv file.

    Args:
        df (DataFrame): Dataframe.
        delimiter (String): Character to delimit fields. By default (;).
        mode (String): The mode to store it. By default (overwrite).
        output (String): Folder where csv is stored. By default (output).

    Returns:
        (Dataframe): New dataframe without blank spaces.
    """
    df.write.option('header',True) \
    .option('delimiter',delimiter) \
    .mode(mode) \
    .csv(output)
    

In [6]:
def main(file: str, sheet: str):
    """
    Calculate Current Balance per AccountNumber and save the output into a csv file.

    Args:
        file (String): input file.
        sheet (String): input file sheet.

    Returns:
        None
    """
    # Creating Spark Session
    spark = SparkSession.builder \
        .master("local[2]") \
        .appName("Financial_Transactions") \
        .getOrCreate()
    
    # Reading Excel file with pandas and creating a Dataframe
    pre_df = pd.read_excel(file, sheet_name = sheet)
    df = spark.createDataFrame(pre_df)

    # Clean all blank spaces from Excel file
    df = clean_blank_spaces(df)
    # Prepare Data Types to process Data
    df = pre_process_data(df)
    # Calculate The Current Balance 
    df_final = process_data(df)
    # Save df into a csv file
    save_to_csv(df_final)

In [7]:
# Script Execution
main(file = 'Financial_Transactions.xlsm',sheet = 'AccountTransaction')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/27 13:38:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/08/27 13:38:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                