In [None]:
import os
from pathlib import Path
import urllib.request
import tarfile
import pickle
import numpy as np
import matplotlib.pyplot as plt

def ensure_cifar(data_dir="all/cifar-10-batches-py"):
    data_dir = Path(data_dir)
    # 既に指定パスに存在すれば返す
    if (data_dir / "data_batch_1").exists():
        print("CIFAR-10 データは既に存在します:", data_dir)
        return str(data_dir)

    # ワークスペース内を探索して見つかれば返す
    ws_root = Path(r"c:\Users\tokot\code\image-processing")
    for p in ws_root.rglob("data_batch_1"):
        print("ワークスペース内で発見:", p.parent)
        return str(p.parent)

    # 見つからなければダウンロードして展開
    print("CIFAR-10 が見つかりません。ダウンロードを開始します...")
    dest = ws_root / "cifar-10-python.tar.gz"
    url = "https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz"
    try:
        urllib.request.urlretrieve(url, dest)
        print("ダウンロード完了:", dest)
        with tarfile.open(dest, "r:gz") as tf:
            tf.extractall(path=ws_root)
        extracted = ws_root / "cifar-10-batches-py"
        if (extracted / "data_batch_1").exists():
            print("展開完了:", extracted)
            return str(extracted)
        raise FileNotFoundError("展開後に期待ファイルが見つかりません")
    except Exception as e:
        print("取得失敗:", e)
        raise

def load_cifar10(data_dir):
    """ (N,3072) の train/test とラベルを返す（float32 0-1 正規化） """
    train_data = None
    train_labels = []
    data_dir = str(data_dir)
    for i in range(1, 6):
        path = os.path.join(data_dir, f"data_batch_{i}")
        if not os.path.exists(path):
            raise FileNotFoundError(path)
        with open(path, "rb") as f:
            data_dict = pickle.load(f, encoding="bytes")
            if train_data is None:
                train_data = data_dict[b"data"]
            else:
                train_data = np.vstack((train_data, data_dict[b"data"]))
            train_labels.extend(data_dict[b"labels"])
    test_path = os.path.join(data_dir, "test_batch")
    with open(test_path, "rb") as f:
        data_dict = pickle.load(f, encoding="bytes")
        test_data = data_dict[b"data"]
        test_labels = data_dict[b"labels"]

    train_data = np.array(train_data, dtype=np.float32) / 255.0
    test_data = np.array(test_data, dtype=np.float32) / 255.0
    train_labels = np.array(train_labels, dtype=np.int64)
    test_labels = np.array(test_labels, dtype=np.int64)
    return train_data, train_labels, test_data, test_labels


def padding_data(train_images, test_images, pad=1):
    """CIFAR-10 の1次元ベクトル配列を画像形状に戻しパディングを付与して返す。
    引数:
      train_images: (N, 3072)
      test_images:  (M, 3072)
      pad: パディング幅（ピクセル）
    戻り値:
      padded_train_images: (N, 32+2*pad, 32+2*pad, 3)
      padded_test_images:  (M, 32+2*pad, 32+2*pad, 3)
    """
    # (N, 3072) -> (N, 3, 32, 32) -> (N, 32, 32, 3)
    train_imgs = train_images.reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1)
    test_imgs = test_images.reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1)

    padded_train = np.pad(
        train_imgs,
        ((0, 0), (pad, pad), (pad, pad), (0, 0)),
        mode="constant",
        constant_values=0.0,
    )
    padded_test = np.pad(
        test_imgs,
        ((0, 0), (pad, pad), (pad, pad), (0, 0)),
        mode="constant",
        constant_values=0.0,
    )

    return padded_train, padded_test

data_dir = ensure_cifar("all/cifar-10-batches-py")
train_images, train_labels, test_images, test_labels = load_cifar10(data_dir)
padded_train_images, padded_test_images = padding_data(train_images, test_images, pad=1)

# レイヤーの次元数を修正（CIFAR-10用）
input_size = 3072  # 32x32x3（カラー画像）
hidden_layer_size = 100  # 中間層は同じ
output_layer_size = 10   # CIFAR-10も10クラス

def im2col(padding_data, filter_size, stride=1, pad=0):
    """
    Im2col変換を行う関数 (NumPyを用いた簡略版ロジック)
    
    input_data: (N, H, W, C) の4次元配列
    """
    N, H, W, C = padding_data.shape
    
    # 出力特徴マップのサイズ計算
    out_h = (H - filter_size) // stride + 1
    out_w = (W - filter_size) // stride + 1
    
    # パッチ抽出のためのループとインデックス計算
    col = np.zeros((N, out_h, out_w, filter_size, filter_size, C))

    # パッチのインデックスを計算して、colに格納していく (非常に複雑な処理)
    # i, j は出力特徴マップ上の座標
    for i in range(out_h):
        i_max = i * stride + filter_size
        for j in range(out_w):
            j_max = j * stride + filter_size
            
            # パッチを抽出 (例: input_data[:, i*stride:i_max, j*stride:j_max, :] )
            col[:, i, j, :, :, :] = padding_data[:, i * stride:i_max, j * stride:j_max, :]
            
    # 形状を (N * out_h * out_w, filter_h * filter_w * C) に変換（行列乗算の形式）
    # ただし、添付図の形式 (filter_h * filter_w * C, N * out_h * out_w) にするためには転置が必要
    col = col.transpose(0, 4, 5, 3, 1, 2).reshape(N * out_h * out_w, -1).T
    
    return col

def set_filter_weights():
    """
    フィルタの重み行列 W を He初期化で初期化する関数
    戻り値:
      W: (K, R*R*ch) の形状の重み行列
    """
    # フィルタのパラメータ設定
    K = 32  # フィルタ枚数
    R = 3   # フィルタサイズ
    ch = 3  # 入力チャネル数

    # 1. He初期化に必要な「入力ノード数」の計算
    # 入力ノード数 = 1つの出力特徴マップの要素を計算するのに使われる入力データ数
    #              = フィルタサイズ(R*R) * 入力チャネル数(ch)
    input_node_count = R * R * ch  # 3 * 3 * 3 = 27

    # 2. 標準偏差の計算 (He初期化)
    std_dev = np.sqrt(2.0 / input_node_count)

    # 3. 重み行列 W の初期化
    # 形状は (K, R*R*ch) = (32, 27)
    W = std_dev * np.random.randn(K, R * R * ch)
    
    return W, R

def set_biases():
    """
    バイアスベクトル b を初期化する関数
    戻り値:
      b: (K,) の形状のバイアスベクトル
    """
    K = 32  # フィルタ枚数
    b = np.zeros(K)  # バイアスはゼロで初期化

    # 1. バイアスベクトルを (K, 1) に整形
    b_vector = b.reshape(-1, 1)
    
    return b_vector

def get_shuffled_index(arr):
    index_arr = np.arange(len(arr)) # 0から始まるインデックスの配列を作成
    np.random.shuffle(index_arr) # インデックスをシャッフル
    return index_arr

def get_batch(random_index):
    """ミニバッチ画像を平坦化して返す（全結合層向け）
    パディングを除いた中心部分（32x32）のみを使用
    """
    batch_images = padded_train_images[random_index]            # (B, H, W, C)
    # パディング除去（中心の32x32を切り出し）
    batch_images = batch_images[:, 1:-1, 1:-1, :]             # パディング1なので両端1ピクセルずつ除去
    batch_images_flat = batch_images.reshape(batch_images.shape[0], -1)  # (B, 3072)
    batch_labels = train_labels[random_index]
    return batch_images_flat, batch_labels

def get_one_hot_label(batch_labels, output_layer_size):
    one_hot_labels = np.zeros((batch_labels.size, output_layer_size)) # ゼロで満たされた配列を作成
    one_hot_labels[np.arange(batch_labels.size), batch_labels] = 1 # 各行の、正解ラベルに対応するインデックスを1にする
    return one_hot_labels


np.random.seed(777) # シードを固定

# 重みとバイアスを正規分布で初期化

is_load = str(input('ロードしますか？ yes or no:  '))
if is_load == 'yes' :
    loaded_data = np.load('CIFAR-10_parameter.npz')
    weight1 = loaded_data['weight1']
    bias1 = loaded_data['bias1']
    weight2 = loaded_data['weight2']
    bias2 = loaded_data['bias2']
else:
    # 全結合層の初期化（CIFAR 用に input_size=3072 を使用）
    weight1 = np.random.normal(loc=0.0, scale=np.sqrt(1 / input_size), size=(hidden_layer_size, input_size))  # (100,3072)
    bias1 = np.zeros((hidden_layer_size,))  # (100,)
    # 第2層（中間層 -> 出力層）
    weight2 = np.random.normal(loc=0.0, scale=np.sqrt(1 / hidden_layer_size), size=(output_layer_size, hidden_layer_size)) # (10,100)
    bias2 = np.random.normal(loc=0.0, scale=np.sqrt(1 / hidden_layer_size), size=output_layer_size) # (10,)
def ReLU(arr):
    """課題4-1 ReLU活性化関数"""
    new_arr = np.where(arr > 0, arr, 0)
    return new_arr

def softmax(x):
    """
    ソフトマックス関数（オーバーフロー対策版）
    各要素を0から1の間の確率に変換
    """
    alpha = np.max(x, axis=-1, keepdims=True)
    exp_x = np.exp(x - alpha)
    return exp_x / np.sum(exp_x, axis=-1, keepdims=True)

# --- 順伝播の実行(重みを更新) ---
def forward_propagation(input_vector, weight1, bias1, weight2, bias2):
    
    # 中間層の計算: 活性化関数の入力　 (バッチサイズ, 784) @ (784, 100) -> (バッチサイズ, 100)
    hidden_layer_input = np.dot(input_vector, weight1.T) + bias1
    
    # 中間層の出力: 活性化関数を適用
    # hidden_layer_output = sigmoid(hidden_layer_input)
    hidden_layer_output = ReLU(hidden_layer_input)
    
    # 出力層の計算: 活性化関数の入力　(バッチサイズ, 100) @ (100, 10) -> (バッチサイズ, 10)
    output_layer_input = np.dot(hidden_layer_output, weight2.T) + bias2
    
    # 出力層の出力: ソフトマックスを適用して確率を算出
    final_output = softmax(output_layer_input)
    
    return final_output, hidden_layer_input, hidden_layer_output # 4-1 hidden_layer_inputを出力に追加

def forward_propagation_train(input_vector, weight1, bias1, weight2, bias2, ignore_number):
    
    hidden_layer_input = np.dot(input_vector, weight1.T) + bias1
    hidden_layer_output = ReLU(hidden_layer_input)
    for index in ignore_number:
        hidden_layer_output[:, index] = 0
    output_layer_input = np.dot(hidden_layer_output, weight2.T) + bias2
    final_output = softmax(output_layer_input)
    return final_output, hidden_layer_input, hidden_layer_output  

def forward_propagation_test(input_vector, weight1, bias1, weight2, bias2, ignore_number):
    
    hidden_layer_input = np.dot(input_vector, weight1.T) + bias1
    hidden_layer_output = ReLU(hidden_layer_input)
    hidden_layer_output *= 1 - (len(ignore_number) / hidden_layer_size)
    output_layer_input = np.dot(hidden_layer_output, weight2.T) + bias2
    final_output = softmax(output_layer_input)
    return final_output, hidden_layer_input, hidden_layer_output     

def get_predicted_class(output_probabilities):
 # 出力された確率から最も高い確率を持つクラス（予測結果）を取得
    if output_probabilities.ndim == 1:
        return np.argmax(output_probabilities)
    else:
        return np.argmax(output_probabilities, axis=1)

def get_cross_entropy_error(y_pred, y_true):
    
    delta = 1e-7
    
    loss = -np.sum(y_true * np.log(y_pred + delta)) # logの中身が0にならないようにdeltaを導入
    
    # ミニバッチサイズBで割って平均を求める
    batch_size = y_pred.shape[0]
    
    cross_entropy_error = loss / batch_size
    
    return cross_entropy_error

def backward_propagation_and_update(batch_image_vector, hidden_layer_input, hidden_layer_output, output_probabilities, one_hot_labels, 
                                    weight1, bias1, weight2, bias2, learning_rate):
    """
    逆伝播法を用いて勾配を計算し、全パラメータを更新する関数。
    更新後のパラメータを返す。 4-1 ReLUにより、hidden_layer_inputも受け取る。
    """
    current_batch_size = batch_image_vector.shape[0]
    
    # --- 逆伝播 ---
    dEn_dak = (output_probabilities - one_hot_labels) / current_batch_size
    dEn_dX = np.dot(dEn_dak, weight2)
    dEn_dW_1 = np.dot(dEn_dak.T, hidden_layer_output)
    dEn_db_1 = np.sum(dEn_dak, axis = 0)
    # dEn_dX_sig = dEn_dX * (hidden_layer_output * (1 - hidden_layer_output))
    differentiated_input = np.where(hidden_layer_input > 0, 1, 0) # ReLUに入力するhidden_input_layerの微分
    dEn_dX_sig = dEn_dX * differentiated_input
    
    dEn_dW_2= np.dot(dEn_dX_sig.T, batch_image_vector)
    dEn_db_2 = np.sum(dEn_dX_sig, axis=0)

    # --- パラメータ更新 ---
    weight1 -= dEn_dW_2 * learning_rate 
    bias1   -= dEn_db_2 * learning_rate
    weight2 -= dEn_dW_1 * learning_rate 
    bias2   -= dEn_db_1 * learning_rate
    
    return weight1, bias1, weight2, bias2

def backward_propagation_and_update_train(batch_image_vector, hidden_layer_input, hidden_layer_output, output_probabilities, one_hot_labels, 
                                    weight1, bias1, weight2, bias2, learning_rate, ignore_number, momentum, prev_delta_W1, prev_delta_W2):
    current_batch_size = batch_image_vector.shape[0]
    dEn_dak = (output_probabilities - one_hot_labels) / current_batch_size
    dEn_dX = np.dot(dEn_dak, weight2)
    dEn_dW_1 = np.dot(dEn_dak.T, hidden_layer_output)
    dEn_db_1 = np.sum(dEn_dak, axis = 0)
    # dEn_dX_sig = dEn_dX * (hidden_layer_output * (1 - hidden_layer_output))
    differentiated_input = np.where(hidden_layer_input > 0, 1, 0) # ReLUに入力するhidden_input_layerの微分
    for index in ignore_number:
        dEn_dX[:, index] = 0 
        differentiated_input[:, index] = 0 
    dEn_dX_sig = dEn_dX * differentiated_input 
    dEn_dW_2= np.dot(dEn_dX_sig.T, batch_image_vector)
    dEn_db_2 = np.sum(dEn_dX_sig, axis=0)
    weight1 = weight1 + (momentum * prev_delta_W1 - dEn_dW_2 * learning_rate )
    bias1   -= dEn_db_2 * learning_rate
    weight2 = weight2 + (momentum * prev_delta_W2 - dEn_dW_1 * learning_rate )
    bias2   -= dEn_db_1 * learning_rate
    
    params_delta_W1 = dEn_dW_2 * learning_rate
    params_delta_W2 = dEn_dW_1 * learning_rate
    return weight1, bias1, weight2, bias2, params_delta_W1, params_delta_W2

def get_accuracy(y_prop, y_true): # 正答率計算

    y_pred = get_predicted_class(y_prop) # 予測結果

    accuracy = np.sum(y_pred == y_true) / len(y_prop)

    return accuracy

def calculate_accuracy_for_epoch(images, labels, weight1, bias1, weight2, bias2, mode, ignore_number):
    """
    指定されたデータセットに対するモデルの正答率を計算する関数。
    modeに応じてforward_propagation_train/testを呼び出す。
    """
    # images_vector = images.reshape(len(images), -1)

    if mode == 'train':

        probabilities, _, _ = forward_propagation_train(images, weight1, bias1, weight2, bias2, ignore_number)
    elif mode == 'test':
        probabilities, _, _ = forward_propagation_test(images, weight1, bias1, weight2, bias2, ignore_number)
    else:
         # デフォルト
        probabilities, _, _ = forward_propagation(images, weight1, bias1, weight2, bias2)

    accuracy = get_accuracy(probabilities, labels)

    return accuracy

# --- メイン処理 ---
if __name__ == "__main__":

    batch_size = 100
    epoch_number = 10
    learning_rate = 0.01
    train_loss_list, train_acc_list, test_acc_list = [], [], []
    momentum = 0.9
    prev_delta_W1 = 0
    prev_delta_W2 = 0

    while True:
        mode = str(input('実行モードを入力してください (train or test): '))
        if mode in ['train', 'test']:
            break
        print("無効なモードです。'train' または 'test' を入力してください。")

    while True:
        try:
            ignore_number = int(input(f'Dropoutの個数を 0 ~ {hidden_layer_size} で入力してください: '))
            if 0 <= ignore_number <= hidden_layer_size:
                break
            else:
                print(f"無効なドロップアウト数です。0から{hidden_layer_size}の範囲で入力してください。")
        except ValueError:
            print("無効な入力です。整数を入力してください。")

    # 訓練モードの場合にのみ学習を実行
    if mode == 'train':
        print("\n--- 訓練モード実行中 ---")

        for i in range(1, epoch_number + 1):
            error_sum = 0
            train_accuracy_sum = 0
            shuffled_train_image_index = get_shuffled_index(train_images)
            
            for j in range(0, len(shuffled_train_image_index), batch_size): # range(start, stop, step) を使い、batch_sizeずつインデックスをずらしながらループ

                # hidden_layer_size分のインデックス配列からignore_number個ランダムに選択
                random_selection = np.random.choice(np.arange(hidden_layer_size), size=ignore_number, replace=False)
                
                index = shuffled_train_image_index[j:j + batch_size]    #シャッフルしたインデックスから、先頭のbatch_size分取り出す

                # 統合した関数を使い、ミニバッチと対応ラベルを一度に取得
                # ミニバッチ取得（flattened を受け取る）
                batch_image, batch_labels = get_batch(index)
                
                # 全結合向け入力をそのまま使用（im2col は使わない）
                train_images_col = batch_image  # shape: (B, 3072)
                
                # 順伝播を実行
                output_probabilities, hidden_layer_input, hidden_layer_output = forward_propagation_train(
                    train_images_col, weight1, bias1, weight2, bias2, random_selection
                )
                # one-hot labelsを取得
                one_hot_labels = get_one_hot_label(batch_labels, output_layer_size)

                
                # クロスエントロピー誤差平均を計算
                calculated_error = get_cross_entropy_error(output_probabilities, one_hot_labels)
                error_sum += calculated_error
                # --- 逆伝播 ---
                weight1, bias1, weight2, bias2, prev_delta_W1, prev_delta_W2 = backward_propagation_and_update_train(
                    batch_image, hidden_layer_input, hidden_layer_output, output_probabilities, one_hot_labels,
                    weight1, bias1, weight2, bias2, learning_rate, random_selection, momentum, prev_delta_W1, prev_delta_W2
                )
                train_accuracy_sum += calculate_accuracy_for_epoch(train_images_col, batch_labels, weight1, bias1, weight2, bias2, 'train', random_selection)

            ignore_index_for_acc = np.arange(hidden_layer_size)[:ignore_number] 
            
             # テスト用も平坦化して渡す（im2col を使わない）
            test_images_col = padded_test_images.reshape(len(padded_test_images), -1)
            test_accuracy = calculate_accuracy_for_epoch(test_images_col, test_labels, weight1, bias1, weight2, bias2, 'test', ignore_index_for_acc)
# ...existing code...
            num_batches = len(train_images) // batch_size
            train_loss_list.append(error_sum / num_batches)
            train_acc_list.append(train_accuracy_sum/ num_batches)
            test_acc_list.append(test_accuracy)
            print(f"{i}エポック目")
            print(f"  平均クロスエントロピー誤差: {error_sum / num_batches}")
            print(f"  学習データに対する正答率: {train_accuracy_sum / num_batches}")
            print(f"  テストデータに対する正答率: {test_accuracy}")
        
    # --- グラフの描画 ---
        x = np.arange(1, epoch_number + 1)
        plt.figure(figsize=(12, 5))

        # 誤差のグラフ
        plt.subplot(1, 2, 1)
        plt.plot(x, train_loss_list, marker='o')
        plt.title('Training Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.grid(True)

        # 正答率のグラフ
        plt.subplot(1, 2, 2)
        plt.plot(x, train_acc_list, marker='o', label='Train Accuracy')
        plt.plot(x, test_acc_list, marker='s', label='Test Accuracy')
        plt.title('Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.ylim(0, 1.0)
        plt.grid(True)
        plt.legend()

        plt.tight_layout()
        plt.show()
        np.savez('assignment3_parameter.npz', weight1 = weight1, bias1 = bias1, weight2 = weight2, bias2 = bias2)
    # テストモードの場合にのみ予測を実行
    elif mode == 'test':
        print("\n--- テストモード実行中 ---")
        random_selection = np.random.choice(np.arange(hidden_layer_size), size=ignore_number, replace=False)
        
        # パディングを除去してから平坦化
        test_images_no_pad = padded_test_images[:, 1:-1, 1:-1, :]
        test_images_flat = test_images_no_pad.reshape(len(test_images_no_pad), -1)
        
        # テストデータに対する最終的な正答率を計算
        test_accuracy = calculate_accuracy_for_epoch(
            test_images_flat, test_labels, weight1, bias1, weight2, bias2, 'test', random_selection
        )
        
        print(f"\nテストデータに対する最終正答率: {test_accuracy}")
        print(f"（ドロップアウト数 {ignore_number} 個）")

ワークスペース内で発見: c:\Users\tokot\code\image-processing\all\CIFAR-10_data\cifar-10-batches-py

--- 訓練モード実行中 ---


ValueError: shapes (100,3468) and (3072,100) not aligned: 3468 (dim 1) != 3072 (dim 0)