# Notebook Outline 🐳

- **Notebook**: Consolidated Codebase of all Replicated Risk factors (16)
- **Replicated Risk Factors**:
  > Replicated 11 Risk Factors
  - Customer Under SubPoena
  - Enhanced PEP
- **Data Used**:
  - Static Excel File: Risk Factors - Static Data.xlsx
  - Reference Excel File: Risk Factors - Reference Data.xlsx


## Import Libraries and Configurations


In [138]:
import pandas as pd
import numpy as np
import logging
import csv


# Set up logging
logging.basicConfig(level=logging.INFO)


## Global Declarations (File Paths)


In [139]:
# File path and sheet for In-scope lookup data
lookup_file_path = r"C:/Users/KadamatiV/Desktop/CRR TUNING CODE CONSOLIDATION/Data/Risk Factors - Reference Data.xlsx"
lookup_sheet_name = "In scope Risk Factors"


# File path for reference and static data
file_path_reference = r"C:/Users/KadamatiV/Desktop/CRR TUNING CODE CONSOLIDATION/Data/"
file_name_reference = r"Risk Factors - Reference Data.xlsx"

file_path_static = r"C:/Users/KadamatiV/Desktop/CRR TUNING CODE CONSOLIDATION/Data/"
file_name_static = r"Risk Factors - Static Data.xlsx"


## Utility Functions 🐎


#### Util 1: Loading the data and file path resolution


In [140]:
def load_risk_factor_data(risk_factor_name, risk_factor_data):
    global file_path_reference, file_name_reference, file_path_static, file_name_static

    if not risk_factor_data:
        logging.info(f"No data found for {risk_factor_name}")
        return None, None

    reference_tables = []
    for table in risk_factor_data["reference"]["tables"]:
        reference_tables.append(
            pd.read_excel(
                file_path_reference + file_name_reference,
                sheet_name=risk_factor_data["reference"]["sheet_name"],
                skiprows=table["skiprows"],
                nrows=table.get("nrows"),
                usecols=table.get("usecols"),
            )
        )

    static_tables = []
    for table in risk_factor_data["static"]["tables"]:
        static_tables.append(
            pd.read_excel(
                file_path_static + file_name_static,
                sheet_name=risk_factor_data["static"]["sheet_name"],
                skiprows=table["skiprows"],
                nrows=table.get("nrows"),
                usecols=table.get("usecols"),
            )
        )

    return reference_tables, static_tables


#### Util 2 : Lookup & Process only Inscope Risk Factors


In [141]:
def load_lookup_data(lookup_file_path, lookup_sheet_name):
    lookup_df = pd.read_excel(lookup_file_path, sheet_name=lookup_sheet_name)
    return lookup_df


def process_risk_factors(lookup_df, risk_factor_functions):
    final_risk_factor_result_dict = {}  # Initialize the dictionary

    for index, row in lookup_df.iterrows():
        risk_factor_name = row["Risk Factor Name"]
        run_flag = row["In Scope (Y/N)"]

        if run_flag == "Y":
            current_scoring_function = risk_factor_functions.get(risk_factor_name)
            if current_scoring_function:
                current_risk_factor_result_df = current_scoring_function()
                final_risk_factor_result_dict[risk_factor_name] = (
                    current_risk_factor_result_df  # Store DataFrame in dict by name
                )
            else:
                logging.info(f"No scoring function found for {risk_factor_name}")

    return final_risk_factor_result_dict  # Return the dictionary


#### Util 3 : Final Dataframe Creation (EXCEL/CSV)


In [142]:
def create_final_df(risk_factor_name, df_to_transform):
    def transform_score_value(row):
        if row["IS_ACTIVE"] == 1:
            if row["PARTY_TYPE_CD"] == "P":
                return "Active Individual"
            elif row["PARTY_TYPE_CD"] == "B":
                return "Active Business"
            else:  # Handle other potential PARTY_TYPE_CD values
                return "Active (Unknown Type)"
        else:
            return "Customer Inactive"

    final_df = pd.DataFrame(
        {
            "PARTY_KEY": df_to_transform["PARTY_KEY"],
            "RISK_SEGMENT": df_to_transform["RISK_SEGMENT"],
            "Score_Value": df_to_transform.apply(transform_score_value, axis=1),
            "Risk_Factor": risk_factor_name,
            "Score": df_to_transform["SCORE"],
        }
    )

    return final_df


## Risk Factor Implementations 🐨


#### Replication 1: Enhanced PEP Risk Factor


#### Replication 2: Customer Under SubPoena Risk Factor


In [143]:
def customer_under_subpoena_rf():

    # TODO: Load Data for subPoena using global/ functional input

    customer_under_subpoena_tables = {
        "reference": {
            "sheet_name": "Cust Under Subp RF - Reference",
            "tables": [{"skiprows": 1, "nrows": None, "usecols": [0, 1, 2]}],
        },
        "static": {
            "sheet_name": "Cust Under Subp RF - Static",
            "tables": [{"skiprows": 1, "nrows": None, "usecols": [0, 1, 2, 3, 4]}],
        },
    }

    reference_tables, static_tables = load_risk_factor_data(
        "Customer Under Subpoena", customer_under_subpoena_tables
    )

    reference_df, static_df = reference_tables[0], static_tables[0]

    def calculate_score(row):

        num_subpoena = row["NUMBER_OF_SUBPOENA"]

        if row["IS_ACTIVE"] == 0:
            return 0

        if num_subpoena <= 1:
            score_range = 0
        elif num_subpoena <= 4:
            score_range = 2
        else:
            score_range = 5

        default_none_values = {0: 0, 2: 5, 5: 3}
        default_value = default_none_values.get(score_range)

        risk_segment = row["RISK_SEGMENT"]
        score_row = reference_df[
            (reference_df["NUMBER_OF_SUBPOENA"] == score_range)
            & (reference_df["RISK_SEGMENT"] == risk_segment)
        ]

        if score_row.empty:
            return default_value

        score_row["SCORE"].iloc[0]
        # Output the final dataframe with all required columns
        return score_row["SCORE"].iloc[0]

    static_df["SCORE"] = static_df.apply(calculate_score, axis=1)
    customer_under_subpoena_org_static_data = static_df

    risk_factor_name = "Customer Under SubPoena"
    final_df_customer_under_subpoena = create_final_df(
        risk_factor_name, customer_under_subpoena_org_static_data
    )
    return final_df_customer_under_subpoena


## Risk Factor Engine : Main Execution 🐢


In [144]:
def main():

    global lookup_file_path, lookup_sheet_name

    lookup_df = load_lookup_data(lookup_file_path, lookup_sheet_name)

    risk_factor_functions = {
        "Customer Under Subpoena": customer_under_subpoena_rf,
        "Enhanced PEP": enhanced_pep_rf,
        # ... Add mappings for your other risk factors
    }

    final_risk_factor_result_dict = process_risk_factors(
        lookup_df, risk_factor_functions
    )

    # Sample usage to access the final DataFrame for a specific risk factor
    # risk_factor_name = "Customer Under Subpoena"
    # final_df = final_risk_factor_result_dict.get(risk_factor_name)
    # print(final_df)


## Run Main Function 🐰


In [145]:
if __name__ == "__main__":
    main()


NameError: name 'enhanced_pep_rf' is not defined

## Test Engine


1. Test the lookup inscope function whether it is process the correct risk factors or not
   - create a hardcoded list with entries of Y | N so accordingly verify using test cases
     - Only 1 risk factor is processed
     - Only 2 risk factors are processed
     - All risk factors are processed
2.
