## Code & Logic for Determining Closed Accounts

Consider the following hypothesis:

---
> $H_1$: It is possible to predict (with some metric) if an account will be closed within the next 30-days using at minimum the following feature set:

>> {
    prior 30-transactions,
    is joint account?,
    age of account owner
}
---
### STEP 1: (logical breakdown of hypothesis exploration)
> When considering $H_1$ or $H_0$ how do we find accounts which are closed?

For the aforementioned hypothesis:
This Jupyter Notebook is concerned with determining **closed accounts**, solely, where additional separate files are to follow for subsequent steps.

#### IMPORTANT NOTE: Exploration carried out via BELLCO_PROD_DNA.OSIBANK, Bellco Credit Union's institutional data.

In [1]:
import numpy as np
import pandas as pd
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import col, lit, sum as sum_, max as max_
from snowflake.snowpark.types import StructType, StructField, IntegerType, StringType, TimestampType
import snowflake.snowpark.functions as f
import yaml
import os

# Functions to Establish & Work With Snowflake Connection

In [2]:
def create_snowpark_session():
    """Function to establish a Snowflake-Snowpark session.
    Authentication is required and accomplished via web browser -- a new tab will be opened 
    which can be closed after authentication is completed.
    NOTE: 'account' and 'user' must be specified.  
    """
    # open yaml config file containing user, authenticator, etc.
    with open('config.yml', 'r') as file:
        config = yaml.safe_load(file)
    
    return Session.builder.configs(config).create()


def get_session_info(session):
    """Function to printout current Snowflake session information."""
    
    print(f"""
{'account':<15}{session.get_current_account()}
{'role':<15}{session.get_current_role()}
{'warehouse':<15}{session.get_current_warehouse()}
{'database':<15}{session.get_current_database()}
{'schema':<15}{session.get_current_schema()}
""")
    
    
def get_view(session, view, ret_rows=1, printSession=True):
    """Function to examine chosen VIEW including printing the number of rows found, 
    while also while printing out current session info if desired."""
    
    if printSession:
        get_session_info(snow_session)
    
    print(f"\nrecords returned from VIEW={view}: {snow_session.table(view).count():,}")
    
    return snow_session.table(view).select("*").limit(ret_rows).collect()

In [4]:
snow_session = create_snowpark_session()

# set database & schema to be used for session
snow_session.use_database("BELLCO_PROD_DNA")
snow_session.use_schema("OSIBANK")

get_session_info(snow_session)

Initiating login request with your identity provider. A browser window should have opened for you to complete the login. If you can't see it, check existing browser windows, or your OS settings. Press CTRL+C to abort and try again...

account        "otscuso"
role           "OTS_INNOVATION"
warehouse      "OTS_INNOVATION_WH"
database       "BELLCO_PROD_DNA"
schema         "OSIBANK"



# Important VIEW exploration

- **VIEW** - a virtual table which does not retain data, rather it saves a SQL statement/query to be run later, which given the sheer volume of data in this case is a memory savings opportunity.

## `ACCTSTAT`

**Naive approach:** find all closed accounts by CLS code, alone, found in this table; this would find all accounts with a registered closure however *not necessarily* a final closure disposition.

This VIEW effectively provides a **key for account status types** via columns `ACCTSTATCD` & `ACCTSTATDESC`

Closed accounts can be interpreted to mean accounts with the following status, however per direction provided **Closed** was used:
- **Closed**
- **Closed with Balances Remaining**
- **Chargeoff** - meaning an account (possibly a loan) which was a loss to the Financial Institute (FI), likley meaning the account was closed

Other important observations
- Assumed - interpreted to mean the FI took control of the account, likely indicating the account was forcibly handed over & possibly closed, although not necessarily (consider judge-mandated terms of a bankruptcy, etc.)

In [5]:
pd.DataFrame(
    get_view(snow_session, view="ACCTSTAT", ret_rows=11, printSession=False)
)[['ACCTSTATCD', 'ACCTSTATDESC', 'DATELASTMAINT']]    # grab important columns, only, from underlying VIEW


records returned from VIEW=ACCTSTAT: 11


Unnamed: 0,ACCTSTATCD,ACCTSTATDESC,DATELASTMAINT
0,APPR,Approved,1994-01-01 00:00:00
1,CLS,Closed,1994-01-01 00:00:00
2,CO,Chargeoff,1994-01-01 00:00:00
3,CWB,Closed with Balances Remaining,1994-01-01 00:00:00
4,DORM,Dormant,1994-01-01 00:00:00
5,IACT,Inactive,1994-01-01 00:00:00
6,ORIG,Originating,1994-01-01 00:00:00
7,ACT,Active,1994-01-01 00:00:00
8,NPFM,Non-Accrual,1996-04-22 11:37:41
9,DENI,Loan Denied,1995-12-28 12:45:40


## `ACCTACCTSTATHIST`

The specific VIEW `ACCTACCTSTATHIST` captures account status via `ACCTSTAT.ACCTSTATCD == 'CLS'`
- ACCTNBR - member account number
- ACCSTATCD - account status (Active, Closed, etc.) 
- EFFDATETIME - effective datetime of account status change. **i.e. the provided datetime when an account was closed, etc.**
- TIMEUNIQUEEXTN - The Time Unique Extension Number is a system assigned primary key. SYSTEM USE ONLY!

BELLCO's Snowflake database has been chosen here for the sake of simplicity, however, so long as the other CU's databases follow the same naming convention and structure they may be accessed similarly.

## Determine which column(s) can be used to ID most recent Member Account status
Limit rows returned from DF to *more than 2 transactions per account* for especially pronounced exploration 

In [6]:
%%capture
# THE FOLLOWING OUTPUT IS WITHELD DUE TO SENSITIVE Personally identifiable information (PII)

test_obj = get_view(snow_session, view="ACCTACCTSTATHIST", ret_rows=10000, printSession=True)
test_df = pd.DataFrame(test_obj)      
# test_df.sort_values(['ACCTNBR', 'EFFDATETIME'])

test_df[test_df['ACCTNBR'].isin(
    test_df.groupby('ACCTNBR').count()[test_df.groupby('ACCTNBR').count()['EFFDATETIME'] > 2].index.values)].sort_values(
    ['ACCTNBR', 'EFFDATETIME'], ascending=False)

In [7]:
%%capture
# THE FOLLOWING OUTPUT IS WITHELD DUE TO SENSITIVE Personally identifiable information (PII)

test_df[test_df['ACCTNBR'].isin(
    test_df.groupby('ACCTNBR').count()[test_df.groupby('ACCTNBR').count()['EFFDATETIME'] > 2].index.values)].sort_values(
    ['ACCTNBR', 'DATELASTMAINT'], ascending=False)

In [8]:
%%capture
# THE FOLLOWING OUTPUT IS WITHELD DUE TO SENSITIVE Personally identifiable information (PII)

test_df[test_df['ACCTNBR'].isin(
    test_df.groupby('ACCTNBR').count()[test_df.groupby('ACCTNBR').count()['EFFDATETIME'] > 2].index.values)].sort_values(
    ['ACCTNBR', 'TIMEUNIQUEEXTN'], ascending=False)

`TIMEUNIQUEEXTN` values appears unique and the higher the number for a given account appears to represent more recent account activity.

In [28]:
print(test_df['EFFDATETIME'].dtype)
print(test_df['TIMEUNIQUEEXTN'].dtype)
print(test_df['DATELASTMAINT'].dtype)

datetime64[ns]
int64
datetime64[ns]


#### Test groupby aggregation equivalence for 3 select columns EFFDATETIME, TIMEUNIQUEEXTN, DATELASTMAINT to determine best variable for max aggregation approach

In [10]:
from pandas.util.testing import assert_frame_equal


# EFFDATETIME & TIMEUNIQUEEXTN will be the same; however, some variation may occur w/ DATELASTMAINT as it's system update
# per direction provided by Cindy Lee on the Database team
A = test_df.groupby('ACCTNBR').max('EFFDATETIME')
B = test_df.groupby('ACCTNBR').max('TIMEUNIQUEEXTN')
C = test_df.groupby('ACCTNBR').max('DATELASTMAINT')

def test_equivalence(df1, df2):
    try:
        assert_frame_equal(df1, df2)
        return True
    except: 
        return False
    

print("A == B:", test_equivalence(A, B))
print("A == C:", test_equivalence(A, C))
print("B == C:", test_equivalence(B, C))    # if A == B & A == C then B == C by Transitive Property

A == B: True
A == C: True
B == C: True


  from pandas.util.testing import assert_frame_equal


##### Outcome:
Any of `EFFDATETIME`, `TIMEUNIQUEEXTN`, or `DATELASTMAINT` are suitable.

## `ACCT`
Contains transactional information

In [9]:
%%capture
# THE FOLLOWING OUTPUT IS WITHELD DUE TO SENSITIVE Personally identifiable information (PII)

test_obj = get_view(snow_session, view="acct".upper(), ret_rows=10000, printSession=True)
test_df = pd.DataFrame(test_obj)      
print(test_df.columns.values)
test_df

# Pandas DataFrame Implementation/Exploration for Determining Closed Accounts
#### Local machine Implementation leveraging Pandas familiarity (not ideal given volume of data)
* See Snowflake implementation below for distributed computing version

In [10]:
%%capture
# THE FOLLOWING OUTPUT IS WITHELD DUE TO SENSITIVE Personally identifiable information (PII)

# limit our returned number of rows to 100,000 vs 12.6+ million
# consider: filtering prior to limit

view_obj = get_view(snow_session, view="ACCTACCTSTATHIST", ret_rows=100000, printSession=True)
view_df = pd.DataFrame(view_obj)
# print(view_df.columns.to_list())    # print column headers if needed for larger VIEWs
view_df

#### IMPORTANT NOTE:
`EFFDATETIME` is the effective datetime of the status change, meaning when an account was closed the status was switched to 'CLS' (closed) for the given datetime captured.

In [11]:
%%capture
# THE FOLLOWING OUTPUT IS WITHELD DUE TO SENSITIVE Personally identifiable information (PII)

# Closed accounts: CLS or CWB - closed w/ remaining balance; chargeoffs would also be closed, but allowed to pay
clsd_acct = view_df[
#     (view_df['ACCTSTATCD'] == "CLS") | (view_df['ACCTSTATCD'] == "Chargeoff") | (view_df['ACCTSTATCD'] == "CWB")
    (view_df['ACCTSTATCD'] == "CLS")
]

# clsd_acct.groupby('ACCTNBR').max('EFFDATETIME')) returns a Series which needs to be merged/joined back to full DF
clsd_df = pd.DataFrame(clsd_acct.groupby('ACCTNBR').max('TIMEUNIQUEEXTN')).merge(
    clsd_acct, 
    on="TIMEUNIQUEEXTN", 
    how='inner'
)[['ACCTNBR', 'ACCTSTATCD', 'TIMEUNIQUEEXTN', 'EFFDATETIME', 'POSTDATE']]
clsd_df[clsd_df['EFFDATETIME'].between(pd.to_datetime('01/01/2022'), pd.to_datetime('08/01/2022'))]

In [17]:
clsd_acct['EFFDATETIME'].info()
clsd_acct['ACCTSTATCD'].value_counts()

<class 'pandas.core.series.Series'>
Int64Index: 20851 entries, 12 to 99999
Series name: EFFDATETIME
Non-Null Count  Dtype         
--------------  -----         
20851 non-null  datetime64[ns]
dtypes: datetime64[ns](1)
memory usage: 325.8 KB


CLS    20851
Name: ACCTSTATCD, dtype: int64

# Snowflake DataFrame Implementation for Determining Closed Accounts
#### Snowflake DataFrame construct
https://docs.snowflake.com/en/developer-guide/snowpark/python/working-with-dataframes#label-snowpark-python-dataframe-construct
#### Snowflake efficiency
https://docs.snowflake.com/en/user-guide/querying-persisted-results

"When a query is executed, the result is persisted (i.e. cached) for a period of time. At the end of the time period, the result is purged from the system."

### Distributed implementation via Snowflake

In [12]:
# https://docs.snowflake.com/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrame.groupBy.html

def get_closed_accounts_for_timeframe(session, begin, end, lim=200):
    """
    This function represents a Snowflake-Snowpark implementation to find closed accounts for a given look-back period.  
    
    Accounts w/ the status code 'CLS' (OSIBANK-DNA system code for closed) for the given timeframe are found.
    These accounts are then aggregated by account number where the most recent/highest unique 'TIMEUNIQUEEXTN' column value
    is determined for an account via the aggregate function 'max'. The most recent [unique] value for the period 
    is needed as it's possible to encounter accounts which were closed but subsequently re-opened in the given period.
    'TIMEUNIQUEEXTN' represents a unique system key generated for account status changes.
    After this query is carried out then a join is performed on accounts to pull full rows of data matching 'TIMEUNIQUEEXTN'.
    
    Parameters:
        session (Snowflake session object): used to establish a connection to the desired Snowflake instance.
        begin (string: MM/DD/YYYY): a date which specifies the beginning of the lookback period inclusively.
        end (string: MM/DD/YYYY): a date which specifies the end of the lookback period inclusively.
        lim (int): an integer which specifies a query return result limit, a row return max.
        
    Returns:
        Snowflake DataFrame: A DataFrame containing the closed account information.
    """    
    # session.table() creates a Snowflake DataFrame from a table, view or stream
    df_table = session.table("ACCTACCTSTATHIST")
    
    # get accounts which match ultimate/final account status 'CLS'
    latest_status = (df_table
                     .filter((col('ACCTSTATCD') == 'CLS'))
#                      .filter(col('ACCTSTATCD') == 'CLS') | (col('ACCTSTATCD') == 'Chargeoff') | (col('ACCTSTATCD') == 'CWB')
                     .group_by('ACCTNBR')
                     .agg(max_(col('TIMEUNIQUEEXTN')).alias('TIMEUNIQUEEXTN'))
                    )
    
    # merge latest_status back to ACCTACCTSTATHIST DataFrame created from VIEW
    return (latest_status
            .join(
                df_table, 
                on='TIMEUNIQUEEXTN', 
                how='inner', 
                lsuffix='_'
            )
            .select(
                col('ACCTNBR'), 
                col('ACCTSTATCD'), 
                col('TIMEUNIQUEEXTN'), 
                col('EFFDATETIME')
            )
            .filter((f.to_date('EFFDATETIME') >= begin) & (f.to_date('EFFDATETIME') <= end))
            .limit(lim)
           )

In [13]:
%%capture
# THE FOLLOWING OUTPUT IS WITHELD DUE TO SENSITIVE Personally identifiable information (PII)

closed_accounts = get_closed_accounts_for_timeframe(snow_session, '1/1/2022', '8/1/2022', lim=None)
closed_accounts.show()
closed_accounts.dtypes

While several account status codes exist indicating account closure, simply "Closed" appears to be the most common code encountered by far (99.99+%).

## Instantiate/establish new Snowflake session for writing closed accounts to Snowflake table
By creating a new Snowflake session I am able to keep the transactional data separate.  

In [14]:
# instantiate/establish snowflake session
snow_session_ots_innov = create_snowpark_session()

# set database & schema to be used for session
snow_session_ots_innov.use_database("OTS_INNOVATION_DB")
snow_session_ots_innov.use_schema("JOEL_C")

get_session_info(snow_session_ots_innov)

Initiating login request with your identity provider. A browser window should have opened for you to complete the login. If you can't see it, check existing browser windows, or your OS settings. Press CTRL+C to abort and try again...

account        "otscuso"
role           "OTS_INNOVATION"
warehouse      "OTS_INNOVATION_WH"
database       "OTS_INNOVATION_DB"
schema         "JOEL_C"



# Write closed accounts  back to Snowflake
The following Snowflake table OTS_INNOVATION_DB.JOEL_C.SAMPLE_ACCOUNTS is used

In [15]:
%%capture
# THE FOLLOWING OUTPUT IS WITHELD DUE TO SENSITIVE Personally identifiable information (PII)

pd_df = closed_accounts.to_pandas()

# using a local timezone is needed to change the default datetime conversion, a known snowflake_dataframe.to_pandas() issue
# https://github.com/snowflakedb/snowflake-connector-python/issues/319#issuecomment-764145625
pd_df['EFFDATETIME'] = pd_df['EFFDATETIME'].dt.tz_localize('UTC')

# pd_df = pd_df.to_frame()
pd_df['acct_open'] = 0
pd_df['effective_date'] = pd_df['EFFDATETIME'].dt.date
pd_df['window_start'] = pd.to_datetime('1/1/2022')
pd_df['window_end'] = pd.to_datetime('8/1/2022')

pd_df[['ACCTNBR', 'acct_open', 'effective_date', 'window_start', 'window_end']]

In [45]:
# New Snowflake session below references the new DB & Schema.

pd_df = closed_accounts.to_pandas()

# using a local timezone is needed to change the default datetime conversion, a known snowflake_df.to_pandas() issue
# https://github.com/snowflakedb/snowflake-connector-python/issues/319#issuecomment-764145625
pd_df['EFFDATETIME'] = pd_df['EFFDATETIME'].dt.tz_localize('UTC')

# pd_df = pd_df.to_frame()
pd_df['acct_open'] = 0
pd_df['effective_date'] = pd_df['EFFDATETIME'].dt.date
pd_df['window_start'] = pd.to_datetime('1/1/2022')
pd_df['window_end'] = pd.to_datetime('8/1/2022')

# code below overcomes known datetime issue when writing a Pandas DataFrame to a Snowpark DataFrame
pd_df['window_start'] = pd_df['window_start'].dt.tz_localize('UTC').dt.date
pd_df['window_end'] = pd_df['window_end'].dt.tz_localize('UTC').dt.date


# cls_acct_schema = StructType(
#     [
#         StructField( 'ACCTNBR', IntegerType() ), 
#         StructField( 'ACCTSTATCD', StringType() ), 
#         StructField( 'TIMEUNIQUEEXTN', IntegerType() ), 
#         StructField( 'EFFDATETIME', TimestampType() ) 
#     ]
# )

(snow_session_ots_innov    
#  .create_dataframe(pd_df, schema=cls_acct_schema)
 .create_dataframe(pd_df[['ACCTNBR', 'acct_open', 'effective_date', 'window_start', 'window_end']])
 .write
 .mode("overwrite")
 .save_as_table("SAMPLE_ACCOUNTS"))

### Check write results
* We get results and create a Pandas DataFrame from them for the ease of viewing, as DataFrame output is displayed nicely

In [16]:
output_df = pd.DataFrame(snow_session_ots_innov.sql("""
SELECT acct_open, COUNT(acct_open)
FROM OTS_INNOVATION_DB.JOEL_C.SAMPLE_ACCOUNTS 
GROUP BY acct_open;
""").collect())

output_df.style.hide_index()

  output_df.style.hide_index()


ACCT_OPEN,COUNT(ACCT_OPEN)
0,58000
1,58000


# Close Session

In [17]:
snow_session.close()

# LESSONS LEARNED
* Snowflake is nearly identical syntactically to Apache Spark/PySpark which is covered in DU's CS4334 Parallel & Distributed Computing course through the use of DataBricks which uses/builds on Apache Spark
    - follows functional progamming paradigm
    - Snowflake follows Python in using underscores '_' intead of camel case which Apache Spark uses for function calls (Scala/Java origin)
    - A parallel computing DataFrame implementaion is available w/in Snowflake's Snowpark library
* The naive implementation for determining closed accounts given the underlying database construction involves filtering for accounts which only have the "CLS" status code
    - There is much nuance to accounts found w/in the financial system transactional databases, for example accounts which have been closed during a lookback period can be subsequently reopened at any time and therefore we need the ultimate account disposition, only.  Much forethought and familiarity with the underlying database construction is required to properly query.
* In order to determine closed accounts for a lookback period an aggregation was needed using .group_by() on the account number followed by finding the maximum unique transaction number to determine the final disposition of an account during said period.
* Given the volume of transactional data a parallel implementation is really the only viable approach while working with millions of rows of data, as opposed to using Pandas DataFrames on a local machine.
    - after Snowpark DataFrame filtering we can convert the Snowpark DataFrame to a Pandas DataFrame for use
* When a Snowflake SQL query is executed the result is persisted (i.e. cached) for a period of time **automatically**, unlike Apache Spark/DataBricks which requires function calls to achieve this end such as .persist()
* The sheer volume of database tables involved with a financial transactional system (a CU or a bank, etc.) is extraordinary and will require a data dictionary to effectively utilize, or otherwise subject matter expertise (SME).  Unfortunately, OTS' database department informed me a formal data dictionary was unavailble and therefore I relied primarily on my own observations
* Snowflake offers contract terms with pay per query, so unecessary queries should be avoided where possible (think millions of rows of data), although I had encountered no issues with this as I purposefully chose to limit returned results while examining database views.
* Snowflake has a built-in function for writing Snowflake DataFrames to a database table w/in your Snowflake instance.  We do not need to use SQLAlchemy or a similar SQL library (SQLite, etc.) for Python.
* Snowflake database tables (or SQL tables for that matter) can have comments attached to columns which can then be read out simultaneously via SQL query
* In a production environment the best practice is to use a config file which holds sensitive login/authorization credentials separate from the main file(s).  There are several Python libraries available for reading in these files including 1 for YAML files which has been used here.