In [1]:
import numpy as np
from cvxopt import matrix, solvers,modeling
import pandas as pd
from matplotlib import pyplot as plt
from scipy import linalg
solvers.options['show_progress'] = False

In [2]:
class action:
    def __init__(self,value):
        self.value = value
    def result(self,input_pos):
        if self.value == 0: #North      
            output_pos = (input_pos[0] if input_pos[0] == 0 else input_pos[0] -1,input_pos[1])
        elif self.value == 1: #South
            output_pos = (input_pos[0] if input_pos[0] == 1 else input_pos[0] + 1,input_pos[1])
        elif self.value == 2: #East
            output_pos = (input_pos[0],input_pos[1] if input_pos[1] == 3 else input_pos[1] + 1)
        elif self.value == 3: #West
            output_pos = (input_pos[0],input_pos[1] if input_pos[1] == 0 else input_pos[1] - 1)
        else: #Stick
            output_pos = (input_pos[0],input_pos[1])
        return output_pos

In [3]:
class soccer_action_space:
    def __init__(self):
        self.n = 5
        self.actions = [action(0),action(1),action(2),action(3),action(4)]
    def sample(self):
        first_action = np.random.choice(self.actions)
        second_action = np.random.choice(self.actions) 
        return (first_action,second_action)

In [4]:
class state:
    state_id_mapping = {}
    i=0
    for a in range(2):
        for b in range(4):
            for c in range(2):
                for d in range(4):
                    for e in range(2):
                        state_id_mapping[(a,b,c,d,e)] = i
                        i += 1
    def __init__(self,player0_pos,player1_pos,ball_possession,id_mapping = state_id_mapping):
        self.player0_pos = player0_pos
        self.player1_pos = player1_pos
        self.ball_possession = ball_possession  
        self.value = (self.player0_pos[0],self.player0_pos[1],self.player1_pos[0],self.player1_pos[1],ball_possession)
        self.id = id_mapping[self.value]          

In [5]:
class soccer_observation_space:
    def __init__(self):
        self.shape = [128,5]

In [6]:
class SoccerGame:
    def __init__(self):
        self.action_space = soccer_action_space()
        self.observation_space = soccer_observation_space()
        self.reward = None
        self.done = False
    def reset(self):
        return state((0,2),(0,1),1)
    
    def get_action_output(self,first_mover_action,second_mover_action,first_mover_pos
                          ,second_mover_pos,first_mover_ball_possession):
        
        ## First Player Move from first_mover_pos to first_mover_target_pos
        first_mover_target_pos = first_mover_action.result(first_mover_pos)
        if first_mover_target_pos == second_mover_pos:
            first_mover_ball_possession = 0             
        else:
            first_mover_pos = first_mover_target_pos
            
        ## Second Player Move from second_mover_pos to second_mover_target_pos
        second_mover_target_pos = second_mover_action.result(second_mover_pos)
        if second_mover_target_pos == first_mover_pos:
            first_mover_ball_possession = 1            
        else:
            second_mover_pos = second_mover_target_pos
            
        return first_mover_pos,second_mover_pos,first_mover_ball_possession
        
    def step(self,actions,current_state):
        if np.random.rand() > 0.5: # Player A makes first move
            first_mover_action = actions[0]
            second_mover_action = actions[1]
            
            first_mover_pos = current_state.player0_pos
            second_mover_pos = current_state.player1_pos
            
            first_mover_ball_possession = 1 if current_state.ball_possession == 0 else 0
            
            first_mover_pos,second_mover_pos,first_mover_ball_possession = self.get_action_output(first_mover_action
                                                                                        ,second_mover_action
                                                                                        ,first_mover_pos
                                                                                        ,second_mover_pos
                                                                                        ,first_mover_ball_possession)
            ball_possession = 0 if first_mover_ball_possession == 1 else 1
            next_state = state(first_mover_pos,second_mover_pos,ball_possession)
        else: # Player B makes first move
            first_mover_action = actions[1]
            second_mover_action = actions[0]
            
            first_mover_pos = current_state.player1_pos
            second_mover_pos = current_state.player0_pos
            
            first_mover_ball_possession = 1 if current_state.ball_possession == 1 else 0
            
            first_mover_pos,second_mover_pos,first_mover_ball_possession = self.get_action_output(first_mover_action
                                                                                        ,second_mover_action
                                                                                        ,first_mover_pos
                                                                                        ,second_mover_pos
                                                                                        ,first_mover_ball_possession)
            ball_possession = 1 if first_mover_ball_possession == 1 else 0
            next_state = state(second_mover_pos,first_mover_pos,ball_possession)  
            
        ball_column = next_state.player0_pos[1] if next_state.ball_possession == 0 else next_state.player1_pos[1]  
        if  ball_column == 0:
            reward = 100
            done = True
        elif ball_column == 3:
            reward = -100
            done = True
        else:
            reward = 0
            done = False
        
        return next_state,reward,done,None

In [7]:
env = SoccerGame()
def lp_corr(lp_state, Player1_Q, Player2_Q):
    combined_prob = None
    Player1_Q_Action = Player1_Q[lp_state.id]
    Player2_Q_Action = Player2_Q[lp_state.id].T
    Player1_index = []
    Player2_index = []
    Player1_diff = []
    Player2_diff = []
    for i in range(5):
        Player1_diff.append(Player1_Q_Action - Player1_Q_Action[i, :])
        Player2_diff.append(Player2_Q_Action - Player2_Q_Action[i, :])
        for j in range(5):
            if i != j:
                Player1_index.append(i * 5 + j)
            Player2_index.append(j * 5 + i)
    player1_rationality = linalg.block_diag(*Player1_diff)
    player1_cons = player1_rationality[Player1_index, :]
    player2_rationality = linalg.block_diag(*Player2_diff)
    player2_cons = player2_rationality[Player1_index, :][:, Player2_index]
    G = np.append(player1_cons, player2_cons, axis=0)
    G = matrix(np.append(G, -np.identity(25), axis=0))
    c = matrix((Player1_Q[lp_state.id] + Player2_Q[lp_state.id].T).reshape(25))
    h = matrix(np.zeros(65))
    A = matrix(np.ones((1, 25)))
    b = matrix(1.0)
    solution = solvers.lp(c, G, h, A, b,solver="glpk")
    if solution['x'] is not None:
        sum_prob = sum(np.abs(solution['x']))
        raw_prob = np.abs(np.array(solution['x']).reshape((5, 5)))
        combined_prob = raw_prob / sum_prob
    return combined_prob


steps = 1e6
current_step = 0
alpha = 1
alpha_decay = 0.999995
gamma = 0.9
epsilon = 1
epsilon_decay = 0.999995
step_list = []
error_list = []
Player1_Q = np.zeros((128, 5, 5))
Player2_Q = np.zeros((128, 5, 5))
combined_Prob_List = [1 / 25] * 25
while current_step < steps:
    initial_state = env.reset()
    current_state = initial_state
    done = False
    total_rewards = 0
    while not done:
        Player1_random_action, Player2_random_action = env.action_space.sample()
        index = np.random.choice(list(range(25)), 1, p=combined_Prob_List)[0]
        greedy_action_values = np.unravel_index(index, shape=(5, 5))
        Player1_greedy_action = env.action_space.actions[greedy_action_values[0]]
        Player2_greedy_action = env.action_space.actions[greedy_action_values[1]]

        current_action = (Player1_random_action, Player2_random_action) if np.random.random() < epsilon else (
        Player1_greedy_action, Player2_greedy_action)
        next_state, reward, done, _ = env.step(current_action, current_state)
        combined_Prob = lp_corr(current_state, Player1_Q, Player2_Q)
        if combined_Prob is not None:
            Player1_V = np.sum(combined_Prob * Player1_Q[current_state.id])
            Player2_V = np.sum(combined_Prob * Player2_Q[current_state.id].T)
            combined_Prob_List = list(combined_Prob.reshape(25))

        Player1_Q_Key = (current_state.id,) + (current_action[0].value,) + (current_action[1].value,)
        Player1_Q_before = Player1_Q[Player1_Q_Key]
        Player1_Q[Player1_Q_Key] = Player1_Q[Player1_Q_Key] + alpha * (reward + gamma * Player1_V - Player1_Q[Player1_Q_Key])
        Player1_Q_after = Player1_Q[Player1_Q_Key]

        player2_reward = reward * -1
        Player2_Q_Key = (current_state.id,) + (current_action[0].value,) + (current_action[1].value,)
        Player2_Q[Player2_Q_Key] = Player2_Q[Player2_Q_Key] + alpha * (
                    player2_reward + gamma * Player2_V.T - Player2_Q[Player2_Q_Key])

        error = Player1_Q_after - Player1_Q_before
        if current_state.value == (0,2,0,1,1) and current_action[0].value == 1 and current_action[
            1].value == 4 and error > 0:
            step_list.append(current_step)
            error_list.append(abs(error))
        current_state = next_state
        current_step += 1
        if current_step % 10000 == 0:
            print(f'Step {current_step} completed.')
        epsilon = epsilon * epsilon_decay if epsilon > 0.001 else epsilon
        alpha = alpha * alpha_decay if alpha > 0.001 else alpha
output_df = pd.DataFrame({'Step': step_list, 'Error': error_list})

ValueError: domain error

In [None]:
plt.title("Correlated-Q Learning")
plt.xlabel('Simulation Iteration')
plt.ylabel('Q-value Difference')
plt.ylim(0, 0.5)
plt.plot(output_df['Step'],output_df['Error'])
plt.savefig("CorrQ.png")