In [1]:
import pandas as pd
from itertools import product
from amplify import BinaryPoly, gen_symbols, sum_poly, Solver, decode_solution
from amplify.constraint import equal_to, greater_equal
from amplify.client import FixstarsClient
import numpy as np
from collections import defaultdict

# 各店舗の要求人数情報の読み込み
dict_req = dict(location=["tenjin", "hakata"], employee=[2, 3])

# 各従業員の勤務希望情報の読み込み
dict_worker_loc = dict(
    worker_id=[0, 1, 2, 3, 4], tenjin=[2, 2, 1, 0, 1], hakata=[1, 1, 1, 1, 0]
)

df_req = pd.DataFrame.from_dict(dict_req, orient="index").T
print("各店舗の従業員要求人数")
display(df_req.style.hide_index())
df_worker_loc = pd.DataFrame.from_dict(dict_worker_loc, orient="index").T
print("\n各従業員の勤務希望情報")
display(df_worker_loc.style.hide_index())

## 店舗名の取得
workers = df_worker_loc["worker_id"].values
locations = df_req["location"].values

## 各データ長を取得
num_workers = len(workers)
num_locations = len(locations)

## 店舗名<->店舗番号の変換をするdictを作成
idx2loc = dict((i, v) for i, v in enumerate(df_req["location"].values))
loc2idx = dict((v, i) for i, v in enumerate(df_req["location"].values))

# 決定変数を用意
location_variables = gen_symbols(BinaryPoly, num_workers, num_locations)
# 勤務不可能店舗に関しては変数を０へ
for i, l in product(range(num_workers), locations):
    worker_req = df_worker_loc.iloc[i][l]
    if worker_req == 0:
        # 勤務不可
        location_variables[i][loc2idx[l]] = BinaryPoly(0)

# 充足率の計算
w = [
    (sum_poly(num_workers, lambda i: location_variables[i][l])) / df_req["employee"][l]
    for l in range(num_locations)
]

# 充足率の平均の-1倍
average_fill_rate_cost = -((sum_poly(w) / len(w)) )

# 充足率の分散
variance_fill_rate_cost = (
    sum_poly(len(w), lambda i: w[i] ** 2) / len(w) - (sum_poly(w) / num_locations) ** 2
)

# 従業員の希望度の-1倍
location_cost = -sum_poly(
    num_workers,
    lambda i: sum_poly(
        num_locations,
        lambda l: df_worker_loc.loc[i][idx2loc[l]] * location_variables[i][l],
    ),
)

# 従業員iは同時に1店舗のみ勤務できる
location_constarints = sum(
    [equal_to(sum_poly(location_variables[i]), 1) for i in range(num_workers)]
)
# 店舗の合計人数は要求人数以上
require_constraints = sum(
    [
        greater_equal(
            sum_poly(num_workers, lambda i: location_variables[i][l]),
            df_req["employee"][l],
        )
        for l in range(num_locations)
    ]
)

# それぞれの目的関数の係数
loc_priority = 1
ave_fill_priority = 1
var_fill_priority = 10

# 目的関数
cost_func = (
    loc_priority * location_cost
    + ave_fill_priority * average_fill_rate_cost
    + var_fill_priority * variance_fill_rate_cost
)

# 制約条件を表すペナルティ関数の重み
constraint_weight = 10

# 制約条件
constraints = constraint_weight * (location_constarints + require_constraints)

# 最適化モデル
model = cost_func + constraints

# クライアントの設定
client = FixstarsClient()
client.token = "YPUHk3Oh0pIVYdwFB43uzcLFkEiq9zDf"
client.parameters.timeout = 1000  #  タイムアウト1秒

# ソルバーを定義して実行
solver = Solver(client)
result = solver.solve(model)

# 制約条件チェック
if len(result) == 0:
    raise RuntimeError("The given constraints are not satisfied")
values = result[0].values
energy = result[0].energy

# 勤務地に関する変数の解
location_solutions = decode_solution(location_variables, values, 0)

# 結果を格納する変数を生成
location_index_list = np.where(np.array(location_solutions) == 1)[1]
dict_df = defaultdict(list)

for i, loc_ind in enumerate(location_index_list):
    ## 配属勤務地
    worker_id = df_worker_loc.loc[i]["worker_id"]
    loc = locations[loc_ind]
    dict_df["worker_id"].append(worker_id)
    dict_df["location"].append(loc)

df_result = pd.DataFrame.from_dict(dict_df, orient="index").T
display(df_result.style.hide_index())

各店舗の従業員要求人数


location,employee
tenjin,2
hakata,3



各従業員の勤務希望情報


worker_id,tenjin,hakata
0,2,1
1,2,1
2,1,1
3,0,1
4,1,0


worker_id,location
0,hakata
1,tenjin
2,hakata
3,hakata
4,tenjin
