In [None]:
from traci_backend import TraciBackend
import random
import matplotlib.pyplot as plt

In [None]:
class Agent():
    def __init__(self, discount, alpha, epsilon):
        self.tb = TraciBackend()
        self.discount = discount
        self.alpha = alpha
        self.epsilon = epsilon
        self.weights1 = [1] * 4 # for action1
        self.weights2 = [1] * 4 # for action2
        
    def getStateRewardCum(self, edges, num_iter):
        '''
        Fetch data from traci backend,
        return the state and reward
        The reward is calculated using data from accumulative step.
        '''
        tb = self.tb
        reward = 0
        evaluation = 0
        for _ in range(num_iter):
            tb.simulate_step()
            evaluation += self.getEvaluationUnit()
            state = [0] * 4
            for n, edge in enumerate(edges):
                line_length, total_vehicle_num = \
                    tb.get_halt_vehicle_cnt_edge(edge), tb.get_vehicle_cnt_edge(edge)
                pass_vehicle_num = total_vehicle_num - line_length
                if n < 2:
                    state[0] += line_length
                    state[1] += pass_vehicle_num
                else:
                    state[2] += line_length
                    state[3] += pass_vehicle_num
                reward += pass_vehicle_num - line_length
        return state, reward, tb.is_end(), evaluation
        
    def getStateReward(self, edges):
        '''
        Fetch data from traci backend,
        return the state and reward of this step
        The reward is calculated using data from a specific step,
        not accumulated.
        '''
        tb = self.tb
        state, reward = [0] * 4, 0
        for n, edge in enumerate(edges):
            line_length, total_vehicle_num = tb.get_halt_vehicle_cnt_edge(edge), tb.get_vehicle_cnt_edge(edge)
            
            pass_vehicle_num = total_vehicle_num - line_length
            if n < 2:
                state[0] += line_length
                state[1] += pass_vehicle_num
            else:
                state[2] += line_length
                state[3] += pass_vehicle_num
            reward += pass_vehicle_num - line_length
        return state, reward, tb.is_end()
    
    def getQ(self, state):
        '''
        Use the function approximator, give out the approximated value
        '''
        q1, q2 = 0, 0
        for i in range(4):
            q1 += state[i] * self.weights1[i]
            q2 += state[i] * self.weights2[i]
        return q1, q2
        
    def eGreedy(self, q):
        '''
        Return the action selection 0 or 1 according to q-value
        '''
        q1, q2 = q[0], q[1]
        rand = random.random()
        if q1 == q2:
            return 0 if rand < 0.5 else 1
        if rand < self.epsilon and q1 < q2 or rand > self.epsilon and q1 > q2:
            return 0
        else:
            return 1
        
        
    def updateWeight(self, curr_state, next_state, reward, action):
        '''
        Use approximate Q-learning, update weight in the approximator
        '''
        q_curr = self.getQ(curr_state)[action] # current approximated q value
        q_next_act = self.getQ(next_state) # next approximated q value for both action
        next_act = self.eGreedy(q_next_act) # get the action by epsilon greedy
        q_next = max(q_next_act)
        delta = reward + self.discount * q_next - q_curr
        # only update one weights array, either for action1 or action2
        if action:
            self.weights2 = [self.weights2[i] + self.alpha * delta * curr_state[i] for i in range(4)]
        else:
            self.weights1 = [self.weights1[i] + self.alpha * delta * curr_state[i] for i in range(4)]
        return next_act, action ^ next_act
    
    def executeAction(self, action, change):
        '''
        Adjust the lights in sumo given the action 0 or 1
        '''
        tb = self.tb
        evaluation = 0
        if not change and action:
            tb.set_light_phase('0', 4)
            return evaluation
        elif not change and not action:
            tb.set_light_phase('0', 0)
            return evaluation
        elif change and action:
            tb.set_light_phase('0', 1)
            for _ in range(3):
                tb.simulate_step()
                evaluation += self.getEvaluationUnit()
            tb.set_light_phase('0', 2)
            for _ in range(5):
                tb.simulate_step()
                evaluation += self.getEvaluationUnit()
            tb.set_light_phase('0', 3)
            for _ in range(3):
                tb.simulate_step()
                evaluation += self.getEvaluationUnit()
            tb.set_light_phase('0', 4)
        else:
            tb.set_light_phase('0', 5)
            for _ in range(3):
                tb.simulate_step()
                evaluation += self.getEvaluationUnit()
            tb.set_light_phase('0', 6)
            for _ in range(5):
                tb.simulate_step()
                evaluation += self.getEvaluationUnit()
            tb.set_light_phase('0', 7)
            for _ in range(3):
                tb.simulate_step()
                evaluation += self.getEvaluationUnit()
            tb.set_light_phase('0', 0)
        return evaluation
        
    def getEvaluationUnit(self):
        tb = self.tb
        evaluation = 0
        for edge in ['1si', '2si', '3si', '4si']:
            evaluation += tb.get_halt_vehicle_cnt_edge(edge)
        return evaluation
    
    def train(self, step_size):
        '''
        Train the agent with num_iter, observe the reward from traci backend every step_size period.
        step_size is the time before observing the state and reward, should be smaller than 31.
        '''
        tb = self.tb
        tb.start()
        prev_state = [0] * 4
        next_act, change = 0, True
        weights_history = [[], []]
        isEnd = False
        evaluation = 0
        while not isEnd:
            evaluation += self.executeAction(next_act, change)
            for _ in range(step_size):
                tb.simulate_step()
                evaluation += self.getEvaluationUnit()
            state, reward, isEnd = self.getStateReward(['1si', '2si', '3si', '4si'])
            weights_history[0].append(self.weights1)
            weights_history[1].append(self.weights2)
            next_act, change = self.updateWeight(prev_state, state, reward, next_act)
            prev_state = state
        tb.close()
        return weights_history, evaluation
    
    def train2(self, step_size):
        '''
        Train the agent with num_iter, observe the reward from traci backend every step_size period.
        step_size is the time before observing the state and reward
        '''
        tb = self.tb
        tb.start()
        prev_state = [0] * 4
        next_act, change = 0, True
        weights_history = [[], []]
        isEnd = False
        evaluation = 0
        while not isEnd:
            evaluation += self.executeAction(next_act, change)
                
            state, reward, isEnd, evaluationUnit = self.getStateRewardCum(['1si', '2si', '3si', '4si'], step_size)
            weights_history[0].append(self.weights1)
            weights_history[1].append(self.weights2)
            evaluation += evaluationUnit
            next_act, change = self.updateWeight(prev_state, state, reward, next_act)
            prev_state = state
            
        tb.close()
        return weights_history, evaluation
    
        

class StaticAgent:
    
    def __init__(self):
        self.tb = TraciBackend()
    def train(self):
        evaluation, step = 0, 0
        tb = self.tb
        tb.start()
        while not tb.is_end():
            tb.simulate_step()
            step += 1
            for edge in ['1si', '2si', '3si', '4si']:
                evaluation += tb.get_halt_vehicle_cnt_edge(edge)
        tb.close()
        return evaluation
            

class LongestQueueFirstAgent:
    
    def __init__(self):
        self.tb = TraciBackend()
    
    def getEvaluationUnit(self):
        tb = self.tb
        evaluation = 0
        for edge in ['1si', '2si', '3si', '4si']:
            evaluation += tb.get_halt_vehicle_cnt_edge(edge)
        return evaluation
    
    def executeAction(self, action, change):
        '''
        Adjust the lights in sumo given the action 0 or 1
        '''
        tb = self.tb
        evaluation = 0
        if not change and action:
            tb.set_light_phase('0', 4)
            return evaluation
        elif not change and not action:
            tb.set_light_phase('0', 0)
            return evaluation
        elif change and action:
            tb.set_light_phase('0', 1)
            for _ in range(3):
                tb.simulate_step()
                evaluation += self.getEvaluationUnit()
            tb.set_light_phase('0', 2)
            for _ in range(5):
                tb.simulate_step()
                evaluation += self.getEvaluationUnit()
            tb.set_light_phase('0', 3)
            for _ in range(3):
                tb.simulate_step()
                evaluation += self.getEvaluationUnit()
            tb.set_light_phase('0', 4)
        else:
            tb.set_light_phase('0', 5)
            for _ in range(3):
                tb.simulate_step()
                evaluation += self.getEvaluationUnit()
            tb.set_light_phase('0', 6)
            for _ in range(5):
                tb.simulate_step()
                evaluation += self.getEvaluationUnit()
            tb.set_light_phase('0', 7)
            for _ in range(3):
                tb.simulate_step()
                evaluation += self.getEvaluationUnit()
            tb.set_light_phase('0', 0)
        return evaluation
    
    def train(self, step_size):
        evaluation, isEnd, action, change, step = 0, False, 0, False, 0
        tb = self.tb
        tb.start()
        while not tb.is_end():
            step += 1
            evaluation += self.executeAction(action, change)
            for i in range(step_size): 
                tb.simulate_step()
                total_vehicle_num, halting_nums = 0, [0, 0]
                for edge in ['1si', '2si', '3si', '4si']:
                    halting_nums[edge == '3si' or edge == '4si'] += tb.get_halt_vehicle_cnt_edge(edge)
                    total_vehicle_num += tb.get_vehicle_cnt_edge(edge)
                evaluation += sum(halting_nums)
            next_act = 1 if halting_nums[0] > halting_nums[1] else 0
            change = action ^ next_act
            action = next_act
        tb.close()
        return evaluation

In [None]:
def test_Agent1(gamma, alpha, e):   
    simulateAgent = Agent(gamma, alpha, e)
    evaluation = []
    for _ in range(100):
        _, evaluateUnit = simulateAgent.train(20)
        evaluation.append(evaluateUnit)
    return evaluation

def test_Agent2(gamma, alpha, e):
    simulateAgent = Agent(gamma, alpha, e)
    evaluation = []
    for _ in range(100):
        _, evaluateUnit = simulateAgent.train2(20)
        evaluation.append(evaluateUnit)
    return evaluation

def test_StaticAgent():
    simulateAgent = StaticAgent()
    evaluation = []
    for _ in range(100):
        evaluateUnit = simulateAgent.train()
        evaluation.append(evaluateUnit)
    return evaluation

def test_LQFAgent():
    simulateAgent = LongestQueueFirstAgent()
    evaluation = []
    for _ in range(100):
        evaluateUnit = simulateAgent.train(20)
        evaluation.append(evaluateUnit)
    return evaluation

In [None]:
evaluation1 = test_Agent1(0.9, 0.001, 0.05)
evaluation2 = test_Agent2(0.2, 0.001, 0.05)
evaluation3 = test_StaticAgent()
evaluation4 = test_LQFAgent()

In [None]:
evaluation6_0 = test_Agent1(0.9, 0.001, 0.05)
evaluation7 = test_Agent1(0.8, 0.001, 0.05)
evaluation8 = test_Agent1(0.8, 0.0003, 0.05)
evaluation9 = test_Agent1(0.3, 0.001, 0.05)
evaluation6_1 = test_Agent1(0.9, 0.001, 0.05)
evaluation6_2 = test_Agent1(0.9, 0.001, 0.05)
evaluation6_3 = test_Agent1(0.9, 0.001, 0.05)

In [None]:
plt.plot(evaluation6_0)
plt.show()
plt.plot(evaluation6_1)
plt.show()
plt.plot(evaluation6_2)
plt.show()
plt.plot(evaluation6_3)
plt.show()
plt.plot(evaluation7)
plt.show()
plt.plot(evaluation8)
plt.show()
plt.plot(evaluation9)
plt.show()

In [None]:
text_best = ','.join([str(num) for num in evaluation8])
text_ok = ','.join([str(num) for num in evaluation6_2])
with open("QLearning_best.txt", "w") as f:
    f.write(text_best)
with open("QLearning_ok.txt", "w") as f:
    f.write(text_ok)

In [None]:
text_static = ','.join([str(num) for num in evaluation3])
with open("Static.txt", "w") as f:
    f.write(text_static)

In [None]:
plt.plot(evaluation1)
plt.show()
plt.plot(evaluation2)
plt.show()
plt.plot(evaluation3)
plt.show()
plt.plot(evaluation4)
plt.show()