## SVM

In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import SVC
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score
# 读取数据
data = pd.read_excel('/content/tech.xlsx')

# 将连续情感分数映射为离散类别
# def map_score_to_category(score):
#     if score < 0:
#         return -1
#     elif score > 0:
#         return 1
#     else:
#         return 0
# 将连续情感分数映射为离散类别
def map_score_to_category(score):
    if score < 0:
        return -1
    else :
        return 1

data['emotion_category'] = data['score'].apply(map_score_to_category)

# 分割数据为训练集和测试集
X = data['hd']
y = data['emotion_category']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 使用TF-IDF特征提取
tfidf_vectorizer = TfidfVectorizer(max_features=500)
X_train_tfidf = tfidf_vectorizer.fit_transform(X_train)
X_test_tfidf = tfidf_vectorizer.transform(X_test)

# 创建SVM模型
svm_model = SVC(kernel='linear', C=1.0)

# 训练模型
svm_model.fit(X_train_tfidf, y_train)

# 预测情感类别
y_pred = svm_model.predict(X_test_tfidf)

# 输出评估指标
print(classification_report(y_test, y_pred))
print('Accuracy:',accuracy_score(y_test, y_pred))


FileNotFoundError: [Errno 2] No such file or directory: '/content/tech.xlsx'

## 朴素贝叶斯

In [None]:
import pandas as pd
import numpy as np
import jieba
import matplotlib.pyplot as plt
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import classification_report

# 读取数据
data = pd.read_excel('/content/tech2.xlsx')

# 将连续情感分数映射为二元类别
def map_score_to_binary(score):
    if score >= 0:
        return 1  # 正面
    else:
        return 0  # 非正面

data['emotion_binary'] = data['score'].apply(map_score_to_binary)
# 分割数据为训练集和测试集
X = data['hd']
y = data['emotion_binary']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 使用TF-IDF特征提取
tfidf_vectorizer = TfidfVectorizer(max_features=5000)
X_train_tfidf = tfidf_vectorizer.fit_transform(X_train)
X_test_tfidf = tfidf_vectorizer.transform(X_test)

# 构建贝叶斯算法分类器
mb = MultinomialNB(alpha=1)  # alpha 为可选项，默认 1.0，添加拉普拉修/Lidstone 平滑参数
# 训练数据
mb.fit(X_train_tfidf, y_train)
# 预测数据
y_predict = mb.predict(X_test_tfidf)
#预测值与真实值展示
# print('预测值：',y_predict)
# print('真实值：',y_test)
report = classification_report(y_test, y_predict) # X_test_tfidf, y_test
print(report)

mb.score(X_test_tfidf, y_test)

## 分层采样

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tensorflow.keras import regularizers
import random
from sklearn.utils import shuffle
import importlib
importlib.invalidate_caches()

df = pd.read_excel('tech.xlsx')
df = df.sort_values('score')

subset1, subset2, subset3 = np.split(df, [df[df['score'] < 0].index[-1]+1, df[df['score'] == 0].index[-1]+1])
train1 = subset1.sample(frac=0.7)
test1 = subset1.drop(train1.index)

train2 = subset2.sample(frac=0.7)
test2 = subset2.drop(train2.index)

train3 = subset3.sample(frac=0.7)
test3 = subset3.drop(train3.index)

train_set = pd.concat([train1, train2, train3])
test_set = pd.concat([test1, test2, test3])
train_set = shuffle(train_set)
test_set = shuffle(test_set)
X_train = train_set['hd']
y_train = train_set['score']
X_test = test_set['hd']
y_test = test_set['score']
y_train = train_set['score']
y_test = test_set['score']
data=pd.concat([train_set, test_set])

## LSTM

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import random
from sklearn.utils import shuffle
import importlib
importlib.invalidate_caches()
df = pd.read_excel('tech.xlsx') #/content/drive/MyDrive/数据集/数据集/大豆/gpt_soybean20180709-20180927 tech1 tech2 tech3

data = shuffle(df)
mean_score = data['score'].mean()
# def map_score_to_category(score):
#     if score < 0:
#         return -1
#     else :
#         return 1
# def map_score_to_category(score):
#     if score < 0:
#         return -1
#     elif score > 0:
#         return 1
#     else:
#         return 0
#data['emotion_category'] = data['score'].apply(map_score_to_category)
#data['score'].fillna(mean_score, inplace=True)
scores =data['score']
# 构建词汇表，将标题文本转换为数字序列
tokenizer = tf.keras.layers.TextVectorization(output_mode='int')
tokenizer.adapt(data['hd'])

# 将标题文本转换为数字序列
title_sequences = tokenizer(data['hd']).numpy()

# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(title_sequences, np.array(scores), test_size=0.3, random_state=42) # 0.2 0.4

# 构建模型
model = tf.keras.Sequential([
    tf.keras.layers.Embedding(input_dim=len(tokenizer.get_vocabulary()), output_dim=64, mask_zero=True),
    tf.keras.layers.LSTM(64,kernel_regularizer=regularizers.l2(0.01)),
    tf.keras.layers.Dense(1,activation='sigmoid')
])
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
# 编译模型
model.compile(optimizer, loss='mean_squared_error')

# 记录损失
history = model.fit(X_train, y_train, epochs=50, validation_data=(X_test, y_test), verbose=2)
train_loss = history.history['loss']
test_loss = history.history['val_loss']
#output_file = 'lstm.txt'

# with open(output_file, 'w') as f:
#   f.write(str(history.history))
# f.close()
# 绘图
# plt.plot(train_loss, label='Train Loss')
# plt.plot(test_loss, label='Test Loss')
# plt.title('LSTM Model Performance')
# plt.xlabel('Epoch')
# plt.ylabel('Loss')
# plt.legend()
# plt.show()


## ATT_LSTM

In [None]:
import numpy as np
import pandas as pd
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.decomposition import LatentDirichletAllocation
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import regularizers
from tensorflow.keras.layers import Layer, Embedding, LSTM, Dense

class AttentionLayer(Layer):
    def __init__(self, **kwargs):
        super(AttentionLayer, self).__init__(**kwargs)

    def build(self, input_shape):
        # Create a trainable weight variable for this layer.
        self.W_q = self.add_weight(name="W_q", shape=(input_shape[-1], input_shape[-1]), initializer="uniform")
        self.W_k = self.add_weight(name="W_k", shape=(input_shape[-1], input_shape[-1]), initializer="uniform")
        super(AttentionLayer, self).build(input_shape)  # Be sure to call this at the end

    def call(self, x):
        q = tf.matmul(x, self.W_q)
        k = tf.matmul(x, self.W_k)
        v = x

        attention_weights = tf.nn.softmax(tf.matmul(q, k, transpose_b=True), axis=-1)
        output = tf.matmul(attention_weights, v)
        return output

    def compute_output_shape(self, input_shape):
        return input_shape
# 定义新闻标题数据和对应的分数数据
# df = pd.read_excel('tech.xlsx') # /content/drive/MyDrive/数据集/数据集/大豆/gpt_soybean20180709-20180927.xlsx
# data = shuffle(df)
#mean_score = data['score'].mean()
#data['score'].fillna(mean_score, inplace=True)
# def map_score_to_category(score):
#     if score <= 0:
#         return 0
#     else:
#         return 1
# data['emotion_category'] = data['score'].apply(map_score_to_category)
# scores = data['emotion_category']

# 构建词汇表，将标题文本转换为数字序列
tokenizer = tf.keras.layers.TextVectorization(output_mode='int')
tokenizer.adapt(data['hd'])

# 将标题文本转换为数字序列
title_sequences = tokenizer(data['hd']).numpy()

# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(title_sequences, np.array(scores), test_size=0.3, random_state=42)


# 构建模型
input_layer = tf.keras.layers.Input(shape=(title_sequences.shape[1],))
embedding_layer = Embedding(input_dim=len(tokenizer.get_vocabulary()), output_dim=64, mask_zero=True)(input_layer)
lstm_layer = LSTM(64, kernel_regularizer=regularizers.l2(0.01),return_sequences=True)(embedding_layer)
attention_layer = AttentionLayer()(lstm_layer)
output_layer = Dense(1)(attention_layer)

model = tf.keras.Model(inputs=input_layer, outputs=output_layer)
# 定义精度指标函数
def custom_accuracy(y_true, y_pred):
    threshold = 0.4  # 自定义阈值 0.5 0.1 0.3
    absolute_error = tf.abs(y_true - y_pred)
    correct_predictions = tf.reduce_mean(tf.cast(absolute_error < threshold, tf.float32))
    return correct_predictions

# 编译模型
model.compile(optimizer='adam', loss='mean_squared_error', metrics=[custom_accuracy])

# 训练模型
#model.fit(X_train, y_train, epochs=100, verbose=2)
history = model.fit(X_train, y_train, epochs=50, validation_data=(X_test, y_test), verbose=2)
train_loss = history.history['loss']
test_loss = history.history['val_loss']
accuracy = history.history['custom_accuracy']  # 获取精度值

# 绘图
# plt.plot(train_loss, label='Train Loss')
# plt.plot(test_loss, label='Test Loss')
# plt.title('Attention-based LSTM Model Performance')
# plt.xlabel('Epoch')
# plt.ylabel('Loss')
# plt.legend()
# plt.show()
#plt.savefig('myplot111.png', dpi=300)

## seq2seq

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split

# 读取数据
# data = pd.read_excel('tech.xlsx')

# titles = data['hd']
# scores = data['score']

# 文本标记化
tokenizer = Tokenizer()
tokenizer.fit_on_texts(titles)
sequences = tokenizer.texts_to_sequences(titles)


max_len = max([len(seq) for seq in sequences])
padded_sequences = pad_sequences(sequences, maxlen=max_len, padding='post')
labels = np.array(scores)

X_train, X_test, y_train, y_test = train_test_split(padded_sequences, labels, test_size=0.3, random_state=42) #0.2 0.4

vocab_size = len(tokenizer.word_index) + 1
embedding_dim = 16
latent_dim = 32

# Encoder
encoder_inputs = tf.keras.layers.Input(shape=(max_len,))
encoder_embedding = tf.keras.layers.Embedding(input_dim=voicab_size, output_dim=embedding_dim, input_length=max_len)(encoder_inputs)
encoder_lstm = tf.keras.layers.LSTM(latent_dim)(encoder_embedding)
encoder_outputs, state_h, state_c = tf.keras.layers.LSTM(latent_dim, return_state=True)(encoder_embedding)
encoder_states = [state_h, state_c]

# Decoder
decoder_inputs = tf.keras.layers.Input(shape=(max_len,))
decoder_embedding = tf.keras.layers.Embedding(input_dim=vocab_size, output_dim=embedding_dim, input_length=max_len)(decoder_inputs)
decoder_lstm = tf.keras.layers.LSTM(latent_dim, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(decoder_embedding, initial_state=encoder_states)
decoder_dense = tf.keras.layers.Dense(1, activation='linear')
decoder_outputs = decoder_dense(decoder_outputs)

model = tf.keras.models.Model([encoder_inputs, decoder_inputs], decoder_outputs)
model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mean_absolute_error'])

history = model.fit([X_train, X_train], y_train, epochs=50, batch_size=1, validation_split=0.2)

# loss, mae = model.evaluate([X_test, X_test], y_test)
# print("Test Mean Absolute Error:", mae)


## transformer

In [1]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split

# 读取数据
# data = pd.read_excel('/content/tech.xlsx')

# titles = data['hd']
# scores = data['score']

# # 文本标记化
# tokenizer = Tokenizer()
# tokenizer.fit_on_texts(titles)
# sequences = tokenizer.texts_to_sequences(titles)

# # 序列填充
# max_len = max([len(seq) for seq in sequences])
# padded_sequences = pad_sequences(sequences, maxlen=max_len, padding='post')

# # 标签处理
# labels = np.array(scores)

X_train, X_test, y_train, y_test = train_test_split(padded_sequences, labels, test_size=0.3, random_state=42)

vocab_size = len(tokenizer.word_index) + 1
embedding_dim = 32
num_heads = 2
dff = 32
num_encoder_layers = 2

# 位置编码
def positional_encoding(position, d_model):
    angle_rads = np.arange(position)[:, np.newaxis] / np.power(10000, (2 * (np.arange(d_model)[np.newaxis, :] // 2)) / np.float32(d_model))
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[np.newaxis, ...]
    return tf.cast(pos_encoding, dtype=tf.float32)

# 建立Transformer模型
class TransformerEncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(TransformerEncoderLayer, self).__init__()
        self.multi_head_attention = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model)
        self.ffn = tf.keras.Sequential([
            tf.keras.layers.Dense(dff, activation='relu'),
            tf.keras.layers.Dense(d_model)
        ])
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)

    def call(self, inputs, training=True):
        attn_output = self.multi_head_attention(inputs, inputs, return_attention_scores=False)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)
        return out2

class TransformerEncoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, maximum_position_encoding, rate=0.1):
        super(TransformerEncoder, self).__init__()
        self.d_model = d_model
        self.num_layers = num_layers
        self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding, self.d_model)
        self.enc_layers = [TransformerEncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, inputs, training=True):
        seq_len = tf.shape(inputs)[1]
        inputs = self.embedding(inputs)
        inputs *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        inputs += self.pos_encoding[:, :seq_len, :]
        inputs = self.dropout(inputs, training=training)
        for i in range(self.num_layers):
            inputs = self.enc_layers[i](inputs, training)
        return inputs

# 构建Transformer模型
class TransformerModel(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, maximum_position_encoding, rate=0.1):
        super(TransformerModel, self).__init__()
        self.encoder = TransformerEncoder(num_layers, d_model, num_heads, dff, input_vocab_size, maximum_position_encoding, rate)
        self.flatten = tf.keras.layers.Flatten()
        self.dense = tf.keras.layers.Dense(1, activation='linear')

    def call(self, inputs, training=True):
        enc_output = self.encoder(inputs, training)
        flattened_output = self.flatten(enc_output)
        output = self.dense(flattened_output)
        return output

# 初始化并训练模型
model = TransformerModel(
    num_layers=num_encoder_layers,
    d_model=embedding_dim,
    num_heads=num_heads,
    dff=dff,
    input_vocab_size=vocab_size,
    maximum_position_encoding=max_len,
)    

model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mean_absolute_error'])

model.fit(X_train, y_train, epochs=50, batch_size=1, validation_data=(X_test, y_test))

loss, mae = model.evaluate(X_test, y_test)
# print("Test Mean Absolute Error:", mae)



NameError: name 'padded_sequences' is not defined

## eda

In [None]:
# @Author : zhany
# @Time : 2019/03/20 

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import jieba
import synonyms
import random
from random import shuffle

random.seed(2019)

#停用词列表，默认使用哈工大停用词表
f = open('stopwords/HIT_stop_words.txt')
stop_words = list()
for stop_word in f.readlines():
    stop_words.append(stop_word[:-1])


#考虑到与英文的不同，暂时搁置
#文本清理
'''
import re
def get_only_chars(line):
    #1.清除所有的数字
'''


########################################################################
# 同义词替换
# 替换一个语句中的n个单词为其同义词
########################################################################
def synonym_replacement(words, n):
    new_words = words.copy()
    random_word_list = list(set([word for word in words if word not in stop_words]))     
    random.shuffle(random_word_list)
    num_replaced = 0  
    for random_word in random_word_list:          
        synonyms = get_synonyms(random_word)
        if len(synonyms) >= 1:
            synonym = random.choice(synonyms)   
            new_words = [synonym if word == random_word else word for word in new_words]   
            num_replaced += 1
        if num_replaced >= n: 
            break

    sentence = ' '.join(new_words)
    new_words = sentence.split(' ')

    return new_words

def get_synonyms(word):
    return synonyms.nearby(word)[0]


########################################################################
# 随机插入
# 随机在语句中插入n个词
########################################################################
def random_insertion(words, n):
    new_words = words.copy()
    for _ in range(n):
        add_word(new_words)
    return new_words

def add_word(new_words):
    synonyms = []
    counter = 0    
    while len(synonyms) < 1:
        random_word = new_words[random.randint(0, len(new_words)-1)]
        synonyms = get_synonyms(random_word)
        counter += 1
        if counter >= 10:
            return
    random_synonym = random.choice(synonyms)
    random_idx = random.randint(0, len(new_words)-1)
    new_words.insert(random_idx, random_synonym)


########################################################################
# Random swap
# Randomly swap two words in the sentence n times
########################################################################

def random_swap(words, n):
    new_words = words.copy()
    for _ in range(n):
        new_words = swap_word(new_words)
    return new_words

def swap_word(new_words):
    random_idx_1 = random.randint(0, len(new_words)-1)
    random_idx_2 = random_idx_1
    counter = 0
    while random_idx_2 == random_idx_1:
        random_idx_2 = random.randint(0, len(new_words)-1)
        counter += 1
        if counter > 3:
            return new_words
    new_words[random_idx_1], new_words[random_idx_2] = new_words[random_idx_2], new_words[random_idx_1] 
    return new_words

########################################################################
# 随机删除
# 以概率p删除语句中的词
########################################################################
def random_deletion(words, p):

    if len(words) == 1:
        return words

    new_words = []
    for word in words:
        r = random.uniform(0, 1)
        if r > p:
            new_words.append(word)

    if len(new_words) == 0:
        rand_int = random.randint(0, len(words)-1)
        return [words[rand_int]]

    return new_words


########################################################################
#EDA函数
def eda(sentence, alpha_sr=0.1, alpha_ri=0.1, alpha_rs=0.1, p_rd=0.1, num_aug=9):
    seg_list = jieba.cut(sentence)
    seg_list = " ".join(seg_list)
    words = list(seg_list.split())
    num_words = len(words)

    augmented_sentences = []
    num_new_per_technique = int(num_aug/4)+1
    n_sr = max(1, int(alpha_sr * num_words))
    n_ri = max(1, int(alpha_ri * num_words))
    n_rs = max(1, int(alpha_rs * num_words))

    #print(words, "\n")

    
    #同义词替换sr
    for _ in range(num_new_per_technique):
        a_words = synonym_replacement(words, n_sr)
        augmented_sentences.append(' '.join(a_words))

    #随机插入ri
    for _ in range(num_new_per_technique):
        a_words = random_insertion(words, n_ri)
        augmented_sentences.append(' '.join(a_words))
    
    #随机交换rs
    for _ in range(num_new_per_technique):
        a_words = random_swap(words, n_rs)
        augmented_sentences.append(' '.join(a_words))

   
    #随机删除rd
    for _ in range(num_new_per_technique):
        a_words = random_deletion(words, p_rd)
        augmented_sentences.append(' '.join(a_words))
    
    #print(augmented_sentences)
    shuffle(augmented_sentences)

    if num_aug >= 1:
        augmented_sentences = augmented_sentences[:num_aug]
    else:
        keep_prob = num_aug / len(augmented_sentences)
        augmented_sentences = [s for s in augmented_sentences if random.uniform(0, 1) < keep_prob]

    augmented_sentences.append(seg_list)

    return augmented_sentences

##
#测试用例
#eda(sentence="我们就像蒲公英，我也祈祷着能和你飞去同一片土地")
