1.加载数据集

In [None]:
from datasets import load_dataset

# 指定本地文件路径
dataset = load_dataset('parquet', data_files={
    'train': 'C:/Users/10520/Desktop/huggingface_dataset/yelp_review_full/train-00000-of-00001.parquet',
    'test': 'C:/Users/10520/Desktop/huggingface_dataset/yelp_review_full/test-00000-of-00001.parquet'
})

dataset2 = load_dataset('json', data_files={
    'train': 'C:/Users/10520/Desktop/huggingface_dataset/sst5/train.jsonl',
    'test': 'C:/Users/10520/Desktop/huggingface_dataset/sst5/test.jsonl'
})


# 访问训练集中的第100条数据
# print(dataset['train'][100])

# 查看训练集和测试集的条目数量
train_size = len(dataset['train'])
test_size = len(dataset['test'])

print("训练集条目数量:", train_size)
print("测试集条目数量:", test_size)

2.数据预处理

In [None]:
from transformers import AutoTokenizer
from pprint import pprint

tokenizer = AutoTokenizer.from_pretrained("C:/Users/10520/Desktop/huggingface_model/distilbert")


def tokenize_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True)

tokenized_datasets = dataset.map(tokenize_function, batched=True)
tokenized_datasets2 = dataset2.map(tokenize_function, batched=True)

print(tokenized_datasets["train"][0]) # 查看训练集的前五条数据
# print(tokenized_datasets["test"][:5])  # 查看训练集的前五条数据

In [None]:
small_train_dataset = tokenized_datasets["train"].shuffle(seed=42).select(range(30000))
small_test_dataset = tokenized_datasets["test"].shuffle(seed=42).select(range(20000))

input_ids = np.array(small_train_dataset["input_ids"])
attention_mask = np.array(small_train_dataset["attention_mask"])
labels = np.array(small_train_dataset["label"])

x = [input_ids, attention_mask]
y = labels  

3.自定义模型

In [None]:
from transformers import TFDistilBertModel, DistilBertConfig
import tensorflow as tf


class CustomTFEmbeddings(tf.keras.layers.Layer):
    """Custom embeddings layer to concatenate input embeddings and position embeddings."""
    def __init__(self, config, **kwargs):
        super().__init__(**kwargs)
        self.config = config
        self.dim = config.dim
        self.initializer_range = config.initializer_range
        self.max_position_embeddings = config.max_position_embeddings
        self.LayerNorm = tf.keras.layers.LayerNormalization(epsilon=1e-12, name="LayerNorm")
        self.dropout = tf.keras.layers.Dropout(rate=config.dropout)
        
        # 添加线性层
        self.linear_layer = tf.keras.layers.Dense(self.config.hidden_size, activation='relu')

    def build(self, input_shape=None):
        self.weight = self.add_weight(
            name="weight",
            shape=[self.config.vocab_size, self.dim],
            initializer=tf.keras.initializers.TruncatedNormal(stddev=self.initializer_range),
        )
        self.position_embeddings = self.add_weight(
            name="embeddings",
            shape=[self.max_position_embeddings, self.dim],
            initializer=tf.keras.initializers.TruncatedNormal(stddev=self.initializer_range),
        )

    def call(self, input_ids=None, position_ids=None, inputs_embeds=None, training=False):
        assert not (input_ids is None and inputs_embeds is None)

        if input_ids is not None:
            inputs_embeds = tf.gather(params=self.weight, indices=input_ids)

        input_shape = tf.shape(inputs_embeds)[:-1]

        if position_ids is None:
            position_ids = tf.expand_dims(tf.range(start=0, limit=input_shape[-1]), axis=0)

        position_embeds = tf.gather(params=self.position_embeddings, indices=position_ids)

        # 使用串联操作
        final_embeddings = tf.concat([inputs_embeds, position_embeds], axis=-1)

        final_embeddings = self.LayerNorm(final_embeddings)
        final_embeddings = self.dropout(final_embeddings, training=training)

        # 添加线性层
        final_embeddings = self.linear_layer(final_embeddings)

        return final_embeddings


将前馈网络中的两个线性层中间增加卷积层

In [None]:
from transformers.modeling_tf_utils import get_initializer
from transformers.activations_tf import get_tf_activation

class CustomTFFFN(tf.keras.layers.Layer):
    def __init__(self, config, **kwargs):
        super().__init__(**kwargs)
        self.dropout = tf.keras.layers.Dropout(config.dropout)
        
        # 第一个线性层
        self.lin1 = tf.keras.layers.Dense(
            config.hidden_dim, kernel_initializer=get_initializer(config.initializer_range), name="lin1"
        )
        
        # 添加卷积层
        self.conv1d = tf.keras.layers.Conv1D(
            filters=config.hidden_dim,  # 卷积核的个数应该匹配输入的维度
            kernel_size=3,  # 卷积核的大小，你可以根据需求调整
            padding='same',  # 保持输入和输出的长度相同
            activation='relu',  # 激活函数
            name="conv1d"
        )
        
        # 第二个线性层
        self.lin2 = tf.keras.layers.Dense(
            config.dim, kernel_initializer=get_initializer(config.initializer_range), name="lin2"
        )
        
        self.activation = get_tf_activation(config.activation)
        self.config = config

    def call(self, input, training=False):
        # 线性层1
        x = self.lin1(input)
        x = self.activation(x)
        
        # 添加卷积层
        x = self.conv1d(x)
        
        # 线性层2
        x = self.lin2(x)
        x = self.dropout(x, training=training)
        return x

    def build(self, input_shape=None):
        if self.built:
            return
        self.built = True
        if getattr(self, "lin1", None) is not None:
            with tf.name_scope(self.lin1.name):
                self.lin1.build([None, None, self.config.dim])
        if getattr(self, "lin2", None) is not None:
            with tf.name_scope(self.lin2.name):
                self.lin2.build([None, None, self.config.hidden_dim])
        if getattr(self, "conv1d", None) is not None:
            with tf.name_scope(self.conv1d.name):
                self.conv1d.build([None, None, self.config.hidden_dim])


修改模型中的对应类

In [None]:
from transformers.models.distilbert import modeling_tf_distilbert


modeling_tf_distilbert.TFEmbeddings = CustomTFEmbeddings

modeling_tf_distilbert.TFFFN = CustomTFFFN


5.加载模型，训练模型

In [None]:
from transformers import TFAutoModelForSequenceClassification,TFDistilBertForSequenceClassification,AutoConfig

# 创建 TensorFlow 数据集
test_tf_dataset = small_test_dataset.to_tf_dataset(
    columns=['input_ids', 'attention_mask'],
    label_cols='label',
    shuffle=True,
    batch_size = 32
)


In [None]:
import tensorflow as tf
from transformers import TFDistilBertForSequenceClassification
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping

# 加载模型
model = TFDistilBertForSequenceClassification.from_pretrained("C:/Users/10520/Desktop/huggingface_model/distilbert", num_labels=5, ignore_mismatched_sizes=True)

# 编译模型
model.compile(optimizer=Adam(learning_rate=2e-5), 
              metrics=['accuracy'],
              loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True))

# 设置早停策略
early_stopping = EarlyStopping(
    monitor='val_loss',   # 监测的指标
    patience=3,           # 容忍的epoch数，即验证集损失没有改善的连续epoch数
    restore_best_weights=True  # 恢复训练过程中验证集损失最小的模型权重
)
# 创建 ReduceLROnPlateau 调度器
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=2)

# 训练模型
history = model.fit(
    x,y,
    validation_split = 0.1,
    epochs=25, 
    batch_size=8, 
    callbacks=[early_stopping,reduce_lr]  # 添加早停回调
)

In [None]:
import matplotlib.pyplot as plt

# 绘制损失曲线
plt.figure(figsize=(10, 6))
plt.plot(history.history['loss'], label='training loss')
plt.plot(history.history['val_loss'], label='validation loss')
plt.title('loss curve')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# 绘制准确率曲线
plt.figure(figsize=(10, 6))
plt.plot(history.history['accuracy'], label='training accuray')
plt.plot(history.history['val_loss'], label='validation accuracy')
plt.title('loss curve')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

评估模型

In [None]:
# 评估模型
results = model.evaluate(test_tf_dataset)
results


In [None]:
# model.save_pretrained("C:/Users/10520/Desktop/huggingface_model/distilbert")