<a href="https://colab.research.google.com/github/soitakahashi/ga-scheduling/blob/master/Schedule_ADSaitai_v0_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [55]:
!pip install deap
!pip3 install scoop



In [0]:
import random
from scoop import futures

from deap import base
from deap import creator
from deap import tools
from deap import cma

from google.colab import files
import pandas as pd
import numpy as np

In [0]:
# 従業員を表すクラス
class Employee(object):
  def __init__(self, no, name, priority, prevcount, sex, general, wills, costs):
    self.no = no
    self.name = name
    self.priority = priority
    self.prevcount = prevcount
    self.sex = sex
    self.general = general
    self.wills = wills
    self.costs = costs

  def get_cost(self, box_index):
    return(self.costs[box_index])
    # return(self.costs[SHIFT_BOXES.index(box_name)])

  def is_applicated(self, box_name):
    return (box_name in self.wills)

In [0]:
  # インプットパラメータの定義
  DURATION_DAYS = 10
  DAYS_PER_PEOPLE = 2

  POPULATION_GROUP = 40


In [0]:
# シフトを表すクラス
# 内部的には 3(朝昼晩) * 7日 * 10人 = 210次元のタプルで構成される
class Shift(object):

  SHIFT_BOXES = []
  for i in range(DURATION_DAYS):
    SHIFT_BOXES.append('d' + str(i))

  # 各コマの想定人数
  NEED_PEOPLE = [DAYS_PER_PEOPLE] * DURATION_DAYS

  def __init__(self, list):
    if list == None:
      self.make_sample()
    else:
      self.list = list
    self.employees = []

  # ランダムなデータを生成
  def make_sample(self):
    sample_list = []
    for num in range(DURATION_DAYS * POPULATION_GROUP):
      sample_list.append(random.randint(0, 1))
    self.list = tuple(sample_list)

  # タプルを1ユーザ単位に分割
  def slice(self):
    sliced = []
    start = 0
    for num in range(POPULATION_GROUP):
      sliced.append(self.list[start:(start + DURATION_DAYS)])
      start = start + DURATION_DAYS
    return tuple(sliced)

  # ユーザ別にアサインコマ名を出力する
  def print_inspect(self):
    user_no = 0
    for line in self.slice():
      printf("ユーザ%d" % user_no)
      print(line)
      user_no = user_no + 1

      index = 0
      for e in line:
        if e == 1:
          print(self.SHIFT_BOXES[index])
        index = index + 1

  # CSV形式でアサイン結果の出力をする
  def print_csv(self):
    for line in self.slice():
      print(','.join(map(str, line)))

  # TSV形式でアサイン結果の出力をする
  def print_tsv(self):
    for line in self.slice():
      print("\t".join(map(str, line)))

  # ユーザ番号を指定してコマ名を取得する
  def get_boxes_by_user(self, user_no):
    line = self.slice()[user_no]
    return self.line_to_box(line)

  # 1ユーザ分のタプルからコマ名を取得する
  def line_to_box(self, line):
    result = []
    index = 0
    for e in line:
      if e == 1:
        result.append(self.SHIFT_BOXES[index])
      index = index + 1
    return result    

  # コマ番号を指定してアサインされているユーザ番号リストを取得する
  def get_user_nos_by_box_index(self, box_index):
    user_nos = []
    index = 0
    for line in self.slice():
      if line[box_index] == 1:
        user_nos.append(index)
      index += 1
    return user_nos

  # コマ名を指定してアサインされているユーザ番号リストを取得する
  def get_user_nos_by_box_name(self, box_name):
    box_index = self.SHIFT_BOXES.index(box_name)
    return self.get_user_nos_by_box_index(box_index)

  # 想定人数と実際の人数の差分を取得する
  def abs_people_between_need_and_actual(self):
    result = []
    index = 0
    for need in self.NEED_PEOPLE:
      actual = len(self.get_user_nos_by_box_index(index))
      result.append(abs(need - actual))
      index += 1
    return result

  # 調整コストの量を取得する
  def sum_total_cost(self):
    count = 0
    for box_name in self.SHIFT_BOXES:
      user_nos = self.get_user_nos_by_box_name(box_name)
      for user_no in user_nos:
        e = self.employees[user_no]
        count += e.get_cost(self.SHIFT_BOXES.index(box_name))
    return count

  # 過去の参加回数の総量を取得する
  def sum_previous_count(self):
    count = 0
    for box_name in self.SHIFT_BOXES:
      user_nos = self.get_user_nos_by_box_name(box_name)
      for user_no in user_nos:
        e = self.employees[user_no]
        count += e.prevcount
    return count

  # 優先度の総量を取得する
  def sum_priority(self):
    count = 0
    for box_name in self.SHIFT_BOXES:
      user_nos = self.get_user_nos_by_box_name(box_name)
      for user_no in user_nos:
        e = self.employees[user_no]
        count += e.priority
    return count

  # 応募していないコマにアサインされている件数を取得する
  def not_applicated_assign(self):
    count = 0
    for box_name in self.SHIFT_BOXES:
      user_nos = self.get_user_nos_by_box_name(box_name)
      for user_no in user_nos:
        e = self.employees[user_no]
        if not e.is_applicated(box_name):
          count += 1
    return count

  # 同一期間中に2回以上参加している人を取得する
  def multi_assigned_user(self):
    result = []
    for user_no in range(POPULATION_GROUP):
      if sum(self.slice()[user_no]) > 1:
        result.append(user_no)
    return len(result)


In [60]:
uploaded = files.upload()
file_name = list(uploaded.keys())[0]
df = pd.read_csv(file_name, dtype={'name': str})
df_np = np.asarray(df)
employees=[]
for no,name,priority,prevcount,sex,general,wills,costs in df_np:
  employees.append(Employee(no,name,priority,prevcount,sex,general,eval(wills),eval(costs)))

Saving employees.csv to employees (21).csv


In [61]:
creator.create("FitnessPeopleCount", base.Fitness, weights=(-10.0, -100.0, -10.0, -10.0, -10.0, -50.0))
creator.create("Individual", list, fitness=creator.FitnessPeopleCount)

toolbox = base.Toolbox()

toolbox.register("map", futures.map)

toolbox.register("attr_bool", random.randint, 0, 1)
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_bool, (DURATION_DAYS * POPULATION_GROUP))
toolbox.register("population", tools.initRepeat, list, toolbox.individual)



In [0]:
def evalShift(individual):
  s = Shift(individual)
  s.employees = employees

  # 想定人数とアサイン人数の差
  people_count_sub_sum = sum(s.abs_people_between_need_and_actual())
  # 応募していない時間帯へのアサイン数
  not_applicated_count = s.not_applicated_assign()
  # 調整コストの総量
  total_cost = s.sum_total_cost() / (POPULATION_GROUP)
  # 過去の参加回数の総量
  previous_count = s.sum_previous_count() / (POPULATION_GROUP)
  # 優先度の総量
  total_priority = s.sum_priority() / (POPULATION_GROUP)
  # 2回以上アサインされた人数
  people_count_multi_assign = s.multi_assigned_user()

  return (not_applicated_count, people_count_sub_sum, total_cost, previous_count, total_priority, people_count_multi_assign)


In [0]:
toolbox.register("evaluate", evalShift)
# 交叉関数を定義(二点交叉)
toolbox.register("mate", tools.cxTwoPoint)

# 変異関数を定義(ビット反転、変異隔離が5%)
toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)

# 選択関数を定義(トーナメント選択、tournsizeはトーナメントの数？)
toolbox.register("select", tools.selTournament, tournsize=3)

In [64]:
if __name__ == '__main__':
    # 初期集団を生成する
    pop = toolbox.population(n=300)
    CXPB, MUTPB, NGEN = 0.6, 0.5, 3000 # 交叉確率、突然変異確率、進化計算のループ回数

    print("進化開始")

    # 初期集団の個体を評価する
    fitnesses = list(map(toolbox.evaluate, pop))
    for ind, fit in zip(pop, fitnesses):  # zipは複数変数の同時ループ
        # 適合性をセットする
        ind.fitness.values = fit

    print("  %i の個体を評価" % len(pop))

     # 進化計算開始
    for g in range(NGEN):
        print("-- %i 世代 --" % g)

        # 選択
        # 次世代の個体群を選択
        offspring = toolbox.select(pop, len(pop))
        # 個体群のクローンを生成
        offspring = list(map(toolbox.clone, offspring))

        # 選択した個体群に交差と突然変異を適応する

        # 交叉
        # 偶数番目と奇数番目の個体を取り出して交差
        for child1, child2 in zip(offspring[::2], offspring[1::2]):
            if random.random() < CXPB:
                toolbox.mate(child1, child2)
                # 交叉された個体の適合度を削除する
                del child1.fitness.values
                del child2.fitness.values

        # 変異
        for mutant in offspring:
            if random.random() < MUTPB:
                toolbox.mutate(mutant)
                del mutant.fitness.values

        # 適合度が計算されていない個体を集めて適合度を計算
        invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
        fitnesses = map(toolbox.evaluate, invalid_ind)
        for ind, fit in zip(invalid_ind, fitnesses):
            ind.fitness.values = fit

        print("  %i の個体を評価" % len(invalid_ind))

        # 次世代群をoffspringにする
        pop[:] = offspring

        # すべての個体の適合度を配列にする
        index = 1
        for v in ind.fitness.values:
          fits = [v for ind in pop]

          length = len(pop)
          mean = sum(fits) / length
          sum2 = sum(x*x for x in fits)
          std = abs(sum2 / length - mean**2)**0.5

          print("* パラメータ%d" % index)
          print("  Min %s" % min(fits))
          print("  Max %s" % max(fits))
          print("  Avg %s" % mean)
          print("  Std %s" % std)
          index += 1

    print("-- 進化終了 --")

    best_ind = tools.selBest(pop, 1)[0]
    print("最も優れていた個体: %s, %s" % (best_ind, best_ind.fitness.values))
    s = Shift(best_ind)
    s.print_csv()
    s.print_tsv()

進化開始


IndexError: ignored

In [0]:
print(s.abs_people_between_need_and_actual())
# print(s.not_applicated_assign() / (DURATION_DAYS * POPULATION_GROUP))
# print( s.sum_total_cost() / (DURATION_DAYS * POPULATION_GROUP))
# print(s.sum_previous_count() / (DURATION_DAYS * POPULATION_GROUP))
# print( s.sum_priority() / (DURATION_DAYS * POPULATION_GROUP))
print( s.multi_assigned_user())


In [0]:
user_no = 0
ans = []
for i in s.slice():
  cnt = 0
  for j in i:
    if j == 1:
      tmp = []
      tmp.append("d" + str(cnt))
      tmp.append(employees[user_no].name)
      tmp.append(employees[user_no].prevcount)
      tmp.append(employees[user_no].priority)
      tmp.append(employees[user_no].costs[cnt])
      tmp.append(("d" + str(cnt)) in employees[user_no].wills)
      ans.append(tmp)
    cnt += 1
  user_no += 1
print(ans)