In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
'''Consider the discrete-time asset allocation example in section 8.4 of Rao and Jelvis. Suppose the single-time-step return of the risky asset from time t to t+1 as Yt = a, prob = p
and b, prob = (1-p). Suppose that T=10, use the TD method to find the Q function and hence the optimal strategy'''

'Consider the discrete-time asset allocation example in section 8.4 of Rao and Jelvis. Suppose the single-time-step return of the risky asset from time t to t+1 as Yt = a, prob = p\nand b, prob = (1-p). Suppose that T=10, use the TD method to find the Q function and hence the optimal strategy'

In [18]:
from __future__ import annotations
## Setting up - State, Reward Transition, Actions, Reward
from abc import ABC, abstractmethod
from dataclasses import dataclass
from collections import defaultdict
from scipy.stats import binom
import numpy as np
from typing import Dict, Sequence
import itertools
import random
import operator

'''
Initial Assumptions
- Discrete Time
- Discrete Returns: +/-1 (daily)
- 1 Stock - Follows Binomial Distribution payout
- Utility - CARA function for final step reward Constant Absolute Risk Aversion
- T = 10 based on Assignment specification
- No fractional investments (i.e. only units)
- Assuming W_0 initial wealth = 0
- Assume NO CONSUMPTION of wealth at any Time t<T
- Unconstrained allocation amount
'''

ITERATIONS=3
GAMMA=1
COEFFICIENT_OF_ARA=1
INVESTMENT_LIMIT=1 #Buy 1 stock per day
RISK_FREE_RATE=0 #Assuming you're just letting your money sit there for now so you will never lose really
PROBABILITY_ALPHA=0.5 #Probability of risky asset increasing
INCREASE=1
DECREASE=-1
PRICE_RANGE=1 #[-1,1]

'''State'''
@dataclass(frozen=True)
class WealthState:
    time: int #time state I am in
    wealth: int #for now it will be int
    risky_asset_allocation: int
    termination_time:int = ITERATIONS
    
    def __eq__(self, other):
        return (self.time == other.time) and \
               (self.wealth == other.wealth) and \
               (self.risky_asset_allocation == other.risky_asset_allocation)
    
    def isTerminal(self):
        return self.time == self.termination_time

'''ACTIONS'''
'''Only buy 1 stock for a given time step'''
@dataclass(frozen=True)
class InvestmentAction:
    risky_investment_amount: int
    
    def __eq__(self, other):
        return self.risky_investment_amount == other.risky_investment_amount
    
    #Can't short yet#
    def action_to_next_state(self, ws: WealthState) -> WealthState:
        new_state = WealthState(time=ws.time+1, 
                                wealth=ws.wealth,
                                risky_asset_allocation=ws.risky_asset_allocation + self.risky_investment_amount
                               )
        return new_state
    
    @staticmethod
    def get_all_actions(investment_limit: int) -> Sequence[InvestmentAction]:
        all_actions = list()
        for i in range(investment_limit+1):
            action = InvestmentAction(risky_investment_amount=i) #you can choose not to invest too
            all_actions.append(action)
        
        return all_actions
    
'''REWARD'''
'''CARA UTILITY'''
def cara_func(x :int, alpha=COEFFICIENT_OF_ARA) -> int:
    return - np.exp(- alpha * x)/alpha

def wealth_func(ws: WealthState, risky_asset_return: int) -> int:
    risky_return = ws.risky_asset_allocation * risky_asset_return
    #when negative that means im borrowing money to buy stocks
    risk_free_return = max(0,(ws.wealth - ws.risky_asset_allocation)) * (1 + RISK_FREE_RATE) 
    return risky_return + risk_free_return  

#Binomial Returns
def expected_binomial_returns(time=ITERATIONS, probability=PROBABILITY_ALPHA) -> Dict:
    values = [[0]]
    for i in range(time):
        root = values[i]
        new_vals = []
        for val in root:
            new_vals += [val + 1, val -1]
        
        values += [new_vals] #Per index gives you the combinations on each iteration - building the binomial tree
    
    combinations = values[time]
    sorted_prices = sorted({v for v in combinations})
    
    ##Probabilities
    price_probs = {price: round(binom.pmf(index, time, probability),3) 
                   for index, price in enumerate(sorted_prices)}
        
    return price_probs

#probability passed in for testing purposes - defaulted to global variable
def reward_function(ws: WealthState, probability=PROBABILITY_ALPHA, alpha=COEFFICIENT_OF_ARA) -> float:
    if ws.isTerminal():
        expected_returns = expected_binomial_returns(ws.time,probability)
        utility = 0
        for price, probability in list(expected_returns.items()):
            price_utility = cara_func(wealth_func(ws,price),alpha)
            # print(price_utility)
            #Remember price is stochastic as well - so calculating the expected price given the 
            #binomial returns at the very end
            utility += price_utility * probability

        return utility
    else:
        return 0.

'''TRANSITIONS'''
'''Returns a sequence indexed by time. Essentially, its a sequence of possible states at a given time assuming all actions
are applied to each state'''
def get_all_state_actions(iterations, available_actions, initial_state) -> Sequence[Dict[WealthState, Dict[WealthState, float]]]:
    all_states = [{initial_state:{(a,a.action_to_next_state(initial_state)): \
                    1/len(available_actions) for a in available_actions}}]
    prev_states = [initial_state]
    for i in range(iterations-1):
        next_states = [a.action_to_next_state(prev) for a in available_actions for prev in prev_states]
        next_states_mapping = {}
        for next_state in next_states:
            #Assuming equal probability of moving into the next state
            transitioned_states = {(a,a.action_to_next_state(next_state)):
                           1/len(available_actions) for a in available_actions}
            next_states_mapping[next_state]= transitioned_states
        
        all_states.append(next_states_mapping)
        prev_states = next_states
        
    return all_states

'''
MDP PROCESS
1. Decrement from time T back down to 0 to calculate the values for each state.
2. each time_step: T-1 = r + discount * E[G_T|S] (but r = 0 for non terminal states so... :)
'''
initial_state = ws = WealthState(time=0,wealth=-1,risky_asset_allocation=0)
all_actions = InvestmentAction.get_all_actions(investment_limit=INVESTMENT_LIMIT)
all_state_actions = get_all_state_actions(ITERATIONS, all_actions, ws)

#Define a state action reward mapping - for each time - state:{(action,next_state): reward}
#Probably applicable for finite states only.
def get_state_action_value_map(all_state_actions, reward_func=reward_function, gamma=GAMMA, iterations=ITERATIONS):
    state_action_value_map = defaultdict(float)
    for i in reversed(range(iterations)):
        states_at_t = all_state_actions[i]
        for (state_at_t) in states_at_t:
            next_state_actions = list(states_at_t[state_at_t].items())
            discount = gamma ** (iterations - i)
            ##The assumption is becasue this is finite state, the state_value_map will have computed the
            ##rewards at t+1 already cause this is going reverse time. Need to think about approximating later
            next_states = [(action_state, probability) for action_state , probability in next_state_actions]
            state_rewards = {s_a:reward_func(s_a[1]) * prob * discount if s_a[1].isTerminal()
                             else sum(list(state_action_value_map[s_a[1]].values())) * prob * discount #not adding r because 0 for non-terminal
                             for s_a, prob in next_states}
            state_action_value_map[state_at_t] = state_rewards
    
    return state_action_value_map

state_action_value_map=get_state_action_value_map(all_state_actions)
#Initialize a random policy - randomly choose an action to determine an initial policy
policy_0 = defaultdict(float)

for _, states_actions_map in enumerate(all_state_actions):
    [*states] = states_actions_map.keys()
    for state in states:
        action_states = [*states_actions_map[state].keys()]
        # print(action_states)
        probabilities = [*states_actions_map[state].values()]
        chosen_action_state = random.choices(action_states, weights=probabilities)
        chosen_action = [action for (action,state) in chosen_action_state][0]
        policy_0[state] = chosen_action
    
###Policy Evaluation - policy for s -> action: I want value function
def policy_evaluation(policy, state_action_value_map):
    q_function = defaultdict(float)
    for (state, action) in policy.items():
        state_action_values = state_action_value_map[state]
        action_value = state_action_values[(action, action.action_to_next_state(state))]
        q_function[(state,action)] = action_value
    
    return q_function

policy_evaluation(policy_0,state_action_value_map)

###Value Iteration - greedy policy - choose the action from value function that gives you the highest expected reward
def converged(q_function_1,q_function_2):
    return max(abs(q_function_1[s] - q_function_2[s]) for s in q_function_1) < 1e-4

q_function_0 = policy_evaluation(policy_0,state_action_value_map)
q_function = q_function_0
greedy_q_function = defaultdict(float)
while not converged(q_function,greedy_q_function):
    #Evaluate the greedy_q_function
    greedy_policy = defaultdict(float)
    for state_actions in all_state_actions:
        for (state, actions) in state_actions.items():
            action_values = {action: state_action_value_map[state][(action,action.action_to_next_state(state))] 
                         for action, _ in actions}
            max_action = [action for action, value in action_values.items() if value == max(action_values.values())][0]
            greedy_policy[state]=max_action
    
    q_function.update(greedy_q_function)
    greedy_q_function.update(policy_evaluation(greedy_policy,state_action_value_map))

print(greedy_q_function)
print("")
print(greedy_policy)
        

defaultdict(<class 'float'>, {(WealthState(time=0, wealth=-1, risky_asset_allocation=0, termination_time=3), InvestmentAction(risky_investment_amount=0)): -7.699875968629806, (WealthState(time=1, wealth=-1, risky_asset_allocation=0, termination_time=3), InvestmentAction(risky_investment_amount=1)): 0.0, (WealthState(time=1, wealth=-1, risky_asset_allocation=1, termination_time=3), InvestmentAction(risky_investment_amount=0)): -14.231195443495643, (WealthState(time=2, wealth=-1, risky_asset_allocation=0, termination_time=3), InvestmentAction(risky_investment_amount=0)): -0.5, (WealthState(time=2, wealth=-1, risky_asset_allocation=1, termination_time=3), InvestmentAction(risky_investment_amount=1)): 0.0, (WealthState(time=2, wealth=-1, risky_asset_allocation=2, termination_time=3), InvestmentAction(risky_investment_amount=1)): 0.0, (WealthState(time=1, wealth=-1, risky_asset_allocation=0, termination_time=3), InvestmentAction(risky_investment_amount=0)): -1.1685564937639685, (WealthState

### Test Functions

In [30]:
import unittest

def assert_lists_no_order(test_case, list1,list2):
    for i in list1:
        test_case.assertIn(i,list2)
        
    for i in list2:
        test_case.assertIn(i,list1)
        
class TestWealthState(unittest.TestCase):
    ws = WealthState(time=0,wealth=10,risky_asset_allocation=0)
    
    def testWealthState(self):
        self.assertEqual(self.ws.time,0)
        self.assertEqual(self.ws.wealth,10)
        self.assertEqual(self.ws.risky_asset_allocation,0)

class TestInvestmentAction(unittest.TestCase):
    ia = InvestmentAction(risky_investment_amount=2)
    ws = WealthState(time=0,wealth=10,risky_asset_allocation=0)
    
    def testNextState(self):
        ns = self.ia.action_to_next_state(self.ws)
        self.assertEqual(ns.time,1)
        self.assertEqual(ns.wealth,10)
        self.assertEqual(ns.risky_asset_allocation,2)

    def testGetAllActions_Investment_Limit_1(self):
        ia = InvestmentAction(risky_investment_amount=1)
        actions = InvestmentAction.get_all_actions(1)
        expected_actions = [
            InvestmentAction(risky_investment_amount=1),
            InvestmentAction(risky_investment_amount=0)
        ]
        assert_lists_no_order(self,actions,expected_actions)
        
class TestCARAUtility(unittest.TestCase):
     def testCARA(self):
        # -( e ^ (- alpha * Wt) )/alpha
        self.assertEqual(cara_func(1, alpha=1),-1/np.e)
        self.assertEqual(cara_func(1, alpha=2), - ((np.e ** (-2 * 1))/2))

class TestWealthFunction(unittest.TestCase):
    def testWealthFunction_Price_0_0_Risky_Asset(self):
        ws = WealthState(time=0,wealth=10,risky_asset_allocation=0)
        self.assertEqual(wealth_func(ws,0),10)
        
    def testWealthFunction_Price_1_0_Risky_Asset(self):
        ws = WealthState(time=0,wealth=10,risky_asset_allocation=0)
        self.assertEqual(wealth_func(ws,0),10)

    def testWealthFunction_Price_0_1_Risky_Asset(self):
        ws = WealthState(time=0,wealth=10,risky_asset_allocation=1)
        self.assertEqual(wealth_func(ws,1),10)
        
    def testWealthFunction_Price_1_1_Risky_Asset(self):
        ws = WealthState(time=0,wealth=10,risky_asset_allocation=1)
        self.assertEqual(wealth_func(ws,1),10)
        
    def testWealthFunction_Price_2_1_Risky_Asset(self):
        ws = WealthState(time=0,wealth=10,risky_asset_allocation=1)
        self.assertEqual(wealth_func(ws,2),11)
        
    def testWealthFunction_Price_3_1_Risky_Asset(self):
        ws = WealthState(time=0,wealth=10,risky_asset_allocation=1)
        self.assertEqual(wealth_func(ws,3),12)

    def testWealthFunction_Price_3_2_Risky_Asset(self):
        ws = WealthState(time=0,wealth=10,risky_asset_allocation=2)
        self.assertEqual(wealth_func(ws,3),14)
        
    def testWealthFunction_Price_neg_3_2_Risky_Asset(self):
        ws = WealthState(time=0,wealth=10,risky_asset_allocation=2)
        self.assertEqual(wealth_func(ws,-3),2)

    def testWealthFunction_Zero_Initial_Price_neg_3_2_Risky_Asset(self):
        ws = WealthState(time=0,wealth=0,risky_asset_allocation=2)
        self.assertEqual(wealth_func(ws,-3),-6)

class TestBinomialReturns(unittest.TestCase):
    def testBinomialReturnsWhenT_equals_1(self):
        #either +1 or -1 with 0.5 chance
        returns = expected_binomial_returns(1,0.5)
        self.assertEqual(returns,{1:0.5,-1:0.5})

    def testBinomialReturnsWhenT_equals_2(self):
        returns = expected_binomial_returns(2,0.5)
        self.assertEqual(returns,{2:0.25,0:0.5,-2:0.25})

    def testBinomialReturnsWhenT_equals_2_Uneven_Chance(self):
        returns = expected_binomial_returns(2,0.6)
        self.assertEqual(returns,{2:0.36,0:0.48,-2:0.16})
        
    def testBinomialReturnsWhenT_equals_3(self):
        returns = expected_binomial_returns(3,0.5)
        self.assertEqual(returns,{3:0.125,1:0.375,-1:0.375,-3:0.125})

class TestRewardFunction(unittest.TestCase):
    def testRewardWhenStateIsNonTerminal(self):
        ws= WealthState(time=0,wealth=0,risky_asset_allocation=0)
        self.assertEqual(reward_function(ws),0)
    
    def testRewardWhenStateIsTerminal_at_1(self):
        ws= WealthState(time=1,wealth=0,risky_asset_allocation=1,termination_time=1)
        ## 0.5 * -e(-1 * -1)/1 + 0.5 * -e(-1 * 1)/1 = expected utility given the prices
        self.assertEqual(reward_function(ws,probability=0.5,alpha=1), (-np.e * 0.5) + (1/-np.e * 0.5))
        
    def testRewardWhenStateIsTerminal_at_2(self):
        ws= WealthState(time=2,wealth=0,risky_asset_allocation=1,termination_time=2)
        '''
        0.25 * -e(-1 * -2)
        0.5 * -e(-1 * 0)
        0.25 * -e(-1 * 2)
        '''
        expected_utility = round((0.25 * -np.e ** (-1 * -2)) + (0.5 * -np.e ** (-1 * 0)) + (0.25 * -np.e ** (-1 * 2)),5)
        self.assertEqual(round(reward_function(ws,probability=0.5,alpha=1),5), expected_utility)
        
    def testRewardWhenStateIsTerminal_at_1_Uneven_Chance(self):
        ws= WealthState(time=1,wealth=0,risky_asset_allocation=1,termination_time=1)
        ''' 0.6 * -e(-1 * -1)/1 + 0.4 * -e(-1 * 1)/1 = expected utility given the prices'''
        reward = round(reward_function(ws,probability=0.6,alpha=1),5)
        expected_reward = round((-np.e * 0.4) + (1/-np.e * 0.6),5)
        self.assertEqual(reward,expected_reward)
        
class TestGetAllStateActions(unittest.TestCase):
    def testGet_All_State_Actions_1_time_step(self):
        ws= WealthState(time=0,wealth=0,risky_asset_allocation=0,termination_time=1)
        actions = [InvestmentAction(risky_investment_amount=1),InvestmentAction(risky_investment_amount=0)]
        all_state_actions = get_all_state_actions(1,actions,ws)
        expected_state_actions = [{
            ws: {
                (InvestmentAction(risky_investment_amount=1),WealthState(time=1,wealth=0,risky_asset_allocation=1)):0.5,
                (InvestmentAction(risky_investment_amount=0),WealthState(time=1,wealth=0,risky_asset_allocation=0)):0.5
            }
        }]
        self.assertEqual(expected_state_actions,all_state_actions)
        
    def testGet_All_State_Actions_2_time_step(self):
        ws= WealthState(time=0,wealth=0,risky_asset_allocation=0,termination_time=1)
        actions = [InvestmentAction(risky_investment_amount=1),InvestmentAction(risky_investment_amount=0)]
        all_state_actions = get_all_state_actions(2,actions,ws)
        expected_state_actions = [{
            ws: {
                (InvestmentAction(risky_investment_amount=1),WealthState(time=1,wealth=0,risky_asset_allocation=1)):0.5,
                (InvestmentAction(risky_investment_amount=0),WealthState(time=1,wealth=0,risky_asset_allocation=0)):0.5
            }
        },
            {
            WealthState(time=1,wealth=0,risky_asset_allocation=1): {
                (InvestmentAction(risky_investment_amount=1),WealthState(time=2,wealth=0,risky_asset_allocation=2)):0.5,
                (InvestmentAction(risky_investment_amount=0),WealthState(time=2,wealth=0,risky_asset_allocation=1)):0.5
            },
            WealthState(time=1,wealth=0,risky_asset_allocation=0): {
                (InvestmentAction(risky_investment_amount=1),WealthState(time=2,wealth=0,risky_asset_allocation=1)):0.5,
                (InvestmentAction(risky_investment_amount=0),WealthState(time=2,wealth=0,risky_asset_allocation=0)):0.5
            }
        },
        ]
        self.assertEqual(expected_state_actions,all_state_actions)

class TestGetStateActionValueMap(unittest.TestCase):
    def testGetStateActionValueMap_1_iteration(self):
        gamma = 1
        probability = 0.5
        state_actions = [{
            WealthState(time=0,wealth=0,risky_asset_allocation=0,termination_time=1): {
                (InvestmentAction(risky_investment_amount=1),WealthState(time=1,wealth=0,risky_asset_allocation=1,termination_time=1)):0.5,
                (InvestmentAction(risky_investment_amount=0),WealthState(time=1,wealth=0,risky_asset_allocation=0,termination_time=1)):0.5
            }
        }]
        reward_function = lambda ws: -np.e ** ws.risky_asset_allocation
        expected_state_action_value_map = defaultdict(float)
        expected_state_action_value_map.update({
            WealthState(time=0,wealth=0,risky_asset_allocation=0,termination_time=1):{
                (InvestmentAction(risky_investment_amount=1),WealthState(time=1,wealth=0,risky_asset_allocation=1,termination_time=1)): -gamma * np.e ** 1 * probability,
                (InvestmentAction(risky_investment_amount=0),WealthState(time=1,wealth=0,risky_asset_allocation=0,termination_time=1)): -gamma * np.e ** 0 * probability
            }
        })
        result = get_state_action_value_map(state_actions,reward_func=reward_function,gamma=1,iterations=1)
        print(result)
        print(expected_state_action_value_map)
        self.assertEqual(expected_state_action_value_map,result)
        
unittest.main(argv=[''], verbosity=2, exit=False)

testBinomialReturnsWhenT_equals_1 (__main__.TestBinomialReturns) ... ok
testBinomialReturnsWhenT_equals_2 (__main__.TestBinomialReturns) ... ok
testBinomialReturnsWhenT_equals_2_Uneven_Chance (__main__.TestBinomialReturns) ... ok
testBinomialReturnsWhenT_equals_3 (__main__.TestBinomialReturns) ... ok
testCARA (__main__.TestCARAUtility) ... ok
testGet_All_State_Actions_1_time_step (__main__.TestGetAllStateActions) ... ok
testGet_All_State_Actions_2_time_step (__main__.TestGetAllStateActions) ... ok
testGetStateActionValueMap_1_iteration (__main__.TestGetStateActionValueMap) ... ok
testGetAllActions_Investment_Limit_1 (__main__.TestInvestmentAction) ... ok
testNextState (__main__.TestInvestmentAction) ... ok
testRewardWhenStateIsNonTerminal (__main__.TestRewardFunction) ... ok
testRewardWhenStateIsTerminal_at_1 (__main__.TestRewardFunction) ... ok
testRewardWhenStateIsTerminal_at_1_Uneven_Chance (__main__.TestRewardFunction) ... ok
testRewardWhenStateIsTerminal_at_2 (__main__.TestRewardF

defaultdict(<class 'float'>, {WealthState(time=0, wealth=0, risky_asset_allocation=0, termination_time=1): {(InvestmentAction(risky_investment_amount=1), WealthState(time=1, wealth=0, risky_asset_allocation=1, termination_time=1)): -1.3591409142295225, (InvestmentAction(risky_investment_amount=0), WealthState(time=1, wealth=0, risky_asset_allocation=0, termination_time=1)): -0.5}})
defaultdict(<class 'float'>, {WealthState(time=0, wealth=0, risky_asset_allocation=0, termination_time=1): {(InvestmentAction(risky_investment_amount=1), WealthState(time=1, wealth=0, risky_asset_allocation=1, termination_time=1)): -1.3591409142295225, (InvestmentAction(risky_investment_amount=0), WealthState(time=1, wealth=0, risky_asset_allocation=0, termination_time=1)): -0.5}})


ok

----------------------------------------------------------------------
Ran 24 tests in 0.053s

OK


<unittest.main.TestProgram at 0x7fdc56a60190>