# 神经符号 AI 示例：Transformer + gplearn (符号回归/分类)

本 notebook 演示如何将神经网络（一个非常小的 Transformer 风格编码器）与符号学习（使用 gplearn 的 SymbolicClassifier）结合，构建一个简单的可解释分类流程：\n
1) 使用 sklearn 的 digits 数据集；\n
2) 训练一个小型 Transformer（PyTorch）用于特征提取（得到每张图像的嵌入向量）；\n
3) 在嵌入向量上训练 gplearn 的 SymbolicClassifier，得到一个可读的符号表达式；\n
4) 比较神经模型与符号模型的性能，并展示符号表达式。\n

依赖库：torch, torchvision, scikit-learn, gplearn, numpy。为简洁起见，模型较小并只训练少量 epoch，适合在 CPU 上运行。

In [2]:
# 如果尚未安装依赖，请先运行（取消注释并执行）：
%pip install torch torchvision scikit-learn gplearn numpy

Note: you may need to restart the kernel to use updated packages.


In [8]:
import numpy as np
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# gplearn 用于符号学习（导入但延后 fit）
from gplearn.genetic import SymbolicClassifier

# 1. 加载数据（digits）
digits = load_digits()
X = digits.images  # shape (n_samples, 8, 8)
y = digits.target

# 为了示例，把问题简化为二分类：是否为数字 3
y_bin = (y == 3).astype(int)

# 扁平化图像作为 transformer 的输入序列（将 8x8 拆为 64 tokens，每 token 为像素值）
n_samples = X.shape[0]
X_flat = X.reshape(n_samples, -1)  # (n_samples, 64)

# 标准化
scaler = StandardScaler()
X_flat = scaler.fit_transform(X_flat)

# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(X_flat, y_bin, test_size=0.2, random_state=42, stratify=y_bin)

# 将数据变为 PyTorch 需要的形状： (batch, seq_len, feature). 我们把每个像素看作 1-d token => feature_dim=1, seq_len=64
def to_torch_dataset(X, y):
    X_t = torch.tensor(X, dtype=torch.float32).unsqueeze(-1)
    y_t = torch.tensor(y, dtype=torch.long)
    return TensorDataset(X_t, y_t)

train_ds = to_torch_dataset(X_train, y_train)
test_ds = to_torch_dataset(X_test, y_test)

train_loader = DataLoader(train_ds, batch_size=64, shuffle=True)
test_loader = DataLoader(test_ds, batch_size=128)

# 2. 定义一个非常小的 Transformer 编码器，用来把序列映射为固定长度嵌入
class TinyTransformer(nn.Module):
    def __init__(self, seq_len=64, d_model=32, nhead=4, num_layers=1, emb_dim=16):
        super().__init__()
        self.input_proj = nn.Linear(1, d_model)
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=64, dropout=0.1, activation='relu')
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.pool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(d_model, emb_dim)
        # 最后用于直接分类的头（可选，用于比较）
        self.classifier = nn.Linear(emb_dim, 2)

    def forward(self, x):
        # x: (batch, seq_len, 1) -> transformer expects (seq_len, batch, d_model)
        b, s, _ = x.shape
        x = self.input_proj(x)  # (b, s, d_model)
        x = x.permute(1, 0, 2)  # (s, b, d_model)
        x = self.transformer(x)  # (s, b, d_model)
        x = x.permute(1, 2, 0)  # (b, d_model, s) for pooling
        x = self.pool(x).squeeze(-1)  # (b, d_model)
        emb = self.fc(x)  # (b, emb_dim)
        logits = self.classifier(emb)
        return logits, emb

# 训练模型（少量 epoch）
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = TinyTransformer().to(device)
opt = optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.CrossEntropyLoss()

n_epochs = 8
for epoch in range(n_epochs):
    model.train()
    total_loss = 0.0
    for xb, yb in train_loader:
        xb = xb.to(device)
        yb = yb.to(device)
        opt.zero_grad()
        logits, _ = model(xb)
        loss = loss_fn(logits, yb)
        loss.backward()
        opt.step()
        total_loss += loss.item() * xb.size(0)
    avg_loss = total_loss / len(train_loader.dataset)
    # 评估训练集上的准确率作为进度指示
    model.eval()
    preds = []
    trues = []
    with torch.no_grad():
        for xb, yb in train_loader:
            xb = xb.to(device)
            logits, _ = model(xb)
            preds.extend(torch.argmax(logits, dim=1).cpu().numpy().tolist())
            trues.extend(yb.numpy().tolist())
    acc = accuracy_score(trues, preds)
    print(f'Epoch {epoch+1}/{n_epochs} loss={avg_loss:.4f} acc={acc:.4f}')

# 在测试集上提取嵌入
model.eval()
embs_train = []
embs_test = []
y_train_list = []
y_test_list = []
with torch.no_grad():
    for xb, yb in DataLoader(to_torch_dataset(X_train, y_train), batch_size=128):
        xb = xb.to(device)
        _, emb = model(xb)
        embs_train.append(emb.cpu().numpy())
        y_train_list.append(yb.numpy())
    for xb, yb in DataLoader(to_torch_dataset(X_test, y_test), batch_size=128):
        xb = xb.to(device)
        _, emb = model(xb)
        embs_test.append(emb.cpu().numpy())
        y_test_list.append(yb.numpy())
embs_train = np.vstack(embs_train)
embs_test = np.vstack(embs_test)
y_train_arr = np.concatenate(y_train_list)
y_test_arr = np.concatenate(y_test_list)

print('Embeddings shapes:', embs_train.shape, embs_test.shape)
print('y_test distribution:', np.bincount(y_test_arr))

# 现在用 gplearn 的 SymbolicClassifier 在嵌入上学习一个符号分类器
# 注意：gplearn 在 CPU 上执行，输入维度不宜过大。这里 emb_dim=16，样本少，适合示例。
# 为避免 NotFittedError，我们在 fit 时加入异常捕获并输出诊断信息。

# 兼容性补丁（若需要）：有些环境中 gplearn 的 SymbolicClassifier 可能缺少 sklearn 的 _validate_data 方法
try:
    getattr(SymbolicClassifier, '_validate_data')
    need_patch = False
except Exception:
    need_patch = True

if need_patch:
    from sklearn.utils.validation import check_array, check_X_y
    def _validate_data(self, X, y=None, accept_sparse=False, reset=True, ensure_2d=True,
                       allow_nd=False, multi_output=False, ensure_min_samples=1,
                       ensure_min_features=1, y_numeric=False, estimator_name=None):
        if y is None:
            X_checked = check_array(X, accept_sparse=accept_sparse, ensure_2d=ensure_2d,
                                    allow_nd=allow_nd)
            return X_checked
        X_checked, y_checked = check_X_y(X, y, accept_sparse=accept_sparse, ensure_2d=ensure_2d,
                                         multi_output=multi_output, y_numeric=y_numeric)
        return X_checked, y_checked
    setattr(SymbolicClassifier, '_validate_data', _validate_data)

# 构建并拟合 SymbolicClassifier（在 try/except 中，以便捕获错误）
sym_clf = SymbolicClassifier(population_size=500, generations=20, stopping_criteria=0.95, p_crossover=0.7, p_subtree_mutation=0.1, p_hoist_mutation=0.05, p_point_mutation=0.1, max_samples=0.9, verbose=1, random_state=42, parsimony_coefficient=0.001)
fit_exception = None
try:
    sym_clf.fit(embs_train, y_train_arr)
    # 有些 sklearn/gplearn 组合不会自动设置 n_features_in_，手动补齐以兼容 predict
    if not hasattr(sym_clf, 'n_features_in_'):
        try:
            sym_clf.n_features_in_ = embs_train.shape[1]
        except Exception:
            pass
    print('gplearn fit completed successfully')
    try:
        print('Learned program:')
        print(sym_clf._program)
    except Exception:
        pass
except Exception as e:
    import traceback
    traceback.print_exc()
    fit_exception = e
    print('gplearn fit failed with exception:', e)

# 用符号模型预测并评估（只有在 fit 成功时才执行）
if fit_exception is None:
    try:
        # 若 predict 前仍缺少 n_features_in_，再补一次
        if not hasattr(sym_clf, 'n_features_in_'):
            try:
                sym_clf.n_features_in_ = embs_train.shape[1]
            except Exception:
                pass
        y_pred_sym = sym_clf.predict(embs_test)
        print('Symbolic classifier accuracy:', accuracy_score(y_test_arr, y_pred_sym))
        print('Symbolic expression:')
        print(sym_clf._program)

        # 诊断：查看符号预测分布与混淆矩阵，避免 UndefinedMetricWarning
        print('y_test distribution:', np.bincount(y_test_arr))
        try:
            unique, counts = np.unique(y_pred_sym, return_counts=True)
            print('y_pred_sym unique counts:', dict(zip(unique.tolist(), counts.tolist())))
        except Exception:
            print('Could not compute y_pred_sym distribution')
        print('Confusion matrix (symbolic):')
        try:
            print(confusion_matrix(y_test_arr, y_pred_sym))
        except Exception as e:
            print('Could not compute confusion matrix:', e)

    except Exception as e:
        import traceback
        traceback.print_exc()
        print('Prediction with symbolic classifier failed:', e)
else:
    print('Skipping symbolic prediction because fit failed')

# 作为对照，直接在嵌入上训练一个简单 sklearn 逻辑回归
from sklearn.linear_model import LogisticRegression
lr = LogisticRegression(max_iter=500)
lr.fit(embs_train, y_train_arr)
y_pred_lr = lr.predict(embs_test)
print('Logistic regression accuracy:', accuracy_score(y_test_arr, y_pred_lr))

# 最后展示分类报告（符号分类器使用 zero_division=0 以抑制 UndefinedMetricWarning）
if fit_exception is None and 'y_pred_sym' in locals():
    print('Classification report (symbolic):')
    print(classification_report(y_test_arr, y_pred_sym, zero_division=0))
elif fit_exception is None:
    print('Symbolic classifier did not produce predictions')
else:
    print('No symbolic classification report because fit failed')



Epoch 1/8 loss=0.3589 acc=0.8984
Epoch 2/8 loss=0.3314 acc=0.8984
Epoch 2/8 loss=0.3314 acc=0.8984
Epoch 3/8 loss=0.3293 acc=0.8984
Epoch 3/8 loss=0.3293 acc=0.8984
Epoch 4/8 loss=0.3297 acc=0.8984
Epoch 4/8 loss=0.3297 acc=0.8984
Epoch 5/8 loss=0.3303 acc=0.8984
Epoch 5/8 loss=0.3303 acc=0.8984
Epoch 6/8 loss=0.3275 acc=0.8984
Epoch 6/8 loss=0.3275 acc=0.8984
Epoch 7/8 loss=0.3254 acc=0.8984
Epoch 7/8 loss=0.3254 acc=0.8984
Epoch 8/8 loss=0.3228 acc=0.8984
Embeddings shapes: (1437, 16) (360, 16)
y_test distribution: [323  37]
    |   Population Average    |             Best Individual              |
---- ------------------------- ------------------------------------------ ----------
 Gen   Length          Fitness   Length          Fitness      OOB Fitness  Time Left
Epoch 8/8 loss=0.3228 acc=0.8984
Embeddings shapes: (1437, 16) (360, 16)
y_test distribution: [323  37]
    |   Population Average    |             Best Individual              |
---- ------------------------- ------------