## The Risk Measure DataSet generation engine codebase

This engine contains the following key components:

1.	First, a *singleton* object, *CurveManager*, that contains a list of curves identifiable by name and date. Each curve is a specialization of an abstract Curve class and the simplest of which has a vector of double values itemized by dates and when the object is called with a date, an interpolated value is returned. 

2.	Second, a singleton object, *ScenarioManager*, that contains a list of market scenarios identifiable by name and date. Each scenario is constructed by taking the complete list of curves from the CurveManager as the BASE, and perturb each curve up and down in a certain fashion, so that the scenario will contain three lists of the curves: the BASE, the UP perturbed, and the DOWN perturbed.
3.	Thirdly, a singleton object, *SecurityManager*, that contains a list of financial securities identifiable by an id. Each security is a specialization of an abstract class Security, constructed from a dictionary of <attribute, value>, and has a function, NPV, that takes a scenario object and returns a value.
4.	Last, a generic function that iterates through each security from the SecurityManager and each scenario from the ScenarioManager, and call the security's NPV function three times to obtain three values for each the perturbations contained in the scenario. The three values are then pushed into a data store with all the relevant identifiers.
5.	Piecing them together is a workflow manager, or *task dispatcher*, to use this code base: it takes a JSON document as a configuration input that instructs about where all the input files such as curve definition and security definition and the scenario definition are located, and indicates what use case the job is tasked with, then it will first pre-process the inputs and construct the three containers, CurveManager, ScenarioManager, and SecurityManager, and dispatch the workflow to the relevant use case. At the end, output the result in the data store to a CSV file.
6.	And one more item and most important to complete the picture: *info logging*! we now need a logger through the code so that each step is logged and if there is any exception, error messages are captured into the log. 

This file is the workflow manager, the *main()* function.

In [1]:
import pandas as pd
from scipy.interpolate import interp1d

In [None]:
# python
import logging
import json
import pandas as pd # type: ignore
from datetime import datetime
from abc import ABC, abstractmethod
from typing import Dict, Union, List, Optional, Tuple
from scipy.interpolate import interp1d
import numpy as np
import csv

# Configure logging
logging.basicConfig(
    filename="task_dispatcher.log",
    filemode="a",
    format="%(asctime)s - %(levelname)s - %(message)s",
    level=logging.INFO,
)

# Abstract Classes and Singleton Managers

class Curve:
    """Abstract base class for a curve."""
    def __init__(self, name: str, dates: List[datetime], values: List[float]):
        self.name = name
        self.dates = dates
        self.values = values

    @abstractmethod
    def get_value(self, date: datetime) -> float:
        pass


class SimpleCurve(Curve):
    """A simple curve with linear interpolation."""
    def __init__(self, dates: int, values: float):
        self.dates = np.array(dates)
        self.values = np.array(values)
        self.interpolator = interp1d(self.dates.astype(int), self.values, kind='linear', fill_value="extrapolate")

    def get_value(self, date: int) -> float:
        return self.interpolator(date)

class CurveManager:
    """Singleton class managing curves."""
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(CurveManager, cls).__new__(cls)
            cls._instance.curves = {}
        return cls._instance

    def load_curves(self, curve_file):
        curves_data = pd.read_csv(curve_file)
        for _, row in curves_data.iterrows():
            dates = [datetime.strptime(d, "%Y-%m-%d") for d in row["dates"].split(";")]
            values = [float(v) for v in row["values"].split(";")]
            self.add_curve(SimpleCurve(name=row["curve_name"], dates=dates, values=values))

    def read_curves_from_csv(self, file_path):
        try:
            with open(file_path, 'r') as file:
                reader = csv.reader(file)
                curve_name = None
                dates = []
                values = []

                for row in reader:
                    if not row:  # Blank row indicates end of current curve
                        if curve_name and dates and values:
                            curve = SimpleCurve(dates, values)
                            self.add_curve(curve_name, dates[0], curve)
                            curve_name = None
                            dates = []
                            values = []
                    elif curve_name is None:  # Header row containing curve name
                        curve_name = row[0]
                    else:  # Data rows containing <date, discount factor>
                        date = int(row[0])
                        value = float(row[1])
                        dates.append(date)
                        values.append(value)

                # Add the last curve if the file doesn't end with a blank row
                if curve_name and dates and values:
                    curve = SimpleCurve(dates, values)
                    self.add_curve(curve_name, dates[0], curve)
                    
            logging.info(f"Successfully read and added curves from CSV: {file_path}")

        except Exception as e:
            logging.error(f"Error reading curves from CSV file {file_path}: {e}")
            raise

    def add_curve(self, curve: Curve):
        key = (curve.name,)
        self.curves[key] = curve
        logging.info(f"Added curve: {curve.name}")

    def get_curve(self, name: str) -> Optional[Curve]:
        return self.curves.get((name,))


class Scenario:
    """Class representing a market scenario with BASE, UP, and DOWN perturbed curves."""
    def __init__(self, name: str, date: datetime, base_curves: Dict[Tuple[str], Curve]):
        self.name = name
        self.date = date
        self.base_curves = base_curves
        self.up_curves = {}
        self.down_curves = {}
        self._generate_perturbations()

    def _generate_perturbations(self):
        """Generates UP and DOWN perturbed curves."""
        for key, curve in self.base_curves.items():
            up_values = [v * 1.1 for v in curve.values]
            down_values = [v * 0.9 for v in curve.values]
            self.up_curves[key] = SimpleCurve(curve.name, curve.dates, up_values)
            self.down_curves[key] = SimpleCurve(curve.name, curve.dates, down_values)


class ScenarioManager:
    """Singleton class managing scenarios."""
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(ScenarioManager, cls).__new__(cls)
            cls._instance.scenarios = {}
        return cls._instance

    def create_scenario(self, name: str, date: datetime):
        curve_manager = CurveManager()
        base_curves = curve_manager.curves
        scenario = Scenario(name, date, base_curves)
        self.scenarios[(name, date)] = scenario
        logging.info(f"Created scenario: {name} on {date}")


class Security(ABC):
    """Abstract class for a financial security."""
    def __init__(self, security_id: str, attributes: Dict[str, Union[str, float, int]]):
        self.security_id = security_id
        self.attributes = attributes

    @abstractmethod
    def NPV(self, scenario: Scenario) -> float:
        pass


class Bond(Security):
    """A simple bond implementation."""
    def NPV(self, scenario: Scenario) -> float:
        curve_key = (self.attributes["curve_name"],)
        base_curve = scenario.base_curves.get(curve_key)

        if not base_curve:
            raise ValueError(f"Curve {curve_key} not found in scenario.")

        cash_flows = self.attributes["cash_flows"]
        npv = 0.0
        for cash_flow_date, cash_flow_amount in cash_flows:
            discount_factor = base_curve.get_value(cash_flow_date)
            npv += cash_flow_amount / (1 + discount_factor)
        return npv


class SecurityManager:
    """Singleton class managing securities."""
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(SecurityManager, cls).__new__(cls)
            cls._instance.securities = {}
        return cls._instance

    def add_security(self, security: Security):
        self.securities[security.security_id] = security
        logging.info(f"Added security: {security.security_id}")


# Generic Calculation Function

def calculate_npv_for_all(security_manager: SecurityManager, scenario_manager: ScenarioManager) -> pd.DataFrame:
    data_store = []
    for (scenario_name, scenario_date), scenario in scenario_manager.scenarios.items():
        for security_id, security in security_manager.securities.items():
            try:
                base_npv = security.NPV(scenario)
                up_npv = security.NPV(Scenario(name=scenario.name, date=scenario.date, base_curves=scenario.up_curves))
                down_npv = security.NPV(Scenario(name=scenario.name, date=scenario.date, base_curves=scenario.down_curves))
                data_store.append({
                    "Security ID": security_id,
                    "Scenario Name": scenario_name,
                    "Scenario Date": scenario_date,
                    "NPV_BASE": base_npv,
                    "NPV_UP": up_npv,
                    "NPV_DOWN": down_npv,
                })
                logging.info(f"Calculated NPVs for Security: {security_id}, Scenario: {scenario_name}")
            except Exception as e:
                logging.error(f"Error calculating NPV for Security: {security_id}, Scenario: {scenario_name}. Error: {e}")
    return pd.DataFrame(data_store)


# Task Dispatcher

class TaskDispatcher:
    def __init__(self, config_file: str):
        with open(config_file, 'r') as f:
            self.config = json.load(f)
        self.curve_manager = CurveManager()
        self.scenario_manager = ScenarioManager()
        self.security_manager = SecurityManager()

    def load_scenarios(self):
        scenario_file = self.config["scenario_definition_file"]
        scenarios_data = pd.read_csv(scenario_file)
        for _, row in scenarios_data.iterrows():
            self.scenario_manager.create_scenario(row["scenario_name"], datetime.strptime(row["scenario_date"], "%Y-%m-%d"))

    def load_securities(self):
        security_file = self.config["security_definition_file"]
        securities_data = pd.read_csv(security_file)
        for _, row in securities_data.iterrows():
            attributes = {
                "curve_name": row["curve_name"],
                "cash_flows": [
                    (datetime.strptime(d, "%Y-%m-%d"), float(a))
                    for d, a in zip(row["cash_flow_dates"].split(";"), row["cash_flow_amounts"].split(";"))
                ]
            }
            self.security_manager.add_security(Bond(security_id=row["security_id"], attributes=attributes))

    def execute_use_case(self):
        use_case = self.config["use_case"]
        if use_case == "NPV_CALCULATION":
            results = calculate_npv_for_all(self.security_manager, self.scenario_manager)
            results.to_csv(self.config["output_file"], index=False)
            logging.info(f"Results saved to {self.config['output_file']}")

    def run(self):
        try:
            logging.info("Starting Task Dispatcher...")
            self.load_curves()
            self.load_scenarios()
            self.load_securities()
            self.execute_use_case()
            logging.info("Task Dispatcher completed successfully.")
        except Exception as e:
            logging.error(f"Task Dispatcher encountered an error: {e}")

# Example Usage
if __name__ == "__main__":
    # Path to the configuration file
    config_file = "config.json"
    
    # Initialize and run the TaskDispatcher
    dispatcher = TaskDispatcher(config_file)
    dispatcher.run()
    

KeyboardInterrupt: 