Задание 1

In [1]:
from abc import ABC, abstractmethod
import math
from datetime import datetime
import time
from scipy.stats import norm


class AbstractAsset(ABC):
    def __init__(self, name: str, price: float):
        self.name = name
        self.price = price

    def __repr__(self) -> str:
        return f"Stock: {self.name}, \nPrice: {self.price}"


class Stock(AbstractAsset):
    __registry: dict[str, "Stock"] = {}

    @classmethod
    def get_from_registry(cls, name: str) -> "Stock":
        if (stock := cls.__registry.get(name)) is None:
            raise KeyError(f"Stock {name} doesn't exist")
        return stock

    def __new__(cls, name: str, price: float):
        if name in cls.__registry:
            return cls.__registry[name]
        instance = super().__new__(cls)
        cls.__registry[name] = instance
        return instance

    def __init__(self, name: str, price: float):
        if not hasattr(self, "_initialized"):
            super().__init__(name, price)
            self._initialized = True


class Derivative(AbstractAsset):
    FORMAT_CODE = "%Y-%m-%d"

    def __init__(self, *args, expiration_date: str, underlying_name: str, **kwargs):
        super().__init__(*args, **kwargs)
        self.expiration_date = expiration_date
        self.__underlying: Stock = Stock.get_from_registry(underlying_name)

    @property
    def underlying(self) -> Stock:
        return self.__underlying


class Option(Derivative):
    def __init__(
        self,
        *args,
        strike_price: float,
        risk_free_rate: float,
        volatility: float,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)

        if strike_price <= 0:
            raise ValueError("Strike price must be positive")
        if risk_free_rate < 0:
            raise ValueError("Risk-free rate cannot be negative")
        if volatility <= 0:
            raise ValueError("Volatility must be positive")

        expiration_timestamp = datetime.strptime(
            self.expiration_date, self.FORMAT_CODE
        ).timestamp()
        if expiration_timestamp <= time.time():
            raise ValueError("Expiration date must be in the future")

        self.strike_price = strike_price
        self.risk_free_rate = risk_free_rate
        self.volatility = volatility
        self.__payoff = None

    @abstractmethod
    def exercise(self): ...

    def _calculate_time_to_expiration(self) -> float:
        expiration_timestamp = datetime.strptime(
            self.expiration_date, self.FORMAT_CODE
        ).timestamp()
        current_timestamp = time.time()
        T = (expiration_timestamp - current_timestamp) / (60 * 60 * 24 * 365)
        return T
    
class EuropeanCallOption(Option):
    @property
    def payoff(self) -> float:
        T = self._calculate_time_to_expiration()
        d1 = (math.log(self.underlying.price / self.strike_price) + 
              (self.risk_free_rate + self.volatility**2 / 2) * T) / \
             (self.volatility * math.sqrt(T))
        d2 = d1 - (self.volatility * math.sqrt(T))
        c = self.underlying.price * norm.cdf(d1) - self.strike_price * \
            math.exp(-self.risk_free_rate * T) * norm.cdf(d2)
        return c
        
    def exercise(self):
        return None

class EuropeanPutOption(Option):
    @property
    def payoff(self) -> float:
        T = self._calculate_time_to_expiration()
        d1 = (math.log(self.underlying.price / self.strike_price) + 
              (self.risk_free_rate + self.volatility**2 / 2) * T) / \
             (self.volatility * math.sqrt(T))
        d2 = d1 - (self.volatility * math.sqrt(T))
        p = self.strike_price * math.exp(-self.risk_free_rate * T) * norm.cdf(-d2) - \
            self.underlying.price * norm.cdf(-d1)
        return p
        
    def exercise(self):
        current_price = self.underlying.price
        exercise_value = max(0, self.strike_price - current_price)
        return exercise_value

class AmericanCallOption(Option):
    @property
    def payoff(self) -> float:
        intrinsic_value = max(0, self.underlying.price - self.strike_price)
        
        T = self._calculate_time_to_expiration()
        d1 = (math.log(self.underlying.price / self.strike_price) + 
              (self.risk_free_rate + self.volatility**2 / 2) * T) / \
             (self.volatility * math.sqrt(T))
        d2 = d1 - (self.volatility * math.sqrt(T))
        european_value = self.underlying.price * norm.cdf(d1) - self.strike_price * \
            math.exp(-self.risk_free_rate * T) * norm.cdf(d2)
            
        return max(intrinsic_value, european_value)
        
    def exercise(self, current_price: float = None) -> float:
        if current_price is None:
            current_price = self.underlying.price
            
        exercise_value = max(0, current_price - self.strike_price)
        
        T = self._calculate_time_to_expiration()
        if T > 0:
            d1 = (math.log(current_price / self.strike_price) + 
                  (self.risk_free_rate + self.volatility**2 / 2) * T) / \
                 (self.volatility * math.sqrt(T))
            d2 = d1 - (self.volatility * math.sqrt(T))
            european_value = current_price * norm.cdf(d1) - self.strike_price * \
                math.exp(-self.risk_free_rate * T) * norm.cdf(d2)
            
            if exercise_value > european_value:
                return exercise_value
                
        return exercise_value

class AmericanPutOption(Option):
    @property
    def payoff(self) -> float:
        intrinsic_value = max(0, self.strike_price - self.underlying.price)
        
        T = self._calculate_time_to_expiration()
        if T <= 0:
            return intrinsic_value
            
        d1 = (math.log(self.underlying.price / self.strike_price) + 
              (self.risk_free_rate + self.volatility**2 / 2) * T) / \
             (self.volatility * math.sqrt(T))
        d2 = d1 - (self.volatility * math.sqrt(T))
        
        european_value = self.strike_price * math.exp(-self.risk_free_rate * T) * norm.cdf(-d2) - \
            self.underlying.price * norm.cdf(-d1)
            
        return max(intrinsic_value, european_value)
        
    def exercise(self, current_price: float = None) -> float:
        if current_price is None:
            current_price = self.underlying.price
            
        exercise_value = max(0, self.strike_price - current_price)
        
        T = self._calculate_time_to_expiration()
        if T > 0:
            d1 = (math.log(current_price / self.strike_price) + 
                  (self.risk_free_rate + self.volatility**2 / 2) * T) / \
                 (self.volatility * math.sqrt(T))
            d2 = d1 - (self.volatility * math.sqrt(T))
            european_value = self.strike_price * math.exp(-self.risk_free_rate * T) * norm.cdf(-d2) - \
                current_price * norm.cdf(-d1)
            
            if exercise_value > european_value:
                return exercise_value
                
        return exercise_value

In [2]:
vtbr = Stock(name="VTBR", price=70)

eur_call_option = EuropeanCallOption(
    name="Default Name",
    strike_price=70,
    risk_free_rate=0.18,
    volatility=0.3,
    expiration_date="2025-12-31",
    price=7,
    underlying_name="VTBR",
)

Задание 2

In [None]:
class BinaryOption(Option):
    def __init__(self, *args, payout_amount: float, **kwargs):
        super().__init__(*args, **kwargs)
        if payout_amount <= 0:
            raise ValueError("Payout amount must be positive")
        self.payout_amount = payout_amount

    @property
    def payoff(self) -> float:
        T = self._calculate_time_to_expiration()
        d2 = (
            math.log(self.underlying.price / self.strike_price)
            + (self.risk_free_rate - 0.5 * self.volatility**2) * T
        ) / (self.volatility * math.sqrt(T))

        probability_in_the_money = norm.cdf(d2)
        expected_payout = (
            self.payout_amount
            * math.exp(-self.risk_free_rate * T)
            * probability_in_the_money
        )
        return expected_payout

    def exercise(self) -> float:
        current_price = self.underlying.price
        if current_price >= self.strike_price:
            return self.payout_amount
        return 0.0


class BarrierOption(Option):
    BARRIER_TYPES = ["up_and_in", "up_and_out", "down_and_in", "down_and_out"]

    def __init__(self, *args, barrier_level: float, barrier_type: str, **kwargs):
        super().__init__(*args, **kwargs)

        if barrier_level <= 0:
            raise ValueError("Barrier level must be positive")
        if barrier_type not in self.BARRIER_TYPES:
            raise ValueError(f"Barrier type must be one of {self.BARRIER_TYPES}")

        self.barrier_level = barrier_level
        self.barrier_type = barrier_type

    def _monte_carlo_simulation(self, num_simulations: int = 10000) -> float:
        T = self._calculate_time_to_expiration()
        steps = 100
        dt = T / steps
        sqrt_dt = math.sqrt(dt)

        total_payoff = 0.0

        for _ in range(num_simulations):
            price_path = [self.underlying.price]
            hit_barrier = False

            for step in range(steps):
                Z = norm.rvs()
                next_price = price_path[-1] * math.exp(
                    (self.risk_free_rate - 0.5 * self.volatility**2) * dt
                    + self.volatility * sqrt_dt * Z
                )
                price_path.append(next_price)

                if self._check_barrier_condition(next_price):
                    hit_barrier = True

            final_price = price_path[-1]
            payoff = self._calculate_barrier_payoff(final_price, hit_barrier)
            total_payoff += payoff

        expected_payoff = total_payoff / num_simulations
        discounted_payoff = expected_payoff * math.exp(-self.risk_free_rate * T)
        return discounted_payoff

    def _check_barrier_condition(self, price: float) -> bool:
        if self.barrier_type.startswith("up"):
            return price >= self.barrier_level
        else:
            return price <= self.barrier_level

    def _calculate_barrier_payoff(self, final_price: float, hit_barrier: bool) -> float:
        vanilla_payoff = max(0, final_price - self.strike_price)

        if self.barrier_type.endswith("_in"):
            return vanilla_payoff if hit_barrier else 0.0
        else:
            return vanilla_payoff if not hit_barrier else 0.0

    @property
    def payoff(self) -> float:
        return self._monte_carlo_simulation()

    def exercise(self, current_price: float = None) -> float:
        if current_price is None:
            current_price = self.underlying.price

        hit_barrier = self._check_barrier_condition(current_price)
        vanilla_payoff = max(0, current_price - self.strike_price)

        return self._calculate_barrier_payoff(vanilla_payoff, hit_barrier)

Задание 3

In [None]:
class Portfolio:
    def __init__(self, name: str):
        self.name = name
        self.assets = []

    def __repr__(self) -> str:
        return f"Portfolio('{self.name}', assets={self.assets})"

    def __getitem__(self, index):
        return self.assets[index]

    def __contains__(self, asset):
        return asset in self.assets

Задание 5

In [None]:
from typing import Optional, List, Union
class Portfolio:

    def __init__(self, name: str):
        self.name = name
        self.assets = []

    def __repr__(self) -> str:
        return f"Portfolio('{self.name}', assets={self.assets})"

    def __getitem__(self, index):
        return self.assets[index]

    def __contains__(self, asset):
        return asset in self.assets

    def __str__(self) -> str:
        if not self.assets:
            return f"Portfolio '{self.name}' is empty"

        assets_str = "\n".join([f"  - {asset}" for asset in self.assets])
        return f"Portfolio: {self.name}\nAssets:\n{assets_str}\nTotal Value: ${self.total_value}"

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, Portfolio):
            return False
        return (
            self.name == other.name
            and self.assets == other.assets
        )

    def __lt__(self, other: object) -> bool:
        if not isinstance(other, Portfolio):
            return NotImplemented
        return sum(i.price for i in self.assets) < sum(i.price for i in other.assets)

    def __le__(self, other: object) -> bool:
        if not isinstance(other, Portfolio):
            return NotImplemented
        return sum(i.price for i in self.assets) <= sum(i.price for i in other.assets)

    def __mul__(self, factor: float) -> "Portfolio":
        if not isinstance(factor, (int, float)):
            return NotImplemented

        scaled_assets: List[AbstractAsset] = []
        for asset in self.assets:
            scaled_asset = type(asset)(name=asset.name, price=asset.price * factor)
            scaled_assets.append(scaled_asset)

        return Portfolio(f"{self.name} (scaled {factor}x)", scaled_assets)

    def __bool__(self) -> bool:
        return len(self.assets) > 0


Задание 6

In [None]:
import pickle
from typing import List, Any, Optional, Iterator, Callable
from contextlib import contextmanager
from functools import wraps
from datetime import datetime


def validate_option_params(func: Callable[[Any], Any]) -> Callable[[Any], Any]:
    @wraps(func)
    def wrapper(self, *args, **kwargs) -> Any:
        strike_price: float = kwargs.get(
            "strike_price", getattr(self, "strike_price", 0)
        )
        risk_free_rate: float = kwargs.get(
            "risk_free_rate", getattr(self, "risk_free_rate", 0)
        )
        volatility: float = kwargs.get("volatility", getattr(self, "volatility", 0))
        expiration_date: str = kwargs.get(
            "expiration_date", getattr(self, "expiration_date", "")
        )

        if strike_price <= 0:
            raise ValueError("Strike price must be positive")
        if risk_free_rate < 0:
            raise ValueError("Risk-free rate cannot be negative")
        if volatility <= 0:
            raise ValueError("Volatility must be positive")


        expiration_timestamp: float = datetime.strptime(
            expiration_date, "%Y-%m-%d"
        ).timestamp()
        if expiration_timestamp <= time.time():
            raise ValueError("Expiration date must be in the future")

        return func(self, *args, **kwargs)

    return wrapper


class PortfolioManager:
    def __init__(self, portfolios: List[Any]) -> None:
        self.portfolios: List[Any] = portfolios
        self._original_state: Optional[List[Any]] = None

    def __enter__(self) -> "PortfolioManager":
        self._original_state = [
            pickle.dumps(portfolio) for portfolio in self.portfolios
        ]
        return self

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        if self._original_state:
            for i, serialized_portfolio in enumerate(self._original_state):
                if i < len(self.portfolios):
                    self.portfolios[i] = pickle.loads(serialized_portfolio)

    def sum_all(self) -> Any:
        if not self.portfolios:

            return Portfolio("Empty Sum")

        total_assets: List[Any] = []
        for portfolio in self.portfolios:
            total_assets.extend(portfolio.assets)

        return Portfolio("Sum of All Portfolios", total_assets)

    def market_neutral(self) -> "PortfolioManager":
        neutral_portfolios: List[Any] = []
        for portfolio in self.portfolios:
            neutral_assets: List[Any] = []
            for asset in portfolio.assets:
                from copy import copy

                neutral_asset = copy(asset)
                neutral_asset.price = -neutral_asset.price
                neutral_assets.append(neutral_asset)

            neutral_portfolio = Portfolio(f"Neutral {portfolio.name}", neutral_assets)
            neutral_portfolios.append(neutral_portfolio)

        return PortfolioManager(neutral_portfolios)

    def save_portfolios(self, filename: str) -> None:
        with open(filename, "wb") as f:
            pickle.dump(self.portfolios, f)

    def load_portfolios(self, filename: str) -> None:
        with open(filename, "rb") as f:
            self.portfolios = pickle.load(f)

    def __getitem__(self, index: int) -> Any:
        return self.portfolios[index]

    def __len__(self) -> int:
        return len(self.portfolios)

    def add_portfolio(self, portfolio: Any) -> None:
        self.portfolios.append(portfolio)

    def remove_portfolio(self, portfolio: Any) -> None:
        self.portfolios.remove(portfolio)


@contextmanager
def temporary_portfolio_changes(
    portfolio: Any, temporary_assets: List[Any]
) -> Iterator[Any]:
    original_assets: List[Any] = portfolio.assets.copy()
    try:
        portfolio.assets = temporary_assets
        yield portfolio
    finally:
        portfolio.assets = original_assets  