## 時間情報のない集団データ

### 方針

・内部に問題ごとの依存関係を定義し、その関係をもとに遷移させる。

・条件付き確率（遷移確率）と周辺分布の積の和から次の周辺分布を求める。

・まずは遷移過程が部分的にしか分からない人工データを用意し、モデルを学習する。

## 人工データ生成

In [1]:
import numpy as np

# 問題の依存関係の行列 A 
A = np.array([
    [0, 0, 0, 0, 0, 0],  # 初期状態
    [1, 0, 0, 0, 0, 0],  # 問題1は初期状態のみに依存
    [1, 0, 0, 0, 0, 0],  # 問題2は問題1に依存
    [0, 1, 1, 0, 0, 0],  # 問題3は問題2に依存
    [0, 0, 0, 1, 0, 0],  # 問題4は問題2、問題3に依存
    [0, 0, 0, 0, 1, 0]   # 問題4は問題2、問題3に依存
], dtype = float)*3

# 0でない要素の数で割る処理
nonzero_counts = np.count_nonzero(A[1:], axis=1, keepdims=True)
A[1:] = np.where(nonzero_counts != 0, A[1:] / nonzero_counts, 0)

# 遷移確率を計算する関数
def calculate_transition_probabilities(A, X):
    n = len(X)
    raw_probabilities = np.zeros(n)  # 遷移確率の元となる値
    
    # 不正解の問題に対して遷移確率を計算
    for i in range(n):
        if X[i] == 0:  # まだ正解していない問題のみ対象
            required_problems = A[i, :]  # i番目の問題に必要な依存関係
            
            solved_problems = X * required_problems  # 現状解けている
            
            num_solved = np.sum(solved_problems)      # 実際に解けた問題の数
            
            raw_probabilities[i] = np.exp(num_solved)
    
    # 総和で割って正規化
    total_sum = np.sum(raw_probabilities)  # expの総和
    if total_sum > 0:  # 総和が0でなければ正規化
        probabilities = raw_probabilities / total_sum
    else:
        probabilities = raw_probabilities  # 総和が0ならそのまま
    
    return probabilities

In [2]:
# 教師データセットを生成する関数
def generate_training_data(A, initial_X, num_correct_problems, num_data_per_step):
    n = len(initial_X)  # 問題数
    dataset = []
    
    # 各ステップでデータを生成
    for i in range(num_correct_problems, num_correct_problems+1):  # 1問以上の正解を対象にする
        for j in range(num_data_per_step):  # 各ステップごとにデータ数
            X = initial_X.copy()  # 初期状態からスタート
            
            # i問正解させる
            for k in range(i):
                input_X = X.copy()    # 遷移前状態を保持
                probabilities = calculate_transition_probabilities(A, X)
                
                if np.sum(probabilities) > 0:  # 正規化された確率がある場合
                    # 確率に基づいて次に正解させる問題を選択
                    next_correct_problem = np.random.choice(n, p=probabilities)
                    X[next_correct_problem] = 1  # 選ばれた問題を正解に遷移させる
            
                # 初期状態と1ステップ後の状態の差分を教師データとして使用
                target_Y = (X - input_X).clip(min=0)  # 0から1に変わった部分のみを1、他は0
                
                # 初期状態（入力）と差分（教師データ）のペアを保存
                dataset.append((input_X.copy(), target_Y.copy()))  # (入力データ, 教師データ)
    
    return dataset


In [3]:
import torch

import itertools
from collections import defaultdict

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np


In [4]:
def _bcsoftmax1d(x, budget):
    """Budget Constrained Softmax function for vector.

    Args:
        x (Tensor): input vector. shape: (n_outputs, )
        budget (Tensor): budget (constraint) vector. shape: (n_outputs, )

    Returns:
        y (Tensor): output probability vector. shape: (n_outputs, ). Satisfying the constraints y_i <= budget_i.
    
    """
    x = x - torch.max(x, dim=0)[0] # normalization to avoid numerical errors
    exp_x = torch.exp(x)
    # sorting
    _, indices = torch.sort(budget / exp_x, descending=False)
    exp_x = exp_x[indices]
    budget = budget[indices]
    # find K_B
    r = torch.flip(torch.cumsum(torch.flip(exp_x, dims=(0, )), dim=0), dims=(0, ))
    s = 1.0 - (torch.cumsum(budget, dim=0) - budget)
    z = r / s
    is_in_KB = torch.logical_and(
        (s - budget) > 0, exp_x / z > budget
    )
    # compute outputs
    s = 1 - torch.sum(budget * is_in_KB)
    r = torch.sum(exp_x * (~is_in_KB))
    y = torch.where(~is_in_KB, s * exp_x / r, budget)
    # undo sorting
    _, inv_indices = torch.sort(indices, descending=False)
    return y[inv_indices]


def _bcsoftmax1d_stable(x, budget):
    """Budget Constrained Softmax function for vector.
    This function is more numerically stable than `_bcsoftmax1d` by computing some values in log-scale.
    
    Args:
        x (Tensor): input vector. shape: (n_outputs, )
        budget (Tensor): budget (constraint) vector. shape: (n_outputs, )

    Returns:
        y (Tensor): output probability vector. shape: (n_outputs, ). Satisfying the constraints y_i <= budget_i.
    
    """
    # sorting
    _, indices = torch.sort(torch.log(budget) - x, descending=False)
    x = x[indices]
    budget = budget[indices]
    # find K_B
    log_r = torch.flip(torch.logcumsumexp(torch.flip(x, dims=(0, )), dim=0), dims=(0, ))
    s = 1.0 - (torch.cumsum(budget, dim=0) - budget)
    is_in_KB = torch.logical_or(
        budget == 0,
        torch.logical_and(
            s - budget > 0,
            x - log_r + torch.log(s) > torch.log(budget)
        )
    )
    # compute outputs
    exp_x = torch.exp(x - torch.max(torch.where(~is_in_KB, x, -torch.inf), dim=0)[0])
    s = 1 - torch.sum(budget * is_in_KB)
    r = torch.sum(exp_x * (~is_in_KB))
    y = torch.where(~is_in_KB, s * exp_x / r, budget)
    # undo sorting
    _, inv_indices = torch.sort(indices, descending=False)
    return y[inv_indices]


class BCSoftmax1d(torch.autograd.Function):
    """Autograd implementation of Budget Constrained Softmax function for vector.
    """
    generate_vmap_rule = True
    
    @staticmethod
    def forward(x, c):
        y = _bcsoftmax1d_stable(x, c)
        return y

    @staticmethod
    def setup_context(ctx, inputs, output):
        x, c = inputs
        is_in_KB = c == output
        ctx.save_for_backward(x, c, is_in_KB)
    
    @staticmethod
    def backward(ctx, grad_y):
        x, c, is_in_KB = ctx.saved_tensors
        exp_x = torch.exp(
            x - torch.max(torch.where(~is_in_KB, x, -torch.inf), dim=0)[0]
        )
        s = 1 - torch.sum(c * is_in_KB)
        r = torch.sum(exp_x * (~is_in_KB))
        
        # compute Jacobian
        Jx = torch.where(
            torch.outer(~is_in_KB, ~is_in_KB),
            torch.diag(~is_in_KB * exp_x) * r - torch.outer(exp_x, exp_x),
            0,
        )
        Jx *= torch.where(
            s > 0,
            s / (r * r),
            0
        )
        Jc = torch.where(
            torch.outer(~is_in_KB, is_in_KB),
            - exp_x[:, None] / r,
            1.0 * torch.diag(is_in_KB)
        )

        # print("s", s, "r", r)
        # print("勾配", torch.matmul(grad_y, Jx), torch.matmul(grad_y, Jc))
        assert not torch.isnan(torch.matmul(grad_y, Jx)).any(), "Jx contains NaN"
        assert not torch.isinf(torch.matmul(grad_y, Jx)).any(), "Jx contains Inf"
        assert not torch.isnan(torch.matmul(grad_y, Jc)).any(), "Jc contains NaN"
        assert not torch.isinf(torch.matmul(grad_y, Jc)).any(), "Jc contains Inf"

        # return vector-Jacobian product
        return torch.matmul(grad_y, Jx), torch.matmul(grad_y, Jc)

In [5]:
######### Use these functions! #########
bcsoftmax1d = BCSoftmax1d.apply

# データによってmodelを通す回数が違
# bcsoftmax2d = torch.vmap(BCSoftmax1d.apply) # input shape = (batch_size, n_classes)

In [6]:
class Model(nn.Module):
    def __init__(self, num_questions):
        super(Model, self).__init__()
        self.fc = nn.Linear(num_questions, num_questions, bias=False)  # 全結合層

    def forward(self, x, c):
        x = self.fc(x)  # 全結合層の適用
        x = bcsoftmax1d(x, c)
        return x

In [7]:
"評価指標"

def kl_divergence(p, q, epsilon=1e-10):
    """
    KLダイバージェンスを計算する関数（qが0の場合の無限大問題を回避）
    
    Parameters:
        p (numpy.ndarray): 真の確率分布
        q (numpy.ndarray): 予測確率分布
        epsilon (float): スムージングパラメータ（非常に小さい値）
    
    Returns:
        float: KLダイバージェンス
    """
    p = np.array(p, dtype=np.float64)
    q = np.array(q, dtype=np.float64)
    
    # pとqにスムージングを適用
    p = np.where(p == 0, epsilon, p)
    q = np.where(q == 0, epsilon, q)
    
    # KLダイバージェンスの計算
    return np.sum(p * np.log(p / q))

def hellinger_distance(p, q):
    """
    Hellinger距離を計算する関数

    Parameters:
        p (numpy.ndarray): 真の分布 [batch_size, num_classes]
        q (numpy.ndarray): 予測分布 [batch_size, num_classes]
    
    Returns:
        float: Hellinger距離の平均値
    """
    p = np.array(p, dtype=np.float64)
    q = np.array(q, dtype=np.float64)
    # Hellinger距離の計算式: H(p, q) = (1/√2) * ||√p - √q||_2
    sqrt_p = np.sqrt(p)
    sqrt_q = np.sqrt(q)
    distance = np.sqrt(np.sum((sqrt_p - sqrt_q) ** 2)) / np.sqrt(2)
    return np.mean(distance)

def jensen_shannon_divergence(p, q):
    """
    Jensen-Shannon divergence を計算する関数

    Parameters:
        p (numpy.ndarray): 真の分布 [batch_size, num_classes]
        q (numpy.ndarray): 予測分布 [batch_size, num_classes]
    
    Returns:
        float: Jensen-Shannon divergence の平均値
    """
    p = np.array(p, dtype=np.float64)
    q = np.array(q, dtype=np.float64)

    # 0 の値があると log(0) が発生するため、微小値を加える
    epsilon = 1e-12
    p = np.clip(p, epsilon, 1)
    q = np.clip(q, epsilon, 1)
    
    # 分布の平均 M
    m = 0.5 * (p + q)
    
    # Kullback-Leibler divergence の補助関数
    def kl(a, b):
        return np.sum(a * np.log(a / b))
    
    # JSD の計算: JSD(p || q) = 0.5 * (KL(p || M) + KL(q || M))
    jsd = 0.5 * (kl(p, m) + kl(q, m))
    
    # 各サンプルの平均を返す
    return np.mean(jsd)


def evaluate_model(r, p, q):
    """
    遷移確率の真値と予測値の類似性を評価する関数
    :param r: 各ノードの重み (np.array)
    :param p: 各ノードの真の遷移確率 (2D np.array: ノード数 x 遷移確率)
    :param q: 各ノードの予測遷移確率 (2D np.array: ノード数 x 遷移確率)
    :return: 評価指標 L
    """
    L = 0
    for k in range(len(r)):
        L += r[k] * kl_divergence(p[k], q[k])
    return L

In [8]:
"実験結果を保存する外部ファイル"
import csv

# 評価結果を保存するCSVファイル名
results_csv = "results_allpath_same_n_3000.csv"

# 初回にヘッダーを追加する
header = [
    "Iteration", 
    "KL_Model_allpath_same_n",
    "HD_Model_allpath_same_n",
    "JSD_Model_allpath_same_n",
]

try:
    with open(results_csv, "x", newline="") as file:
        writer = csv.writer(file)
        writer.writerow(header)
except FileExistsError:
    pass  # ファイルが既に存在する場合は何もしない

In [9]:

# データセットの生成
num_questions = 5  # 問題数
num_data_per_step = 20     # 各ステップごとに生成するデータ数

num_epochs = 3000  # エポック数
alpha = 0.001 # 正則化パラメータ

for trial in range(20):
    print(f"Trial {trial + 1}")

    "データ生成"

    # 生徒の回答状況 X (1が正解、0が不正解)
    # 初期状態は全て不正解
    X_init = np.array([1, 0, 0, 0, 0, 0])

    training_data = generate_training_data(A, X_init, num_questions, num_data_per_step)
    train_X = [input_data for input_data, _ in training_data]
    train_Y = [target_data for _, target_data in training_data]

    # PyTorch テンソルに変換
    train_X = torch.tensor(train_X, dtype=torch.float32)
    train_Y = torch.tensor(train_Y, dtype=torch.float32)


    "状態定義"

    # Generate states where the first digit is always 1
    states = [(1,) + state for state in itertools.product([0, 1], repeat=num_questions)]

    states = sorted(states, key=lambda state: sum(state))

    # Initialize the state counts
    state_counts = defaultdict(int)

    # Assuming 'dataset' is your list of student results
    for result, result2 in training_data:
        # print(result2)
        state_tuple = tuple(map(int, result + result2))  # Convert np.int64 to int
        state_counts[state_tuple] += 1  # Count only if the first digit is 1

    # Display the counts for each state
    for state in states:
        count = state_counts[state]
        formatted_state = list(state)  # Convert tuple to list for the desired format

    
    "モデル定義"
    # モデル、損失関数、最適化関数の設定
    model = Model(num_questions+1)  # 5問+初期状態の問題を扱うモデル
    criterion = nn.CrossEntropyLoss()  # クロスエントロピー損失
    optimizer = optim.Adam(model.parameters(), lr=0.01)


    "学習"
    for epoch in range(num_epochs):
        model.train()  # モデルを訓練モードに
        optimizer.zero_grad()  # 勾配の初期化
        
        outputs = []  # 出力を保持するリスト
        # 各データに対して train_Y の値に基づいてループを実行
        for i, target in enumerate(train_Y):
            c = torch.ones(num_questions+1, dtype=torch.float32)
            output = train_X[i]  # 各 i 番目の入力データを使用
            output = output.view(-1)

            # もしcの和が1なら、rが0となるので対応
            if c.sum() <= 1:
                # print("aaaaaaaa")
                output = c
            else:
                output = model(output, c)  # 前回の出力を次のステップの入力として使用

            outputs.append(output)  # 最終的な出力を保存

        # outputs を適切な形に変換して損失計算（例えば torch.stack を使用）
        outputs = torch.stack(outputs)
        outputs = outputs.squeeze(1)

        # モデルの出力の確認
        assert not torch.isnan(outputs).any(), "Model output contains NaN"
        assert not torch.isinf(outputs).any(), "Model output contains Inf"

        # L1正則化項の計算
        l1_reg = torch.tensor(0.0, requires_grad=True)
        for param in model.parameters():
            l1_reg = l1_reg + torch.sum(torch.abs(param))

        # 損失の計算
        loss0 = criterion(outputs, train_Y)

        loss = loss0 + alpha * l1_reg  # L1正則化項を追加した損失
        
        # 損失の確認
        assert not torch.isnan(loss).any(), "Loss contains NaN"
        assert not torch.isinf(loss).any(), "Loss contains Inf"
        
        # バックプロパゲーションとパラメータの更新
        loss.backward()
        optimizer.step()
        
        # 100エポックごとに損失を表示
        if (epoch+1) % 100 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

    # 学習結果の確認
    print("Training complete!")

    "モデル評価"
    node_probabilities = defaultdict(float)
    start = (1,) + tuple(0 for _ in range(num_questions))
    node_probabilities[start] = 1

    # 機械学習モデルと簡易モデル
    KL = 0
    HD = 0
    JSD = 0

    for state in states:
        probabilities = calculate_transition_probabilities(A, np.array(state))

        # 最下層から頂点までのノードの分布を順に計算
        for i in range(num_questions + 1):
            if state[i] == 0:  # まだ解けていない問題
                # 遷移後の状態
                next_state = list(state)
                next_state[i] = 1
                next_state = tuple(next_state)
                node_probabilities[next_state] += probabilities[i] * node_probabilities[state]
        
        # budgetの計算
        c_g = torch.ones(num_questions+1, dtype=torch.float32)
        c_g = c_g - torch.tensor(state, dtype=torch.float32)

        # 状態とその予測分布
        state_tensor = torch.tensor(state, dtype=torch.float32)
        predicted_values = model(state_tensor, c_g)  # 予測値を計算

    # 評価指標の計算
        kl = node_probabilities[state] * kl_divergence(probabilities, predicted_values.detach())
        hd = node_probabilities[state] * hellinger_distance(probabilities, predicted_values.detach())
        jsd = node_probabilities[state] * jensen_shannon_divergence(probabilities, predicted_values.detach())
        # print(f"状態: {state}, ノード分布: {node_probabilities[state]:.3g}, KL: {kl:.3g}, HD: {hd:.3g}, JSD: {jsd:.3g}")
        # print(f"真の遷移確率: {probabilities}, 予測遷移確率: {predicted_values}")
        KL += kl
        HD += hd
        JSD += jsd


    # 試行ごとの指標をCSVに追記
    with open(results_csv, "a", newline="") as file:
        writer = csv.writer(file)
        writer.writerow([
            trial + 1,
            KL,
            HD,
            JSD
        ])

    print(f"Trial {trial + 1} completed. Results saved.")


Trial 1


  train_X = torch.tensor(train_X, dtype=torch.float32)


Epoch [100/3000], Loss: 1.5673
Epoch [200/3000], Loss: 1.4157
Epoch [300/3000], Loss: 1.3586
Epoch [400/3000], Loss: 1.3346
Epoch [500/3000], Loss: 1.3222
Epoch [600/3000], Loss: 1.3150
Epoch [700/3000], Loss: 1.3103
Epoch [800/3000], Loss: 1.3071
Epoch [900/3000], Loss: 1.3048
Epoch [1000/3000], Loss: 1.3030
Epoch [1100/3000], Loss: 1.3016
Epoch [1200/3000], Loss: 1.3004
Epoch [1300/3000], Loss: 1.2992
Epoch [1400/3000], Loss: 1.2982
Epoch [1500/3000], Loss: 1.2973
Epoch [1600/3000], Loss: 1.2966
Epoch [1700/3000], Loss: 1.2960
Epoch [1800/3000], Loss: 1.2955
Epoch [1900/3000], Loss: 1.2951
Epoch [2000/3000], Loss: 1.2948
Epoch [2100/3000], Loss: 1.2945
Epoch [2200/3000], Loss: 1.2942
Epoch [2300/3000], Loss: 1.2940
Epoch [2400/3000], Loss: 1.2939
Epoch [2500/3000], Loss: 1.2937
Epoch [2600/3000], Loss: 1.2936
Epoch [2700/3000], Loss: 1.2935
Epoch [2800/3000], Loss: 1.2934
Epoch [2900/3000], Loss: 1.2934
Epoch [3000/3000], Loss: 1.2933
Training complete!
Trial 1 completed. Results sav