In [18]:
import numpy as np
import typing
from typing import Any

In [31]:
from nptyping import NDArray, Float64, Int, Shape

# StateSpace is a numpy array of integers with an unknown number of rows and one column
StateSpace = NDArray[Shape["*"], Int] 
# This is how you initialize one object: 
states = np.array([1, 2, 3])
isinstance(states, StateSpace)

# TransitionMatrix is a 2 dimensional numpy array of floats with an unknown number of rows and columns. The values are floats between 0 and 1 and the sum of each row is 1.
# An array with 2 dimensions of type float
TransitionMatrix = NDArray[Shape["*, *"], Float64] 
# This is how you initialize one object:
P = np.array([[0.5, 0.25, 0.25], [0.25, 0.5, 0.25], [0.25, 0.25, 0.5]])
isinstance(P, TransitionMatrix)


True

In [68]:
# display numbers in non scientific notation showing 4 decimal places
np.set_printoptions(suppress=True, precision=4)

In [69]:
# StateSpace Type is a numpy array of integers
StateSpace = typing.NewType('StateSpace', np.array)
# TransitionMatrix Type is a numpy array of floats between 0 and 1
TransitionMatrix = typing.NewType('TransitionMatrix', np.array)

In [73]:

def get_initial_state(state: int = 1) -> int:
    return state

# write a function to validate the transition matrix, each row must sum to 1
def validate_transition_matrix(P: TransitionMatrix) -> bool:
    # assert that the sum of each row is 1
    assert np.all(np.sum(P, axis=1) == 1), "Each row of the transition matrix must sum to 1"
    return True

# Write a comment describing what this code does, as well as any other information you think is relevant. Include any function names, variable names, or other identifiers that you think are important. You can also include any other information that you think is relevant, such as the purpose of the code, the context in which it is used, or any other information that you think is relevant.
# The code above is a simulation of a Markov chain with 4 states. We start at the state 1 and transition to other states. The function simulate_from_discrete_distribution simulates a transition from a state to another state. The function generate_random_sequence generates a random sequence of states for a given number of transitions. The function compute_time_to_return_to_state computes the time to return to a state. The function expected_revenue calculates the expected revenue for a customer in n_steps.

def get_initial_state(state: int = 1) -> int:
    return state

# write a function to validate the transition matrix, each row must sum to 1
def validate_transition_matrix(P: TransitionMatrix) -> bool:
    # assert that the sum of each row is 1
    assert np.all(np.sum(P, axis=1) == 1), "Each row of the transition matrix must sum to 1"
    return True

def simulate_from_discrete_distribution(states:StateSpace, probability_vector:np.array) -> int:
    # assert that dimensions of states and probability_vector are the same and pass the eror message
    assert states.shape == probability_vector.shape, "states and probability_vector must have the same dimensions"
    # assert that the sum of the probability vector is 1
    assert np.sum(probability_vector) == 1, "The sum of the probability vector must be 1"
    return int(np.random.choice(states, size=1, p = np.array(probability_vector)))

def generate_random_sequence(states: StateSpace,P: TransitionMatrix, n_transitions:int = 10, start_state:int = 1) -> np.array:
    # assert that the type of states is StateSpace
    # assert type(states) == StateSpace, "states must be of type StateSpace (numpy array))"
    # assert that the type of P is TransitionMatrix
    # assert type(P) == TransitionMatrix, "P must be of type TransitionMatrix (numpy array)"
    start_state = get_initial_state(start_state) # get initial state
    validate_transition_matrix(P) # validate transition matrix
    random_transitions = [start_state] # set start state to 1
    while n_transitions >0: # while number of transitions is greater than 0
        next_state = simulate_from_discrete_distribution(states, P[start_state-1]) # simulate next state
        random_transitions.append(next_state) # append next state to random transitions
        start_state = next_state # set start state to next state
        n_transitions -= 1 # decrement number of transitions
    return np.array(random_transitions)

# write a function to compute time to return to a state
def compute_time_to_return_to_state(  
        states: StateSpace,
        P: TransitionMatrix,
        state:int,
        n_transitions:int = 1000,
        ) -> int:
    # assert that the type of state is int
    assert type(state) == int, "state must be of type int"
    random_squence = generate_random_sequence(states, P, n_transitions=n_transitions, start_state=state) # generate random sequence
    # Find the indices of the elements that are equal to state
    indices = np.where(random_squence == state)[0]
    # Find the differences between consecutive indices, this gives the length of each run or the number of steps taken to return to the start state
    differences = np.diff(indices)
    # Find the frequency of each run length
    unique, counts = np.unique(differences, return_counts=True)
    # Find the expected value of the run length (Expected value is possible values multiplied by their probabilities ( = frequency/total number of runs)))
    return np.sum(unique*counts/np.sum(counts)), random_squence

# Expected revenue for a customer in n_steps
def expected_revenue(
        states: StateSpace, 
        P: TransitionMatrix,  
        revenue:np.array, 
        n_steps:int = 12, 
    ) -> float:
    # assert that the type of revenue is np.array
    assert type(revenue) == np.ndarray, "revenue must be of type np.array"
    # assert that the dimensions of states and revenue are the same
    assert states.shape == revenue.shape, "states and revenue must have the same dimensions"
    # Find P raised to the power of n_steps
    P_n_steps = np.linalg.matrix_power(P, n_steps)
    # Find the dot product of P_n_steps and revenue
    return np.dot(P_n_steps, revenue)

def simulate_from_discrete_distribution(states:StateSpace, probability_vector:np.array) -> int:
    # assert that dimensions of states and probability_vector are the same and pass the eror message
    assert states.shape == probability_vector.shape, "states and probability_vector must have the same dimensions"
    # assert that the sum of the probability vector is 1
    assert np.sum(probability_vector) == 1, "The sum of the probability vector must be 1"
    return int(np.random.choice(states, size=1, p = np.array(probability_vector)))

def generate_random_sequence(states: StateSpace,P: TransitionMatrix, n_transitions:int = 10, start_state:int = 1) -> np.array:
    # assert that the type of states is StateSpace
    # assert type(states) == StateSpace, "states must be of type StateSpace (numpy array))"
    # assert that the type of P is TransitionMatrix
    # assert type(P) == TransitionMatrix, "P must be of type TransitionMatrix (numpy array)"
    start_state = get_initial_state(start_state) # get initial state
    validate_transition_matrix(P) # validate transition matrix
    random_transitions = [start_state] # set start state to 1
    while n_transitions >0: # while number of transitions is greater than 0
        next_state = simulate_from_discrete_distribution(states, P[start_state-1]) # simulate next state
        random_transitions.append(next_state) # append next state to random transitions
        start_state = next_state # set start state to next state
        n_transitions -= 1 # decrement number of transitions
    return np.array(random_transitions)

# write a function to compute time to return to a state
def compute_time_to_return_to_state(  
        states: StateSpace,
        P: TransitionMatrix,
        state:int,
        n_transitions:int = 1000,
        ) -> int:
    # assert that the type of state is int
    assert type(state) == int, "state must be of type int"
    random_squence = generate_random_sequence(states, P, n_transitions=n_transitions, start_state=state) # generate random sequence
    # Find the indices of the elements that are equal to state
    indices = np.where(random_squence == state)[0]
    # Find the differences between consecutive indices, this gives the length of each run or the number of steps taken to return to the start state
    differences = np.diff(indices)
    # Find the frequency of each run length
    unique, counts = np.unique(differences, return_counts=True)
    # Find the expected value of the run length (Expected value is possible values multiplied by their probabilities ( = frequency/total number of runs)))
    return np.sum(unique*counts/np.sum(counts)), random_squence

# Expected revenue for a customer in n_steps
def expected_revenue(
        states: StateSpace, 
        P: TransitionMatrix,  
        revenue:np.array, 
        n_steps:int = 12, 
    ) -> float:
    # assert that the type of revenue is np.array
    assert type(revenue) == np.ndarray, "revenue must be of type np.array"
    # assert that the dimensions of states and revenue are the same
    assert states.shape == revenue.shape, "states and revenue must have the same dimensions"
    # Find P raised to the power of n_steps
    P_n_steps = np.linalg.matrix_power(P, n_steps)
    # Find the dot product of P_n_steps and revenue
    return np.dot(P_n_steps, revenue)


In [88]:
# How to simulate from discrete distribution
# use numpy random choice to generate 1 random number with possible values between 0 and 4 and probabilities given by P[3]
P, states = np.array([[0.2, 0.3, 0.4, 0.1],[0.3,0.1,0.3,0.3],[0.0,0.,1,0],[0,0,0,1]]), np.array([1,2,3,4])
P, states = np.array([[.5,.5],[1,0]]), np.array([1,2])
P, states = np.array([[.3,.7,0,0],[.7,.3,0,0],[0,0,0,1],[0,0,1,0]]), np.array([1,2,3,4])

In [92]:
# Generate a random sequence of length 1000 starting from state 1
expectedValue, seq = compute_time_to_return_to_state(states, P, state=4, n_transitions=1000)
expectedValue

2.0

In [103]:
P, states = np.array([[1,0,0,0],[0.3,0,.5,0.2],[0,0,0.6,0.4],[0,.2,.5,.3]]), np.array([1,2,3,4])
revenue = np.array([0,3,7,10])
expected_revenue(states, P, revenue=revenue, n_steps=12)

array([0.        , 4.38094013, 6.21864939, 5.87772386])