In [1]:
from operator import itemgetter
import itertools
from typing import Callable, Iterable, Iterator, TypeVar, Sequence, Tuple


import numpy as np

from src.function_approx import LinearFunctionApprox, Weights
from src.markov_decision_process import TransitionStep, NonTerminal
from src.monte_carlo import greedy_policy_from_qvf
from src.policy import DeterministicPolicy

S = TypeVar('S')
A = TypeVar('A')

In [2]:
def least_squares_tdq(
    transitions: Iterable[TransitionStep[S, A]],
    feature_functions: Sequence[Callable[[Tuple[NonTerminal[S], A]], float]],
    target_policy: DeterministicPolicy[S, A],
    γ: float,
    ε: float
) -> LinearFunctionApprox[Tuple[NonTerminal[S], A]]:
    '''transitions is a finite iterable'''
    num_features: int = len(feature_functions)
    a_inv: np.ndarray = np.eye(num_features) / ε
    b_vec: np.ndarray = np.zeros(num_features)
    for tr in transitions:
        phi1: np.ndarray = np.array([f((tr.state, tr.action))
                                     for f in feature_functions])
        if isinstance(tr.next_state, NonTerminal):
            phi2 = phi1 - γ * np.array([
                f((tr.next_state, target_policy.action_for(tr.next_state.state)))
                for f in feature_functions])
        else:
            phi2 = phi1
        temp: np.ndarray = a_inv.T.dot(phi2)
        a_inv = a_inv - np.outer(a_inv.dot(phi1), temp) / (1 + phi1.dot(temp))
        b_vec += phi1 * tr.reward

    opt_wts: np.ndarray = a_inv.dot(b_vec)
    return LinearFunctionApprox.create(
        feature_functions=feature_functions,
        weights=Weights.create(opt_wts)
    )


def least_squares_policy_iteration(
    transitions: Iterable[TransitionStep[S, A]],
    actions: Callable[[NonTerminal[S]], Iterable[A]],
    feature_functions: Sequence[Callable[[Tuple[NonTerminal[S], A]], float]],
    initial_target_policy: DeterministicPolicy[S, A],
    γ: float,
    ε: float
) -> Iterator[LinearFunctionApprox[Tuple[NonTerminal[S], A]]]:
    '''transitions is a finite iterable'''
    target_policy: DeterministicPolicy[S, A] = initial_target_policy
    transitions_seq: Sequence[TransitionStep[S, A]] = list(transitions)
    while True:
        q: LinearFunctionApprox[Tuple[NonTerminal[S], A]] = \
            least_squares_tdq(
                transitions=transitions_seq,
                feature_functions=feature_functions,
                target_policy=target_policy,
                γ=γ,
                ε=ε,
            )
        target_policy = greedy_policy_from_qvf(q, actions)
        yield q

In [3]:
import numpy as np

class AmericanOptionLSPI:
    def __init__(self, sampler, feature_functions, gamma, epsilon, num_iterations):
        self.sampler = sampler
        self.feature_functions = feature_functions
        self.gamma = gamma
        self.epsilon = epsilon
        self.num_iterations = num_iterations

    def simulate_data(self, num_samples):
        transitions = []
        self.sampler.sample()
        for _ in range(num_samples):
            state = self.sampler.get_current_state() #Note: в реализации по другому
            action = np.random.choice([0, 1])  # random policy
            reward, next_state = self.sampler.step(action) #Note: в реализации по другому
            transitions.append((state, action, reward, next_state))
        return transitions

    def learn_policy(self):
        transitions = self.simulate_data(10000)  # 10000 transitions
        initial_policy = lambda s: np.random.choice([0, 1])
        
        qvf = least_squares_policy_iteration(
            transitions=transitions,
            feature_functions=self.feature_functions,
            initial_target_policy=DeterministicPolicy(initial_policy),
            γ=self.gamma,
            ε=self.epsilon
        )
        return qvf

    def price_option(self):
        qvf = self.learn_policy()

In [4]:
from src.samplers import WienerRainbowPutOptionSampler
our_sampler = WienerRainbowPutOptionSampler(
        cnt_trajectories=1_000_0, # кол-во траекторий
        cnt_times=30, # кол-во временных точек
        t=1,
        time_grid = None,
        seed = 345,
        sigmas = [1., 1.], # волатильность цен базовых активов
        strike=0.
)

In [5]:
def feature_functions(state_action_pair):
    state, action = state_action_pair
    # разница в цене от страйка, time to maturity, actions
    features = [
        state[0] - strike,  # price difference of first asset
        state[1] - strike,  # price difference of second asset
        state[2],  # time to maturity
        action  # action taken: 0 for hold, 1 for exercise
    ]
    return np.array(features)


In [6]:
lspi = AmericanOptionLSPI(
    sampler=our_sampler,
    feature_functions=feature_functions,
    gamma=0.95,  
    epsilon=1e-5,  
    num_iterations=20  # num of policy iterations
)

policy = lspi.learn_policy()


  0%|          | 0/10000 [00:00<?, ?it/s]

  0%|          | 0/10000 [00:00<?, ?it/s]

AttributeError: 'WienerRainbowPutOptionSampler' object has no attribute 'get_current_state'

In [7]:
def price_option(policy, num_simulations=1000):
    total_payoff = 0
    for _ in range(num_simulations):
        for t in range(sampler.cnt_times):
            action = policy(state)
            if action == 1:  # exercise option
                payoff = max(strike - min(state), 0)
                total_payoff += payoff
                break
            state = sampler.step()  # move to next state
    return total_payoff / num_simulations

In [1]:
import numpy as np
from sklearn.preprocessing import PolynomialFeatures
from tqdm import tqdm

class LSPI:
    def __init__(self, feature_functions, gamma, regularization_alpha=1e-4):
        self.feature_functions = feature_functions
        self.gamma = gamma
        self.regularization_alpha = regularization_alpha
        self.weights = None

    def train(self, experiences):
        # Construct design matrix and target vector
        phi = np.array([self.feature_functions(state) for state, _, _ in experiences])
        rewards = np.array([reward for _, action, reward in experiences])
        transitions = np.array([self.feature_functions(next_state) for _, next_state, _ in experiences])

        # Regression target
        y = rewards + self.gamma * np.dot(transitions, self.weights) if self.weights is not None else rewards

        # Regularized least squares (Ridge regression)
        A = np.dot(phi.T, phi) + self.regularization_alpha * np.eye(phi.shape[1])
        b = np.dot(phi.T, y)
        self.weights = np.linalg.solve(A, b)

    def value(self, state):
        features = self.feature_functions(state)
        return np.dot(features, self.weights)

class AmericanMonteCarloWithLSPI:
    def __init__(
            self,
            sampler,
            feature_functions,
            gamma=0.95,
            regularization_alpha=1e-4
    ):
        self.sampler = sampler
        self.gamma = gamma
        self.regularization_alpha = regularization_alpha
        self.lspi = LSPI(feature_functions, gamma, regularization_alpha)
        self.price_history = None
        self.option_price = None
        self.result = {}

    def price(self, num_episodes=1000, episode_length=30, quiet=False):
        # Generate training data from simulation
        experiences = []
        for _ in tqdm(range(num_episodes)):
            self.sampler.sample()
            for time_index in range(episode_length - 1):
                current_state = self.sampler.markov_state[:, time_index]
                next_state = self.sampler.markov_state[:, time_index + 1]
                reward = self.sampler.payoff[:, time_index]  # Reward is the payoff at exercising time
                experiences.append((current_state, next_state, reward))

        # Train LSPI with collected data
        self.lspi.train(experiences)

        # Simulate option pricing with LSPI estimated values
        self.sampler.sample()  # Resample for pricing
        discounted_payoff = self.sampler.payoff * self.sampler.discount_factor
        self.option_price = np.zeros_like(discounted_payoff[:, -1])

        for time_index in range(self.sampler.cnt_times - 1, -1, -1):
            current_state = self.sampler.markov_state[:, time_index]
            continuation_value = self.lspi.value(current_state)
            exercise_value = discounted_payoff[:, time_index]

            # Decide to exercise based on LSPI value function approximation
            exercise = exercise_value > continuation_value
            self.option_price = np.where(exercise, exercise_value, self.option_price)

            if not quiet:
                print(f"Time {time_index}: Exercise {'Yes' if np.any(exercise) else 'No'}")

        # Compute statistics
        self.result['price'] = float(np.mean(self.option_price))
        self.result['std'] = float(np.std(self.option_price))

        return self.result

# Feature functions should be defined based on domain knowledge or experimental tuning
def feature_functions(state):
    return PolynomialFeatures(degree=2).fit_transform(state.reshape(-1, 1))
