In [1]:
import pandas as pd
import numpy as np
import pandas_gbq
from google.cloud import bigquery
import time
import calendar
from datetime import datetime, timedelta,date
import seaborn as sns
import pickle
import cloudpickle
pd.set_option('display.max_columns', 500)
pd.set_option('display.max_rows', 500)
client = bigquery.Client()
from tqdm import tqdm
import logging
import sys
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
client = bigquery.Client()
from sklearn.preprocessing import StandardScaler, LabelEncoder,OneHotEncoder
import warnings as w
w.filterwarnings('ignore')

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
from pyspark.sql import types as st
from pyspark.sql.window import Window

In [3]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName('SMS Feature Mart') \
    .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.29.0') \
    .config('spark.jars', '/home/jupyter/jar_folder/gcs-connector-hadoop3-latest.jar') \
    .config('spark.hadoop.fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem') \
    .config('spark.hadoop.google.cloud.auth.service.account.enable', 'true')\
    .config('spark.executor.instances', '8') \
    .config('spark.executor.memory', '64g') \
    .config('spark.driver.memory', '16g') \
    .config('spark.memory.fraction', '0.8') \
    .config('spark.sql.shuffle.partitions', '1000') \
    .config('spark.shuffle.spill', 'true') \
    .config('spark.hadoop.fs.gs.system.bucket', 'alternate_credit_score_model_files') \
    .getOrCreate()

# Step 2: Set up GCS configurations
hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("google.cloud.auth.service.account.enable", "true")
hadoop_conf.set("google.cloud.auth.service.account.json.keyfile", "/home/jupyter/jar_folder/abcd-dataplatform-4a30abb96536.json")
print("Spark Session created")

:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jupyter/.ivy2/cache
The jars for the packages stored in: /home/jupyter/.ivy2/jars
com.google.cloud.spark#spark-bigquery-with-dependencies_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b36aa803-e53d-46ca-91a5-afbd03ffd523;1.0
	confs: [default]
	found com.google.cloud.spark#spark-bigquery-with-dependencies_2.12;0.29.0 in central
:: resolution report :: resolve 371ms :: artifacts dl 10ms
	:: modules in use:
	com.google.cloud.spark#spark-bigquery-with-dependencies_2.12;0.29.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	--------------------------------------------------------------

25/06/11 09:17:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/06/11 09:17:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/06/11 09:17:23 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
Spark Session created


In [4]:
#creating date variable first such as end_of_date,strt_of_date,7 day wrt to as_end_date,15 day wrt to as_end_date
as_of_end_dt = pd.to_datetime('today').normalize()
as_of_end_dt

Timestamp('2025-06-11 00:00:00')

In [5]:
#defining date variable
last_3months = as_of_end_dt - pd.DateOffset(months=3)
last_6months = as_of_end_dt - pd.DateOffset(months=6)
last_9months = as_of_end_dt - pd.DateOffset(months=9)
last_12months = as_of_end_dt - pd.DateOffset(months=12)

# Model Base

In [6]:
model_base_query = f"""
    SELECT
        mobilenumber,
        id AS customer_id
    FROM 
        abcd-dataplatform-prod.abcd_mobileapp_transformed.ABCDPRODDB_t_customer
        """
model_base = client.query(model_base_query).to_dataframe()
model_base['mobilenumber'] = pd.to_numeric(model_base['mobilenumber'], errors='coerce')
model_base = model_base.dropna(subset=['mobilenumber'])
model_base['mobilenumber'] = model_base['mobilenumber'].astype('int64')
model_base = model_base[model_base['mobilenumber'].between(10**9, 10**10 - 1)]
model_base.info()

<class 'pandas.core.frame.DataFrame'>
Index: 0 entries
Data columns (total 2 columns):
 #   Column        Non-Null Count  Dtype
---  ------        --------------  -----
 0   mobilenumber  0 non-null      int64
 1   customer_id   0 non-null      Int64
dtypes: Int64(1), int64(1)
memory usage: 0.0 bytes


# Investment Product Buy from On-us

In [7]:
business_query = f"""
SELECT 
    product,purchase_date,amount,mobilenumber
FROM 
    `abcd-dataplatform.abcd_data_model.abcd_app_business`
"""
df_prod = client.query(business_query).to_dataframe()
df_prod.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 203289 entries, 0 to 203288
Data columns (total 4 columns):
 #   Column         Non-Null Count   Dtype  
---  ------         --------------   -----  
 0   product        203289 non-null  object 
 1   purchase_date  203289 non-null  dbdate 
 2   amount         203289 non-null  float64
 3   mobilenumber   203263 non-null  object 
dtypes: dbdate(1), float64(1), object(2)
memory usage: 6.2+ MB


In [8]:
df_prod['product'].unique()

array(['STOCKS', 'HEALTH INSURANCE', 'POCKET INSURANCE', 'DIGIGOLD',
       'DIGI SILVER', 'MUTUAL FUND', 'MOTOR INSURANCE',
       'TRAVEL INSURANCE', 'FIXED DEPOSIT', 'LIFE INSURANCE',
       'PERSONAL LOAN', 'LAS', 'GOLD LOAN', 'BL', 'HOME_LOAN'],
      dtype=object)

In [9]:
df_prod

Unnamed: 0,product,purchase_date,amount,mobilenumber
0,STOCKS,2023-11-14,0.0,9840359735
1,STOCKS,2023-12-18,0.0,9869707918
2,STOCKS,2024-01-15,0.0,9819245861
3,STOCKS,2024-02-20,0.0,9321827343
4,STOCKS,2024-02-27,0.0,8099715515
...,...,...,...,...
203284,HOME_LOAN,2025-04-04,4300000.0,
203285,HOME_LOAN,2024-10-25,4500000.0,
203286,HOME_LOAN,2025-04-07,4600411.0,
203287,HOME_LOAN,2024-12-24,5484970.0,


# focusing only on investment product

In [10]:
investment_products = [
    'STOCKS',
    'MUTUAL FUND',
    'DIGIGOLD',
    'DIGI SILVER',
    'FIXED DEPOSIT'
]

In [11]:
# Cleaning mobilenumber in business matrix table
df_prod['mobilenumber'] = pd.to_numeric(df_prod['mobilenumber'], errors='coerce')
df_prod = df_prod.dropna(subset=['mobilenumber'])
df_prod['mobilenumber'] = df_prod['mobilenumber'].astype('int64')
df_prod = df_prod[df_prod['mobilenumber'].between(10**9, 10**10 - 1)]

In [12]:
df_prod.rename(columns={'amount':'total_amount'},inplace=True)

In [13]:
df_prod.info()

<class 'pandas.core.frame.DataFrame'>
Index: 203260 entries, 0 to 203283
Data columns (total 4 columns):
 #   Column         Non-Null Count   Dtype  
---  ------         --------------   -----  
 0   product        203260 non-null  object 
 1   purchase_date  203260 non-null  dbdate 
 2   total_amount   203260 non-null  float64
 3   mobilenumber   203260 non-null  int64  
dtypes: dbdate(1), float64(1), int64(1), object(1)
memory usage: 7.8+ MB


In [14]:
#Filtering investment products
df_investments = df_prod[df_prod['product'].isin(investment_products)].copy()
df_investments.sample(10)

Unnamed: 0,product,purchase_date,total_amount,mobilenumber
92629,MUTUAL FUND,2024-11-28,1500.0,8506805520
119144,FIXED DEPOSIT,2025-05-06,5000.0,7709912272
180814,DIGIGOLD,2025-03-13,10001.0,8867990851
151462,DIGIGOLD,2025-03-07,5001.0,8999101080
107775,DIGIGOLD,2024-06-22,2001.0,9819601993
177220,DIGIGOLD,2024-05-10,7722.46,9702770033
125064,DIGIGOLD,2024-09-24,5001.0,9266485487
81059,DIGIGOLD,2025-06-10,501.0,8754784301
34929,DIGIGOLD,2024-11-20,18.0,9845604423
92908,MUTUAL FUND,2024-11-07,1500.0,8388029185


In [15]:
#Renaming columns 
df_investments.rename(columns={
    'purchase_date': 'buy_date',
    'product': 'type_of_investment'
}, inplace=True)

In [16]:
df_investments.head()

Unnamed: 0,type_of_investment,buy_date,total_amount,mobilenumber
0,STOCKS,2023-11-14,0.0,9840359735
1,STOCKS,2023-12-18,0.0,9869707918
2,STOCKS,2024-01-15,0.0,9819245861
3,STOCKS,2024-02-20,0.0,9321827343
4,STOCKS,2024-02-27,0.0,8099715515


In [17]:
df_investments.info()

<class 'pandas.core.frame.DataFrame'>
Index: 187318 entries, 0 to 203255
Data columns (total 4 columns):
 #   Column              Non-Null Count   Dtype  
---  ------              --------------   -----  
 0   type_of_investment  187318 non-null  object 
 1   buy_date            187318 non-null  dbdate 
 2   total_amount        187318 non-null  float64
 3   mobilenumber        187318 non-null  int64  
dtypes: dbdate(1), float64(1), int64(1), object(1)
memory usage: 7.1+ MB


In [18]:
df_investments['buy_date'] = pd.to_datetime(df_investments['buy_date'])
df_investments['buy_date'].head()

0   2023-11-14
1   2023-12-18
2   2024-01-15
3   2024-02-20
4   2024-02-27
Name: buy_date, dtype: datetime64[ns]

In [19]:
#Creating 'month_year' column 
df_investments['month_year'] = df_investments['buy_date'].dt.to_period('M').astype(str)
df_investments['month_year'].head()

0    2023-11
1    2023-12
2    2024-01
3    2024-02
4    2024-02
Name: month_year, dtype: object

In [20]:
df_investments.head()

Unnamed: 0,type_of_investment,buy_date,total_amount,mobilenumber,month_year
0,STOCKS,2023-11-14,0.0,9840359735,2023-11
1,STOCKS,2023-12-18,0.0,9869707918,2023-12
2,STOCKS,2024-01-15,0.0,9819245861,2024-01
3,STOCKS,2024-02-20,0.0,9321827343,2024-02
4,STOCKS,2024-02-27,0.0,8099715515,2024-02


In [None]:
#Filtering data from April 2024 to April 2025
df_filtered = df_investments[
    (df_investments['buy_date'] >= '2024-04-01') & 
    (df_investments['buy_date'] <= '2025-04-30')
].copy()

In [22]:
df_filtered.head()

Unnamed: 0,type_of_investment,buy_date,total_amount,mobilenumber,month_year
431,STOCKS,2025-05-24,0.0,9880426604,2025-05
28482,DIGIGOLD,2025-05-01,10.0,7739338747,2025-05
28483,DIGIGOLD,2025-05-01,10.0,8726925380,2025-05
28484,DIGIGOLD,2025-05-01,10.0,9917180393,2025-05
28485,DIGIGOLD,2025-05-01,10.0,8838314870,2025-05


In [23]:
monthly_product_summary  = df_filtered.groupby(['mobilenumber', 'month_year', 'type_of_investment'])['total_amount'].agg(
    total_investment='sum',
    avg_investment='mean'
).reset_index()

In [24]:
monthly_product_summary.head()

Unnamed: 0,mobilenumber,month_year,type_of_investment,total_investment,avg_investment
0,6000175688,2025-05,MUTUAL FUND,5000.0,5000.0
1,6000426705,2025-05,DIGI SILVER,5001.0,5001.0
2,6000581615,2025-05,DIGIGOLD,500.0,500.0
3,6000651261,2025-05,DIGI SILVER,2000.0,2000.0
4,6001351830,2025-05,DIGIGOLD,1051.0,1051.0


In [25]:
monthly_product_summary.mobilenumber.nunique(),monthly_product_summary.shape[0]

(7736, 8246)

In [26]:
# monthly_product_summary['result'] = (
#     monthly_product_summary['total_investment'] == monthly_product_summary['avg_investment']
# ).astype(str)

In [27]:
def convert_to_snapshot_period(month_year):
    return pd.to_datetime(month_year).strftime('%Y-%b').lower()

# Add snapshot_period column
monthly_product_summary['snapshot_period'] = monthly_product_summary['month_year'].apply(convert_to_snapshot_period)

In [28]:
monthly_product_summary.head()

Unnamed: 0,mobilenumber,month_year,type_of_investment,total_investment,avg_investment,snapshot_period
0,6000175688,2025-05,MUTUAL FUND,5000.0,5000.0,2025-may
1,6000426705,2025-05,DIGI SILVER,5001.0,5001.0,2025-may
2,6000581615,2025-05,DIGIGOLD,500.0,500.0,2025-may
3,6000651261,2025-05,DIGI SILVER,2000.0,2000.0,2025-may
4,6001351830,2025-05,DIGIGOLD,1051.0,1051.0,2025-may


In [29]:
monthly_product_summary[monthly_product_summary['type_of_investment']=='MUTUAL FUND']

Unnamed: 0,mobilenumber,month_year,type_of_investment,total_investment,avg_investment,snapshot_period
0,6000175688,2025-05,MUTUAL FUND,5000.0,5000.000000,2025-may
5,6001939240,2025-05,MUTUAL FUND,2000.0,2000.000000,2025-may
14,6200096665,2025-05,MUTUAL FUND,2000.0,2000.000000,2025-may
30,6203006871,2025-05,MUTUAL FUND,1500.0,1500.000000,2025-may
44,6204497098,2025-05,MUTUAL FUND,1500.0,1500.000000,2025-may
...,...,...,...,...,...,...
8233,9999177513,2025-05,MUTUAL FUND,2500.0,2500.000000,2025-may
8236,9999397696,2025-05,MUTUAL FUND,5000.0,5000.000000,2025-may
8237,9999432162,2025-05,MUTUAL FUND,5500.0,1833.333333,2025-may
8243,9999820567,2025-05,MUTUAL FUND,1500.0,1500.000000,2025-may


In [30]:
# pivot_df  = monthly_product_summary.pivot_table(
#     index='month_year',
#     columns='type_of_investment',
#     values='total_investment',
#     aggfunc='sum',
#     fill_value=0
# ).reset_index()

In [31]:
# customers_per_month = monthly_product_summary.groupby('month_year')['mobilenumber'].nunique().reset_index()
# customers_per_month

In [32]:
#customers_per_month.rename(columns={'mobilenumber': 'unique_customers'}, inplace=True)

In [33]:
# final_df = pivot_df.merge(customers_per_month, on='month_year')
# final_df

In [34]:
# final_df.to_csv('ABCD_monthly_product_summary.csv',index=False)

# Investment product buy from off-us so using sms data

# Extracting Investment related feature for last 1yrs

In [35]:
# Set Google Cloud project ID
project_id = 'abcd-dataplatform-prod'
dataset_id = "abcd_digitap_transformed"
table_id = "users_digitap_sms_data_transformed"
table_name = f"{project_id}.{dataset_id}.{table_id}"

sms_data = spark.read.format("bigquery") \
    .option('table', table_name) \
    .load()

In [36]:
sms_data.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- year_month: string (nullable = true)
 |-- year_month_date: date (nullable = true)
 |-- total_inflow: string (nullable = true)
 |-- total_expense: string (nullable = true)
 |-- balances: struct (nullable = true)
 |    |-- bank_accounts: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- balance: string (nullable = true)
 |    |    |    |-- monthly_credit: string (nullable = true)
 |    |    |    |-- monthly_debit: string (nullable = true)
 |    |    |    |-- bank: string (nullable = true)
 |    |    |    |-- account: string (nullable = true)
 |    |    |    |-- last_txn_date: string (nullable = true)
 |    |-- year_month: string (nullable = true)
 |    |-- total_inflow: string (nullable = true)
 |    |-- loan_dues: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- emi_due: string (nullable = true)
 |    |  

In [37]:
test_columns = ['user_id', 'ingestion_time', 'device_id', 'year_month', 'year_month_date']
print(''.join([f"    sf.col('{col}'),\n" for col in test_columns]))

    sf.col('user_id'),
    sf.col('ingestion_time'),
    sf.col('device_id'),
    sf.col('year_month'),
    sf.col('year_month_date'),



In [38]:
investment_base = sms_data.select(test_columns + ['balances.investment'])
investment_base.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- ingestion_time: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- year_month: string (nullable = true)
 |-- year_month_date: date (nullable = true)
 |-- investment: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- amount: string (nullable = true)
 |    |    |-- folio_no: string (nullable = true)
 |    |    |-- last_txn_date: string (nullable = true)



In [39]:
# Explode the investment data
investments_df = investment_base.select(
    sf.col("user_id"),
    sf.col('year_month'),
    sf.col("year_month_date"),
    sf.explode(sf.col("investment")).alias("investment"))

In [40]:
# Fetch only the last 1 years of investment data
max_date = investments_df.agg(sf.max("year_month_date")).collect()[0][0]
twelve_months_ago = sf.add_months(sf.lit(max_date), -12)

investments_df = investments_df.filter(
    (sf.col("year_month_date") > twelve_months_ago) & (sf.col("year_month_date") <= sf.lit(max_date))
)

                                                                                

In [41]:
# Select required columns with necessary transformations
investments_df = investments_df.select(
    sf.col("user_id"),
    sf.col('year_month_date'),
    sf.col("investment.type").alias("investment_type"),
    sf.regexp_replace(sf.col("investment.amount"), ',', '').cast("float").alias("investment_amount"),
    sf.col("investment.folio_no"),
    sf.col("investment.last_txn_date")
)

In [42]:
investments_df.select("investment_type").distinct().show(truncate=False)



+-----------------------+
|investment_type        |
+-----------------------+
|SHARE & TRADING        |
|RECURRING DEPOSIT      |
|RD BOOKING             |
|FIXED DEPOSIT          |
|TD BOOKING             |
|RD CLOSURE/ RD MATURE  |
|NPS CONTRIBUTION       |
|NPS INVESTED SUM       |
|TERM DEPOSIT           |
|TD CLOSURE/ TD MATURE  |
|FD BOOKING             |
|FD CLOSURE/ FD MATURE  |
|EMPLOYEE PROVIDENT FUND|
|MF REDEMPTION          |
|MF EXPENSE             |
|Mutual Fund            |
+-----------------------+



                                                                                

In [43]:
#adding new feature to pyspark investment df 
investments_df = investments_df.withColumn("investment_type", sf.lower(sf.col("investment_type")))
investments_df = investments_df.withColumn("last_txn_date", sf.col("last_txn_date").cast(st.DateType()))

In [44]:
investments_df = investments_df.withColumn(
    "grouped_investment_type",
    sf.when(sf.col("investment_type").rlike(r"(?i)(share & trading)"), "trading")
    .when(sf.col("investment_type").rlike(r"(?i)(recurring deposit|rd booking)"), "recurring_deposit")
    .when(sf.col("investment_type").rlike(r"(?i)(fixed deposit|fd booking)"), "fixed_deposit")
    .when(sf.col("investment_type").rlike(r"(?i)(term deposit|td booking)"), "term_deposit")
    .when(sf.col("investment_type").rlike(r"(?i)(employee provident fund)"), "provident_fund")
    .when(sf.col("investment_type").rlike(r"(?i)(nps contribution|nps invested sum)"), "nps")
    .when(sf.col("investment_type").rlike(r"(?i)(mutual fund)"), "mutual_fund")

    .otherwise("Others")
)

In [45]:
inv_query = f"""
WITH exploded_investments AS (
  SELECT 
    SAFE_CAST(RIGHT(REPLACE(user_id, ',', ''), 10) AS INT64) AS user_id,
    PARSE_DATE('%Y-%m-%d', inv.last_txn_date) AS buy_date,
    CASE 
      WHEN REGEXP_CONTAINS(LOWER(inv.type), r"share & trading") THEN "trading"
      WHEN REGEXP_CONTAINS(LOWER(inv.type), r"recurring deposit|rd booking") THEN "recurring_deposit"
      WHEN REGEXP_CONTAINS(LOWER(inv.type), r"fixed deposit|fd booking") THEN "fixed_deposit"
      WHEN REGEXP_CONTAINS(LOWER(inv.type), r"term deposit|td booking") THEN "term_deposit"
      WHEN REGEXP_CONTAINS(LOWER(inv.type), r"employee provident fund") THEN "provident_fund"
      WHEN REGEXP_CONTAINS(LOWER(inv.type), r"nps contribution|nps invested sum") THEN "nps"
      WHEN REGEXP_CONTAINS(LOWER(inv.type), r"mutual fund") THEN "mutual_fund"
      ELSE "others"
    END AS type_of_investment,
    SAFE_CAST(REPLACE(inv.amount, ',', '') AS FLOAT64) AS amount
  FROM `abcd-dataplatform-prod.abcd_digitap_transformed.users_digitap_sms_data_transformed` AS a,
       UNNEST(balances.investment) AS inv
  WHERE year_month_date BETWEEN DATE_SUB(DATE('{as_of_end_dt}'), INTERVAL 12 MONTH) AND DATE('{as_of_end_dt}')
),

aggregated_investments AS (
  SELECT
    user_id,
    type_of_investment,
    MAX(buy_date) AS buy_date,
    SUM(amount) AS total_amount
  FROM exploded_investments
  WHERE type_of_investment != 'others'
  GROUP BY user_id, type_of_investment
)

SELECT 
  user_id AS mobilenumber,
  type_of_investment,
  buy_date,
  total_amount
FROM aggregated_investments

"""

invst_customers_df = client.query(inv_query).to_dataframe()
invst_customers_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7 entries, 0 to 6
Data columns (total 4 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   mobilenumber        0 non-null      Int64  
 1   type_of_investment  7 non-null      object 
 2   buy_date            7 non-null      dbdate 
 3   total_amount        7 non-null      float64
dtypes: Int64(1), dbdate(1), float64(1), object(1)
memory usage: 359.0+ bytes


In [46]:
invst_customers_df.sample(5)

Unnamed: 0,mobilenumber,type_of_investment,buy_date,total_amount
6,,term_deposit,2025-06-09,5119161000.0
1,,trading,2025-06-08,1290407000.0
3,,provident_fund,2025-06-11,239157000000.0
4,,mutual_fund,2024-09-01,10000.0
5,,fixed_deposit,2025-06-10,17336370000.0


In [47]:
invst_customers_df.mobilenumber.nunique(),invst_customers_df.shape[0]

(0, 7)

In [48]:
invst_customers_df['buy_date'] = pd.to_datetime(invst_customers_df['buy_date'])

In [49]:
invst_customers_df['month_year'] = invst_customers_df['buy_date'].dt.to_period('M').astype(str)

In [50]:
invst_customers_df.head()

Unnamed: 0,mobilenumber,type_of_investment,buy_date,total_amount,month_year
0,,nps,2025-06-05,146353500.0,2025-06
1,,trading,2025-06-08,1290407000.0,2025-06
2,,recurring_deposit,2025-06-08,583807700.0,2025-06
3,,provident_fund,2025-06-11,239157000000.0,2025-06
4,,mutual_fund,2024-09-01,10000.0,2024-09


In [51]:
#Filtering data from April 2024 to April 2025
invst_customers_df = invst_customers_df[
    (invst_customers_df['month_year'] >= '2024-04') & 
    (invst_customers_df['month_year'] <= '2025-04')
]
invst_customers_df.head()

Unnamed: 0,mobilenumber,type_of_investment,buy_date,total_amount,month_year
4,,mutual_fund,2024-09-01,10000.0,2024-09


In [52]:
sms_monthly_product_summary  = invst_customers_df.groupby(['mobilenumber', 'month_year', 'type_of_investment'])['total_amount'].agg(
    total_investment='sum',
    avg_investment='mean'
).reset_index()

In [53]:
sms_monthly_product_summary.head()

Unnamed: 0,mobilenumber,month_year,type_of_investment,total_investment,avg_investment


In [54]:
sms_monthly_product_summary.mobilenumber.nunique(),sms_monthly_product_summary.shape[0]

(0, 0)

In [55]:
def convert_to_snaconcatot_period1(month_year):
    return pd.to_datetime(month_year).strftime('%Y-%b').lower()
# Add snapshot_period column
sms_monthly_product_summary['snapshot_period'] = sms_monthly_product_summary['month_year'].apply(convert_to_snapshot_period1)

NameError: name 'convert_to_snapshot_period1' is not defined

In [None]:
sms_monthly_product_summary.mobilenumber.nunique(),sms_monthly_product_summary.shape[0]

In [None]:
# pivot_sms = sms_monthly_product_summary.pivot_table(
#     index='month_year',
#     columns='type_of_investment',
#     values='total_investment',
#     aggfunc=['sum'],
#     fill_value=0
# )
# pivot_sms.head()

In [None]:
# pivot_sms.columns = [f"{agg}_{col}".lower() for agg, col in pivot_sms.columns]
# pivot_sms = pivot_sms.reset_index()

In [None]:
# unique_mobile_count_sms = sms_monthly_product_summary.groupby('month_year')['mobilenumber'].nunique().reset_index()
# unique_mobile_count_sms.rename(columns={'mobilenumber': 'unique_mobilenumber_count'}, inplace=True)

In [None]:
# final_sms_summary = pivot_sms.merge(unique_mobile_count_sms, on='month_year')
# final_sms_summary

In [None]:
# final_sms_summary.to_csv('DigiTap_monthly_product_summary.csv',index=False)

In [None]:
.

In [None]:
final_monthly_summary = pd.concat(
    [monthly_product_summary, sms_monthly_product_summary],
    ignore_index=True
)
final_monthly_summary.head()

In [None]:
final_monthly_summary.mobilenumber.nunique(),final_monthly_summary.shape[0]

In [None]:
final_monthly_summary.to_gbq(
        destination_table='abcd_data_science_app.INVESTMENT_MONTH_ON_MONTH_DATA', #here we have to change
        project_id='abcd-dataplatform',
        if_exists='append'
    )

In [None]:
target_base_query = f"""
     SELECT DISTINCT
        mobilenumber,
        month_year,
        type_of_investment,
        total_investment
    FROM 
        `abcd-dataplatform.abcd_data_science_app.INVESTMENT_MONTH_ON_MONTH_DATA`
   
        """
target_base = client.query(target_base_query).to_dataframe()
target_base.info()

In [None]:
target_base.type_of_investment.unique()

In [None]:
target_base_query = f"""
            SELECT DISTINCT
                mobilenumber,
                month_year,
                type_of_investment,
                total_investment
            FROM 
                `abcd-dataplatform.abcd_data_science_app.INVESTMENT_MONTH_ON_MONTH_DATA`
            WHERE
                LOWER(type_of_investment) IN ('fixed deposit','fixed_deposit') 
                AND month_year in  ('2024-10','2024-11','2024-12','2025-01')
            
        """
target_base = client.query(target_base_query).to_dataframe()
target_base.info()

In [None]:
target_base.mobilenumber.nunique(),target_base.shape[0]

In [None]:
.

In [None]:
invst_customers_df.drop_duplicates(subset='mobilenumber',keep='first',inplace=True,ignore_index=True)

In [None]:
invst_customers_df.mobilenumber.nunique(),invst_customers_df.shape[0]

In [None]:
invst_customers_df.type_of_investment.unique()

In [None]:
invst_customers_df.rename(columns={'latest_buy_date':'buy_date'},inplace=True)

In [None]:
invst_customers_df.head()

In [None]:
invst_customers_df.info()

In [None]:
#Ensuring both DataFrames have the same column structure
df_investment = df_investments[['mobilenumber', 'type_of_investment', 'buy_date', 'total_amount']]
invst_customers_df = invst_customers_df[['mobilenumber', 'type_of_investment', 'buy_date', 'total_amount']]

#Concatenate both DataFrames
df_concat = pd.concat([df_investments, invst_customers_df], ignore_index=True)
df_concat.info()

In [None]:
df_concat.mobilenumber.nunique(),df_concat.shape[0]

In [None]:
agg_df = df_concat.groupby(['mobilenumber', 'type_of_investment']).agg({
    'total_amount': 'sum',
    'buy_date': 'max'
}).reset_index()

In [None]:
agg_df.mobilenumber.nunique(),agg_df.shape[0]

In [None]:
agg_df.drop_duplicates(subset='mobilenumber',keep='first',inplace=True,ignore_index=True)

In [None]:
agg_df.sample(10)

In [None]:
agg_df.mobilenumber.nunique(),agg_df.shape[0]

In [None]:
agg_df.to_csv('Investment Holding Level data.csv',index=False)

In [None]:
model_base = model_base.merge(
    agg_df,
    on='mobilenumber',
    how='left'
)
model_base.info()

In [None]:
model_base.customer_id.nunique(),model_base.shape[0]

In [None]:
model_base[model_base.type_of_investment.notna()].sample(10)