<a href="https://colab.research.google.com/github/inforeqd512/QLearning/blob/main/Tribe_Q_learning_v9.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [414]:
import numpy as np
import copy #for deepcopy()
# importing functools for reduce()
import functools

import unittest
!python -V

Python 3.7.11


# Business Rules

In [415]:
class Business_rules:
  def __init__(self):
    self.target_dr_per_lead = 4
    self.target_max_reassign = 4
    self.target_max_promote = 4
    self.target_max_hire = 4
    self.actions = ['reassign', 'promote_cascade',  'promote_leaders', 'hire']

    #for simulations
    self.ceil_total_people_in_grade = {"3" : 4, "4.1" : 10, "4.2" : 16, "4.3" : 64, "4.4" : 64, "5.1" : 26, "5.2": 16, "MS":100}

    return

# Chapter

In [416]:
class Allocated_leads:
  def __init__(self):
    self.allocated_leaders_gradewise = {"3":0, "4.1":0, "4.2":0}
    return

  def total_leaders_leading(self):
    total = functools.reduce(lambda x, value:x + value, self.allocated_leaders_gradewise.values(), 0)
    return total 

  def allocate_leaders(self, allocated_leaders_gradewise = {"3":0, "4.1":0, "4.2":0}):
    for grade in allocated_leaders_gradewise:
      self.allocated_leaders_gradewise[grade] += allocated_leaders_gradewise[grade]
    return

  def promote_leaders(self, higher_grade, lower_grade, target):
    if higher_grade in self.allocated_leaders_gradewise and \
      lower_grade in self.allocated_leaders_gradewise:
      num_in_lower_grade = self.allocated_leaders_gradewise[lower_grade]
      actually_promote = target
      if num_in_lower_grade < target:
        actually_promote = num_in_lower_grade
      self.allocated_leaders_gradewise[higher_grade] += actually_promote
      self.allocated_leaders_gradewise[lower_grade] -= actually_promote
    return

In [417]:
class Successor:
  def __init__(self, 
               people_remaining_gradewise = {"3":0, "4.1":2, "4.2":0, "4.3":0, "4.4":0, "5.1":0, "5.2":0, "MS":0},
               leader_successor_grades = {"3":"4.1", "4.1":"4.2", "4.2":"4.3"}
               ):
    self.people_remaining_gradewise = people_remaining_gradewise
    self.leader_successor_grades = leader_successor_grades
    self.engineer_successor_grades = {"4.3":"4.4", "4.4":"5.1", "5.1":"5.2"}
    return

  def successor_ratio_for_grade(self, kv_tuple):
    k = kv_tuple[0]
    v = kv_tuple[1]
    total_successors = self.people_remaining_gradewise[v]
    total_leaders = self.people_remaining_gradewise[k]
    ratio = 0
    if total_leaders <= 0:
      total_leaders = 1
    ratio = total_successors / total_leaders
    return ratio

  def dict_successor_per_lead(self):
    d = dict(map(lambda kv: (kv[0], self.successor_ratio_for_grade(kv)) , self.leader_successor_grades.items()))
    return d  


  def total_successors_for_lead_grades(self):
    d = self.dict_successor_per_lead()
    total = functools.reduce(lambda x, value: x + value, d.values(), 0)
    return total


  def dict_successor_per_engineer(self):
    d = dict(map(lambda kv: (kv[0], self.successor_ratio_for_grade(kv)) , self.engineer_successor_grades.items()))
    return d  


  def total_successors_for_engineer_grades(self):
    d = self.dict_successor_per_engineer()
    total = functools.reduce(lambda x, value: x + value, d.values(), 0)
    return total


  def diff_successor_ratio_for_leader_engineer_grades(self):
    l = self.total_successors_for_lead_grades()
    e = self.total_successors_for_engineer_grades()
    diff = l - e
    return diff

  def is_num_people_immediate_promotable_to_higher_grade_lt_those_to_lower_grade(self):
    promotable_to_higher_grade = self.total_successors_for_lead_grades()
    promotable_to_lower_grade = self.total_successors_for_engineer_grades()
    diff = promotable_to_higher_grade - promotable_to_lower_grade
    rule = diff > 0
    return rule  

  def is_num_immediate_promotable_to_leader_grades_gt_zero(self):
    rule = self.total_successors_for_lead_grades() > 0
    return rule  

In [418]:
class DR_per_lead:
  def __init__(self, allocated_leads, business_rules, people_remaining_gradewise = {"3":0, "4.1":2, "4.2":0, "4.3":0, "4.4":0, "5.1":0, "5.2":0, "MS":0}):
    self.people_remaining_gradewise = people_remaining_gradewise
    self.allocated_leads = allocated_leads
    return

  def total_people_remaining(self):
    total = functools.reduce(lambda x, value:x + value, self.people_remaining_gradewise.values(), 0)
    return total

  def dr_per_lead(self):
    total = self.allocated_leads.total_leaders_leading()
    ratio = 999 #some infinitely big number
    if total > 0:
      ratio = self.total_people_remaining() / total
    return ratio

  def is_overall_dr_per_lead_gt_target(self):
    rule = self.dr_per_lead() > self.business_rules.target_dr_per_lead
    return rule

In [419]:
class DR_per_lead_for_this_leader_grade_wrt_higher_grades:
  def __init__(self, allocated_leads, business_rules, ranked_grades=["3","4.1","4.2"]):
    self.allocated_leads = allocated_leads
    self.ranked_grades = ranked_grades
    self.business_rules = business_rules
    return

  def dr_per_lead_for_this_grade_wrt_higher_grades(self, grade):
    total_leading_in_grade = self.allocated_leads.allocated_leaders_gradewise[grade]
    total_leading_in_higher_grades = 0
    for local_grade in self.ranked_grades:
      if local_grade == grade:
        break
      total_leading_in_higher_grades += self.allocated_leads.allocated_leaders_gradewise[local_grade]

    dr_per_lead_higher_grade = 0
    if total_leading_in_higher_grades > 0:
      dr_per_lead_higher_grade = total_leading_in_grade / total_leading_in_higher_grades

    return dr_per_lead_higher_grade

  def dict_dr_per_lead_for_this_grade_wrt_higher_grade(self):
    dict_dr = {}
    for grade in self.ranked_grades:
      dr_per_lead_higher_grade = self.dr_per_lead_for_this_grade_wrt_higher_grades(grade)
      dict_dr[grade] = dr_per_lead_higher_grade
    return dict_dr

  def max_dr_per_lead_in_leader_grades(self):
    vals = self.dict_dr_per_lead_for_this_grade_wrt_higher_grade().values()
    max_val = max(vals)
    if max_val > 10:
      print("max_dr_per_lead_in_leader_grades :", max_val)
    return max_val

  def is_max_dr_per_lead_in_leader_grades_gt_target(self):
    rule = False
    second_highest_grade = self.ranked_grades[1]
    max_val = self.max_dr_per_lead_in_leader_grades()
    if max_val > self.business_rules.target_dr_per_lead:
       if max_val == self.dict_dr_per_lead_for_this_grade_wrt_higher_grade()[second_highest_grade]:
         #no more promotions possible
         rule = False
       else:
         rule = True
    return rule

In [420]:
class Chapter:
  def __init__(self, people_gradewise = {"3":0, "4.1":2, "4.2":0, "4.3":0, "4.4":0, "5.1":0, "5.2":0, "MS":0}):
    self.people_gradewise = people_gradewise
    self.allocated_leads = Allocated_leads()
    self.business_rules = Business_rules()
    self.people_remaining_gradewise = copy.copy(self.people_gradewise)
    self.leader_successor_grades = {"3":"4.1", "4.1":"4.2", "4.2":"4.3"}
    self.ranked_leader_grades = list(self.leader_successor_grades.keys())
    self.ranked_perm_promotable_grades = list(self.people_gradewise.keys())[:-1]
    self.successor = Successor(people_remaining_gradewise = self.people_remaining_gradewise, 
                   leader_successor_grades = self.leader_successor_grades)
    self.dr_per_lead = DR_per_lead(people_remaining_gradewise = self.people_remaining_gradewise,
                                   business_rules = self.business_rules,
                                   allocated_leads = self.allocated_leads)
    self.dr_per_lead_for_this_leader_grade_wrt_higher_grades = DR_per_lead_for_this_leader_grade_wrt_higher_grades(allocated_leads = self.allocated_leads,
                                                                                                                   business_rules = self.business_rules)
    return
  
  def allocate_leaders(self, allocated_leaders_gradewise = {"3":0, "4.1":0, "4.2":0}):
    self.allocated_leads.allocate_leaders(allocated_leaders_gradewise=allocated_leaders_gradewise)
    for grade in allocated_leaders_gradewise:
      self.people_remaining_gradewise[grade] -= allocated_leaders_gradewise[grade]
    self.update_chapter_state()

  def update_chapter_state(self):
    self.successor = Successor(people_remaining_gradewise = self.people_remaining_gradewise, 
                   leader_successor_grades = self.leader_successor_grades)
    self.dr_per_lead = DR_per_lead(people_remaining_gradewise = self.people_remaining_gradewise,
                                  business_rules = self.business_rules,
                                  allocated_leads = self.allocated_leads)
    self.dr_per_lead_for_this_leader_grade_wrt_higher_grades = DR_per_lead_for_this_leader_grade_wrt_higher_grades(allocated_leads = self.allocated_leads,
                                                                                                                business_rules = self.business_rules)

  def total_people(self):
    total = functools.reduce(lambda x, value:x + value, self.people_gradewise.values(), 0)
    return total
    
  #Hire section :-
  def hire(self):
    idx = len(self.ranked_leader_grades) - 1 #always hire the lowest leader grades
    grade = self.ranked_leader_grades[idx]
    if grade in self.people_remaining_gradewise:
      self.people_remaining_gradewise[grade] += self.business_rules.target_max_hire

    self.update_chapter_state()

  #Promote section :-
  def promote_cascade(self):
    if self.successor.is_num_immediate_promotable_to_leader_grades_gt_zero():
      n = len(self.ranked_perm_promotable_grades)-1 #ignore grade for the most junior
      for i in range(1, n): #ignore grade for top boss
        grade = self.ranked_perm_promotable_grades[i]
        lower_grade = self.ranked_perm_promotable_grades[i+1]
        num_in_grade = self.people_remaining_gradewise[grade]
        num_in_lower_grade = self.people_remaining_gradewise[lower_grade]
        target_num_to_promote = self.business_rules.target_max_promote
        num_to_promote = target_num_to_promote
        if num_in_lower_grade < target_num_to_promote:
          num_to_promote = num_in_lower_grade

        self.people_remaining_gradewise[grade] += num_to_promote
        self.people_remaining_gradewise[lower_grade] -= num_to_promote

      self.update_chapter_state()


  def promote_leaders(self):
    if self.dr_per_lead_for_this_leader_grade_wrt_higher_grades.is_max_dr_per_lead_in_leader_grades_gt_target():
      n = len(self.ranked_leader_grades)-1 #ignore grade for the most junior
      for i in range(1, n): #ignore grade for top boss
        grade = self.ranked_perm_promotable_grades[i]
        lower_grade = self.ranked_perm_promotable_grades[i+1]
        self.allocated_leads.promote_leaders("4.1", "4.2", self.business_rules.target_max_promote)

      self.update_chapter_state()
    

  #Reassign section :-
  def reassign_leaders(self):
    d = self.dict_leaders_that_can_be_reassigned()
    self.allocate_leaders(allocated_leaders_gradewise = d)
    return

  def dict_leaders_that_can_be_reassigned(self):
    d = dict(map(lambda k: (k, self.people_remaining_gradewise[k]) , self.ranked_leader_grades))
    return d

  def total_leaders_that_can_be_reassigned(self):
    total = functools.reduce(lambda x, k:x + self.people_remaining_gradewise[k], self.ranked_leader_grades, 0)
    return total


  #State producer :-
  def is_overall_dr_per_lead_gt_target(self):
    rule = self.dr_per_lead.dr_per_lead() > self.business_rules.target_dr_per_lead
    return rule

  def is_num_reassignable_leaders_gt_zero(self):
    rule = self.total_leaders_that_can_be_reassigned() > 0
    return rule

  def produce_state(self):
    state = State()
    state.is_overall_dr_per_lead_gt_target = self.is_overall_dr_per_lead_gt_target()
    state.is_num_reassignable_leaders_gt_zero = self.is_num_reassignable_leaders_gt_zero()
    state.is_num_immediate_promotable_to_leader_grades_gt_zero = self.successor.is_num_immediate_promotable_to_leader_grades_gt_zero()
    state.is_num_people_immediate_promotable_to_higher_grade_lt_those_to_lower_grade = self.successor.is_num_people_immediate_promotable_to_higher_grade_lt_those_to_lower_grade()
    state.is_max_dr_per_lead_in_leader_grades_gt_target = False #self.dr_per_lead_for_this_leader_grade_wrt_higher_grades.is_max_dr_per_lead_in_leader_grades_gt_target()
    return state

  def favorability(self):
    target_dr_per_lead = self.business_rules.target_dr_per_pead
    dr_per_lead = abs(target_dr_per_lead - self.dr_per_lead.dr_per_lead())
    punishment = -dr_per_lead

  def print_info(self):
    print(self.people_remaining_gradewise)
    print(self.allocated_leads.allocated_leaders_gradewise)

# State

In [421]:
class State:

  def __init__(self, is_overall_dr_per_lead_gt_target = False, 
                is_num_reassignable_leaders_gt_zero = False,
                is_num_immediate_promotable_to_leader_grades_gt_zero = False,
                is_num_people_immediate_promotable_to_higher_grade_lt_those_to_lower_grade = False,
                is_max_dr_per_lead_in_leader_grades_gt_target = False):
    self.is_overall_dr_per_lead_gt_target = is_overall_dr_per_lead_gt_target
    self.is_num_reassignable_leaders_gt_zero = is_num_reassignable_leaders_gt_zero
    self.is_num_immediate_promotable_to_leader_grades_gt_zero = is_num_immediate_promotable_to_leader_grades_gt_zero 
    self.is_num_people_immediate_promotable_to_higher_grade_lt_those_to_lower_grade = is_num_people_immediate_promotable_to_higher_grade_lt_those_to_lower_grade
    self.is_max_dr_per_lead_in_leader_grades_gt_target = is_max_dr_per_lead_in_leader_grades_gt_target

  def state_copy(self):
    new_state = copy.deepcopy(self)
    return new_state

  def state_tuple(self):
    return ((self.is_overall_dr_per_lead_gt_target, 
                        self.is_num_reassignable_leaders_gt_zero, 
                        self.is_num_immediate_promotable_to_leader_grades_gt_zero, 
                        self.is_num_people_immediate_promotable_to_higher_grade_lt_those_to_lower_grade,
                        self.is_max_dr_per_lead_in_leader_grades_gt_target))

  def state_hash(self):
    hash_value = hash((self.is_overall_dr_per_lead_gt_target, 
                        self.is_num_reassignable_leaders_gt_zero, 
                        self.is_num_immediate_promotable_to_leader_grades_gt_zero, 
                        self.is_num_people_immediate_promotable_to_higher_grade_lt_those_to_lower_grade,
                        self.is_max_dr_per_lead_in_leader_grades_gt_target))
    return hash_value



# Action

In [422]:
class Action:

  def __init__(self, action_string):
    self.action = action_string
    self.action_hash = hash(action_string)
    return

  def print_info(self):
    print(self.action, self.action_hash)
    return

  @classmethod
  def action_from_string(self, action_string):
    if action_string == "reassign":
      return Reassign()
    elif action_string == "promote_cascade":
      return Promote_cascade()
    elif action_string == "promote_leaders":
      return Promote_leaders()
    elif action_string == "hire":
      return Hire()

In [423]:
class Reassign(Action):

  def __init__(self):
    super().__init__("reassign")
    return

  def run(self, state, chapter):
    chapter.reassign_leaders()
    new_state = chapter.produce_state()
    return new_state

In [424]:
class Promote_cascade(Action):

  def __init__(self):
    super().__init__("promote_cascade")
    return

  def run(self, state,  chapter):
    chapter.promote_cascade()
    new_state = chapter.produce_state()
    return new_state

In [425]:
class Promote_leaders(Action):

  def __init__(self):
    super().__init__("promote_leaders")
    return

  def run(self, state, chapter):
    chapter.promote_leaders()
    new_state = chapter.produce_state()
    return new_state

In [426]:
class Promote_junior_grades:

  def __init__(self):
    super().__init__("promote_junior_grades")
    return

  def run(self, state):
    new_state = state.state_copy()
    return new_state

In [427]:
class Hire(Action):

  def __init__(self):
    super().__init__("hire")
    return

  def run(self, state, chapter):
    chapter.hire()
    new_state = chapter.produce_state()
    return new_state

In [428]:
class Hire_junior_grades:

  def __init__(self):
    super().__init__("hire_junior_grades")
    return

  def run(self, state):
    new_state = state.state_copy()
    return new_state

# Reward

In [429]:
class Reward:

  environment_actions = [
      "reassign",
      "promote_cascade",
      "promote_leaders",
      "promote_junior_grades",
      "hire",
      "hire_junior_grades"
  ]

  #this is per my dry run, the actions that will provide the most reward in this state
  environment_state_action_reward = {
      (True, True, False, True, False)    : {"reassign":1},
      (True, True, False, False, False)   : {"reassign":1},

      (True, True, True, True, False)     : {"reassign":1},
      (True, True, True, False, False)    : {"reassign":1},

      (True, False, False, True, False)   : {"promote_cascade": 1}, #, "promote_junior_grades": 1, "hire_junior_grades": 1},
      (True, False, False, False, False)  : {"promote_cascade": 1}, #, "promote_junior_grades": 1, "hire_junior_grades": 1},

      (True, False, True, True, False)    : {"promote_cascade" : 1}, #, "promote_junior_grades" : 1},
      (True, False, True, False, False)   : {"promote_cascade" : 1}, #, "promote_junior_grades" : 1},

      (False, True, False, True, False)   : {"success" : 100},
      (False, True, False, False, False)  : {"success" : 100},

      (False, True, True, True, False)    : {"success" : 100},
      (False, True, True, False, False)   : {"success" : 100},

      (False, False, False, True, False)  : {"success" : 100},
      (False, False, False, False, False) : {"success" : 100},

      (False, False, True, True, False)    : {"success" : 100},
      (False, False, True, False, False)   : {"success" : 100},

      (True, True, False, True, True)    : {"reassign":1},
      (True, True, False, False, True)   : {"reassign":1},

      (True, True, True, True, True)     : {"reassign":1},
      (True, True, True, False, True)    : {"reassign":1},

      (True, False, False, True, True)   : {"promote_cascade": 1}, #, "promote_junior_grades": 1, "hire_junior_grades": 1},
      (True, False, False, False, True)  : {"promote_cascade": 1}, #, "promote_junior_grades": 1, "hire_junior_grades": 1},

      (True, False, True, True, True)    : {"promote_cascade" : 1}, #, "promote_junior_grades" : 1},
      (True, False, True, False, True)   : {"promote_cascade" : 1}, #, "promote_junior_grades" : 1},

      (False, True, False, True, True)   : {"promote_leaders": 1},
      (False, True, False, False, True)  : {"promote_leaders": 1},

      (False, True, True, True, True)    : {"promote_leaders": 1},
      (False, True, True, False, True)   : {"promote_leaders": 1},

      (False, False, False, True, True)  : {"promote_leaders": 1},
      (False, False, False, False, True) : {"promote_leaders": 1},

      (False, False, True, True, True)    : {"promote_leaders": 1},
      (False, False, True, False, True)   : {"promote_leaders": 1},
        }

  def __init__(self):
    return  

  def reward_for_state_action(self, old_state, action_string, chapter):

    chapter_copy = copy.deepcopy(chapter)
    action = Action.action_from_string(action_string)
    new_state = action.run(old_state, chapter_copy)
    new_state_tuple = new_state.state_tuple()
    
    reward_actions = self.environment_state_action_reward[new_state_tuple]

    reward = -1
    if "success" in reward_actions:
      reward = reward_actions["success"]
    elif action_string in reward_actions:
      reward = -1 #reward_actions[action_string]
    
    return (reward, new_state, chapter_copy)


# Reinforcement Learning

# Q-table

In [430]:
class Q_table:
  """
  dict[key(state hash, action hash)] = q-values
  """
  
  def __init__(self):
    self.Q = {}
    return

  def getStateActionKey(self, state_tuple, action_string):
    return (state_tuple, action_string)

  def getValueQ(self, state_tuple, action_string):
    """ Get expected reward given an action in a given state,
        returns 0 if the state-action pair has not been seen before.
        Input is state and action key                          """

    state_action_key = self.getStateActionKey(state_tuple, action_string)
    if state_action_key in self.Q:
        return self.Q.get(state_action_key)
    else:
        self.Q[state_action_key] = 0
        return 0

  def setValueQ(self, state_tuple, action_string, value):
    """ Set value in Q """
    state_action_key = self.getStateActionKey(state_tuple, action_string)
    self.Q[state_action_key] = value

  def getBestAction(self, state_tuple, list_action_strings):
    """ Get best action given a set of possible actions in a given state """

    # Pick a random action at first - imp as there are many states and there should be a way to randomly choose other than just first 
    random_idx = np.random.choice(len(list_action_strings),1,None)
    best_action = list_action_strings[random_idx[0]]
  
    # Find action that given largest Q in given state
    maxQ = 0
    for action_string in list_action_strings:
        tmpQ = self.getValueQ(state_tuple, action_string)
        if maxQ < tmpQ:
          maxQ = tmpQ
          best_action = action_string

    return best_action

  def getMaxQ(self, state_tuple, list_action_strings):
      """ Returns the maximum Q value given a state and list of actions (input is hash keys) """
      maxQ = 0
      for a in list_action_strings:
          tmpQ = self.getValueQ(state_tuple, a) 
          if maxQ < tmpQ:
              maxQ = tmpQ
      return maxQ


  def updateQ(self, old_state_tuple, action_to_perform_string, reward, new_state_tuple, list_next_possible_action_strings, discount_factor, learning_rate):
    """ Implements Q-learning iterative algorithm """

    # Get current Q Value
    old_q_value = self.getValueQ(old_state_tuple, action_to_perform_string)

    # Find max Q value given the possible set of actions in the next state
    max_nextQ = self.getMaxQ(new_state_tuple, list_next_possible_action_strings) 
    
    temporal_difference = reward + (discount_factor * max_nextQ) - old_q_value

    #update the Q-value for the previous state and action pair
    new_q_value = old_q_value + (learning_rate * temporal_difference)

    self.setValueQ(old_state_tuple, action_to_perform_string, new_q_value)

  def print_info(self):
    print("\n\nQ_table")
    print(self.Q)
    return

# Episode

In [431]:
class Episode:

  preconditioned_episodes = [
            {"people_gradewise" : {"3":0, "4.1":1, "4.2":6, "4.3":6, "4.4":1, "5.1":0, "5.2":2, "MS":7} },
            {"people_gradewise" : {"3":0, "4.1":0, "4.2":5, "4.3":6, "4.4":2, "5.1":0, "5.2":0, "MS":14} },
            {"people_gradewise" : {"3":0, "4.1":0, "4.2":4, "4.3":4, "4.4":4, "5.1":0, "5.2":0, "MS":9} },
            {"people_gradewise" : {"3":2.5, "4.1":3, "4.2":4, "4.3":6, "4.4":5, "5.1":1, "5.2":2, "MS":27} },
            {"people_gradewise" : {"3":0, "4.1":1, "4.2":3.5, "4.3":0.5, "4.4":0.8, "5.1":0, "5.2":0, "MS":3} },
            {"people_gradewise" : {"3":0, "4.1":0, "4.2":2, "4.3":1, "4.4":1, "5.1":0, "5.2":0, "MS":2} },
            {"people_gradewise" : {"3":1, "4.1":1, "4.2":1, "4.3":7, "4.4":4, "5.1":0, "5.2":0, "MS":3} },
            {"people_gradewise" : {"3":0, "4.1":0, "4.2":1.5, "4.3":3, "4.4":2, "5.1":1, "5.2":0, "MS":18} },
            {"people_gradewise" : {"3":2.5, "4.1":3, "4.2":0, "4.3":0, "4.4":5, "5.1":1, "5.2":2, "MS":27} },
            {"people_gradewise" : {"3":3, "4.1":3, "4.2":0, "4.3":0, "4.4":5, "5.1":1, "5.2":2, "MS":70} }
          ]

  def __init__(self, episode_number):
    self.episode_number = episode_number
    if self.episode_number < len(self.preconditioned_episodes):
      d = self.preconditioned_episodes[self.episode_number]
      self.chapter = Chapter(people_gradewise = d["people_gradewise"])
      print("\n\n****episode : ",  d["people_gradewise"])
    else:
      business_rules = Business_rules()
      people_gradewise={"3":0, "4.1":0, "4.2":0, "4.3":0, "4.4":0, "5.1":0, "5.2":0, "MS":0}
      #distribute the simulation
      if episode_number % 2 == 0:
        #simulate for hire 
        for grade in people_gradewise:
          people_gradewise[grade] = 0 
        people_gradewise["MS"] = 100
      else:
        #simulate for all tribe distribution
        for grade in people_gradewise:
          probabilities = [0.2,0.6,0.2]
          random_idx = np.random.choice(business_rules.ceil_total_people_in_grade[grade],1,probabilities)
          people_gradewise[grade] = random_idx[0]

      self.chapter = Chapter(people_gradewise)
      print("\n\n****episode : ", people_gradewise)
    return

# Environment

In [432]:
class Environment:

  def __init__(self):
    self.business_rules = Business_rules()
    self.action_list = map(lambda ac_str: Action.action_from_string(ac_str), self.business_rules.actions)
    self.reward = Reward()

  def possible_actions_in_state(self, state):
    return self.action_list

  def hasReachedTerminalState(self, reward):
    if reward == 100 or reward == -100:
      return True
    else:
      return False

# Trainer

In [433]:
class Trainer:

  def __init__(self):
    self.q_table = Q_table()
    self.environment = Environment()
    self.business_rules = Business_rules()

    #define training parameters
    self.epsilon = 0.5 #the percentage of time when we should take the best action (instead of a random action)
    self.discount_factor = 0.9 #discount factor for future rewards
    self.learning_rate = 0.9 #the rate at which the AI agent should learn
    self.num_episodes = 5
    return

  
  #define an epsilon greedy algorithm that will choose which action to take next 
  def choose_next_action(self, state_tuple, list_action_strings, epsilon):
    #if a randomly chosen value between 0 and 1 is less than epsilon, 
    #then choose the most promising value from the Q-table for this state.
    if np.random.random() < epsilon:
      best_action = self.q_table.getBestAction(state_tuple, list_action_strings)
      return best_action
    else: #choose a random action
      random_idx = np.random.choice(len(list_action_strings))
      random_action = list_action_strings[random_idx]
      return random_action

  def train(self, num_episodes = 0, num_loops = 0):
    if num_episodes > 0:
      self.num_episodes = num_episodes
    #run through 1000 training episodes
    for episode_number in range(self.num_episodes):
      #get the starting state for this episode
      episode = Episode(episode_number)
      print("******episode_number :*******", episode_number)
      #continue taking actions  until we reach a terminal state
      #(i.e., until we reach the final team structure or hit a bad structure)
      reward = 0
      i=0
      while not self.environment.hasReachedTerminalState(reward):
        if num_loops > 0:
          i +=1
          if i >= num_loops:
            break
        #choose which action to take 
        old_state = episode.chapter.produce_state()
        old_state_tuple = old_state.state_tuple()
        list_action_strings = self.business_rules.actions
        action_to_perform_string = self.choose_next_action(old_state_tuple, list_action_strings, self.epsilon)

        #perform the chosen action, and transition to the next state 
        #receive the reward for moving to the new state
        reward, new_state, chapter_modified = self.environment.reward.reward_for_state_action(old_state, action_to_perform_string, episode.chapter)

        new_state_tuple = new_state.state_tuple()
        list_next_possible_actions = self.environment.possible_actions_in_state(new_state)
        list_next_possible_action_strings = map(lambda ac: ac.action, list_next_possible_actions)
        self.q_table.updateQ(old_state_tuple, action_to_perform_string, reward, new_state_tuple, list_next_possible_action_strings, self.discount_factor, self.learning_rate)

        #assign new state to old
        episode.chapter = chapter_modified

 #Define a function that will get the shortest path between the starting out team structure and the most favorable team structure 
  def get_shortest_path(self, chapter):
    i=0
    shortest_path = []
    shortest_path.append(chapter)
    reward = 0 
    old_chapter = chapter
    while not self.environment.hasReachedTerminalState(reward):

      old_state = old_chapter.produce_state()
      old_state_tuple = old_state.state_tuple()
      list_action_strings = self.business_rules.actions
      action_to_perform_string = self.choose_next_action(old_state_tuple, list_action_strings, 1.) #always the best action

      reward, new_state, chapter_modified = self.environment.reward.reward_for_state_action(old_state, action_to_perform_string, old_chapter)
      print(old_state_tuple, action_to_perform_string, reward, new_state.state_tuple())
            
      shortest_path.append(chapter_modified)

      old_chapter = chapter_modified
    
    print("\nshortest_path : \n")
    i=1
    for chapter in shortest_path:
      print("\n step :", i)
      chapter.print_info()
      i += 1

# State Tests

In [434]:
class State_Test(unittest.TestCase):

    def test_state_tuple(self):
      st = State()
      st_tuple = st.state_tuple()
      self.assertTrue(st_tuple == (False, False, False, False, False))

    def test_hash_exists(self):
      st = State()
      st_hash = st.state_hash()
      self.assertTrue(st_hash > 0)

    def test_hash_exists(self):
      st = State()
      st_cp = st.state_copy()
      st_hash = st.state_hash()
      st_cp_hash = st_cp.state_hash()
      self.assertTrue(st_hash == st_cp_hash)

# unittest.main(verbosity=2, exit=False)
suite = unittest.TestLoader().loadTestsFromTestCase(State_Test)
unittest.TextTestRunner().run(suite)


..
----------------------------------------------------------------------
Ran 2 tests in 0.004s

OK


<unittest.runner.TextTestResult run=2 errors=0 failures=0>

# Reward Tests

In [435]:
class Reward_Tests(unittest.TestCase):

    def test_reward_action(self):
      rw = Reward()
      st = State(*(True, True, False, True, False))
      ac = Action("reassign")   
      rw_val = rw.reward_for_state_action(st, ac)
      self.assertTrue(rw_val == 1)

    def test_punish_action(self):
      rw = Reward()
      st = State(*(True, True, False, True, False))
      ac = Action("promote_cascade")   
      rw_val = rw.reward_for_state_action(st, ac)
      self.assertTrue(rw_val == -1)

    def test_success_state(self):
      rw = Reward()
      st = State(*(False, True, False, True, False))
      ac = Action("promote_cascade")   
      rw_val = rw.reward_for_state_action(st, ac)
      self.assertTrue(rw_val == 100)

# unittest.main(verbosity=2, exit=False)
suite = unittest.TestLoader().loadTestsFromTestCase(Reward_Tests)
unittest.TextTestRunner().run(suite)

EEE
ERROR: test_punish_action (__main__.Reward_Tests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-435-0983fa837258>", line 14, in test_punish_action
    rw_val = rw.reward_for_state_action(st, ac)
TypeError: reward_for_state_action() missing 1 required positional argument: 'chapter'

ERROR: test_reward_action (__main__.Reward_Tests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-435-0983fa837258>", line 7, in test_reward_action
    rw_val = rw.reward_for_state_action(st, ac)
TypeError: reward_for_state_action() missing 1 required positional argument: 'chapter'

ERROR: test_success_state (__main__.Reward_Tests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-435-0983fa837258>", line 21, in test_success_state
    rw_val = rw.reward_for_state_ac

<unittest.runner.TextTestResult run=3 errors=3 failures=0>

# Chapter Tests

In [436]:

class Chapter_Tests(unittest.TestCase):

    def setUp(self):
      self.ch = Chapter(people_gradewise = {"3":3, "4.1":3, "4.2":4, "4.3":6, "4.4":5, "5.1":1, "5.2":2, "MS":100})

    def test_total_people(self):
      self.assertTrue(self.ch.total_people() == 124)

    def test_total_leaders_leading(self):
      self.assertTrue(self.ch.allocated_leads.total_leaders_leading() == 0)

    def test_total_leaders_that_can_be_reassigned(self):
      self.assertTrue(self.ch.total_leaders_that_can_be_reassigned() == 10)

    def test_dict_leaders_that_can_be_reassigned(self):
      self.assertTrue(self.ch.dict_leaders_that_can_be_reassigned() == {'3': 3, '4.1': 3, '4.2': 4})

suite = unittest.TestLoader().loadTestsFromTestCase(Chapter_Tests)
unittest.TextTestRunner().run(suite)

class Chapter_Tests_After_Reassign(unittest.TestCase):

    def setUp(self):
      self.ch = Chapter(people_gradewise = {"3":3, "4.1":3, "4.2":4, "4.3":6, "4.4":5, "5.1":1, "5.2":2, "MS":100})
      self.ch.reassign_leaders()

    def test_total_leaders_leading_after_reassign(self):
      self.assertTrue(self.ch.allocated_leads.total_leaders_leading() == 10)

    def test_dict_successor_per_lead(self):
      self.assertTrue(self.ch.successor.dict_successor_per_lead() == {'3': 0.0, '4.1': 0.0, '4.2': 6.0})

    def test_dict_successor_per_engineer(self):
      self.assertTrue(self.ch.successor.dict_successor_per_engineer() == {'4.3': 0.8333333333333334, '4.4': 0.2, '5.1': 2.0})

    def test_total_successors_for_lead_grades(self):
      self.assertTrue(self.ch.successor.total_successors_for_lead_grades() == 6.0)

    def test_total_successors_for_engineer_grades(self):
      self.assertTrue(self.ch.successor.total_successors_for_engineer_grades() == 3.033333333333333)

    def test_diff_successor_ratio_for_leader_engineer_grades(self):
      self.assertTrue(self.ch.successor.diff_successor_ratio_for_leader_engineer_grades() == 2.966666666666667)

    def test_total_people_remaining(self):
      self.assertTrue(self.ch.dr_per_lead.total_people_remaining() == 114)

    #state 
    def test_is_overall_dr_per_lead_gt_target(self):
      self.assertTrue(self.ch.dr_per_lead.dr_per_lead() == 11.4)
      self.assertTrue(self.ch.is_overall_dr_per_lead_gt_target() == True)

    def test_is_num_reassignable_leaders_gt_zero(self):
      self.assertTrue(self.ch.total_leaders_that_can_be_reassigned() == 0)
      self.assertTrue(self.ch.is_num_reassignable_leaders_gt_zero() == False)

    def test_is_num_immediate_promotable_to_leader_grades_gt_zero(self):
      self.assertTrue(self.ch.successor.total_successors_for_lead_grades() == 6)
      self.assertTrue(self.ch.successor.is_num_immediate_promotable_to_leader_grades_gt_zero() == True)

    def test_is_max_dr_per_lead_in_leader_grades_gt_target(self):
      self.assertTrue(self.ch.dr_per_lead_for_this_leader_grade_wrt_higher_grades.max_dr_per_lead_in_leader_grades() == 1)
      self.assertTrue(self.ch.dr_per_lead_for_this_leader_grade_wrt_higher_grades.dict_dr_per_lead_for_this_grade_wrt_higher_grade() == {'3': 0, '4.1': 1.0, '4.2': 0.6666666666666666})
      self.assertTrue(self.ch.dr_per_lead_for_this_leader_grade_wrt_higher_grades.is_max_dr_per_lead_in_leader_grades_gt_target() == False)

    def test_produce_state(self):
      st=ch.produce_state()
      self.assertTrue(st.state_tuple() == (True, False, True, True, False))

suite = unittest.TestLoader().loadTestsFromTestCase(Chapter_Tests_After_Reassign)
unittest.TextTestRunner().run(suite)



....
----------------------------------------------------------------------
Ran 4 tests in 0.007s

OK
.......F....
FAIL: test_produce_state (__main__.Chapter_Tests_After_Reassign)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-436-7283a8a4b9b4>", line 69, in test_produce_state
    self.assertTrue(st.state_tuple() == (True, False, True, True, False))
AssertionError: False is not true

----------------------------------------------------------------------
Ran 12 tests in 0.028s

FAILED (failures=1)


<unittest.runner.TextTestResult run=12 errors=0 failures=1>

# Reassign Tests

In [437]:
class Reassign_Tests(unittest.TestCase):

    def setUp(self):
      self.ch = Chapter(people_gradewise = {"3":3, "4.1":3, "4.2":4, "4.3":6, "4.4":5, "5.1":1, "5.2":2, "MS":100})

    def test_before_reassign(self):
      st = self.ch.produce_state()
      self.assertTrue(st.state_tuple() == (True, True, True, True, False))

    def test_after_reassign(self):
      action = Reassign()
      st2 = action.run(st, self.ch)
      self.assertTrue(st2.state_tuple() == (True, False, True, True, False))


suite = unittest.TestLoader().loadTestsFromTestCase(Reassign_Tests)
unittest.TextTestRunner().run(suite)

..
----------------------------------------------------------------------
Ran 2 tests in 0.004s

OK


<unittest.runner.TextTestResult run=2 errors=0 failures=0>

# Promote Cascade Tests

In [438]:
class Promote_Cascade_Tests(unittest.TestCase):

    def setUp(self):
      self.ch = Chapter(people_gradewise = {"3":3, "4.1":3, "4.2":4, "4.3":6, "4.4":5, "5.1":1, "5.2":2, "MS":100})
      action = Reassign()
      st2 = action.run(st, self.ch)

    def test_before_promote(self):
      st = self.ch.produce_state()
      self.assertTrue(st.state_tuple() == (True, False, True, True, False))

    def test_after_promote(self):
      action = Promote_cascade()
      st2 = action.run(st, self.ch)
      self.assertTrue(st2.state_tuple() == (True, True, True, True, False))


suite = unittest.TestLoader().loadTestsFromTestCase(Promote_Cascade_Tests)
unittest.TextTestRunner().run(suite)

..
----------------------------------------------------------------------
Ran 2 tests in 0.008s

OK


<unittest.runner.TextTestResult run=2 errors=0 failures=0>

# Promote Leaders Tests

In [439]:
class Promote_Leaders_Tests(unittest.TestCase):

    def setUp(self):
      self.ch = Chapter(people_gradewise = {"3":0, "4.1":0, "4.2":0, "4.3":0, "4.4":0, "5.1":0, "5.2":0, "MS":100})
      self.ch.allocated_leads.allocated_leaders_gradewise = {"3":0, "4.1":1, "4.2":24}

    def test_before_promote(self):
      st = self.ch.produce_state()
      self.assertTrue(st.state_tuple() == (False, False, False, False, True))

    def test_after_promote(self):
      action = Promote_leaders()
      st2 = action.run(st, self.ch)
      self.assertTrue(st2.state_tuple() == (False, False, False, False, False))

suite = unittest.TestLoader().loadTestsFromTestCase(Promote_Leaders_Tests)
unittest.TextTestRunner().run(suite)

.F

max_dr_per_lead_in_leader_grades : 24.0



FAIL: test_before_promote (__main__.Promote_Leaders_Tests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-439-458997bec5d1>", line 9, in test_before_promote
    self.assertTrue(st.state_tuple() == (False, False, False, False, True))
AssertionError: False is not true

----------------------------------------------------------------------
Ran 2 tests in 0.005s

FAILED (failures=1)


<unittest.runner.TextTestResult run=2 errors=0 failures=1>

# Hire Tests

In [440]:
class Hire_Tests(unittest.TestCase):

    def setUp(self):
      self.ch = Chapter(people_gradewise = {"3":0, "4.1":0, "4.2":0, "4.3":1, "4.4":1, "5.1":1, "5.2":1, "MS":100})
      self.ch.allocated_leads.allocated_leaders_gradewise = {"3":0, "4.1":1, "4.2":11}

    def test_before_hire(self):
      st = self.ch.produce_state()
      self.assertTrue(st.state_tuple() == (True, False, True, False, True))

    def test_after_hire(self):
      action = Hire()
      st2 = action.run(st, self.ch)
      self.assertTrue(st2.state_tuple() == (True, True, True, True, True))
      self.assertTrue(self.ch.people_remaining_gradewise == {"3":0, "4.1":0, "4.2":4, "4.3":1, "4.4":1, "5.1":1, "5.2":1, "MS":100})

suite = unittest.TestLoader().loadTestsFromTestCase(Hire_Tests)
unittest.TextTestRunner().run(suite)

FF
FAIL: test_after_hire (__main__.Hire_Tests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-440-2f5f79b895b5>", line 14, in test_after_hire
    self.assertTrue(st2.state_tuple() == (True, True, True, True, True))
AssertionError: False is not true

FAIL: test_before_hire (__main__.Hire_Tests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-440-2f5f79b895b5>", line 9, in test_before_hire
    self.assertTrue(st.state_tuple() == (True, False, True, False, True))
AssertionError: False is not true

----------------------------------------------------------------------
Ran 2 tests in 0.004s

FAILED (failures=2)


<unittest.runner.TextTestResult run=2 errors=0 failures=2>

# Environment Tests

In [441]:
env  = Environment()

print(env.business_rules.actions)
st = State()
lst = env.possible_actions_in_state(st)
print(lst) #['reassign', 'promote_cascade', 'promote_leaders', 'hire']
for action in lst:
  print(action.action)
  # reassign
  # promote_cascade
  # promote_leaders
  # hire

print(env.hasReachedTerminalState(100)) #True
print(env.hasReachedTerminalState(-100)) #True
print(env.hasReachedTerminalState(-1)) #False


['reassign', 'promote_cascade', 'promote_leaders', 'hire']
<map object at 0x7f74dda42050>
reassign
promote_cascade
promote_leaders
hire
True
True
False


# Episode Tests

In [442]:
#preconditioned episode
ep = Episode(1)
ch = ep.chapter
print(ch.people_remaining_gradewise) #{'3': 0, '4.1': 0, '4.2': 5, '4.3': 6, '4.4': 2, '5.1': 0, '5.2': 0, 'MS': 14}
print(ch.produce_state().state_tuple())

#simulated episode
ep = Episode(11)
ch = ep.chapter
print(ch.people_remaining_gradewise) 
print(ch.produce_state().state_tuple())

#simulated hire episode
ep = Episode(12)
ch = ep.chapter
print(ch.people_remaining_gradewise) 
print(ch.produce_state().state_tuple())




****episode :  {'3': 0, '4.1': 0, '4.2': 5, '4.3': 6, '4.4': 2, '5.1': 0, '5.2': 0, 'MS': 14}
{'3': 0, '4.1': 0, '4.2': 5, '4.3': 6, '4.4': 2, '5.1': 0, '5.2': 0, 'MS': 14}
(True, True, True, True, False)


****episode :  {'3': 0, '4.1': 7, '4.2': 14, '4.3': 11, '4.4': 33, '5.1': 7, '5.2': 11, 'MS': 31}
{'3': 0, '4.1': 7, '4.2': 14, '4.3': 11, '4.4': 33, '5.1': 7, '5.2': 11, 'MS': 31}
(True, True, True, True, False)


****episode :  {'3': 0, '4.1': 0, '4.2': 0, '4.3': 0, '4.4': 0, '5.1': 0, '5.2': 0, 'MS': 100}
{'3': 0, '4.1': 0, '4.2': 0, '4.3': 0, '4.4': 0, '5.1': 0, '5.2': 0, 'MS': 100}
(True, False, False, False, False)


# Q-table tests

In [443]:
q = Q_table()
q.setValueQ((True, True, True, True, True), "reassign", 1)
print(q.Q) #{((True, True, True, True, True), 'reassign'): 1}
value = q.getValueQ((True, True, True, True, True), "reassign")
print(value) #1

b = Business_rules()
lst_acs = b.actions
print(b.actions) #['reassign', 'promote_cascade', 'promote_leaders', 'hire']
acs = q.getBestAction((True, True, True, True, True), b.actions) #reassign
print(acs)

mq = q.getMaxQ((True, True, True, True, True), lst_acs)
print(mq) #1

{((True, True, True, True, True), 'reassign'): 1}
1
['reassign', 'promote_cascade', 'promote_leaders', 'hire']
reassign
1


# Trainer tests

In [444]:
trainer = Trainer()

state_tuple = (True, True, False, True, False)

br = Business_rules()
list_action_strings = br.actions

ac = trainer.choose_next_action(state_tuple,list_action_strings,epsilon=0.9)
print(ac)



reassign


In [445]:
trainer = Trainer()
trainer.train(50)
print(trainer.q_table.Q)




****episode :  {'3': 0, '4.1': 1, '4.2': 6, '4.3': 6, '4.4': 1, '5.1': 0, '5.2': 2, 'MS': 7}
******episode_number :******* 0


****episode :  {'3': 0, '4.1': 0, '4.2': 5, '4.3': 6, '4.4': 2, '5.1': 0, '5.2': 0, 'MS': 14}
******episode_number :******* 1


****episode :  {'3': 0, '4.1': 0, '4.2': 4, '4.3': 4, '4.4': 4, '5.1': 0, '5.2': 0, 'MS': 9}
******episode_number :******* 2


****episode :  {'3': 2.5, '4.1': 3, '4.2': 4, '4.3': 6, '4.4': 5, '5.1': 1, '5.2': 2, 'MS': 27}
******episode_number :******* 3


****episode :  {'3': 0, '4.1': 1, '4.2': 3.5, '4.3': 0.5, '4.4': 0.8, '5.1': 0, '5.2': 0, 'MS': 3}
******episode_number :******* 4


****episode :  {'3': 0, '4.1': 0, '4.2': 2, '4.3': 1, '4.4': 1, '5.1': 0, '5.2': 0, 'MS': 2}
******episode_number :******* 5


****episode :  {'3': 1, '4.1': 1, '4.2': 1, '4.3': 7, '4.4': 4, '5.1': 0, '5.2': 0, 'MS': 3}
******episode_number :******* 6


****episode :  {'3': 0, '4.1': 0, '4.2': 1.5, '4.3': 3, '4.4': 2, '5.1': 1, '5.2': 0, 'MS': 18}
***

# Shortest Path per policy

In [446]:
#preconditioned episode
# ep = Episode(1)
# ch = ep.chapter


ch = Chapter({'3': 1, '4.1': 6, '4.2': 1, '4.3': 63, '4.4': 53, '5.1': 14, '5.2': 2, 'MS': 12})
trainer.get_shortest_path(ch)

(True, True, True, True, False) reassign -1 (True, False, True, True, False)
(True, False, True, True, False) promote_cascade -1 (True, True, True, True, False)
(True, True, True, True, False) reassign -1 (True, False, True, True, False)
(True, False, True, True, False) promote_cascade -1 (True, True, True, True, False)
(True, True, True, True, False) reassign -1 (True, False, True, True, False)
(True, False, True, True, False) reassign -1 (True, False, True, True, False)
(True, False, True, True, False) promote_cascade -1 (True, True, True, True, False)
(True, True, True, True, False) reassign -1 (True, False, True, True, False)
(True, False, True, True, False) promote_cascade -1 (True, True, True, True, False)
(True, True, True, True, False) reassign -1 (True, False, True, True, False)
(True, False, True, True, False) promote_leaders -1 (True, False, True, True, False)
(True, False, True, True, False) promote_leaders -1 (True, False, True, True, False)
(True, False, True, True, False