- 線形項 + 全2次組み合わせ項を低次元ベクトルで内積表現するモデル
- WideDeepに近いが，より高次のパターンにも対応している．

$
損失関数: \hat{y} = w_0 + \sum_i w_i x_i + \sum_{i < j} \langle \mathbf{v}_i, \mathbf{v}_j \rangle x_i x_j
$

- 実装手順
- 前処理(Leave-One-Out分割)
- 特徴量エンコード(One-Hot-Encoding)
- モデル構築
- モデル学習
- 推薦と評価

In [17]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from sklearn.model_selection import train_test_split

import warnings
warnings.filterwarnings('ignore')
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'  # 0:all, 1:filter INFO, 2:filter WARNING, 3:only ERROR

# データ読み込み
cols = ['user_id', 'item_id', 'rating', 'timestamp']
df = pd.read_csv('u.data', sep='\t', names=cols)

# タイムスタンプを日付型に変換（後でLeave-One-Outで使用）
df['timestamp'] = pd.to_datetime(df['timestamp'])

# Leave-One-Out分割：各ユーザーの最新1件をテスト、それ以前を学習に
df['rank'] = df.groupby('user_id')['timestamp'].rank(method='first', ascending=False)
train_df = df[df['rank'] > 1].copy()
test_df = df[df['rank'] == 1].copy()

# ユーザー・アイテムIDの数値化（LabelEncoder）
user_enc = LabelEncoder()
item_enc = LabelEncoder()

train_df['user'] = user_enc.fit_transform(train_df['user_id'])
train_df['item'] = item_enc.fit_transform(train_df['item_id'])

# テストデータにも変換適用（未知IDは除外）
test_df = test_df[test_df['user_id'].isin(user_enc.classes_)]
test_df = test_df[test_df['item_id'].isin(item_enc.classes_)]
test_df['user'] = user_enc.transform(test_df['user_id'])
test_df['item'] = item_enc.transform(test_df['item_id'])

# 二値分類に変換（例：rating >= 4 を正例とする）
train_df['label'] = (train_df['rating'] >= 4).astype(int)
test_df['label'] = (test_df['rating'] >= 4).astype(int)

# FM用特徴ベクトル（One-Hot）
enc = OneHotEncoder()
X_train = enc.fit_transform(train_df[['user', 'item']])  # sparse matrix
X_test = enc.transform(test_df[['user', 'item']])        # sparse matrix

# ラベル
y_train = train_df['label'].values
y_test = test_df['label'].values

print("ok")

ok


In [18]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Activation, Lambda
from tensorflow.keras.models import Model
import tensorflow.keras.backend as K

input_dim = X_train.shape[1]
k = 16  # 潜在次元数

# スパース → Dense 変換（明示的に .toarray() しておく）
X_train_dense = X_train.toarray()
X_test_dense = X_test.toarray()

# 入力層（dense）
x_input = Input(shape=(input_dim,), name='input')

# 線形項
linear_output = Dense(1)(x_input)

# 埋め込みベクトル V: shape (input_dim, k)
V = tf.Variable(tf.random.normal([input_dim, k], stddev=0.01), name='fm_embedding')

# LambdaレイヤーでFMの2次項計算
def fm_interaction(x):
    # x: (batch_size, input_dim)
    linear_terms = tf.matmul(x, V)  # (batch_size, k)
    square_of_sum = tf.square(tf.reduce_sum(linear_terms, axis=1, keepdims=True))  # (batch_size, 1)
    sum_of_square = tf.reduce_sum(tf.square(linear_terms), axis=1, keepdims=True)  # (batch_size, 1)
    return 0.5 * (square_of_sum - sum_of_square)  # (batch_size, 1)

interaction_term = Lambda(fm_interaction)(x_input)

# 総和 + sigmoid
output = tf.keras.layers.Add()([linear_output, interaction_term])
output = Activation('sigmoid')(output)

# モデル定義・学習
model = Model(inputs=x_input, outputs=output)
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

model.summary()


In [19]:
# -----------------------------
# 学習・予測・評価
# -----------------------------

model.fit(X_train, y_train, epochs=5, batch_size=128, validation_split=0.1)

loss, acc = model.evaluate(X_test, y_test)
print(f"\n✅ FMモデル Test Accuracy: {acc:.4f}")

Epoch 1/5
[1m697/697[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 701us/step - accuracy: 0.5628 - loss: 0.6812 - val_accuracy: 0.6059 - val_loss: 0.6584
Epoch 2/5
[1m697/697[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 571us/step - accuracy: 0.6407 - loss: 0.6472 - val_accuracy: 0.6611 - val_loss: 0.6366
Epoch 3/5
[1m697/697[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 566us/step - accuracy: 0.6853 - loss: 0.6245 - val_accuracy: 0.6841 - val_loss: 0.6205
Epoch 4/5
[1m697/697[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 570us/step - accuracy: 0.7005 - loss: 0.6079 - val_accuracy: 0.6907 - val_loss: 0.6085
Epoch 5/5
[1m697/697[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 563us/step - accuracy: 0.7069 - loss: 0.5968 - val_accuracy: 0.6937 - val_loss: 0.5996
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 913us/step - accuracy: 0.6898 - loss: 0.5920

✅ FMモデル Test Accuracy: 0.6844


In [20]:
from collections import defaultdict

n_negative = 99
all_items = set(train_df['item'].unique())
user_item_score = defaultdict(list)

for user in test_df['user'].unique():
    # 正解アイテム取得
    gt_item = test_df[test_df['user'] == user].iloc[0]['item']

    # 負例アイテムのランダムサンプリング
    negatives = list(all_items - {gt_item})
    sample_size = min(n_negative, len(negatives))
    sampled_negatives = np.random.choice(negatives, size=sample_size, replace=False)

    # 評価対象アイテム = 正解 + 負例
    items_to_score = np.append(sampled_negatives, gt_item)

    # One-Hot特徴量に変換
    one_hot_input = enc.transform([[user, item] for item in items_to_score]).toarray()

    # モデルスコアを計算
    scores = model.predict(one_hot_input, verbose=0).flatten()

    # 結果保存
    user_item_score[user] = list(zip(items_to_score, scores))


In [21]:
# 推薦スコア計算（各ユーザーに対して正解+負例でスコア予測）
# user_item_score = {user_id: [(item_id, score), ...]} がある前提

# 推薦リスト（Top-10）
recommendations = {
    user: [item for item, _ in sorted(items, key=lambda x: x[1], reverse=True)[:10]]
    for user, items in user_item_score.items()
}

# Ground truth（正解1件のみ）
ground_truth = {
    user: [test_df[test_df['user'] == user].iloc[0]['item']]
    for user in user_item_score
}

In [24]:
# 推薦結果と正解データを用意してある前提
# recs: {user_id: [item1, item2, ..., item10]}
# gt:   {user_id: item} or {user_id: [item]} ← 正解がリストの場合、1件に直す

# 正解がリスト形式の場合、最初の1件だけを使う
gt_single = {u: items[0] if isinstance(items, list) else items for u, items in ground_truth.items()}

from Evaluation_index import recall_at_k, precision_at_k, ndcg_at_k, mrr_at_k, hit_at_k

# 評価値の計算
recall = recall_at_k(recommendations, gt_single, 10)
precision = precision_at_k(recommendations, gt_single, 10)
ndcg = ndcg_at_k(recommendations, gt_single, 10)
mrr = mrr_at_k(recommendations, gt_single, 10)
hit = hit_at_k(recommendations, gt_single, 10)

# 結果表示
print("=== FM モデル評価結果（Top-10）===")
print(f"Recall@10    : {recall:.4f}")
print(f"Precision@10 : {precision:.4f}")
print(f"NDCG@10      : {ndcg:.4f}")
print(f"MRR@10       : {mrr:.4f}")
print(f"Hit@10       : {hit:.4f}")

print("=== RecBole モデル評価結果（Top-10）===")
print(f"Recall@10    : 0.0553")
print(f"Precision@10 : 0.0611")
print(f"NDCG@10      : 0.0773")
print(f"MRR@10       : 0.1557")
print(f"Hit@10       : 0.3611")

=== FM モデル評価結果（Top-10）===
Recall@10    : 0.2391
Precision@10 : 0.0239
NDCG@10      : 0.1260
MRR@10       : 0.0921
Hit@10       : 0.2391
=== RecBole モデル評価結果（Top-10）===
Recall@10    : 0.0553
Precision@10 : 0.0611
NDCG@10      : 0.0773
MRR@10       : 0.1557
Hit@10       : 0.3611
