# Window function usage in PySpark
***

- Author: ASHISH GARG
- Date:   29 Aug 2022

## Create reference data
***

In [0]:
account_data = [
  {'AccountNumber':1001, 'TransactionDateTime':'2021-01-21T16:12:01.587+0000', 'Status':'Open for Deposits'},
  {'AccountNumber':1001, 'TransactionDateTime':'2021-08-14T01:14:00.587+0000', 'Status':'Open for Deposits'},
  {'AccountNumber':1001, 'TransactionDateTime':'2021-08-22T17:22:00.587+0000', 'Status':'Open for Deposits'},
  {'AccountNumber':1001, 'TransactionDateTime':'2021-09-22T01:00:00.587+0000', 'Status':'Open for Deposits'},
  {'AccountNumber':1001, 'TransactionDateTime':'2021-09-22T05:22:00.587+0000', 'Status':'Open for Payments'},
  {'AccountNumber':1001, 'TransactionDateTime':'2021-09-22T12:22:00.587+0000', 'Status':'Open for Payments'},
  {'AccountNumber':1001, 'TransactionDateTime':'2021-09-22T23:22:00.587+0000', 'Status':'Closed'},
  {'AccountNumber':1002, 'TransactionDateTime':'2021-01-21T16:12:01.587+0000', 'Status':'Open for Payments'},
  {'AccountNumber':1002, 'TransactionDateTime':'2021-04-21T16:12:01.587+0000', 'Status':'Open for Payments'},
  {'AccountNumber':1002, 'TransactionDateTime':'2021-10-21T16:12:01.587+0000', 'Status':'Open for Payments'},
  {'AccountNumber':1003, 'TransactionDateTime':'2021-10-21T16:12:01.587+0000', 'Status':'Open for Payments'},
  {'AccountNumber':1004, 'TransactionDateTime':'2021-10-21T16:12:01.587+0000', 'Status':'Open for Payments'},
]

## Create spark dataframe.
  Bring data from the source to the spark dataframe
***

In [0]:
account_data_rdd = sc.parallelize(account_data)
account_data_df  = spark.createDataFrame(account_data_rdd)
display(account_data_df)

AccountNumber,Status,TransactionDateTime
1001,Open for Deposits,2021-01-21T16:12:01.587+0000
1001,Open for Deposits,2021-08-14T01:14:00.587+0000
1001,Open for Deposits,2021-08-22T17:22:00.587+0000
1001,Open for Deposits,2021-09-22T01:00:00.587+0000
1001,Open for Payments,2021-09-22T05:22:00.587+0000
1001,Open for Payments,2021-09-22T12:22:00.587+0000
1001,Closed,2021-09-22T23:22:00.587+0000
1002,Open for Payments,2021-01-21T16:12:01.587+0000
1002,Open for Payments,2021-04-21T16:12:01.587+0000
1002,Open for Payments,2021-10-21T16:12:01.587+0000


## Account record count. 
  Check the number of records present for each account in the account table
***

In [0]:
display(account_data_df.groupBy("AccountNumber").count())

AccountNumber,count
1001,7
1002,3
1004,1
1003,1


## Data type check.
  Check the data type of each columns.
***

In [0]:
account_data_df.dtypes

### Datatype conversion
  Convert TransactionDateTime column from string to DateTime
***

In [0]:
from pyspark.sql.functions import *
account_data_df = account_data_df.withColumn("TransactionDateTime",to_timestamp("TransactionDateTime"))
account_data_df.dtypes

## Try GroupBy function
Use group by function with aggregate operator to get the latest datetime transaction for each account 
***

It won't work as group by happens on AccountNumber then the TransactionDateTime individual field become invalid.

In [0]:
df = (account_data_df
  .groupBy('AccountNumber')
  .agg(F.max('TransactionDateTime').alias('MaxTransactionDateTime'))
  .filter(col('MaxTransactionDateTime') == col('TransactionDateTime'))
  .drop("MaxTransactionDateTime"))

## Alternate approach: Window function
***

This will ensure you get all the columns with partition

In [0]:
from pyspark.sql import Window
import pyspark.sql.functions as F

# Specify the window you want to partition (something similar to groupBy) the data with.
account_number_window = Window.partitionBy('AccountNumber')

# Apply partition window over account data df with some aggreegate function
df = (account_data_df
  .withColumn("MaxTransactionDateTime",F.max('TransactionDateTime')
  .over(account_number_window))
  .where("MaxTransactionDateTime = TransactionDateTime").distinct()
  .drop("MaxTransactionDateTime"))
    
display(df)

AccountNumber,Status,TransactionDateTime
1001,Closed,2021-09-22T23:22:00.587+0000
1002,Open for Payments,2021-10-21T16:12:01.587+0000
1003,Open for Payments,2021-10-21T16:12:01.587+0000
1004,Open for Payments,2021-10-21T16:12:01.587+0000
