###Problem: **Given a bank statement, build a system that:**
* Predicts whether user is Salaried.
* if YES, then estimate the monthly salary

----

In [7]:
!pip install msoffcrypto-tool

Collecting msoffcrypto-tool
  Downloading msoffcrypto_tool-6.0.0-py3-none-any.whl.metadata (10 kB)
Collecting olefile>=0.46 (from msoffcrypto-tool)
  Downloading olefile-0.47-py2.py3-none-any.whl.metadata (9.7 kB)
Downloading msoffcrypto_tool-6.0.0-py3-none-any.whl (48 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m48.8/48.8 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading olefile-0.47-py2.py3-none-any.whl (114 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m114.6/114.6 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: olefile, msoffcrypto-tool
Successfully installed msoffcrypto-tool-6.0.0 olefile-0.47


In [21]:
import pandas as pd
import numpy as np
from datetime import datetime
import re
from typing import Dict, Optional, List
from difflib import SequenceMatcher
import warnings
warnings.filterwarnings('ignore')

###Assumptions
Strategy: Define some certain parameters that a bank statement must follow to be classified as 'Salaried'

In [6]:
assumptions = {
    'min_occurences': 3,
    'min_months': 3,
    'regularity_threshold': 0.6,
    'amount_variance_threshold': 0.15,
    'min_salary_amt': 10000,
    'max_salary_amt': 400000
}

I asked some questions:
* Minimum how many months of data is needed?
* Minimum how many times should salary must appear?
* How regular credits must be (0-1)?
* How much can the salary amount realistically vary? i.e. CV
* What's the minimum realistic monthly salary?
* What's the maximum realistic monthly salary?

###Extracting the Bank Statement
Bank statements are usually **encrypted** (simply, password-protected).

So, we'll do this by using a helper library called `msoffcrypto-tool` to unlock the file into memory, and then `pandas` to extract the specific date.

In [2]:
import pandas as pd
import msoffcrypto
import io
import getpass

In [9]:
file_path = '/content/drive/MyDrive/Salary Predictor/AccountStatement.xlsx'
file_password = getpass.getpass()

··········


In [10]:
def extract_statement(file_path: str, file_password: str) -> pd.DataFrame:
  path = file_path
  password = file_password

  decrypted_file = io.BytesIO()

  with open(path, 'rb') as file:
    office_file = msoffcrypto.OfficeFile(file)
    office_file.load_key(password=password)
    office_file.decrypt(decrypted_file)

  decrypted_file.seek(0)

  df = pd.read_excel(
      decrypted_file,
      usecols = 'A:F',
      skiprows=17,
      nrows=130,
      header=0
  )

  return df

Here's a breakdown of what we did:
* Setup file path and password
* Decrypt the file into memory
* Read the specific range using pandas
  * usecols= 'A:F' grabs columns A through F
  * skiprows= 17 skips the first 17 rows, starting from row 18
  * nrows= 130 reads 130 rows after skipping
  * header= 0 assumes the first row contains column names

###Explore the data

In [None]:
df.head()

Unnamed: 0,Date,Details,Ref No/Cheque No,Debit,Credit,Balance
0,01/10/2025,WDL TFR UPI/DR/527472660403/Kamdhenu/YESB/q...,,100.0,,1035.4
1,02/10/2025,WDL TFR UPI/DR/527541238410/BOUDIR M/BARB/s...,,15.0,,1020.4
2,02/10/2025,WDL TFR UPI/DR/527504966304/85830533/UCBA/8...,,795.0,,225.4
3,02/10/2025,WDL TFR UPI/DR/527502384306/SPENCERS/HDFC/s...,,18.5,,206.9
4,03/10/2025,DEP TFR UPI/CR/527529892583/SUBHADIP/UCBA/p...,,,100.0,306.9


In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 129 entries, 0 to 128
Data columns (total 6 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   Date              129 non-null    object 
 1   Details           129 non-null    object 
 2   Ref No/Cheque No  0 non-null      float64
 3   Debit             93 non-null     float64
 4   Credit            36 non-null     float64
 5   Balance           129 non-null    float64
dtypes: float64(4), object(2)
memory usage: 6.2+ KB


In [None]:
df.select_dtypes(include='object').describe()

Unnamed: 0,Date,Details
count,129,129
unique,55,129
top,22/12/2025,WDL TFR UPI/DR/527472660403/Kamdhenu/YESB/q...
freq,8,1


In [None]:
df.select_dtypes(include='float64').describe()

Unnamed: 0,Ref No/Cheque No,Debit,Credit,Balance
count,0.0,93.0,36.0,129.0
mean,,184.534946,457.555556,571.803876
std,,642.436202,1035.781843,1213.077378
min,,1.0,1.0,2.95
25%,,15.0,52.25,95.55
50%,,35.0,200.0,235.5
75%,,170.4,500.0,527.9
max,,6000.0,6250.0,6575.7


###Data Preprocessing
Prepare the data only for what's needed

In [11]:
def prepare_data(df: pd.DataFrame) -> pd.DataFrame:
  try:
    df = df.copy()

    df.columns = df.columns.str.lower()

    if 'ref no/cheque no' in df.columns:
      df.drop(columns=['ref no/cheque no'], inplace=True)
    else:
      warnings.warn("'Ref No/Cheque No' column not found")

    df = df[(df['credit'].notna() | df['debit'].notna())]

    if len(df) == 0:
      raise ValueError("No valid transactions found after filtering")

    required_cols = ['date','details','credit','debit','balance']
    missing_cols = [col for col in required_cols if col not in df.columns]

    if missing_cols:
      raise ValueError(f"Required columns not found : {missing_cols}")

    df['date'] = pd.to_datetime(df['date'],dayfirst=True, errors='coerce')

    if df['date'].isna().sum() > 0:
      df = df.dropna(subset=['date'])
      print(f"Dropped {df['date'].isna().sum()} rows, they had invalid dates")

    df['details'] = df['details'].astype(str)

    df['credit'] = pd.to_numeric(df['credit'], errors='coerce')
    df['debit'] = pd.to_numeric(df['debit'],errors = 'coerce')
    df['balance'] = pd.to_numeric(df['balance'],errors='coerce')

    return df

  except Exception as e:
    print(f"Error during data preparation {e}")
    raise

Here's a breakdown of what we did:
* Drop unnecessary columns
* Check for rows with unnecessary transactions and remove them if needed
* Check and validate missing columns
* Parse dates to handle various date formats
* Check if any dates failed
* Convert `Details` column into `str` datatype
* Convert amount columns to `numeric`
* Added comprehensive error handling for each scenario

###Transaction Parsing
Extract transaction type according to specifics

In [29]:
def get_transaction_type(details: str) -> str:
  if pd.isna(details):
    return 'UNKNOWN'

  details_upper = str(details).upper()

  if 'NEFT' in details_upper: return 'NEFT'
  if 'RTGS' in details_upper: return 'RTGS'
  if 'IMPS' in details_upper: return 'IMPS'
  if 'UPI' in details_upper: return 'UPI'
  if 'ATM' in details_upper: return 'ATM'
  if 'CASH DEPOSIT' in details_upper: return 'CASH DEPOSIT'
  if any(word in details_upper for word in ['CHQ','CHEQUE']):
    return 'CHEQUE'
  if any(word in details_upper for word in ['SALARY','SAL']):
    return 'SALARY'
  if any(word in details_upper for word in ['CASH','DEP']):
    return 'CASH'

  return 'OTHER'

Here's a breakdowno of what we did:
* Check if the details column has `NaN` value
* Convert details into uppercase for clean narration text
* Check keywords: `NEFT`,`RTGS`,`IMPS`,`CHEQUE`,`CASH`

###Building Core Detection Logic
We'll build a pure rule-based salary detection that works on single user data.

Here'e the framework:
* Find transactions that could be a salary
* Calculate how regular transactions are on a scale of 1-10
* Consider both amount consistency and timing consistency
* Get credit summary statistics for debugging
* Keep a standard error response format if things go wrong
* Test using synthetic bank statements

###Find Potential Salary Transactions
Strategy: Find the most frequent cluster of high-value credits

In [13]:
def find_salary_candidates(credit_df: pd.DataFrame)->Optional[pd.DataFrame]:
  threshold = credit_df['credit'].quantile(0.5)
  large_credits = credit_df[credit_df['credit'] > threshold].copy()

  if len(large_credits) < assumptions['min_occurences']:
    return None

  large_credits = large_credits.sort_values('credit')
  clusters = []
  current_cluster = [large_credits.iloc[0]]

  for i in range(1, len(large_credits)):
    current_amount = large_credits.iloc[1]['credit']
    cluster_mean = np.mean([c['credit'] for c in current_cluster])

    if abs(current_amount - cluster_mean) <= 0.15 * cluster_mean:
      current_cluster.append(large_credits.iloc[i])
    else:
      if len(current_cluster) >= assumptions['min_occurences']:
        clusters.append(current_cluster)
      current_cluster = [large_credits.iloc[i]]

  if len(current_cluster) >= assumptions['min_occurences']:
    clusters.append(current_cluster)

  if not clusters:
    return None

  largest_cluster = max(clusters, key=len)
  return pd.DataFrame(largest_cluster)

Here's a breakdown of what we did:
* Filter to top 50% of credits by amount
* Group similar amounts within 15% (simple clustering)
* If the value lies within 15% of cluster mean then add to cluster, else save that to start a new one
* Get the largest cluster among all

###Calculate Regularity of Transactions
Strategy: Calculate how regular transactions are (0-1) scale ; Consider both amount and time consistency

In [14]:
def calculate_regularity_score(transaction_df: pd.DataFrame) -> float:
  if len(transaction_df) < 2:
    return 0

  amount_cv = transaction_df['credit'].std() / transaction_df['credit'].mean()
  amount_score = max(0, 1 - amount_cv)

  days = transaction_df['day'].values
  day_std = np.std(days)
  time_score = max(0, 1 - (day_std/15))

  months = transaction_df['month'].nunique()
  total_months = (transaction_df['date'].max() - transaction_df['date'].min()).days / 30
  occurence_score = min(1.0, months/max(1, total_months))

  weighted_avg = 0.4 * amount_score + 0.4 * time_score + 0.2 * occurence_score
  return weighted_avg

Here's a breakdown of what we did:
* Calculated amount regularity (inverse of coefficient of variation)
* Calculated time regularity (days of month consistency)
* Show monthly occurence regularity score
* Find the weighted average (40% amount + 40%time + 20%occurence)

###Source Consistency Check
Strategy: Check if the money is coming from same entity/source everytime

In [15]:
def check_source_consistency(clustered_df: pd.DataFrame) -> float:
  if len(clustered_df) <2:
    return 0.0

  descriptions = clustered_df['details'].astype(str).tolist()

  similarity_scores = []
  base_desc = descriptions[0]

  for desc in descriptions[1:]:
    ratio = SequenceMatcher(None, base_desc, desc).ratio()
    similarity_scores.append(ratio)

  return np.mean(similarity_scores)

Here's a breakdown of what I did:
* Extract the `details` column
* Compare each description with the first one in the cluster
* Use a sequence-matcher to find similarity ratio
* Returns a score (0-1) indicating how similar the transaction descriptions are:
  * 1.0 → identical descriptions
  * 0.0 → completely different descriptions

###Get Credit Summary
Strategy: Get summary statistics for debugging

In [16]:
def get_credit_summary(credit_df: pd.DataFrame) -> Dict:
  return {
      'total_credits': len(credit_df),
      'total_months': credit_df['month'].nunique(),
      'avg_credits_per_month': round(len(credit_df) / credit_df['month'].nunique(), 1),
      'credit_amt_range': (int(credit_df['credit'].min()), int(credit_df['credit'].max())),
      'median_credit': int(credit_df['credit'].median())
  }

Here's a breakdown of what we did:
* Total credits
* Total months
* Average credits per month
* Credit Amount Range
* Median Credit

###Error Handling
Strategy: Build a standard error response format if things go wrong

In [17]:
def error_handling(text: str, confidence:float= 0.0) -> Dict:
  return {
      'is_salaried': False,
      'estimated_salary': None,
      'reason': text,
      'details': {},
      'confidence_score': confidence
  }

###Implementing Detection Logic
Strategy: Building a function that applies the salary detection logic

In [30]:
def detect_salary(df: pd.DataFrame) -> Dict:
  if df is None or len(df) == 0:
    return error_handling("No valid transaction found")

  credits = df[df['credit'].notna()].copy()

  if len(credits) == 0:
    return error_handling("No credit transactions found")

  credits['month'] = credits['date'].dt.to_period('M')
  credits['day'] = credits['date'].dt.day
  credits['txntype'] = credits['details'].apply(get_transaction_type)

  n_months = credits['month'].nunique()
  if n_months < assumptions['min_months']:
    return error_handling(f"Insufficient Data: {n_months} months of data")

  salary_candidates = find_salary_candidates(credits)

  if salary_candidates is None or len(salary_candidates) < assumptions['min_occurences']:
    return {
        'is_salaried': False,
        'estimated_salary': None,
        'reason': 'No regular high-value credits detected',
        'details': get_credit_summary(credits)
    }


  regularity_score = calculate_regularity_score(salary_candidates)
  amount_cv = salary_candidates['credit'].std() / salary_candidates['credit'].mean()
  amount_consistency_score = max(0.0, 1.0 - amount_cv)

  source_consistency = check_source_consistency(salary_candidates)

  confidence = round(np.mean([regularity_score,amount_consistency_score,source_consistency]), 2)

  is_salaried = (
      regularity_score >= assumptions['regularity_threshold'] and
      amount_cv <= assumptions['amount_variance_threshold'] and
      source_consistency > 0.8
  )

  if is_salaried:
    estimated_salary = int(salary_candidates['credit'].median())

    return {
        'is_salaried': True,
        'confidence_score': confidence,
        'estimated_salary': estimated_salary,
        'salary_range': (
        int(salary_candidates['credit'].min()),
        int(salary_candidates['credit'].max()))
    }

  else:
    reason_parts = []
    if regularity_score < assumptions['regularity_threshold']:
      reason_parts.append(f'regularity score too low (score: {regularity_score:.2f}, threshold: {assumptions['regularity_threshold']})')
    if amount_cv > assumptions['amount_variance_threshold']:
      reason_parts.append(f'amount variance too high (CV: {amount_cv:.2f}, threshold: {assumptions['amount_variance_threshold']})')

    reason_str = ''
    if len(reason_parts) == 1:
        reason_str = f'Credits not regular enough: {reason_parts[0]}'
    elif len(reason_parts) == 2:
        reason_str = f'Credits not regular enough: {reason_parts[0]} and {reason_parts[1]}'
    else:
        reason_str = f'Credits not regular enough (score: {regularity_score:.2f}, threshold: {assumptions['regularity_threshold']})'

    return {
        'is_salaried': False,
        'confidence_score': confidence,
        'estimated_salary': None,
        'reason': reason_str,
        'details': get_credit_summary(credits),
    }

Here's a breakdown of what I did:
* Validate inputs and check if dataframe is empty
* Extract only CREDIT transactions
* Add columns [`Month`,`Day`,`TxnType`] for
* Check the minimum data requirement
* Find potential salary transactions
* Calculate Regularity Score & how much the amount varies(amount_cv)
* Apply Decision Logic
  * Regularity score exceeds given regularity threshold
  * amount_cv is less than given amount variance threshold
* Once decision logic is satisfied, we calculate:
  * `is_salaried`: boolean value (yes or no)
  * `estimated_salary`: median earning value of the salaried candidate
  * `salary_range`: potential range where salary lies
* For non-salaried cases, we'll just add:
  * `reason`: Mentioning the regularity score and threshold
  * `details`: credit summary statistics

###Generate sample bank statements for testing

In [19]:
def generate_test_data(user_type: str = 'salaried', months: int = 6) -> pd.DataFrame:

    transactions = []
    start_date = pd.Timestamp('2024-01-01')
    current_balance = 10000

    if user_type == 'salaried':
        base_salary = 40000
        for month in range(months):
            salary_date = start_date + pd.DateOffset(months=month)

            credit_val = base_salary + np.random.normal(0, 500)
            current_balance += credit_val
            transactions.append({
                'Date': salary_date.strftime('%d/%m/%Y'),
                'Details': 'NEFT/SALARY/XYZ_CORP/INR',
                'Credit': credit_val,
                'Debit': np.nan,
                'Balance': current_balance
            })

            for _ in range(np.random.randint(3, 8)):
                noise_date = salary_date + pd.Timedelta(days=np.random.randint(1, 28))

                credit_noise = np.nan
                debit_noise = np.nan

                if np.random.random() > 0.7:
                    credit_noise = np.random.uniform(100, 3000)
                    current_balance += credit_noise

                if np.random.random() > 0.3:
                    debit_noise = np.random.uniform(100, 5000)
                    current_balance -= debit_noise

                transactions.append({
                    'Date': noise_date.strftime('%d/%m/%Y'),
                    'Details': f'UPI/MERCHANT{np.random.randint(1,100)}',
                    'Credit': credit_noise,
                    'Debit': debit_noise,
                    'Balance': current_balance
                })

    elif user_type == 'freelancer':
        for month in range(months):
            base_date = start_date + pd.DateOffset(months=month)

            for _ in range(np.random.randint(2, 6)):
                payment_date = base_date + pd.Timedelta(days=np.random.randint(1, 28))

                credit_val = np.random.uniform(5000, 50000)
                current_balance += credit_val
                transactions.append({
                    'Date': payment_date.strftime('%d/%m/%Y'),
                    'Details': f'UPI/CLIENT{np.random.randint(1,20)}/PAYMENT',
                    'Credit': credit_val,
                    'Debit': np.nan,
                    'Balance': current_balance
                })

    elif user_type == 'student':
        for month in range(months):
            base_date = start_date + pd.DateOffset(months=month)
            for _ in range(np.random.randint(1, 4)):
                txn_date = base_date + pd.Timedelta(days=np.random.randint(1, 28))

                credit_val = np.nan
                debit_val = np.nan

                if np.random.random() > 0.5:
                    credit_val = np.random.uniform(500, 5000)
                    current_balance += credit_val

                if np.random.random() > 0.3:
                    debit_val = np.random.uniform(100, 2000)
                    current_balance -= debit_val

                transactions.append({
                    'Date': txn_date.strftime('%d/%m/%Y'),
                    'Details': 'UPI/PARENT/TRANSFER' if np.random.random() > 0.5 else 'ATM/CASH_DEPOSIT',
                    'Credit': credit_val,
                    'Debit': debit_val,
                    'Balance': current_balance
                })

    transactions.sort(key=lambda x: datetime.strptime(x['Date'], '%d/%m/%Y'))

    return pd.DataFrame(transactions)

####Let's test out this system using a sample salaried bank statement

In [35]:
freelance_df = generate_test_data(user_type='freelancer',months=4)
preprocessed_data = prepare_data(freelance_df)
result = detect_salary(preprocessed_data)

In [36]:
result

{'is_salaried': False,
 'confidence_score': np.float64(0.8),
 'estimated_salary': None,
 'reason': 'Credits not regular enough: amount variance too high (CV: 0.24, threshold: 0.15)',
 'details': {'total_credits': 14,
  'total_months': 4,
  'avg_credits_per_month': 3.5,
  'credit_amt_range': (5963, 45616),
  'median_credit': 22284}}

In [37]:
student_df = generate_test_data(user_type='student',months=5)
preprocessed_Data = prepare_data(student_df)
result = detect_salary(preprocessed_data)

In [38]:
result

{'is_salaried': False,
 'confidence_score': np.float64(0.8),
 'estimated_salary': None,
 'reason': 'Credits not regular enough: amount variance too high (CV: 0.24, threshold: 0.15)',
 'details': {'total_credits': 14,
  'total_months': 4,
  'avg_credits_per_month': 3.5,
  'credit_amt_range': (5963, 45616),
  'median_credit': 22284}}

In [43]:
salaried_df = generate_test_data(user_type='salaried',months = 6)
preprocessed_data = prepare_data(salaried_df)
result = detect_salary(preprocessed_data)

In [44]:
result

{'is_salaried': True,
 'confidence_score': np.float64(1.0),
 'estimated_salary': 39977,
 'salary_range': (39431, 40631)}

####Let's test with my own bank statement

In [45]:
bank_statement = extract_statement('/content/drive/MyDrive/Salary Predictor/AccountStatement.xlsx','TRIDE03042001')
preprocessed_data = prepare_data(bank_statement)
result = detect_salary(preprocessed_data)

In [46]:
result

{'is_salaried': False,
 'confidence_score': np.float64(0.81),
 'estimated_salary': None,
 'reason': 'Credits not regular enough: amount variance too high (CV: 0.21, threshold: 0.15)',
 'details': {'total_credits': 36,
  'total_months': 3,
  'avg_credits_per_month': 12.0,
  'credit_amt_range': (1, 6250),
  'median_credit': 200}}