In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from sklearn.metrics import accuracy_score, f1_score, classification_report
from sklearn.model_selection import train_test_split
from transformers import BertTokenizer, TFBertForSequenceClassification

In [None]:
model_name = "bert-base-uncased" #bert-large-uncased
model_path = 'model/ivdr_text_classification'
vocab_size = 10000  # 词汇表大小
embedding_dim = 100  # 词向量维度
hidden_size = 128 #128  # LSTM隐藏层大小
max_sequence_length = 128
batch_size = 20

num_classes = 4
epochs_classes = 10
classification_learning_rate = 0.00001
weight_decay = 0.01

In [None]:
df_1 = pd.read_csv('data/ivdr_classification_referance.csv')
df_2 = pd.read_csv('data/ivdr分类训练数据.csv', sep=',')
df_1 = df_1[['device_type', 'device_name']]
df_2 = df_2.dropna(subset=['product_name'])
df_2 = df_2[['risk_class', 'product_name']]
df_2.columns = ['device_type', 'device_name']

class_map = {
    'Class A': 0,
    'Class B': 1,
    'Class C': 2,
    'Class D': 3
}

merged_df = pd.concat([df_1, df_2])
merged_df.replace({'device_type': class_map}, inplace=True)

print(merged_df['device_type'].value_counts())
merged_df

In [None]:
# 加载数据和标签
data = merged_df.to_dict(orient='list')
sentences = data['device_name']
labels = data['device_type']

# 划分训练集和验证集
train_data, test_data, train_labels, test_labels = train_test_split(sentences, labels, test_size=0.2, random_state=42)
train_data, eval_data, train_labels, eval_labels = train_test_split(train_data, train_labels, test_size=0.25, random_state=42)

In [None]:
# 初始化BertTokenizer
tokenizer = BertTokenizer.from_pretrained(model_name)

# 编码训练集和验证集的输入文本
train_encodeds = tokenizer.batch_encode_plus(train_data, truncation=True, padding='max_length', max_length=max_sequence_length, return_tensors='tf')
test_encodeds = tokenizer.batch_encode_plus(test_data, truncation=True, padding='max_length', max_length=max_sequence_length, return_tensors='tf')
eval_encodeds = tokenizer.batch_encode_plus(eval_data, truncation=True, padding='max_length', max_length=max_sequence_length, return_tensors='tf')
# input_ids = tf.convert_to_tensor(encodeds['input_ids'])
# attention_mask = tf.convert_to_tensor(encodeds['attention_mask'])
# labels = tf.convert_to_tensor(labels)

# 转换为TensorFlow Dataset格式
train_dataset = tf.data.Dataset.from_tensor_slices( \
    ({'input_ids': train_encodeds['input_ids'], 'attention_mask': train_encodeds['attention_mask']}, train_labels) \
    ).shuffle(num_classes).batch(batch_size)
test_dataset = tf.data.Dataset.from_tensor_slices( \
    ({'input_ids': test_encodeds['input_ids'], 'attention_mask': test_encodeds['attention_mask']}, test_labels) \
    ).shuffle(num_classes).batch(batch_size)
eval_dataset = tf.data.Dataset.from_tensor_slices( \
    ({'input_ids': eval_encodeds['input_ids'], 'attention_mask': eval_encodeds['attention_mask']}, eval_labels) \
    ).shuffle(num_classes).batch(batch_size)

train_dataset

In [None]:
# 定义TFBertForSequenceClassification模型
classes_model = TFBertForSequenceClassification.from_pretrained(model_name, num_labels=num_classes)

# 定义优化器和损失函数
optimizer = tf.keras.optimizers.Adam(learning_rate=classification_learning_rate)
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# 定义评估指标
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy('train_accuracy')
eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy('eval_accuracy')
train_loss = tf.keras.metrics.Mean('train_loss', dtype=tf.float32)
eval_loss = tf.keras.metrics.Mean('eval_loss', dtype=tf.float32)

@tf.function
def train_step(inputs, labels):
    predictions = None

    with tf.GradientTape() as tape:
        outputs = classes_model(inputs, training=True)[0]
        loss_value = loss(labels, outputs)
        predictions = tf.argmax(outputs, axis=1)

    gradients = tape.gradient(loss_value, classes_model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, classes_model.trainable_variables))

    train_accuracy(labels, outputs)
    train_loss(loss_value)

    return predictions

@tf.function
def eval_step(inputs, labels):
    outputs = classes_model(inputs, training=False)[0]
    loss_value = loss(labels, outputs)

    eval_accuracy(labels, outputs)
    eval_loss(loss_value)

    predictions = tf.argmax(outputs, axis=1)

    return predictions

@tf.function
def test_step(inputs, labels):
    outputs = classes_model(inputs, training=False)[0]
    loss_value = loss(labels, outputs)

    test_accuracy(labels, outputs)
    test_loss(loss_value)

    predictions = tf.argmax(outputs, axis=1)

    return predictions


In [None]:
# 训练模型
train_acc_scores = []
eval_acc_scores = []
for epoch in range(epochs_classes):
    train_accuracy.reset_states()
    eval_accuracy.reset_states()
    train_loss.reset_states()
    eval_loss.reset_states()
    train_predictions = []
    eval_predictions = []
    train_f1 = 0
    eval_f1 = 0

    for batch_inputs, batch_labels in train_dataset:
        predictions = train_step(batch_inputs, batch_labels)
        train_predictions.extend(predictions)

    for batch_inputs, batch_labels in eval_dataset:
        predictions = eval_step(batch_inputs, batch_labels)
        eval_predictions.extend(predictions)

    train_f1 = f1_score(train_labels, train_predictions)
    eval_f1 = f1_score(eval_labels, eval_predictions)
    train_acc_scores.append(train_accuracy.result().numpy())
    eval_acc_scores.append(eval_accuracy.result().numpy())
    print('Epoch {}: 训练: Loss: {:.4f}, Accuracy: {:.4f}, F1: {:.4f}, 验证: Loss: {:.4f}, Accuracy: {:.4f}, F1: {:.4f},'.format(
        epoch + 1, train_loss.result(), train_accuracy.result(), train_f1, eval_loss.result(), eval_accuracy.result(), eval_f1
    ))

plt.plot(range(epochs_classes), train_acc_scores, label='Train')
plt.plot(range(epochs_classes), eval_acc_scores, label='Validation')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
classes_model.save_pretrained(model_path)

In [None]:
classes_model = TFBertForSequenceClassification.from_pretrained(model_path)
tokenizer = BertTokenizer.from_pretrained(model_name)

classes_model

In [None]:
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy('test_accuracy')
test_loss = tf.keras.metrics.Mean('test_loss', dtype=tf.float32)
test_accuracy.reset_states()
test_loss.reset_states()
test_predictions = []
test_f1 = 0
for batch_inputs, batch_labels in test_dataset:
    predictions = test_step(batch_inputs, batch_labels)
    test_predictions.extend(predictions)
test_f1 = f1_score(test_labels, test_predictions, average='micro')

print(test_loss.result().numpy(), test_accuracy.result().numpy(), test_f1)

In [None]:
# outputs = classes_model.predict(test_dataset)
# probabilities = tf.nn.softmax(outputs.logits, axis=1)

# for i in probabilities:
#     predicted_label = tf.argmax([i], axis=1).numpy()[0]
#     print("预测标签:", predicted_label, i.numpy())

In [None]:
df_1 = pd.read_csv('data/ivdr_classification_referance.csv')
df_2 = pd.read_csv('data/ivdr分类训练数据.csv', sep=',')
df_1 = df_1[['device_type', 'device_name']]
df_2 = df_2.dropna(subset=['product_name'])
df_2 = df_2[['risk_class', 'product_name']]
df_2.columns = ['device_type', 'device_name']

class_map = {
    'Class A': 0,
    'Class B': 1,
    'Class C': 2,
    'Class D': 3
}

merged_df = pd.concat([df_1, df_2])
merged_df.replace({'device_type': class_map}, inplace=True)

print(merged_df['device_type'].value_counts())
data = merged_df.to_dict(orient='list')
sentences = data['device_name']
labels = data['device_type']

In [None]:
label_list = ['Class A', 'Class B', 'Class C', 'Class D']
text = "Limusave MT-7501"

out_data = []
for i in range(len(sentences)):
    text = sentences[i]
    input_ids = tokenizer.encode(text, add_special_tokens=True)
    input_ids = tf.convert_to_tensor([input_ids])  # 转换为 TensorFlow 张量
    outputs = classes_model.predict(input_ids)
    probabilities = tf.nn.softmax(outputs.logits, axis=1)
    predicted_index = np.argmax(probabilities, axis=1)
    predicted_label = label_list[predicted_index[0]]
    out_data.append({
        'text': text,
        'predict': predicted_label,
        'label': labels[i],
        'classA': probabilities[0].numpy()[0],
        'classB': probabilities[0].numpy()[1],
        'classC': probabilities[0].numpy()[2],
        'classD': probabilities[0].numpy()[3]
        })
    
df = pd.json_normalize(out_data)
df

In [None]:
def compare_values(row):
    if row['predict'] == row['label']:
        return 1
    else:
        return 0

class_map = {
    'Class A': 0,
    'Class B': 1,
    'Class C': 2,
    'Class D': 3
}
df.replace({'predict': class_map}, inplace=True)
# df['label'] = df['label'].astype(str)
df['result'] = df.apply(compare_values, axis=1)

df.to_excel('data/ivdr_classification_out.xlsx')
df

In [None]:
df['result'].value_counts().to_dict()

In [107]:
Accuracy = []
Coverage = []
threshold = []
total_amount = len(df)

for num in range(40, 99, +1):
    num = num/100

    filtered_df = df[(df['classA'] > num) | (df['classB'] > num) | (df['classC'] > num) | (df['classD'] > num)]
    grouped = filtered_df['result'].value_counts().to_dict()
    print(num, acc, grouped)
    acc = grouped[1] / (grouped[0] + grouped[1])
    cov = (grouped[0] + grouped[1]) / total_amount
    Accuracy.append(acc)
    Coverage.append(cov)
    threshold.append(num)

0.4 0.9889543446244478 {1: 7351, 0: 470}
0.41 0.9399053829433577 {1: 7351, 0: 470}
0.42 0.9399053829433577 {1: 7350, 0: 463}
0.43 0.9407397926532702 {1: 7350, 0: 463}
0.44 0.9407397926532702 {1: 7350, 0: 461}
0.45 0.9409806682883114 {1: 7350, 0: 460}
0.46 0.941101152368758 {1: 7350, 0: 459}
0.47 0.9412216673069536 {1: 7345, 0: 456}
0.48 0.941545955646712 {1: 7345, 0: 452}
0.49 0.9420289855072463 {1: 7345, 0: 449}
0.5 0.9423915832691814 {1: 7338, 0: 443}
0.51 0.9430664439018122 {1: 7332, 0: 439}
0.52 0.9435079140393772 {1: 7326, 0: 428}
0.53 0.9448026824864586 {1: 7319, 0: 420}
0.54 0.9457294224059956 {1: 7311, 0: 417}
0.55 0.9460403726708074 {1: 7303, 0: 407}
0.56 0.9472114137483787 {1: 7297, 0: 403}
0.57 0.9476623376623377 {1: 7289, 0: 393}
0.58 0.9488414475397032 {1: 7279, 0: 384}
0.59 0.9498890773848362 {1: 7275, 0: 374}
0.6 0.9511047195711858 {1: 7271, 0: 373}
0.61 0.9512035583464155 {1: 7269, 0: 364}
0.62 0.9523123280492598 {1: 7261, 0: 359}
0.63 0.9528871391076116 {1: 7241, 0: 34

In [106]:
from pyecharts.charts import Line
from pyecharts import options as opts
from pyecharts.globals import ThemeType


x = [str(num) for num in threshold]
y1 = [round(num, 2)*100 for num in Accuracy]
y2 =  [round(num, 2)*100 for num in Coverage]
line = (
    Line(init_opts=opts.InitOpts(theme=ThemeType.LIGHT))
    .set_global_opts(
        title_opts=opts.TitleOpts(title="Two Lines"),
        xaxis_opts=opts.AxisOpts(name="阈值"),
        yaxis_opts=opts.AxisOpts(name="准确率"),
    )
    .add_xaxis(xaxis_data=x)
    .add_yaxis(series_name="Accuracy", y_axis=y1)
    .add_yaxis(series_name="Coverage", y_axis=y2)
    .render("落地效果预估.html")
)

# line.render_notebook()