In [1]:
import pickle
import numpy as np
from hmmlearn import hmm
import cvxpy as cp
import matplotlib.pyplot as plt

In [2]:
players = [f'player{i}' for i in range(1, 5)]
n_components = 3 # num of hidden states
n_features = 3 # num of observed states
Os = [0, 1, 2] # under-, avg-, over- performance
Hs = [0, 1, 2] # corresponding mental states
T = 100
learning_iterations = 1000

In [3]:
class TeamModel:
    def __init__(self, initial_dist, M, N, R, emission_prob):
        self.initial_dist = initial_dist
        
        # Transition matrices
        self.M = M # dictionary. key = player, value = a row-stochastic matrix, observation-to-state transitions
        self.N = N # dictionary. key = player, value = a row-stochastic matrix, state-to-state transitions
        
        # Graph weights - Tie strengh
        self.R = R # row-stochastic matrix
        
        # Emission matrices
        self.emission_prob = emission_prob # for each player, a row-stochastic matrix
        
        # Store latest hidden and observed states
        self.H = None
        self.O = None
    
        self.t = 0
   
    def check_input_parameters(self):
        pass
    
    def next(self):
        if self.t == 0:
            self.H = {p: np.random.choice(Hs, p=self.initial_dist) for p in players}
            self.O = {p: np.random.choice(Os, p=self.emission_prob[p][self.H[p]]) for p in players}
            self.t = 1
            
            return self.H, self.O
            
        H_new = {}
        O_new = {}
        for p in players:
            R_ = self.R[p]
            emission_ = self.emission_prob[p]
            
            # produce next hidden state
            dist = []
            for h in Hs: # calculate the probability of each next state h
                v = []
                v.append(self.N[p][self.H[p]][h]) # P(H_t^player = h | H__{t-1}^player = H[player])
                for teammate in players:
                    if (p, teammate) in M:
                        v.append(self.M[(p, teammate)][self.O[teammate]][h]) # P(H_t^player = h | O_{t-1}^teammate = O[teammate])
                    else:
                        v.append(self.M[p][self.O[teammate]][h])
                v = np.array(v)
                
                dist.append(np.dot(R_, v))
            
            h_new = np.random.choice(Hs, p=dist)
            
            # produce next observation
            o_new = np.random.choice(Os, p=emission_[h_new])
            
            # Add new values to
            H_new[p] = h_new
            O_new[p] = o_new
            
        # Update hidden and observed states
        self.H = H_new
        self.O = O_new
        
        # Update time
        self.t += 1
        
        return H_new, O_new
    
    def get_data(self, T = 1000):
        # T = number of observations (i.e. number of iterations)
        self.restart()
        data = []
        for _ in range(T):
            H, O = self.next()
            data.append((H, O))
        return data
    
    def restart(self):
        self.H = None
        self.O = None
        self.t = 0
    
    def learn_model(self, observations):        
        # Fit standard HMMs and perform Viterbi decoding to obtain hidden states
        H = dict()
        O = dict()
        print(f'Fitting HMMs...')
        for p in players:
            model = hmm.CategoricalHMM(n_components=n_components, init_params='', params='te')
            model.startprob_ = initial_dist

            X = observations[p]
            O[p] = X # save observations
            X = X.reshape(-1, 1)

            model.transmat_ = init_trans()
            model.emissionprob_ = init_emission()

            model.fit(X)
            opt_score = model.score(X)
            opt_transmat_ = model.transmat_
            opt_emissionprob_ = model.emissionprob_


            # try different initializations
            for _ in range(learning_iterations):
                model.transmat_ = init_trans()
                model.emissionprob_ = init_emission()
                model.fit(X)
                score = model.score(X)
                if score > opt_score:
                    opt_score = score
                    opt_transmat_ = model.transmat_
                    opt_emissionprob_ = model.emissionprob_


            model.transmat_ = opt_transmat_
            model.emissionprob_ = opt_emissionprob_
            
            H[p] = model.predict(X) # MAP estimation of hidden states


        M = dict() # stores transition probabilities from previous observations
        N = dict() # stores transition probabilities from previous hidden state
        E = dict() # stores emission probabilities
            
        print(f'Estimating transition matrices...')
        for player1 in players:
            for player2 in players:
                M[(player1, player2)] = np.ones((n_features, n_components)) # M[j][i] = Pr(H_t^player1 = i | O_{t-1}^player2 = j), for any player2
            N[player1] = np.ones((n_components, n_components)) # N[j][i] = Pr(H_t^player1 = i | H_{t-1}^player1 = j) 
            E[player1] = np.ones((n_components, n_features))
            
            hidden_states = H[player1]
            
            # Compute emission probabilities
            for (h, o) in list(zip(hidden_states, observations[player1])):
                E[player1][h][o] += 1
                
            E[player1] = E[player1] / E[player1].sum(axis=1, keepdims=True)
            
            # Compute N
            L = list(zip(hidden_states[1:], hidden_states))
            for (h1, h2) in L: # h1 = next state, h2 = previous state
                N[player1][h2][h1] += 1

            N[player1] = N[player1] / N[player1].sum(axis=1, keepdims=True)

            # Compute M
            for player2 in players:
                observed_states = O[player2]

                L = list(zip(hidden_states[1:], observed_states))
                for (h, o) in L: # h = next state, o = previous observation
                    M[(player1, player2)][o][h] += 1

                M[(player1, player2)] = M[(player1, player2)] / M[(player1, player2)].sum(axis=1, keepdims=True)
        
        R_optimized = self.learn_R(M, N, H, observations)
        
        return M, N, H, R_optimized, E
    
    def learn_R(self, M, N, H, O):
        # Learn R
        print(f'Learning R...')
        R = {p: cp.Variable(len(players) + 1, nonneg=True) for p in players}
        def obj(R):
            objective = 0
            for p in players:
                for t in range(1, T):
                    h_t = H[p][t] # state of player p at time t
                    v_t = [N[p][H[p][t-1]][h_t]]
                    for teammate in players:
                        v_t.append(M[(p, teammate)][O[teammate][t-1]][h_t])
                    v_t = np.array(v_t)
                    prod = R[p] @ v_t
                    objective += cp.log(prod)
            return objective

        constraints = [cp.sum(R[p]) == 1 for p in R]
        prob = cp.Problem(cp.Maximize(obj(R)), constraints)
        prob.solve()

        R_optimized = {p: R[p].value for p in players}
        return R_optimized

In [4]:
def init_trans():
    trans_mat = np.zeros((n_components, n_components))
    
    trans_mat[0][0] = np.random.uniform(0.5, 1)
    trans_mat[0][1] = 1 - trans_mat[0][0]
    trans_mat[0][2] = 0.0
    
    trans_mat[1][1] = np.random.uniform(0.5, 1)
    trans_mat[1][0] = (1 - trans_mat[1][1]) / 2
    trans_mat[1][2] = (1 - trans_mat[1][1]) / 2
    
    trans_mat[2][2] = np.random.uniform(0.5, 1)
    trans_mat[2][0] = 0.0
    trans_mat[2][1] = 1 - trans_mat[2][2]
    
    return trans_mat

def init_emission():
    emission_prob = np.zeros((n_components, n_features))
    
    emission_prob[0][0] = np.random.uniform(0.5, 1)
    emission_prob[0][1] = 1 - emission_prob[0][0]
    emission_prob[0][2] = 0.0
    
    emission_prob[1][1] = np.random.uniform(0.5, 1)
    emission_prob[1][0] = (1 - emission_prob[1][1]) / 2
    emission_prob[1][2] = (1 - emission_prob[1][1]) / 2
    
    emission_prob[2][2] = np.random.uniform(0.5, 1)
    emission_prob[2][1] = 1 - emission_prob[2][2]
    emission_prob[2][0] = 0.0
    
    return emission_prob

## Parameters
The parameters of our model are:
- M, $M^{player}[i][j] = Prob(H_t^{player} = j | O_{t-1}^{teammate} = i)$ for all teammates
- N, $N^{player}[i][j] = Prob(H_t^{player} = j | H_{t-1}^{player} = i)$
- R
- emission_prob
- initial_dist

Here we give some 'intuitive' instantiations of the above parameters. \
Combining them we can build intersting team structures.

In [5]:
# === M ===
avg_transO = np.array([[0.5, 0.3, 0.2],
                       [0.25, 0.5, 0.25],
                       [0.2, 0.3, 0.5]])

star_transO = np.array([[0.9, 0.1, 0],
                        [0.1, 0.8, 0.1],
                        [0, 0.1, 0.9]])
# === N ===
avg_transH = np.array([[0.8, 0.2, 0.0],
                       [0.1, 0.8, 0.1],
                       [0.0, 0.2, 0.8]])

# === emission_prob ===
avg_emission = np.array([[0.9, 0.1, 0.0],
                       [0.05, 0.9, 0.05],
                       [0.0, 0.1, 0.9]])

# ===  R ===
R_singleH = np.array([1] + [0] * len(players))
def R_singleHO(player):
    i = int(player[-1])
    arr = [0] * (len(players)+1)
    arr[0] = 0.6
    arr[i] = 0.4
    return np.array(arr)

def R_star(player, star):
    if player == star:
        return R_singleH
    arr = [0] * (len(players)+1)
    i = int(star[-1])
    arr[i] = 1
    return np.array(arr)
    
R_uniform = np.array([1/(1 + len(players))] * (len(players) + 1))

In [6]:
def collapsed(H):
    states = list(zip(*[H[p] for p in players]))
    return states.count(*[0 for p in players])

In [7]:
def generate_data(model, T, seed=73):
    np.random.seed(seed)
    data = []
    data = model.get_data(T)

    # Store data
    observations = {player: [] for player in players}
    true_hidden = {player: [] for player in players}
    for (h, o) in data:
        for player in players:
            true_hidden[player].append(h[player])
            observations[player].append(o[player])

    for player in players:
        observations[player] = np.array(observations[player])
        true_hidden[player] = np.array(true_hidden[player])
    return observations, true_hidden

In [8]:
def print_params(M, N, R, E):
    print(f'=== M ===')
    for p1 in players:
        for p2 in players:
            print(f'{(p1, p2)}:\n {np.round(M[(p1, p2)], 2)}')
    print(f'=== N ===')
    for p in players:
        print(f'{p}:\n {np.round(N[p], 2)}')
    print(f'=== R ===')
    for p in players:
        print(f'{p}: {np.round(R[p], 2)}')
    print(f'=== E ===')
    for p in players:
        print(f'{p}:\n {np.round(E[p], 2)}')

In [26]:
def loglikelihood(R, M, N, H, O):
    objective = 0
    for p in players:
        N_player = N[p]
        for t in range(1, T):
            h_t = H[p][t] # state of player at time t
            v_t = [N_player[H[p][t-1]][h_t]]
            for teammate in players:
                v_t.append(M[(p, teammate)][O[teammate][t-1]][h_t])
            v_t = np.array(v_t)
            prod = np.dot(R[p], v_t)
            objective += np.log(prod)
    return objective

# Learning experiments

## SingleHidden, $R_i$ = 1
Each player's state only depends on his previous hidden state.
That is, each player is modeled by a standard HMM.

In [10]:
M = {(p1, p2): avg_transO for p1 in players for p2 in players}
N = {player: avg_transH for player in players}
emission_prob = {player: avg_emission for player in players}
R = {player: R_singleH for player in players}
initial_dist = np.array([0, 1, 0])

model = TeamModel(initial_dist, M, N, R, emission_prob)

In [11]:
# Generate data
np.random.seed(42)
observations, true_hidden = generate_data(model, T)

In [12]:
M_, N_, predicted_hidden, R_, E_  = model.learn_model(observations)

Fitting HMMs...
Estimating transition matrices...
Learning R...


In [13]:
print_params(M_, N_, R_, E_)

=== M ===
('player1', 'player1'):
 [[0.56 0.4  0.04]
 [0.21 0.66 0.13]
 [0.03 0.22 0.75]]
('player1', 'player2'):
 [[0.19 0.44 0.37]
 [0.36 0.54 0.11]
 [0.19 0.41 0.41]]
('player1', 'player3'):
 [[0.21 0.47 0.32]
 [0.27 0.46 0.27]
 [0.19 0.42 0.38]]
('player1', 'player4'):
 [[0.23 0.54 0.23]
 [0.17 0.45 0.38]
 [0.31 0.43 0.26]]
('player2', 'player1'):
 [[0.36 0.36 0.28]
 [0.45 0.23 0.32]
 [0.5  0.06 0.44]]
('player2', 'player2'):
 [[0.91 0.07 0.02]
 [0.29 0.5  0.21]
 [0.03 0.14 0.84]]
('player2', 'player3'):
 [[0.21 0.29 0.5 ]
 [0.56 0.12 0.31]
 [0.54 0.23 0.23]]
('player2', 'player4'):
 [[0.54 0.23 0.23]
 [0.4  0.21 0.4 ]
 [0.48 0.19 0.33]]
('player3', 'player1'):
 [[0.2  0.6  0.2 ]
 [0.34 0.43 0.23]
 [0.39 0.33 0.28]]
('player3', 'player2'):
 [[0.19 0.47 0.35]
 [0.25 0.57 0.18]
 [0.54 0.3  0.16]]
('player3', 'player3'):
 [[0.76 0.21 0.03]
 [0.17 0.65 0.19]
 [0.04 0.35 0.62]]
('player3', 'player4'):
 [[0.38 0.46 0.15]
 [0.32 0.43 0.25]
 [0.31 0.43 0.26]]
('player4', 'player1'):
 [[0.2

## SingleHiddenObserved, $R_i$ = 0.8 and $R_{ii}$ = 0.2
Each player's state only depends on his previous hidden and observed states.
Note that each player is independent of the other players.

In [14]:
M = {(p1, p2): star_transO for p1 in players for p2 in players}
N = {player: avg_transH for player in players}
emission_prob = {player: avg_emission for player in players}
R = {player: R_singleHO(player) for player in players}
initial_dist = np.array([0, 1, 0])

model = TeamModel(initial_dist, M, N, R, emission_prob)

In [15]:
# Generate data
np.random.seed(42)
observations, true_hidden = generate_data(model, T)

In [16]:
M_, N_, predicted_hidden, R_, E_  = model.learn_model(observations)

Fitting HMMs...
Estimating transition matrices...
Learning R...


In [17]:
print_params(M_, N_, R_, E_)

=== M ===
('player1', 'player1'):
 [[0.79 0.18 0.03]
 [0.22 0.59 0.19]
 [0.03 0.19 0.78]]
('player1', 'player2'):
 [[0.31 0.38 0.31]
 [0.22 0.38 0.41]
 [0.47 0.21 0.32]]
('player1', 'player3'):
 [[0.21 0.32 0.47]
 [0.48 0.32 0.2 ]
 [0.27 0.35 0.38]]
('player1', 'player4'):
 [[0.53 0.32 0.16]
 [0.27 0.35 0.39]
 [0.32 0.3  0.38]]
('player2', 'player1'):
 [[0.44 0.18 0.38]
 [0.35 0.38 0.27]
 [0.49 0.19 0.32]]
('player2', 'player2'):
 [[0.86 0.12 0.02]
 [0.28 0.59 0.12]
 [0.03 0.09 0.88]]
('player2', 'player3'):
 [[0.34 0.29 0.37]
 [0.43 0.25 0.32]
 [0.54 0.19 0.27]]
('player2', 'player4'):
 [[0.32 0.11 0.58]
 [0.37 0.24 0.39]
 [0.55 0.32 0.12]]
('player3', 'player1'):
 [[0.06 0.74 0.21]
 [0.38 0.38 0.24]
 [0.54 0.19 0.27]]
('player3', 'player2'):
 [[0.29 0.4  0.31]
 [0.31 0.47 0.22]
 [0.41 0.41 0.18]]
('player3', 'player3'):
 [[0.76 0.21 0.03]
 [0.14 0.66 0.2 ]
 [0.04 0.35 0.62]]
('player3', 'player4'):
 [[0.32 0.58 0.11]
 [0.33 0.45 0.22]
 [0.35 0.32 0.32]]
('player4', 'player1'):
 [[0.2

## Single-star team 
Each player's state only depends on the observed state of the star of the team.
The star's state only depends on his hidden state.

In [18]:
M = {(p1, p2): star_transO for p1 in players for p2 in players}
N = {player: avg_transH for player in players}

emission_prob = {player: avg_emission for player in players}
R = {player: R_star(player, 'player1') for player in players}
initial_dist = np.array([0, 1, 0])

model = TeamModel(initial_dist, M, N, R, emission_prob)

In [19]:
# Generate data
np.random.seed(42)
observations, true_hidden = generate_data(model, T)

In [20]:
M_, N_, predicted_hidden, R_, E_  = model.learn_model(observations)

Fitting HMMs...
Estimating transition matrices...
Learning R...


In [21]:
print_params(M_, N_, R_, E_)

=== M ===
('player1', 'player1'):
 [[0.56 0.4  0.04]
 [0.21 0.66 0.13]
 [0.03 0.22 0.75]]
('player1', 'player2'):
 [[0.41 0.56 0.04]
 [0.24 0.49 0.27]
 [0.08 0.33 0.58]]
('player1', 'player3'):
 [[0.38 0.57 0.05]
 [0.26 0.5  0.24]
 [0.09 0.3  0.61]]
('player1', 'player4'):
 [[0.38 0.58 0.04]
 [0.28 0.47 0.25]
 [0.11 0.37 0.52]]
('player2', 'player1'):
 [[0.76 0.2  0.04]
 [0.23 0.7  0.06]
 [0.03 0.33 0.64]]
('player2', 'player2'):
 [[0.63 0.33 0.04]
 [0.24 0.67 0.09]
 [0.08 0.31 0.61]]
('player2', 'player3'):
 [[0.57 0.38 0.05]
 [0.28 0.54 0.19]
 [0.12 0.39 0.48]]
('player2', 'player4'):
 [[0.54 0.42 0.04]
 [0.36 0.5  0.14]
 [0.09 0.46 0.46]]
('player3', 'player1'):
 [[0.6  0.36 0.04]
 [0.11 0.83 0.06]
 [0.03 0.36 0.61]]
('player3', 'player2'):
 [[0.48 0.48 0.04]
 [0.13 0.71 0.16]
 [0.06 0.44 0.5 ]]
('player3', 'player3'):
 [[0.52 0.43 0.05]
 [0.13 0.74 0.13]
 [0.09 0.36 0.55]]
('player3', 'player4'):
 [[0.46 0.5  0.04]
 [0.17 0.72 0.11]
 [0.07 0.48 0.46]]
('player4', 'player1'):
 [[0.8

## Balanced team
Each player's state depends equaly on the observed state of all players (including himself) and his hidden state.

In [22]:
M = {(p1, p2): avg_transO for p1 in players for p2 in players}
N = {player: avg_transH for player in players}
emission_prob = {player: avg_emission for player in players}
R = {player: R_uniform for player in players}
initial_dist = np.array([0, 1, 0])

model = TeamModel(initial_dist, M, N, R, emission_prob)

In [23]:
# Generate data
np.random.seed(42)
observations, true_hidden = generate_data(model, T)

In [24]:
M_, N_, predicted_hidden, R_, E_  = model.learn_model(observations)

Fitting HMMs...
Estimating transition matrices...
Learning R...


In [25]:
print_params(M_, N_, R_, E_)

=== M ===
('player1', 'player1'):
 [[0.64 0.12 0.24]
 [0.55 0.21 0.24]
 [0.03 0.52 0.45]]
('player1', 'player2'):
 [[0.52 0.25 0.22]
 [0.43 0.33 0.24]
 [0.23 0.23 0.54]]
('player1', 'player3'):
 [[0.46 0.23 0.31]
 [0.45 0.24 0.31]
 [0.29 0.42 0.29]]
('player1', 'player4'):
 [[0.42 0.25 0.33]
 [0.42 0.27 0.3 ]
 [0.41 0.31 0.28]]
('player2', 'player1'):
 [[0.39 0.52 0.09]
 [0.33 0.5  0.17]
 [0.33 0.55 0.12]]
('player2', 'player2'):
 [[0.6  0.35 0.05]
 [0.29 0.67 0.05]
 [0.08 0.54 0.38]]
('player2', 'player3'):
 [[0.31 0.66 0.03]
 [0.43 0.41 0.16]
 [0.25 0.54 0.21]]
('player2', 'player4'):
 [[0.5  0.42 0.08]
 [0.24 0.73 0.03]
 [0.31 0.44 0.26]]
('player3', 'player1'):
 [[0.3  0.67 0.03]
 [0.31 0.67 0.02]
 [0.36 0.61 0.03]]
('player3', 'player2'):
 [[0.35 0.62 0.02]
 [0.36 0.62 0.02]
 [0.23 0.73 0.04]]
('player3', 'player3'):
 [[0.51 0.46 0.03]
 [0.27 0.71 0.02]
 [0.17 0.79 0.04]]
('player3', 'player4'):
 [[0.39 0.58 0.03]
 [0.33 0.64 0.03]
 [0.26 0.72 0.03]]
('player4', 'player1'):
 [[0.1