In [6]:
import pandas as pd
import random
import numpy as np
import time
import copy

### Main function

In [7]:
# load file
df = pd.read_excel('test.xlsm')

TYPES_OF_DUTY = {0:'Test', 1:'CT/MR', 3:'ER', 4:'CR', 5:'VS', 
                 6:'Other', 7:'Other', 8:'Other', 9:'Other'}  # no type 2

df, type_to_generate = data_cleansing(df)
violation, df_updated = is_violation(df,type_to_generate)

# if no violation, then start generating list
optimized_list = {}
if violation == False:
    for duty_type in type_to_generate:
        preliminary_list = preliminary_gen2(df_updated, duty_type, 12000)
        optimized_list[duty_type] = optimization2(preliminary_list, df_updated, duty_type)

for key, value in optimized_list.items():
    print(value)

檢查輸入資料...
檢查輸入資料...OK
正在建立 [type1,CT/MR班] 初步清單...
>----------- 8.4%
=>---------- 16.7%
==>--------- 25.0%
===>-------- 33.4%
====>------- 41.7%
=====>------ 50.0%
[type1,CT/MR班] 初步清單已建立完成
正在尋找 [type1,CT/MR班] 最佳排班...
已完成 [type1,CT/MR班] 最佳排班排序
正在建立 [type3,ER班] 初步清單...
>----------- 8.4%
=>---------- 16.7%
==>--------- 25.0%
===>-------- 33.4%
====>------- 41.7%
=====>------ 50.0%
[type3,ER班] 初步清單已建立完成
正在尋找 [type3,ER班] 最佳排班...
已完成 [type3,ER班] 最佳排班排序
正在建立 [type4,CR班] 初步清單...
>----------- 8.4%
=>---------- 16.7%
==>--------- 25.0%
===>-------- 33.4%
====>------- 41.7%
=====>------ 50.0%
[type4,CR班] 初步清單已建立完成
正在尋找 [type4,CR班] 最佳排班...
已完成 [type4,CR班] 最佳排班排序
[[[['13', '15'], ['12', '16'], ['14', '11'], ['15', '13'], ['16', '12'], ['15', '17'], ['12', '11'], ['14', '13'], ['17', '16'], ['11', '15'], ['12', '14'], ['16', '13'], ['11', '14'], ['17', '15'], ['11', '16'], ['14', '12'], ['11', '13'], ['15', '13'], ['14', '11'], ['16', '15'], ['14', '12'], ['17', '13'], ['11', '12'], ['14', '15'], ['1

### Data clensing

In [1]:
def data_cleansing(df):
    """
    input: df, whole data
    return: df after cleansing, define type_to_generate, sorted
    """
    #df=pd.DataFrame(df)
    #df[df['Unnamed: 2']==4]  # select Unnamed: 2 value ==4 
    # rename columns and index
    df = df.rename(columns={df.columns[0]:'Name',
                            df.columns[1]:'Code',
                            df.columns[2]:'Type',
                            df.columns[3]:'Holiday',
                            df.columns[4]:'Weekday'})
    df = df.rename(index={0:'Weekday_ch',1:'Weekday_num',2:'is_holiday'})
    # unselect unnamned coluns
    unselect_unnamed = [col for col in df if 'Unnamed' not in str(col)]  # 注意 有些還是 date type, 所以用 str
    df = df[unselect_unnamed]  

    # 有哪些班要運算，determine type_to_generate, [0-9, except 2]
    type_to_generate = sorted([int(i) for i in str(df['Name'].loc['is_holiday']) if i in '134567890'])
    df['Name'].loc['is_holiday'] = np.nan  # set value as NaN, not to interfere with count of hollidays


    # 將住院醫師簡碼 (iloc[4] and below)以下 code 轉為 str
    # 使用 loc 賦值不會出現  SettingWithCopyWarning: 
    # A value is trying to be set on a copy of a slice from a DataFrame 
    for i in range(4,len(df.index)):
        df.loc[i]['Code'] = str(df.loc[i]['Code'])

    # 將所有大寫轉為小寫
    lower_text = lambda item: str(item).lower() if type(item) ==str else item
    df.iloc[4:,5:len(df.columns)] = df.iloc[4:,5:len(df.columns)].applymap(lower_text)
        
        
    # rename date index from 1 to date
    # start from 5
    for i in range(5,len(df.columns)):
        df = df.rename(columns={df.columns[i]:str(i-4)}) 
    
    return df, type_to_generate

    ## ==== now the data has been clensed === ##




In [2]:
def is_violation(df, duty_type_array):
    """
    input = dataframe after clensing
    output = if there's no day violation
    print where is the violation
    True -> have violation
    False -> no violation
    """
    
    print('檢查輸入資料...')
    
    # 檢查欲執行項目是否為空白, 如果空白，則 raise assertion
    if duty_type_array == []:
        raise AssertionError('請輸入要執行的班別，再執行程式')
    
        
    # TYPES_OF_DUTY = {1:'CT/MR', 3:'ER', 4:'CR', 5:'VS', 6:'Other', 7:'Other', 8:'Other', 9:'Other', 0:'Test'}
    is_violation = False
    days_in_month = df.loc['Weekday_num'].notnull().sum()
    num_of_holiday = df.loc['is_holiday'].notnull().sum()
    num_of_weekday = days_in_month - num_of_holiday
    
    # iterate through every duty types
    for duty_type in duty_type_array:
        # 檢查每一個要 run 的值班類別班數和是否正確
        # test for holiday
        # Weekday_num 非零的欄位代表當月日數

        # if CT/MR, type1 -> doubles the days of duties
        num_of_holiday_duty = num_of_holiday *2 if duty_type ==1 else num_of_holiday
        num_of_weekday_duty = num_of_weekday *2 if duty_type ==1 else num_of_weekday

        # 假日及平日值班數
        num_of_duties_h = df[df['Type']==duty_type]['Holiday'].sum()
        num_of_duties_w = df[df['Type']==duty_type]['Weekday'].sum()


        # 檢查值班總數是否不足， report error message and violation
        if num_of_duties_h < num_of_holiday_duty:
            print(f'{TYPES_OF_DUTY[duty_type]} 班假日值班總數不足，缺少{num_of_holiday_duty-num_of_duties_h}班')
            is_violation = True
        if num_of_duties_w < num_of_weekday_duty:
            print(f'{TYPES_OF_DUTY[duty_type]} 班平日值班總數不足，缺少{num_of_weekday_duty-num_of_duties_w}班')
            is_violation = True
            
        # 預約值班前後兩天無法值班，避免 qd，並 update 新表，以利接下來亂數產生
        # 4 to len(df.index) 會指到最後一項列
        # 5 to len(df.columns)-1 會指到最後一欄
        # 處理第一欄
        for i in range(4,len(df.index)):
            if df.iloc[i,5] == 1:
                df.iloc[i,6]='x'
        # 處理中間欄
        for i in range(4,len(df.index)):
            for j in range(6,len(df.columns)-1):
                if df.iloc[i,j]==1:
                    df.iloc[i,(j+1)]='x'
                    df.iloc[i,(j-1)]='x'
        # 處理最後一欄
        for i in range(4,len(df.index)):
            if df.iloc[i,len(df.columns)-1] == 1:
                df.iloc[i,len(df.columns)-2]='x'
        
            
        # 檢查是否有某日所有人都無法值班
        # iterate from '1' to 'end'
        for i in range(1,days_in_month+1):
            # any repeated reservation 
            # 已經在 data_cleasing 中間將 大寫轉為小寫了
            # 符合的 duty type 中 5至end處的值，有多少x or X
            # 使用 map 
            #lower_text = lambda item: str(item).lower()
            # num_of_exclude = (df[df['Type']==duty_type].iloc[:,5:][str(i)].map(lower_text)=='x').sum()
            num_of_exclude = (df[df['Type']==duty_type].iloc[:,5:][str(i)]=='x').sum()

            # 該班 R 人數
            num_of_r = len(df[df['Type']==duty_type].index)
            if num_of_exclude >= num_of_r:
                print(f'{TYPES_OF_DUTY[duty_type]} 班{i}號所有人均無法值班')
                is_violation = True       
        
        # 檢查是否有某日有兩個以上的人預約要值班
        # type 1 duty 同時兩個人值班
        if duty_type==1:
            # iterate from '1' to 'end'
            for i in range(1,days_in_month+1):
                # any repeated reservation 
                # 符合的 duty type 中 5至end處的值，是1的有多少個
                num_of_reservation = (df[df['Type']==duty_type].iloc[:,5:][str(i)]==1).sum()
                if num_of_reservation>2:
                    print(f'{TYPES_OF_DUTY[duty_type]} 班{i}號有超過2人預約要值班')
                    is_violation = True
        else:
            # iterate from '1' to 'end'
            for i in range(1,days_in_month+1):
                # any repeated reservation 
                # 符合的 duty type 中 5至end處的值，是1的有多少個
                num_of_reservation = (df[df['Type']==duty_type].iloc[:,5:][str(i)]==1).sum()
                if num_of_reservation>1:
                    print(f'{TYPES_OF_DUTY[duty_type]} 班{i}號有超過1人預約要值班')
                    is_violation = True
                
    if is_violation == False:
        print('檢查輸入資料...OK')
    else: 
        print('請修正以上資料後再執行程式')
        
    return is_violation, df



In [8]:

violation, df_updated = is_violation(df,type_to_generate)

檢查輸入資料...
檢查輸入資料...OK


### Construct the list by condition

In [7]:
# preliminary_list = preliminary_gen(df_updated, 3)
start = time.time()
preliminary_list = preliminary_gen2(df_updated, 3, 40000)
print(time.time()-start)

正在建立 type[4,CR] 初步清單...
type [4,CR] 初步清單已建立完成


### finding the best solution

In [3]:
def preliminary_gen2(df_updated, duty_type, count_start):
    """
    input: df_updated or df
    generate: preliminary_list

    """
    # TYPES_OF_DUTY = {1:'CT/MR', 3:'ER', 4:'CR', 5:'VS', 6:'Other', 7:'Other', 8:'Other', 9:'Other', 0:'Test'}

    
    print(f'正在建立 [type{duty_type},{TYPES_OF_DUTY[duty_type]}班] 初步清單...')

    IS_HOLIDAY = df_updated.iloc[2,5:].tolist()  # list of holiday 'v' [nan, 'v'...]
    DAYS = len(IS_HOLIDAY) # 這個月有幾天, eg 30
    DAY_LIST = [str(i+1) for i in range(DAYS)] # 這個月的號碼 eg ['1'...'28']
    df_work = df_updated[df['Type']==duty_type]

    # 建立 int day, str day 對照表
    # combinding 2 dictionaries: z = {**x, **y}, {1:'1', '1':1}
    DAY_TABLE = {**{(i+1):str(i+1) for i in range(DAYS)}, **{str(i+1):(i+1) for i in range(DAYS)}}

    # 在 duty_type 下，的住院醫師的 code
    CODE_LIST = df_work['Code'].tolist() # code list, ['31','32']
    num_holiday = {}
    num_weekday = {}
    available_code ={}  # dictionary

    # how many holiday/weekday for each resident
    for code in CODE_LIST:
        num_holiday[code] = df_work[df_work['Code']==code]['Holiday'].item()
        num_weekday[code] = df_work[df_work['Code']==code]['Weekday'].item()

    # construct available days:
    # process 預約不值班
    # note: 預約值班的前後已在 is_violation 裡面標記 'x'，所以這裡就可以直接從 available code list 裡面去掉該員，不會遺漏
    for day in DAY_LIST:
        available_code[day]= CODE_LIST[:]  # 一定要使用完整拷貝，不然會變成參照，後面會全部都錯誤
        for code in CODE_LIST:
            if df_work[df_work['Code']==code][day].item()=='x':  # 如果預約不值班 == 'x'，則從 available 中移除
                available_code[day].remove(code)

    # process 預約值班
    # 一天1人值班：
    if duty_type !=1:

        for day in DAY_LIST:
            for code in CODE_LIST:
                if df_work[df_work['Code']==code][day].item()==1: # 如果預約值班，則移除其他
                    available_code[day]=[code]
    else:
    # type1 duty, 一天2人值班，建立 reservation_dict 讓之後程式抓取：
        reservation_dict = {day:[] for day in DAY_LIST}  
        # reference for reservation numbers in the date
        #'3':2 -> 2 people want duty at 3rd, already cleanse condition>2
        for day in DAY_LIST:
            # reservation_dict[day] = 0  # assign value, 
            for code in CODE_LIST:
                if df_work[df_work['Code']==code][day].item()==1: # 如果預約值班，則增加到 reservation dict
                    reservation_dict[day].append(code)
                
    # convert to set if type1, it's faster
    #if duty_type ==1:
    #    for day in DAY_LIST:
    #        available_code[day] = set(available_code[day])

    # 產生 count_start個 符合所有排班規則的 candidate
    preliminary_list = []

    #count_start = 50000
    count = count_start
    
    # for progress bar
    total_step = 12  # set 12 intervals
    interval = int(count_start/total_step) 
    progress = [i*interval for i in range(1,total_step+1)]


    while count >0:  # generate till count = count_start candidates
        #progress bar
        if progress !=[]:
            if (count_start-count-1)>progress[0]:
                del progress[0]
                prefix = '='*(total_step-len(progress)-1) + '>'
                prefix = "{:-<12}".format(prefix)
                print("{s} {r:0.1%}".format(s=prefix,r=(1-count/count_start)))

        candidate_list = []
        available_code_gen = copy.deepcopy(available_code)  # not alter original list

        for day in DAY_LIST:
            day_next = str(int(day)+1)  # next day in string
            # type 1 duty
            if duty_type == 1:
                # type 1 要 check reservation dict
                if len(available_code_gen[day]) <2 : # not enough item can be choosed
                    break
                else:
                    if len(reservation_dict[day]) ==0:
                        # 如果沒有人預約值班，那亂數選兩個人
                        add_item = random.sample(available_code_gen[day],2)  # sample 2 in type 1 duty
                    elif len(reservation_dict[day]) == 1: 
                        # 如果只有一人預約值班，那先選他，從available code list 中移除掉，之後再亂數
                        add_item = reservation_dict[day] # 先指定 eg. ['31']
                        available_code_gen[day].remove(add_item[0]) # remove the first item
                        add_item.append(random.choice(available_code_gen[day])) # 一開始有 check len>=2
                    elif len(reservation_dict[day]) == 2:  
                        # 如果兩人預約這天值班，那就都給他們
                        add_item = reservation_dict[day] # add item 即是這兩個
                    candidate_list.append(add_item)  # add ['X','Y'] to candidiate list   
                        
                    if (DAY_TABLE[day]+1)> DAYS:
                        # 到最後一天的話，就不用移除了
                        break
                    else:
                        # 如果不是最後一天，則依序移除
                        for itm in add_item:
                            if itm in available_code_gen[day_next]:
                                available_code_gen[day_next].remove(itm)

            else:
                # other types ofduty
                if available_code_gen[day] == []: # no item can be choosed
                    break
                else:
                    add_item = random.choice(available_code_gen[day])   # choice is faster than sample 1[0]
                    candidate_list.append(add_item)
                    if (DAY_TABLE[day]+1)> DAYS:
                        break
                    else:
                        if add_item in available_code_gen[day_next]:
                            available_code_gen[day_next].remove(add_item)
 
        if len(candidate_list)==DAYS:  # 其實不需要這句，因為都篩選到了最後一天，但速度幾乎無差別
            preliminary_list.append(candidate_list)
            #print(candidate_list)
            count-=1 

    print(f'[type{duty_type},{TYPES_OF_DUTY[duty_type]}班] 初步清單已建立完成')
    return preliminary_list
    # create a list of all candidates, return preliminary_list




### optimizing the list

In [10]:
# load file
df = pd.read_excel('test.xlsm')

TYPES_OF_DUTY = {0:'Test', 1:'CT/MR', 3:'ER', 4:'CR', 5:'VS', 
                 6:'Other', 7:'Other', 8:'Other', 9:'Other'}  # no type 2

df, type_to_generate = data_cleansing(df)
violation, df_updated = is_violation(df,type_to_generate)

正在尋找 type[4,CR] 最佳排班...
已完成 type[4,CR] 最佳排班排序


In [None]:
preliminary_list = preliminary_gen2(df_updated, 1, 20000)

In [None]:
# 8236 
# in method 17.297198057174683
# == method 17.78391122817993  8236
start = time.time()
list_location_std_sorted = optimization2(preliminary_list, df_updated, 1)
print(time.time()-start)

In [11]:
for item in optimized_list:
    print(item)

[['A', 'C', 'B', 'D', 'C', 'A', 'C', 'B', 'A', 'C', 'B', 'D', 'C', 'A', 'B', 'D', 'A', 'B', 'D', 'C', 'B', 'D', 'A', 'B', 'C', 'D', 'A', 'B', 'C', 'D'], 14865, 0.4977712860815109]


In [5]:
def optimization2(preliminary_list, df_updated, duty_type):
    """
    input: preliminary_list, updated df, what type of duty
    output: list_location_std_sorted, after optimization
    """
    # Optimizing the list
    # 1. minimize the total days of QOD in everyone (如果只選標準差多少人不夠)
    # find min() of days  -> 這幾乎是最好的了，因為幾乎<3，所以 2 不需要

    # 2. minimize standard deviation of days of QOD among others
    #np.array([2,2,2,1]).std(ddof=0)
    # 計算個人值班分散程度（標準差） 的標準差，依照順序排列 （大家分散程度要差不多）
    # 在這個情況下，不可能大家同時標準差都很高，導致標準差的標準差值很小


    #TYPES_OF_DUTY = {1:'CT/MR', 3:'ER', 4:'CR', 5:'VS', 6:'Other', 7:'Other', 8:'Other', 9:'Other', 0:'Test'}
    DAYS = len(preliminary_list[0])  # 這個月有幾天
    df_work = df_updated[df['Type']==duty_type]
    CODE_LIST = df_work['Code'].tolist() # code list, ['31','32']
    
    
    print(f'正在尋找 [type{duty_type},{TYPES_OF_DUTY[duty_type]}班] 最佳排班...')
    
    if duty_type==1:
        num_of_qod_dict = {code:0 for code in CODE_LIST}
        qod = []  # 各組每天的 qod 情況
        #days_of_duty = {code:[] for code in CODE_LIST}
        #for code in CODE_LIST
        for n in range(len(preliminary_list)):
            day_count = 0
            for i in range(DAYS-2):
                #possible_qod = [] # to store possible qod in
                
                for item in preliminary_list[n][i]:  # iterate through duty code in a day
                    if item in preliminary_list[n][i+2]: # if qod happens
                        day_count+=1
                        #possible_qod.append(item)  # add to possible_qod       
            qod.append(day_count)
        # 尋找最少 qod 的組合
        min_qod = min(qod)
        min_qod_index = [index for index,value in enumerate(qod) if value==min_qod]
    
    else:
        qod = []  # 各組 qod 的情況
        for n in range(len(preliminary_list)):
            # search for qod (value of location i == value of location i+2)
            list_temp = [preliminary_list[n][i] for i in range(DAYS-2) if preliminary_list[n][i]==preliminary_list[n][i+2]] 
            qod.append(list_temp)
        qod_pd = pd.DataFrame(qod)

        # 1. 找到 QOD 人次最少的組合
        # num of qods in each candidate
        num_of_qod = []
        for i in range(len(preliminary_list)):
            num_of_qod.append(qod_pd.iloc[i].notnull().sum())  # 非0個數 = qod 個數
        min_qod = min(num_of_qod)
        # create index of candidates with minimal qod days in total
        #eg [7376, 11732, 15383, 18130, 20990, 28528, 28785]
        min_qod_index = [index for index,value in enumerate(num_of_qod) if value==min_qod]
        
        
    # 2. 每個人分布的標準差 之間的標準差 最小化，取三個
    min_qod_list = []  
    for index in min_qod_index:
        min_qod_list.append([index, preliminary_list[index]])  # store index, list with the minimal qods
    
    # 由 CODE_LIST 內容依序提出資訊
    list_location_std = []
    for i in range(len(min_qod_list)):  # how many items
        list_location = []
        std_value = []
        for code in CODE_LIST:
            list_location = [location for location,item in enumerate(min_qod_list[i][1]) if code in item]
            # min_qod_list = index, list with minimal qod value
            std_value.append(np.std(list_location,ddof=0))  # 填入每個人的 std value, to a list
        list_location_std.append([min_qod_list[i][1],
                                 min_qod_index[i],
                                 np.std(std_value, ddof=0)]) # form a list, of [list, location, std value]

    # 根據 std value (list_location_std[2]) 來排序
    # sorted_a = sorted(a, key=lambda x: x[1])
    list_location_std_sorted = sorted(list_location_std, key=lambda x:x[2]) 
    # list_location_std[2] is the std value

    
    # 最多取三個
    if len(list_location_std_sorted)>3:
        list_location_std_sorted=list_location_std_sorted[0:3]

    print(f'已完成 [type{duty_type},{TYPES_OF_DUTY[duty_type]}班] 最佳排班排序')
    
    return list_location_std_sorted


