In [137]:
import numpy as np
import cvxpy as cp
import numpy as np
import pandas as pd
import time
from scipy.optimize import minimize
import random

# 行動分類のラベル
large = ["必需行動", "拘束行動", "自由行動", "その他"] # 大分類
 
middle = ["睡眠", "食事", "身のまわりの用事", "療養・静養", "仕事", "仕事関連", "学業", "学校外の学習", 
  "家事", "子どもの世話", "家庭雑事", "通勤", "通学", "社会参加", "会話・交際", "スポーツ", 
  "行楽・散策", "趣味・娯楽・教養 (インターネット除く)", "趣味・娯楽・教養のインターネット (動画除く)", 
  "インターネット動画", "テレビ", "録画番組・DVD", "ラジオ", "新聞", "雑誌・マンガ・本", "音楽", "休息", "その他", "不明"]
 # 中分類
 
small = ["睡眠", "食事", "身のまわりの用事", "療養・静養", "仕事", "仕事のつきあい", "授業", "学校外の学習", 
  "炊事・掃除・洗濯", "買い物", "子どもの世話", "家庭雑事", "通勤", "通学", "社会参加", "会話・交際", 
  "スポーツ", "行楽・散策", "趣味・娯楽・教養 (インターネット除く)", "趣味・娯楽・教養のインターネット (動画除く)", 
  "インターネット動画", "テレビ", "録画番組・DVD", "ラジオ", "新聞", "雑誌・マンガ・本", "音楽", "休息", "その他", "不明"]
 # 小分類

def calc_list_diff(list1:list, list2:list):
    # リストの長さが異なる場合のエラーチェック
    if len(list1) != len(list2):
        raise ValueError("リストの長さが異なります。")
    
    # 各要素の差分を計算して新しいリストを生成
    difference = [a - b for a, b in zip(list1, list2)]
    return difference

# not P_iの計算
def get_P_i(Initial_P_i,Transition_rates,Daily_table):
    # 初期の行為者率ベクトル（例としてランダムに設定）
    initial_rate = np.array(Initial_P_i)

    # 正規化
    initial_rate = initial_rate / np.sum(initial_rate)

    P_i_list = []
    for i in range(len(Daily_table)):

        # 行列の加工
        n = len(initial_rate)
        one_vector = np.ones(n)
        modified_transition_rates = []

        for A in Transition_rates:
            A_i = A.copy()
            A_i[i, :] = 0  # 行 i を全て 0 に置き換える
            modified_transition_rates.append(A_i)

        # 初期行為者率ベクトルの加工
        modified_initial_rate = initial_rate.copy()
        modified_initial_rate[i] = 0

        # 最終的な行為者率ベクトルの計算
        final_rate = modified_initial_rate

        for A_i in modified_transition_rates:
            final_rate = A_i @ final_rate

        # 行為 i を一度も行わない確率の計算
        P_not_i = one_vector @ final_rate

        P_i = 1-P_not_i

        P_i_list.append(P_i)

    return P_i_list

# 遷移確率の計算
def calc_transition_rates(no_of_elem:int,beta_i:np.ndarray,time_list:list,time_table:pd.DataFrame):

    transition_rates = []
    for i in range(len(time_list)):
        time_t      = time_list[i]
        time_t1     = time_list[(i+1)%24]
        y_t     = np.array(time_table.query("Time == @time_t")["Rate"])
        y_t1    = np.array(time_table.query("Time == @time_t1")["Rate"])

        # 変数の定義
        a = cp.Variable((no_of_elem, no_of_elem), nonneg=True)  # 非負制約を持つ変数行列

        # パラメータの設定
        delta = np.eye(no_of_elem)  # クロネッカーのデルタ

        # 目的関数の定義（要素ごとの積を使用）
        objective = cp.Minimize(cp.sum(cp.multiply(1 - beta_i[:, None] * delta, cp.square(a))))

        # 制約条件の定義
        constraints = [
            cp.sum(a, axis=0) == 1,  # 制約 (6b)
            a >= 0,  # 制約 (6c)
        ]

        # 正規化
        y_t = y_t / np.sum(y_t)
        y_t1 = y_t1 / np.sum(y_t1)

        # 制約 (6d)
        constraints.append(y_t1 - a @ y_t == 0)

        # 問題の定義と解法
        problem = cp.Problem(objective, constraints)
        problem.solve()

        # 結果の格納
        transition_rates.append(a.value)

    return transition_rates

def roulette_wheel_selection(weights):
    """
    Performs roulette wheel selection on a list of weights.
    
    Args:
    weights (list of float): The weights or probabilities for each item.
    
    Returns:
    int: The index of the selected item.
    """
    # Calculate the cumulative sum of weights
    cumulative_sum = [sum(weights[:i+1]) for i in range(len(weights))]
    total_sum = cumulative_sum[-1]
    
    # Generate a random number in the range [0, total_sum)
    random_num = random.uniform(0, total_sum)
    
    # Find the index where the random number would fit in the cumulative sum
    for i, cum_sum in enumerate(cumulative_sum):
        if random_num < cum_sum:
            return i

In [138]:
"""国民生活時間調査のデータからの成型"""
# daily
daily_df = pd.read_csv('2020_4shihyo_all.csv')
daily_df = daily_df.set_axis(["Day","Group","Activity","Rate","AvgTimeAct","AvgTimeAll","StdDevAll"],axis=1)
daily_df["Rate"] = daily_df["Rate"]/100
# daily_table = daily_df.query("Day == '平日' and Group == '男１０代'").query(f"Activity in @middle").filter(items=["Activity","Rate"])
daily_table = daily_df.query("Day == '平日'").query(f"Activity in @middle").filter(items=["Activity","Rate"])
# time
time_df = pd.read_csv('2020_jikoku_all.csv')
time_df = time_df.set_axis(["Day","Group","Activity","Time","Rate"],axis=1)
time_df["Rate"] = time_df["Rate"]/100
# time_table = time_df.query("Day == '平日' and Group == '男１０代'").query(f"Activity in @middle").filter(items=["Activity","Time","Rate"])
time_table = time_df.query("Day == '平日'").query(f"Activity in @middle").filter(items=["Activity","Time","Rate"])
# get time list
time_list = list(time_table["Time"].unique())

orig_P_i = daily_table["Rate"].to_list() # オリジナルの一日の行為者率

In [139]:
"""SA法によるβのパラメータフィッティング(事前知識なし)"""
def objective(N,Beta,Time_list,Time_table,Daily_table,Orig_P_i):
    transition_rates = calc_transition_rates(N,Beta,Time_list,Time_table)
    initial_P_i = time_table.query("Time == '0:00'")["Rate"].to_list()
    calclated_P_i = get_P_i(initial_P_i,transition_rates,Daily_table)
    diff_2 = sum([e**2 for e in calc_list_diff(Orig_P_i,calclated_P_i)])
    return diff_2

def simulated_annealing(objective, bounds, n_iterations, step_size, temp, actN, time_list, time_table, daily_table,orig_P_i):
    # 初期解を生成
    initial_beta = np.random.uniform(bounds[:, 0], bounds[:, 1])
    # 初期評価値を計算
    best_eval = objective(actN,initial_beta,time_list,time_table,daily_table,orig_P_i)
    curr, curr_eval = initial_beta, best_eval
    
    for i in range(n_iterations):
        # 新しい候補解を生成
        candidate = curr + np.random.uniform(-step_size, step_size, len(bounds))
        # 境界チェック
        candidate = np.clip(candidate, bounds[:, 0], bounds[:, 1])
        # 新しい評価値を計算
        candidate_eval = objective(actN,candidate,time_list,time_table,daily_table,orig_P_i)
        # 温度の減衰
        t = temp / float(i + 1)
        # 新しい解を受け入れるかの確率を計算
        if candidate_eval < curr_eval or np.exp((curr_eval - candidate_eval) / t) > np.random.rand():
            curr, curr_eval = candidate, candidate_eval
        # ベスト解を更新
        if candidate_eval < best_eval:
            best, best_eval = candidate, candidate_eval
            print(f"Iteration {i+1}, Best Evaluation: {best_eval}")

    return best, best_eval

# パラメータの設定
act_n = len(daily_table) # 行為の分類数
print("number of acitivities is "+str(act_n))

bounds = np.array([[0.0,1.0] for i in range(act_n)])
n_iterations = 1000
step_size = 0.1
temp = 100.0

# 最適化の実行
best_solution, best_evaluation = simulated_annealing(objective, bounds, n_iterations, step_size, temp, act_n, time_list, time_table, daily_table, orig_P_i)

print(f'Best Solution: {best_solution}')
print(f'Best Evaluation: {best_evaluation}')

number of acitivities is 24
Iteration 12, Best Evaluation: 0.7288613690623583
Iteration 25, Best Evaluation: 0.6794394021624456
Iteration 31, Best Evaluation: 0.6653627014580581
Iteration 34, Best Evaluation: 0.6324872194233818
Iteration 35, Best Evaluation: 0.5889857513841759
Iteration 36, Best Evaluation: 0.5352596188998077


KeyboardInterrupt: 

In [140]:
"""SA法によるβのパラメータフィッティング(事前知識加味)"""
def objective(N,Beta,Time_list,Time_table,Daily_table,Orig_P_i):
    transition_rates = calc_transition_rates(N,Beta,Time_list,Time_table)
    initial_P_i = time_table.query("Time == '0:00'")["Rate"].to_list()
    calclated_P_i = get_P_i(initial_P_i,transition_rates,Daily_table)
    diff_1 = [e for e in calc_list_diff(Orig_P_i,calclated_P_i)]
    diff_2 = sum([e**2 for e in diff_1])
    return diff_2,diff_1

def simulated_annealing(objective, bounds, n_iterations, step_size, temp, actN, time_list, time_table, daily_table,orig_P_i):
    # 初期解を生成
    initial_beta = np.random.uniform(bounds[:, 0], bounds[:, 1])
    # 初期評価値を計算
    best_eval, best_eval_raw= objective(actN,initial_beta,time_list,time_table,daily_table,orig_P_i)
    curr, curr_eval = initial_beta, best_eval
    
    for i in range(n_iterations):
        # 新しい候補解を生成
        candidate = curr + np.random.uniform(-step_size, step_size, len(bounds))
        """候補を事前知識から補正(βは大きいほど同行為の継続確率が大きくなる)"""
        candidate_updated = []
        for candidate_,eval_ in zip(list(candidate),best_eval_raw):
            new_candidate = 0
            if (eval_ > 0): # オリジナルの行為者率よりも高い場合→遷移しすぎ→βを小さく(遷移のしやすさを抑制)
                new_candidate = abs(candidate_)*(-1)
            elif (eval_ < 0): # オリジナルの行為者率よりも低い場合→遷移しなさすぎ→βを大きく(遷移のしやすさを促進)
                new_candidate = abs(candidate_)   
            else:
                new_candidate = candidate_
            candidate_updated.append(new_candidate)
        candidate = candidate_updated

        # 境界チェック
        candidate = np.clip(candidate, bounds[:, 0], bounds[:, 1])
        # 新しい評価値を計算
        candidate_eval, candidate_eval_raw = objective(actN,candidate,time_list,time_table,daily_table,orig_P_i)
        # 温度の減衰
        t = temp / float(i + 1)
        # 新しい解を受け入れるかの確率を計算
        if candidate_eval < curr_eval or np.exp((curr_eval - candidate_eval) / t) > np.random.rand():
            curr, curr_eval = candidate, candidate_eval
        # ベスト解を更新
        if candidate_eval < best_eval:
            best, best_eval, best_eval_raw = candidate, candidate_eval, candidate_eval_raw
        print(f"Iteration {i+1}, Best Evaluation: {best_eval}")

    return best, best_eval

# パラメータの設定
act_n = len(daily_table) # 行為の分類数
print("number of acitivities is "+str(act_n))

bounds = np.array([[0.0,1.0] for i in range(act_n)])
n_iterations = 500
step_size = 0.1
temp = 100.0

# 最適化の実行
best_solution, best_evaluation = simulated_annealing(objective, bounds, n_iterations, step_size, temp, act_n, time_list, time_table, daily_table, orig_P_i)

print(f'Best Solution: {best_solution}')
print(f'Best Evaluation: {best_evaluation}')

number of acitivities is 24
Iteration 1, Best Evaluation: 0.6268198663524098
Iteration 2, Best Evaluation: 0.6268198663524098
Iteration 3, Best Evaluation: 0.6268198663524098
Iteration 4, Best Evaluation: 0.6268198663524098
Iteration 5, Best Evaluation: 0.6268198663524098
Iteration 6, Best Evaluation: 0.6268198663524098
Iteration 7, Best Evaluation: 0.6268198663524098
Iteration 8, Best Evaluation: 0.6268198663524098
Iteration 9, Best Evaluation: 0.6268198663524098
Iteration 10, Best Evaluation: 0.6268198663524098
Iteration 11, Best Evaluation: 0.6268198663524098
Iteration 12, Best Evaluation: 0.6082690490774157
Iteration 13, Best Evaluation: 0.6082690490774157
Iteration 14, Best Evaluation: 0.583776562166655
Iteration 15, Best Evaluation: 0.5516767990524535
Iteration 16, Best Evaluation: 0.5331316686158137
Iteration 17, Best Evaluation: 0.5331316686158137
Iteration 18, Best Evaluation: 0.5331316686158137
Iteration 19, Best Evaluation: 0.5331316686158137
Iteration 20, Best Evaluation: 0

In [143]:
best_solution

array([0.68923324, 0.        , 0.        , 0.        , 0.56552108,
       0.12056058, 0.63979637, 0.67160974, 0.18092713, 0.28148257,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.40349906, 0.        , 1.        ,
       0.        , 0.15892245, 0.52168882, 0.        ])

In [144]:
"""算出した遷移確率から一個票を生成"""
# 推定したβをもとに遷移確率を算出
Transition_rates = calc_transition_rates(act_n,test,time_list,time_table)
# 初期行動の規定
time_table_0 = time_table.query("Time == '0:00'") # 0:00における行為者率を抽出
action_idx = roulette_wheel_selection(time_table_0["Rate"].to_list()) # その時刻における行為者率を重みとしてルーレット選択
Actions =[] # 選択した行為種別の記録
Actions.append(action_idx)
for t in range(4*24): # 15min間隔で24時間分繰り返す。
    transition_r_t = Transition_rates[t] # 時刻tの遷移確率行列を抽出
    action_idx = roulette_wheel_selection(list(transition_r_t[action_idx,:])) # 遷移確率をもとにt+1における行動種別を決定
    Actions.append(action_idx) # 行動種別履歴に追加
acts = daily_table["Activity"].to_list()
actMaster = dict(zip(list(range(len(acts))),acts))
[actMaster[a] for a in Actions]

['睡眠',
 '子どもの世話',
 '行楽・散策',
 '通学',
 '社会参加',
 'スポーツ',
 '療養・静養',
 '新聞',
 '食事',
 '通学',
 '学校外の学習',
 '行楽・散策',
 '行楽・散策',
 '社会参加',
 '行楽・散策',
 '社会参加',
 'スポーツ',
 '通学',
 '社会参加',
 '通学',
 '社会参加',
 '社会参加',
 '通学',
 '通学',
 '会話・交際',
 'インターネット動画',
 '家庭雑事',
 '行楽・散策',
 '雑誌・マンガ・本',
 '社会参加',
 'インターネット動画',
 '療養・静養',
 '療養・静養',
 '行楽・散策',
 '会話・交際',
 '休息',
 'インターネット動画',
 '社会参加',
 '会話・交際',
 '社会参加',
 '雑誌・マンガ・本',
 '雑誌・マンガ・本',
 '社会参加',
 '雑誌・マンガ・本',
 '学校外の学習',
 '通学',
 'インターネット動画',
 '通学',
 '学校外の学習',
 '新聞',
 '音楽',
 '社会参加',
 '通勤',
 '雑誌・マンガ・本',
 '雑誌・マンガ・本',
 '雑誌・マンガ・本',
 '雑誌・マンガ・本',
 '学校外の学習',
 '音楽',
 '社会参加',
 '音楽',
 '睡眠',
 '仕事関連',
 '社会参加',
 '社会参加',
 '会話・交際',
 '食事',
 '療養・静養',
 'スポーツ',
 '通学',
 'ラジオ',
 'ラジオ',
 'ラジオ',
 '雑誌・マンガ・本',
 '会話・交際',
 'スポーツ',
 '雑誌・マンガ・本',
 '雑誌・マンガ・本',
 '療養・静養',
 '通学',
 '社会参加',
 '睡眠',
 'テレビ',
 '通学',
 '家庭雑事',
 '通学',
 'ラジオ',
 'スポーツ',
 '療養・静養',
 '療養・静養',
 'スポーツ',
 '療養・静養',
 'スポーツ',
 '社会参加',
 'スポーツ',
 '家庭雑事',
 '通学']