In [1]:
import numpy as np
import plotly.graph_objects as go
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from functools import reduce
from math import ceil
sns.set_style('darkgrid')

In [2]:
TAB_SIZE = 4
EPS = 1e-9

In [3]:
def AVAR(q, w, alpha):
    eval = [(w[k], q[k])  for k in q.keys()]
    res = 0
    a = alpha
    for wk, qk in sorted(eval, reverse=True):
        if np.isclose(alpha, 0, atol = EPS):
            break
        if alpha >= qk:
            res += wk*qk
            alpha -= qk
        else:
            res += wk*alpha
            alpha = 0
    return res/a

def add_qw(qw1, qw2):
    # bad error for now
    Q_sum = {}
    w_sum = {}
    for x1, q1 in qw1[0].items():
        for x2, q2 in qw2[0].items():
            qs = q1*q2
            temp = qw1[1][x1] + qw2[1][x2]
            if isinstance(x1, int) or isinstance(x1, np.int32):
                x1 = (x1,)
            if isinstance(x2, int) or isinstance(x2, np.int32):
                x2 = (x2,)
            xs = x1 + x2
            Q_sum[xs] = qs 
            w_sum[xs] = temp
    return (Q_sum, w_sum)

def AVAR_of_sum(list_qw, alpha):
    return AVAR(*reduce(add_qw, list_qw), alpha)

def sum_of_AVAR(list_qw, alpha):
    return sum([AVAR(*qw, alpha) for qw in list_qw])

In [4]:
class Node:
    def __init__(self, id, model, parent = None, Q = {}, cost = {}):
        self.id = id
        self.parent = parent
        self.Q = Q
        self.cost = cost
        self.w = {}
        self.children = []
        self.policy = {}
        self.terminal = True
        self.model = model
        self.name = str(id)
        self.one_step = None
    
    def add_child(self, node):
        self.children.append(node)
        node.parent = self
        self.terminal = False
        
    def get_w(self):
        if self.w:
            return self.w
        if self.terminal:
            for x in self.model.X:
                self.w[x] = self.cost[x]
            return self.w
        self.calc_policy()
        return self.w    

    def calc_policy(self):
        """
            Calculate the optimal policy and value for the maximal subtree rooted here
        """
        if self.terminal:
            raise Error("calc_policy ran on terminal nodes!")
        def net_one_step(x, u):
            res = self.cost[u][x] + self.one_step([(child.Q[u][x], child.get_w()) for child in self.children])
            return res
        self.policy = {x: min(self.model.U, key=lambda u: net_one_step(x, u)) for x in self.model.X}
        self.w = {x: net_one_step(x, self.policy[x]) for x in self.model.X}
    
    def print_tree(self, level = 0):
        print(" " * TAB_SIZE * level + self.name)
        for child in self.children:
            child.print_tree(level+1)

In [5]:
class Model:
    def __init__(self, lo, hi, U, alpha):
        """
            State space X = [lo, hi] of interval size = 1
            Action space U
            VaR calculation alpha
            Assume that 0 is root node
        """
        self.X = range(lo, hi + 1)
        self.lo = lo
        self.hi = hi
        self.U = U
        self.alpha = alpha
        self.nodes = [Node(0, self)]
        self.root = self.nodes[0]
        self.construct_graph()
        self.construct_risks()
         
    def construct_graph(self):
        raise NotImplementedError("construct_graph has not been properly implemented!")
    
    def construct_risks(self):
        raise NotImplementedError("construct_risks has not been properly implemented!")
        
    
    def bound(self, q):
        q_res = {x : 0 for x in self.X}
        for k, qk in q.items():
            q_res[max(self.lo, min(k, self.hi))] += qk

        return q_res
    
    def draw_edge(self, parent_i, child_i):
        if max(parent_i, child_i) >= len(self.nodes):
            # add nodes appropriately
            self.nodes += [Node(i, self) for i in range(len(self.nodes), max(parent_i, child_i) + 1)]
        self.nodes[parent_i].add_child(self.nodes[child_i])

In [6]:
class RDModel(Model):
    def __init__ (self, lo, hi, U, alpha, T, investment_cost = 1):
        """
            q0(x, u), q1(x, u)
            c0(x, u), c1(x)
        """
        self.T = T
        self.n = 2 * T + 2
        self.investment_cost = investment_cost
        super().__init__(lo, hi, U, alpha)

    def construct_graph(self):
        """
            customize graph structure here
        """
        for i in range(self.T):
            self.draw_edge(2*i, 2*i + 1)
            self.draw_edge(2*i, 2*i + 2)
        self.draw_edge(2*self.T, 2*self.T + 1)
    
    def construct_risks(self):
        """
            set Q, c, and one_step for each node
        """
        def q0(x, u, t):
            if u == 0:
                return {x-2: 0.2, x-1: 0.2, x: 0.2, x+1: 0.2, x+2: 0.2}
            # u == 1
            return {x+1: 0.4, x+2: 0.2, x+3: 0.4}

        def q1(x, u, t):
            if u == 0:
                return {x-1: 0.6, x: 0.2, x+1: 0.2}
            # u == 1
            return {x-1: 0.2, x: 0.4, x+1: 0.4}

        def c0(x, u, t):
            if u == 0:
                return 0
            return self.investment_cost

        def c1(x, u, t):
            # x = state
            # a = action of this node
            return np.exp(-x/20)
        
        for i in range(self.n):
            self.nodes[i].t = ceil(float(self.nodes[i].id)/2)
        for node in self.nodes:
            node.t = ceil(float(node.id)/2)
            if node.terminal:
                node.cost = {x : c1(x, None, node.t) for x in self.X}
                node.Q = {u : {x : self.bound(q1(x, u, node.t)) for x in self.X} for u in self.U}
            else:
                node.cost = {u : {x : c0(x, u, node.t) for x in self.X} for u in self.U}
                node.Q = {u : {x : self.bound(q0(x, u, node.t)) for x in self.X} for u in self.U}
            #!customize one step here
            node.one_step = lambda list_qw : sum_of_AVAR(list_qw, self.alpha)
        
    # customized functions for this particular model
    def policy_change(self, policy):
        res = self.lo
        for k, v in policy.items():
            if v == 1:
                res = k
        return res

In [7]:
models = []
LO, HI = -100, 100
T_range = range(10, 21, 10)
investment_costs = range(1, 5, 2)
alphas = [0.1, 0.2, 0.3, 0.4, 0.5]

### Plot against investment_costs, fixed alpha = 0.3

In [8]:
"""
    Plot against investment_costs
"""
frames = []
for investment_cost in investment_costs:
    frame = []
    for T in T_range:
        model = RDModel(LO, HI, [0, 1], 0.3, T, investment_cost)
        model.root.calc_policy()
        x = [node.t for node in model.nodes if node.policy]
        y = [model.policy_change(node.policy) for node in model.nodes if node.policy]
        frame.append(go.Scatter(x = x, y = y, mode = 'markers', name=f'T = {T}'))
    frames.append(go.Frame(data = frame, name = f'Cost {investment_cost}'))

In [9]:
# Create the figure with the frames and the slider
fig = go.Figure(
    data=frames[0].data,
    layout=go.Layout(
        title='Changes by Time with Cost Slider',
        xaxis=dict(title='Time'),
        yaxis=dict(title='Value of Change'),
        yaxis_range=[-50, 60],
        updatemenus=[dict(
            type="buttons",
            showactive=False,
            buttons=[dict(label="Play",
                          method="animate",
                          args=[None, dict(frame=dict(duration=500, redraw=True), fromcurrent=True, mode="immediate")]),
                     dict(label="Pause",
                          method="animate",
                          args=[[None], dict(frame=dict(duration=0, redraw=False), mode="immediate")])]
        )]
    ),
    frames=frames
)

# Configure the sliders
fig.update_xaxes(showline=True, linewidth=1, linecolor='black', mirror=True)
fig.update_yaxes(showline=True, linewidth=1, linecolor='black', mirror=True)
fig.update_layout(
    yaxis=dict(dtick=10),
    xaxis=dict(dtick=10),
    sliders=[{
        'steps': [{'args': [[f.name], {'frame': {'duration': 300, 'redraw': True}, 'mode': 'immediate'}],
                   'label': f'{f.name}',
                   'method': 'animate'} for f in frames],
        'transition': {'duration': 300},
    }]
)
fig.show()

### Plot against alphas, fixed investment_cost = 5

In [12]:
"""
    Plot against alphas
"""
frames = []
for alpha in alphas:
    frame = []
    for T in T_range:
        model = RDModel(LO, HI, [0, 1], alpha, T, 5)
        model.root.calc_policy()
        x = [node.t for node in model.nodes if node.policy]
        y = [model.policy_change(node.policy) for node in model.nodes if node.policy]
        frame.append(go.Scatter(x = x, y = y, mode = 'markers', name=f'T = {T}'))
    frames.append(go.Frame(data = frame, name = f'Alpha {alpha}'))

In [13]:
# Create the figura with the frames and the slider
fig = go.Figure(
    data=frames[0].data,
    layout=go.Layout(
        title='Changes by Time with Alpha Slider and cost = 5',
        xaxis=dict(title='Time'),
        yaxis=dict(title='Value of Change'),
        yaxis_range=[-50, 60],
        updatemenus=[dict(
            type="buttons",
            showactive=False,
            buttons=[dict(label="Play",
                          method="animate",
                          args=[None, dict(frame=dict(duration=500, redraw=True), fromcurrent=True, mode="immediate")]),
                     dict(label="Pause",
                          method="animate",
                          args=[[None], dict(frame=dict(duration=0, redraw=False), mode="immediate")])]
        )]
    ),
    frames=frames
)

# Configure the sliders
fig.update_xaxes(showline=True, linewidth=1, linecolor='black', mirror=True)
fig.update_yaxes(showline=True, linewidth=1, linecolor='black', mirror=True)
fig.update_layout(
    yaxis=dict(dtick=10),
    xaxis=dict(dtick=10),
    sliders=[{
        'steps': [{'args': [[f.name], {'frame': {'duration': 300, 'redraw': True}, 'mode': 'immediate'}],
                   'label': f'{f.name}',
                   'method': 'animate'} for f in frames],
        'transition': {'duration': 300},
    }]
)
fig.show()