In [49]:
import pandas as pd
import numpy as np
from pathlib import Path

In [50]:
securities_file_path = "security.txt"
bhav_copy_files = [
    "BhavCopy_NSE_CM (1).csv",
    "BhavCopy_NSE_CM (2).csv",
    "BhavCopy_NSE_CM (3).csv",
]

fno_underlyings_file_path = "FNO_Underlyings.csv"
var_data_file_path = "C_VAR1_06112025_6112025.csv"

output_csv_file = "securities_categorized.csv"
symbol_column = "SYMBOL"
series_column = "SERIES"
traded_value_column = "TtlTrfVal"
price_range_column = "PRICE_RANGE"
bhav_symbol_column = "TckrSymb"
bhav_series_column = "SctySrs"


In [51]:
def extract_price_band_percentage(price_range_string):
    if pd.isna(price_range_string):
        return 0.0

    if not isinstance(price_range_string, str):

        return 0.0

    if "-" not in price_range_string and "–" not in price_range_string:

        return 0.0

    separator = "-" if "-" in price_range_string else "–"
    parts = price_range_string.split(separator)

    if len(parts) != 2:

        return 0.0

    try:
        lower_limit = float(str(parts[0]).strip())
        upper_limit = float(str(parts[1]).strip())
        midpoint = (lower_limit + upper_limit) / 2.0

        if midpoint == 0:
            return 0.0
        band_percentage = (upper_limit - midpoint) / midpoint * 100.0
        return band_percentage

    except ValueError:
        return 0.0

In [52]:
def load_and_aggregate_bhav_data(bhav_file_list):
    bhav_data_frames = []
    for file_path in bhav_file_list:
        if not Path(file_path).exists():
            print(f"Bhav file not found, skipping: {file_path}")
            continue
        bhav_dataframe = pd.read_csv(file_path)
        required_columns = {bhav_symbol_column, bhav_series_column, traded_value_column}
        missing_columns = required_columns - set(bhav_dataframe.columns)
        if missing_columns:
            raise ValueError(f"Bhav file {file_path} is missing columns: {missing_columns}")
        bhav_cleaned = bhav_dataframe[[bhav_symbol_column, bhav_series_column, traded_value_column]].copy()
        bhav_cleaned.columns = [symbol_column, series_column, "traded_value"]
        bhav_data_frames.append(bhav_cleaned)

    if not bhav_data_frames:
        print("No Bhav files were successfully loaded. Average traded values will be NaN.")
        return pd.DataFrame(columns=[symbol_column, "average_traded_value", "series_from_bhav"])
    combined_bhav_data = pd.concat(bhav_data_frames, ignore_index=True)
    aggregated_bhav = (combined_bhav_data.groupby(symbol_column).agg(average_traded_value=("traded_value", "mean"),series_from_bhav=(series_column, lambda x: x.dropna().iloc[-1] if len(x.dropna()) > 0 else np.nan),).reset_index())
    return aggregated_bhav

In [53]:
def load_fno_underlyings_list(fno_file_path):
    if not Path(fno_file_path).exists():
        print(f"[WARNING] F&O underlyings file not found: {fno_file_path}")
        return set()

    fno_dataframe = pd.read_csv(fno_file_path)
    if symbol_column not in fno_dataframe.columns:
        raise ValueError(f"F&O file must contain '{symbol_column}' column. Found: {list(fno_dataframe.columns)}")

    fno_symbols = set(fno_dataframe[symbol_column].astype(str).str.strip().unique())
    return fno_symbols


In [54]:
def load_variance_data_from_var_file(var_file_path):
    if not Path(var_file_path).exists():
        print(f"VAR file not found: {var_file_path}")
        return pd.DataFrame(columns=[symbol_column, "variance"])

    variance_records = []
    try:
        with open(var_file_path, 'r', encoding='utf-8') as file_handle:
            for line in file_handle:
                fields = line.strip().split(',')
                if len(fields) >= 5:
                    record_type = fields[0]
                    if record_type == '20':
                        symbol = fields[1].strip()
                        variance_value_str = fields[4].strip()
                        try:
                            variance_value = float(variance_value_str)
                            variance_records.append({symbol_column: symbol,"variance": variance_value})

                        except ValueError:
                            continue             

    except Exception as error:
        print(f"[WARNING] Error reading VAR file {var_file_path}: {error}")
        return pd.DataFrame(columns=[symbol_column, "variance"])

    if not variance_records:
        print("[WARNING] VAR file contains no valid data rows.")
        return pd.DataFrame(columns=[symbol_column, "variance"])

    variance_dataframe = pd.DataFrame(variance_records)
    variance_dataframe = variance_dataframe.drop_duplicates(subset=[symbol_column], keep='first')

    return variance_dataframe

In [55]:
def categorize_securities_into_groups(dataframe):
    required_columns = ["average_traded_value", "price_band_percentage", "is_fno_stock", "variance", series_column]

    for column_name in required_columns:

        if column_name not in dataframe.columns:

            raise ValueError(f"Missing required column for categorization: {column_name}")

    dataframe["average_traded_value"] = dataframe["average_traded_value"].fillna(0.0)
    dataframe["price_band_percentage"] = dataframe["price_band_percentage"].fillna(0.0)
    dataframe["variance"] = dataframe["variance"].fillna(9999.0) 

    is_valid_series = dataframe[series_column].isin(["EQ", "BE", "BZ"])
    is_fno_security = dataframe["is_fno_stock"] == True
    qualifies_for_5x_standard = ( is_valid_series & (dataframe["average_traded_value"] > 50_00_000) &  (dataframe["price_band_percentage"] > 5.0))
    qualifies_for_3x_standard = ( is_valid_series & (dataframe["average_traded_value"] > 20_00_000) & (dataframe["price_band_percentage"] > 5.0))
    qualifies_for_5x_fno = is_fno_security & (dataframe["variance"] <= 20.0)
    qualifies_for_3x_fno = is_fno_security & (dataframe["variance"] <= 33.33) & (~qualifies_for_5x_fno)
    dataframe["category"] = "Only Delivery"
    dataframe.loc[qualifies_for_3x_standard | qualifies_for_3x_fno, "category"] = "3x"
    dataframe.loc[qualifies_for_5x_standard | qualifies_for_5x_fno, "category"] = "5x"
    return dataframe

In [56]:
if not Path(securities_file_path).exists():
    raise FileNotFoundError(f"Securities file not found: {securities_file_path}")
nse_headers_file = Path("nse security headers.txt")

if nse_headers_file.exists():
    import ast
    headers_text = nse_headers_file.read_text(encoding="utf-8")
    header_start = headers_text.find('[')
    header_end = headers_text.find(']', header_start)
    nse_column_headers = ast.literal_eval(headers_text[header_start:header_end+1])
    nse_column_headers = [header.upper() for header in nse_column_headers]
    securities_dataframe = pd.read_csv(
        securities_file_path, 
        sep="|", 
        header=None, 
        names=nse_column_headers, 
        dtype=str
    )
else:
    securities_dataframe = pd.read_csv(securities_file_path, sep="|", header=None, dtype=str)
    print("[WARNING] NSE headers file not found. Using numeric column indexes.")

if symbol_column not in securities_dataframe.columns:
    raise ValueError(f"Symbol column '{symbol_column}' not found. Available columns: {list(securities_dataframe.columns)}")
securities_dataframe[symbol_column] = securities_dataframe[symbol_column].astype(str).str.strip()
print(len(securities_dataframe))


40657


In [57]:
bhav_aggregated_data = load_and_aggregate_bhav_data(bhav_copy_files)
combined_data = securities_dataframe.merge(bhav_aggregated_data, on=symbol_column, how="left")
print(combined_data.shape)

(40657, 56)


In [58]:
if series_column not in combined_data.columns:
    if "series_from_bhav" in combined_data.columns:
        combined_data[series_column] = combined_data["series_from_bhav"]

    else:
        print(f"[WARNING] Series column not found. Setting to NaN.")
        combined_data[series_column] = pd.NA

else:
    if "series_from_bhav" in combined_data.columns:
        combined_data[series_column] = combined_data[series_column].fillna(combined_data["series_from_bhav"])

if "series_from_bhav" in combined_data.columns:
    combined_data.drop(columns=["series_from_bhav"], inplace=True)


In [59]:
if "average_traded_value" not in combined_data.columns:
    print("[WARNING] Average traded value not found. Setting to 0.")
    combined_data["average_traded_value"] = 0.0


In [60]:
if price_range_column not in combined_data.columns:
    print(f" Price range column '{price_range_column}' not found in securities file. Defaulting to 0.")
    combined_data["price_band_percentage"] = 0.0
else:
    combined_data["price_band_percentage"] = combined_data[price_range_column].apply(extract_price_band_percentage)


 Price range column 'PRICE_RANGE' not found in securities file. Defaulting to 0.


In [61]:
fno_symbol_set = load_fno_underlyings_list(fno_underlyings_file_path)
combined_data["is_fno_stock"] = combined_data[symbol_column].isin(fno_symbol_set)
print(combined_data.shape)

(40657, 57)


In [62]:
variance_dataframe = load_variance_data_from_var_file(var_data_file_path)
combined_data = combined_data.merge(variance_dataframe, on=symbol_column, how="left")


VAR file not found: C_VAR1_06112025_6112025.csv


In [63]:
securities_categorized = categorize_securities_into_groups(combined_data)

securities_categorized.to_csv(output_csv_file, index=False)
print(f"[INFO] Results saved to: {output_csv_file}")
print("CATEGORIZATION SUMMARY")

category_counts = securities_categorized["category"].value_counts(dropna=False)
print("\nNumber of securities in each category:")
print(category_counts)
print("DATA QUALITY REPORT")

missing_avg_traded_value = securities_categorized["average_traded_value"].isna().sum()
missing_price_band = securities_categorized["price_band_percentage"].isna().sum()
missing_variance = securities_categorized["variance"].isna().sum()

print(f"\nSecurities with missing 3-day average traded value: {missing_avg_traded_value}")
print(f"Securities with missing price band percentage: {missing_price_band}")
print(f"Securities with missing variance data: {missing_variance}")
print("PROCESSING ASSUMPTIONS")
print("""1. If a security has no Bhav Copy data in the last 3 trading days, its average traded value is treated as 0.
2. If price band range is missing or malformed in the securities file, price band percentage is treated as 0%.
3. If a security is not listed in the F&O underlyings file, it is treated as a non-F&O security (regular equity only).
4. If a security has no matching entry in the VAR file, it is not eligible for F&O-based 5x/3x overrides.
5. Categorization precedence: 5x > 3x > Only Delivery. A security is placed in the highest applicable category.""")

  dataframe["variance"] = dataframe["variance"].fillna(9999.0)


[INFO] Results saved to: securities_categorized.csv
CATEGORIZATION SUMMARY

Number of securities in each category:
category
Only Delivery    40657
Name: count, dtype: int64
DATA QUALITY REPORT

Securities with missing 3-day average traded value: 0
Securities with missing price band percentage: 0
Securities with missing variance data: 0
PROCESSING ASSUMPTIONS
1. If a security has no Bhav Copy data in the last 3 trading days, its average traded value is treated as 0.
2. If price band range is missing or malformed in the securities file, price band percentage is treated as 0%.
3. If a security is not listed in the F&O underlyings file, it is treated as a non-F&O security (regular equity only).
4. If a security has no matching entry in the VAR file, it is not eligible for F&O-based 5x/3x overrides.
5. Categorization precedence: 5x > 3x > Only Delivery. A security is placed in the highest applicable category.
