# Markov Model class definition

This notebook defines a constructor for a Markov model class.

In [1]:
# Put all the imports at the beginning

import pandas as pd
import numpy as np

from collections import Counter

np.seterr(divide='ignore')

{'divide': 'warn', 'invalid': 'warn', 'over': 'warn', 'under': 'ignore'}

Before the Markov model class, I also need a MarkovState class. The MarkovState class will contain the information about the state following one or more transitional steps from the MM.

This class needs the following data:

1. An index (index_ls) of the states considered in the MarkovState

2. A (logged) probabilistic distribution of the current state (currentState_ar)

3. A structure containing the historical paths to each node in the index.

In [2]:
class MarkovState:
    
    def __init__(self, currentLogState_sr, paths_df):
        
        self.myCurrentLogState_sr=currentLogState_sr
        self.myPaths_df=paths_df
        
    def get_log_current_state_distribution(self):
        return self.myCurrentLogState_sr
    
    def get_current_state_distribution(self):
        return np.exp(self.myCurrentLogState_sr)
    
    def get_index(self):
        return list(self.myCurrentLogState_sr.index)
    
    def get_path_dataframe(self):
        return self.myPaths_df
        
    def most_likely_path(self, state):
        '''
        Find the most likely path to the current state,
        as found from the path history
        '''
        # Hacky, but seems to work
        path_ls=[self.myPaths_ls[-1][self.myIndex_ls.index(state)],
                 self.myIndex_ls.index(state)]

        i=len(self.myPaths_ls)-1
        
        while i>0:
            path_ls[0:0]=[self.myPaths_ls[i-1][path_ls[0]]]
            i-=1

        return [self.myIndex_ls[i] for i in path_ls]


    def most_likely_state(self, n=1):
        '''
        Return the n most likely states to have ended up in.
        '''
        return [y[1] for y in sorted([x for x in zip(self.get_current_state_distribution(), self.get_index())],
                                     reverse=True)
               ][:n]

Although it's not the only way of defining a Markov model, for the moment, I'm going to do the definition by taking arguments in the constructor that represent a distribution of transitions.

In [19]:
class MarkovModel:
    
    def __init__(self, transitions_ls, initialStates_ls=[]):
        '''
        Take a list of initial states, and a list of pairs of transitions
        between states. Create a Markov model based on the distribution of
        initial states, and distribution of transitions.
        
        If initialStates_ls is empty, assume an equal distribution over all
        the states obtained from the transitions and the extra states.
        '''

        # First, build the list of all states in the model
        self.stateIndex_ls=list({x for x in initialStates_ls}. \
                                 union({x for (x, y) in transitions_ls}). \
                                 union({y for (x, y) in transitions_ls}))
        self.stateIndex_ls.sort()  # Just for aesthetics

        # Now build a series that contains the initial states
        if initialStates_ls:
            self.initialState_sr=pd.Series(Counter(initialStates_ls),
                                           index=self.stateIndex_ls).fillna(0)
        else:
            self.initialState_sr=pd.Series(Counter(self.stateIndex_ls), 
                                           index=self.stateIndex_ls).fillna(0)
        
        # and normalise the values so the prob.s sum to 1
        self.initialState_sr=self.initialState_sr/np.sum(self.initialState_sr)

        # Now build an array that encodes the transitions
        self.transitionMatrix_df=pd.DataFrame(0,
                                              index=self.stateIndex_ls,
                                              columns=self.stateIndex_ls)
        
        for (x, y) in transitions_ls:
            self.transitionMatrix_df.loc[x][y]+=1
        for row_ix in self.transitionMatrix_df.index:
            if not all(self.transitionMatrix_df.loc[row_ix]==0):
                self.transitionMatrix_df.loc[row_ix]= \
                   self.transitionMatrix_df.loc[row_ix]/ \
                     np.sum(self.transitionMatrix_df.loc[row_ix])
                
        # Take the log of the transition matrix to make
        # calculations more accurate
        self.logTransitionMatrix_df=np.log(self.transitionMatrix_df)

        # Same for the initial states:
        self.logInitialState_sr=np.log(self.initialState_sr)


    def create_markov_state(self, statesIn_ls):
        '''
        Helper function to convert a list of states
        to a MarkovState object. Usually used as the
        first step of input to the apply method.
        '''
        
        statesDist_sr=pd.Series(Counter(statesIn_ls),
                                index=self.transitionMatrix_df.index
                               ).fillna(0)
        statesDist_sr=statesDist_sr/np.sum(statesDist_sr)
        
        return MarkovState(np.log(statesDist_sr),
                           pd.DataFrame(index=self.transitionMatrix_df.index))
        
    def apply_1(self, stateIn_ms):
        '''
        Helper function to apply: applies the transition matrix
        for self to stateIn_ms. Returns the pair of the log 
        distribution of outputs and the previous state from 
        which the next state is arrived at.
        '''
        stateDistOut_ar=np.empty(len(self.stateIndex_ls))
        previousStateOut_ls=[0]*len(self.stateIndex_ls)
        
        # For each row in the transition matrix:
        for (i, row) in enumerate(self.logTransitionMatrix_ar):

            # multiply (logged) each of the transition probabilities
            # by the probability of being in that state
            calculateTransitions_ar=stateIn_ms.get_log_current_state_distribution() + \
                                    self.logTransitionMatrix_ar.transpose()[i]
            
            # Find the index of the largest value (most probable transition) 
            argmax_i=np.argmax(calculateTransitions_ar)

            # and add that probability and the previous state
            # to the output values
            stateDistOut_ar[i]=calculateTransitions_ar[argmax_i]
            previousStateOut_ls[i]=argmax_i
            
        # return {'logdist':stateDistOut_ar, 
        #         'prevstates': previousStateOut_ls}

        return MarkovState(stateIn_ms.get_index(),
                           stateDistOut_ar,
                           stateIn_ms.get_path_list() + [previousStateOut_ls])

    def apply(self, stateIn_ms, steps=1):
        '''
        Takes an input MarkovState, and returns the output
        MarkovState following steps applications.
        
        Can also take a list of states as an alternative to
        the input MarkovState, in which case it will be 
        converted as necessary.

        Both stateIn_ar and transitionIn_ar are expressed as logs.
        
        TODO: Raise an error if indices don't match, or if a
              list is input which contains nonexistent states.
        '''
        
        # First, if the given argument is not a MarkovState,
        # generate one based on the input
        if not isinstance(stateIn_ms, MarkovState):
            stateIn_ms=self.create_markov_state(stateIn_ms)
    
        # Next, let's assume that we're only doing one step
        # at the moment
        
        stateOut_ms=stateIn_ms
        
        for i in range(steps):
            stateOut_ms=self.apply_1(stateOut_ms)
        
        return stateOut_ms
    
    def transition_weight(self, state1, state2):
        '''
        Return the weight (usually a probability, if the weight
        of all leaving arcs sum to 1) of the arc from state1
        to state2
        '''
        return self.transitionMatrix_ar[self.stateIndex_ls.index(state1)][self.stateIndex_ls.index(state2)]
    
    def get_states(self):
        '''
        Return list of states in model
        '''
        return list(self.transitionMatrix_df.index)

I'm going to put rather more test cases here now. We can always comment out the tests before exporting.

In [20]:
transitions_ls=[('a', 'a'), ('a', 'b'), ('a', 'b'), ('a', 'b'), ('a', 'b'), 
                ('a', 'c'), ('a', 'c'), ('a', 'c'), ('a', 'c'), ('a', 'c'), 
                ('b', 'a'), ('b', 'b'), ('b', 'b'), ('b', 'b'), ('b', 'b'), 
                ('b', 'b'), ('b', 'b'), ('b', 'b'), ('b', 'b'), ('b', 'c'),
                ('c', 'a'), ('c', 'a'), ('c', 'a'), ('c', 'b'), ('c', 'b'),
                ('c', 'b'), ('c', 'b'), ('c', 'c'), ('c', 'c'), ('c', 'c')]

In [21]:
test1_mm=MarkovModel(transitions_ls, ['a'])

In [22]:
test1_mm.transitionMatrix_df

Unnamed: 0,a,b,c
a,0.1,0.4,0.5
b,0.1,0.8,0.1
c,0.3,0.4,0.3


In [23]:
test1_mm.initialState_sr

a    1.0
b    0.0
c    0.0
dtype: float64

In [24]:
transitions_ls=[('a', 'a'), ('a', 'b'), ('a', 'b'), ('a', 'b'), ('a', 'b'), 
                ('a', 'c'), ('a', 'c'), ('a', 'c'), ('a', 'c'), ('a', 'c'), 
                ('c', 'a'), ('c', 'a'), ('c', 'a'), ('c', 'b'), ('c', 'b'),
                ('c', 'b'), ('c', 'b'), ('c', 'c'), ('c', 'c'), ('c', 'c')]

In [25]:
test2_mm=MarkovModel(transitions_ls)

In [26]:
test2_mm.transitionMatrix_df

Unnamed: 0,a,b,c
a,0.1,0.4,0.5
b,0.0,0.0,0.0
c,0.3,0.4,0.3


In [27]:
test2_mm.initialState_sr

a    0.333333
b    0.333333
c    0.333333
dtype: float64

In [28]:
test1_mm.create_markov_state(['b'])

<__main__.MarkovState at 0x113808320>

In [None]:
u=pd.DataFrame(index=['a', 'b', 'c'])
u

In [None]:
u['g']=pd.Series({'a':3, 'c':4}, index=u.index, dtype=np.int).fillna(0)

In [None]:
u

Now we're going to try to write some code to merge two different markov models. This isn't mathematically particularly well founded, but what the heck...

I'm also going to write a subclass of the MarkovModel class here, which will be initialised using the transition matrix etc., rather than the distribution of state transitions:

In [None]:
class MarkovModelFromArrays(MarkovModel):
    def __init__(self, transitions_ar, initialStates_ar, index_ls):
        '''
        Give the arrays directly. Note, this is not generally
        intended to be called by the user.
        '''
        # provided index should be sorted
        assert index_ls==sorted(index_ls)
        self.stateIndex_ls=index_ls

        # Store the array that contains the initial states
        self.initialState_ar=initialStates_ar
        # and take the log
        self.logInitialState_ar=np.log(self.initialState_ar)

        # Now store the array that encodes the transitions,
        self.transitionMatrix_ar=transitions_ar
        # and take the log
        self.logTransitionMatrix_ar=np.log(self.transitionMatrix_ar)

In [None]:
def merge(model1, model2, weight, normalise=True):
    '''
    Combine the transition matrices of model1 and model2 into
    an averaged model, in which model1 has weight, and model2
    has (1-weight).
    
    If normalise==True, then adjust the model so that the
    outputs from all nodes sum to 1 (or zero if no leaving
    arcs).
    '''
    assert
    
    
    mergedIndex_ls=sorted(list(set(model1.stateIndex_ls).union(set(model2.stateIndex_ls))))
    mergedTransitionMatrix_ar=np.zeros([len(mergedIndex_ls), len(mergedIndex_ls)], dtype=np.float)

    for stateFrom in model1.stateIndex_ls:
        for stateTo in model1.stateIndex_ls:
            w=weight * model1.transition_weight(stateFrom, stateTo)
            mergedTransitionMatrix_ar[mergedIndex_ls.index(stateFrom)][mergedIndex_ls.index(stateTo)]+=w
            
    for stateFrom in model2.stateIndex_ls:
        for stateTo in model2.stateIndex_ls:
            w=(1-weight) * model2.transition_weight(stateFrom, stateTo)
            mergedTransitionMatrix_ar[mergedIndex_ls.index(stateFrom)][mergedIndex_ls.index(stateTo)]+=w

    if normalise:
        for (i, row) in enumerate(mergedTransitionMatrix_ar):
            if np.sum(row)!=0:
                mergedTransitionMatrix_ar[i]=row/np.sum(row)
    
    # I should do something with the initial states... let's just combine
    # them like the transition matrices
    initialStates_ar=np.zeros(len(mergedIndex_ls))
    for (i, v) in enumerate(model1.initialState_ar):
        stateIndex_state=model1.stateIndex_ls[i]
        ms_i=mergedIndex_ls.index(stateIndex_state)
        initialStates_ar[ms_i]+=weight*v
    
    for (i, v) in enumerate(model2.initialState_ar):
        stateIndex_state=model2.stateIndex_ls[i]
        ms_i=mergedIndex_ls.index(stateIndex_state)
        initialStates_ar[ms_i]+=(1-weight)*v
    
    if normalise:
        initialStates_ar=initialStates_ar/np.sum(initialStates_ar)
    
    return MarkovModelFromArrays(mergedTransitionMatrix_ar,
                                 initialStates_ar,
                                 mergedIndex_ls)
    
    

In [None]:
a=np.zeros([3,3])
a

In [None]:
for i in range(3):
    for j in range(3):
        a[i][j]=i*0.2+j*0.3
a

In [None]:
b=np.concatenate([a, np.zeros([2, 3])], axis=0)
b

In [None]:
c=np.concatenate([b, np.zeros([5, 2])], axis=1)
c

Now want to sort so that the index is ['a', 'b', 'c', 'd', 'e'] rather than ['a', 'c', 'e', 'b', 'd']

In [None]:
c.argsort()

In [None]:
import pandas as pd

In [None]:
a=pd.DataFrame({'a':[0, 2, 1], 'e':[6, 8, 7], 'c':[3, 5, 4]},
                index=['a', 'e', 'c'])
a

In [None]:
b=pd.DataFrame({'d':[12, 11, 10], 'e':[15, 16, 14], 'a':[19, 17, 18]},
                index=['e', 'd', 'a'])
b

In [None]:
a+b

In [None]:
a.add(b, fill_value=0)

In [None]:
import pandas as pd

In [None]:
i_ls=['a', 'b', 'c', 'd']

In [None]:
df=pd.DataFrame(3, index=i_ls, columns=['g', 'h', 'i', 'j'], dtype=int)
np.log(df)

In [None]:
np.sum(df.loc['b'])

In [None]:
u=pd.Series(Counter(['b', 'b', 'd', 'd', 'd', 'e', 'e']), index=i_ls, dtype=np.int).fillna(0)

In [None]:
u

In [None]:
u=u/np.sum(u)

In [None]:
not all(u==0)