# Analysis of MF Data from AMFI and Portfolio Development

This notebook aims to analyze all listed MFs under AMFI (Association of Mutual Funds of India) to provide insights on volatility and historical performance of each fund and leverage the same to suggest recommendations for investment.

In [19]:
# Import neccessary libraries
import pandas as pd
import os
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
from datetime import date, datetime
from kiteconnect import KiteConnect

# Import rapids specific libraries
def load_rapids_env():
    import cudf
    import cupy as cp
    import connectorx as cx
    return cudf, cp, cx

# Initialize environment
load_dotenv()
amfi_data_batchA = os.getenv('amfi_data_batchA')
amfi_data_batchB = os.getenv('amfi_data_batchB')
railway_db_url = os.getenv('railway_db_url')
engine = create_engine(railway_db_url, connect_args={'options': '-c search_path="FINANCIAL_ANALYSIS"'})

# Select execution option
option = input('Select program to run: 1-Data_Load, 2-Funds_Analysis_Master_Data_Development, 3-Funds_Analysis_Feature_Engineering, 4-Model_Development, 5-Model_Deployment: ')

## Data load program - to be executed only once in the beginning

In [20]:
# Data load program
if option == '1':
    try:
        # Load data from batch A into dataframe and correct date format
        df_batchA = pd.read_csv(amfi_data_batchA)
        df_batchA['date'] = pd.to_datetime(df_batchA['date'], infer_datetime_format= True, errors = 'coerce')
        print(f'Data for top 5 rows from batch A: \n{df_batchA.head(5)}')
        errors_batchA = df_batchA['date'].isna()
        i_batchA = [rows for rows, val in enumerate(errors_batchA) if val == True]
        print(f'Number of records with dates in string and not updated by Pandas in batch A: {len(i_batchA)}')
        df_batchA = df_batchA.dropna(subset=['date'])
        print(f'Number of records in batch A: {len(df_batchA['SNo.'])}')

        # Load data from batch B into dataframe and correct date format
        df_batchB = pd.read_csv(amfi_data_batchB)
        df_batchB['date'] = pd.to_datetime(df_batchB['date'], infer_datetime_format= True, errors='coerce')
        print(f'Data for top 5 rows from batch B: \n{df_batchB.head(5)}')
        errors_batchB = df_batchB['date'].isna()
        i_batchB = [rows for rows, val in enumerate(errors_batchB) if val == True]
        print(f'Number of records with dates in string and not updated by Pandas in batch B: {len(i_batchB)}')
        df_batchB = df_batchB.dropna(subset=['date'])
        print(f'Number of records in batch B: {len(df_batchB['SNo.'])}')

        # Combine batch A and B data
        df_combined = pd.concat([df_batchA, df_batchB], ignore_index=True)
        df_combined = df_combined.rename(columns={'date': 'trx_date'})
        df_combined = df_combined.rename(columns={'SNo.': 's_no'})
        print(f'Data for top 5 rows from consolidated dataframe: \n{df_combined.head(5)}')
        print(f'Number of records in consolidated data: {len(df_combined['s_no'])}')

        # Populate data into database
        with engine.connect() as database_connection:
            for records_start in range(0, len(df_combined), 1000000):
                records_end = records_start + 1000000
                df_chunks = df_combined.iloc[records_start:records_end]
                df_chunks.to_sql(
                    'amfi_database',
                    con=database_connection,
                    schema='FINANCIAL_ANALYSIS',
                    if_exists='append',
                    index=False,
                    method='multi'
                    )
                database_connection.commit()
                print(f'{len(df_chunks)} Committed.')

        query = text('select * from amfi_database;')
        with engine.connect() as database_connection:
            df = pd.read_sql(sql=query, con=database_connection, index_col='trx_id')

        processed_records = len(df['s_no'])
        print(f'Successfully entered {processed_records} into database.')
    except Exception as e:
        print(f'Error Encountered During Data Load: {e}')
else:
    print(f'Selected option 2. Proceeding to execute funds analysis program.')

Selected option 2. Proceeding to execute funds analysis program.


## Funds analysis program

### 1. Initial data load and transformation using RAPIDS to generate final dataset for analysis.

In [21]:
# Funds analysis program - data pull and load
if option == '2':
    try:
        # Initialize environment and obtain necessary keys
        cudf, cp, cx = load_rapids_env()
        kite_api = os.getenv('kite_connect_api')
        rapids_data_to_path = os.getenv('rapids_data_to_path')
        final_dataset = os.getenv('final_dataset')
        parquet_data = os.getenv('parquet_data')
        
        # Pull data into arrow table using connectorx and convert to GPU enabled dataframe.
        query = 'select * from "FINANCIAL_ANALYSIS".amfi_database where trading_symbol_reinvestment is not null and trading_symbol_growth is not null;'
        nav_historical_arrow_tbl = cx.read_sql(
            conn=railway_db_url, 
            query=query, 
            return_type='arrow',
            partition_on='trx_id',
            partition_range=(1, 20000000),
            partition_num=7
            )
        df_nav_historical_data = cudf.DataFrame.from_arrow(nav_historical_arrow_tbl)
        df_nav_historical_data = df_nav_historical_data.sort_values(by='s_no')
        print(f'MFs with available trading symbols:{len(df_nav_historical_data)}')
        df_nav_historical_data_20_year = df_nav_historical_data[
                df_nav_historical_data['trx_date'].dt.year>=2005
            ]
        print(f'20 year historical records: {len(df_nav_historical_data_20_year)}')

        # Pull data from kite on MF details (not available in AMFI)
        kc = KiteConnect(api_key=kite_api)
        kc_mf_instruments = kc.mf_instruments()

        # Mutating the kite date columns to GPU friendly datetime type.
        for rec in kc_mf_instruments:
            for items, value in rec.items():
                if isinstance(value, (date, datetime)):
                    rec[items] = value.isoformat()

        df_kc_mf_data = cudf.DataFrame(kc_mf_instruments)
        df_kc_mf_data.index.name = 'SNo.'
        df_kc_mf_data['last_price_date'] = df_kc_mf_data['last_price_date'].astype('str')
        df_kc_mf_data['last_price_date'] = cudf.to_datetime(df_kc_mf_data['last_price_date'])
        print(f'Total records pulled from kite on MF instruments: {len(df_kc_mf_data)}')

        # Merging AMFI and KTIE data into final master data for future analysis.
        df_amfi_kc_merged = cudf.merge(
            df_nav_historical_data_20_year, 
            df_kc_mf_data, 
            left_on='trading_symbol_growth', 
            right_on='tradingsymbol', 
            how='left'
            )
        
        # Pushing final data into CSV.
        df_amfi_kc_merged.to_csv(f'{rapids_data_to_path}/00.AMFI_KITE_FINAL_MF_20_YEAR_DATA.csv')
        print(f'Data merge complete. {len(df_amfi_kc_merged)} records in final dataset.')

        # Load and cleanse data
        df_final_dataset = cudf.read_csv(final_dataset)
        df_final_dataset = df_final_dataset.dropna(subset='tradingsymbol') # Removing all rows with missing Kite data
        missing_records = df_final_dataset['tradingsymbol'].isna().sum()
        df_final_dataset['trx_date'] = cudf.to_datetime(df_final_dataset['trx_date']) # Converting all dates to datetime format
        print(f'Number of records with missing Zerodha data: {missing_records}')
        print(f'Total remaining records in final cleansed dataset: {len(df_final_dataset)}')

        # Create unique datasets based on fund names
        df_final_dataset.to_parquet(f'{rapids_data_to_path}/parquet_data/', partition_cols=['scheme_name'])

    except Exception as e:
        print(f'Error Encountered During Funds Analysis Master Data Development: {e}')

elif option == '1':
    print('Selected option 1, running the data load program.')

else:
    print(f'Selected option 3, running the data analysis program.')

Selected option 3, running the data analysis program.


### 2. Feature engineering on final dataset to enable fund volatility analysis computation. 

In [22]:
# Funds analysis program - feature engineering
if option == '3':
    try:
        # Initilaize environment
        cudf, cd, cx = load_rapids_env()
        load_dotenv()
        parquet_data = os.getenv('parquet_data')
        rapids_data_to_path = os.getenv('rapids_data_to_path')
        final_dataset = os.getenv('final_dataset')

        # Load parquet data and generate %NAV change (proxy for return calculations)
        df_final_dataset = cudf.read_csv(final_dataset)
        scheme_name = df_final_dataset['scheme_name'].unique().to_arrow().to_pylist()
        for scheme_id, scheme in enumerate(scheme_name):
            try:
                print(f'Running generate program for scheme: {scheme_id} out of {len(scheme_name)}')
                df_parquet_dataset = cudf.read_parquet(parquet_data, filters=[('scheme_name', '==', scheme)])
                df_parquet_dataset = df_parquet_dataset.sort_values(by=['scheme_name', 'trx_date'], ascending=False)
                df_parquet_dataset['nav_returns_%age'] = ((df_parquet_dataset['nav'].pct_change())*100).astype('float64').round(4)
                df_parquet_dataset = df_parquet_dataset[['scheme_name', 'fund_name', 'tradingsymbol', 'scheme_type', 'settlement_type', 'trx_date', 'nav', 'nav_returns_%age']]
                df_parquet_dataset.index.name = 'SNo.'
                df_parquet_dataset.to_csv(f'{rapids_data_to_path}/processed_files/01.nav_returns_dataset_{scheme}.csv')
                print(f'File generation complete for scheme: {scheme}')
            except Exception as e:
                print(f'Error generating file: {e}')
                continue

        # Generate risk and return profiles for each scheme
        

    except Exception as e:
        print(f'Error Encountered During Fund Analysis: {e}')

else:
    print(f'Selected option 4, running the model development program.')

Selected option 4, running the model development program.


### 3. Model development

In [23]:
if option == '4':
    try:
        # initialize environment
        cudf, cd, cx = load_rapids_env()
        processed_data = os.getenv('processed_data')

        # data load
        df_processed_data = cudf.read_csv(processed_data)

        # data description
        print(f'Data stats for conolodated nav returns:\n{df_processed_data.describe()}')

    except Exception as e:
        print(f'Error Encountered: {e}')
else:
    print(f'Selected option 5, running the model deployment program.')

Error Encountered: read_csv does not support multiple sources, got: ['/home/anurag_sarangi/projects/datasets/Rapids_Output/processed_files/01.nav_returns_dataset_360 ONE Dynamic Bond Fund Direct Plan Monthly Dividend.csv', '/home/anurag_sarangi/projects/datasets/Rapids_Output/processed_files/01.nav_returns_dataset_360 ONE Dynamic Bond Fund Direct Plan Quarterly Dividend.csv', '/home/anurag_sarangi/projects/datasets/Rapids_Output/processed_files/01.nav_returns_dataset_360 ONE Dynamic Bond Fund Regular Plan Half Yearly Dividend.csv', '/home/anurag_sarangi/projects/datasets/Rapids_Output/processed_files/01.nav_returns_dataset_360 ONE Dynamic Bond Fund Regular Plan Monthly Dividend.csv', '/home/anurag_sarangi/projects/datasets/Rapids_Output/processed_files/01.nav_returns_dataset_360 ONE Dynamic Bond Fund Regular Plan Quarterly Dividend.csv', '/home/anurag_sarangi/projects/datasets/Rapids_Output/processed_files/01.nav_returns_dataset_360 ONE Focused Fund - Direct Plan - Dividend.csv', '/hom

### 4. Model deployment

In [24]:
if option == '5':
    print('Test')
else:
    print(f'Invalid option {option} selected. Please enter either 1, 2, 3, 4 or 5 as input.')

Invalid option 4 selected. Please enter either 1, 2, 3, 4 or 5 as input.
