In [1]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.utils.rnn import pad_sequence
from sklearn.model_selection import train_test_split
import torch.nn.functional as F
from torch.optim.lr_scheduler import StepLR
#from imblearn.over_sampling import SMOTE
#from torch.optim.lr_scheduler import ReduceLROnPlateau

In [2]:
df = pd.read_csv('C:/Users/JenMing/Desktop/MBTI/LSTM/mbti_to_LSTM_DF.csv')
df.head()

Unnamed: 0,type,posts
0,INFJ,"['INFP', 'INFP', 'INFJ', 'ENFP', 'ISTP', 'INTP..."
1,ENTP,"['INTJ', 'INTP', 'ENFP', 'INTJ', 'INTP', 'INTP..."
2,INTP,"['INTJ', 'INFP', 'INFP', 'INTP', 'INTP', 'INTJ..."
3,INTJ,"['INTJ', 'ISFJ', 'INFP', 'INTP', 'INTP', 'INTP..."
4,ENTJ,"['ENTJ', 'INTP', 'ENFP', 'INTP', 'ENTJ', 'INTJ..."


In [3]:
# 編碼轉換
personality_mapping = {'INFJ': 0,
                        'ENTP': 1,
                        'INTP': 2,
                        'INTJ': 3,
                        'ENTJ': 4,
                        'ENFJ': 5,
                        'INFP': 6,
                        'ENFP': 7,
                        'ISFP': 8,
                        'ISTP': 9,
                        'ISFJ': 10,
                        'ISTJ': 11,
                        'ESTP': 12,
                        'ESFP': 13,
                        'ESTJ': 14,
                        'ESFJ': 15 }

In [4]:
# 資料載入和轉換
encoded_data = []

chars_to_remove = "][' "    

for index, row in df.iterrows():
    dialogues = row["posts"] #字串
    target_personality = row["type"]
    for char in chars_to_remove:
        dialogues = dialogues.replace(char, "")
    
    dialogues_list = dialogues.split(',')
    
    
    dialogue_ids = [personality_mapping[personality] for personality in dialogues_list]
    target_personality_id = personality_mapping[target_personality]
    
    encoded_data.append((dialogue_ids, target_personality_id))
    

In [5]:
# 動態計算 input_size
max_dialogue_length = max(len(dialogue) for dialogue, _ in encoded_data)
input_size = max_dialogue_length
input_size

57

In [6]:
# 填充序列並轉換為張量
padded_dialogues = [torch.tensor(dialogue, dtype=torch.float32) for dialogue, _ in encoded_data]
padded_dialogues = pad_sequence(padded_dialogues, batch_first=True)

target_personality = torch.tensor([target for _, target in encoded_data], dtype=torch.float32)

In [7]:
# 定義 RNN 模型
class PersonalityPredictionRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size,dropout_prob=0.5):
        super(PersonalityPredictionRNN, self).__init__()
        self.rnn = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc1 = nn.Linear(hidden_size, 128)  # 添加全連接層
        self.fc2 = nn.Linear(128,64)
        #self.relu = nn.ReLU()  # 添加 ReLU 激活函數
        self.dropout = nn.Dropout(dropout_prob)
        self.fc3 = nn.Linear(64, output_size)
        #單向
        #self.fc = nn.Linear(hidden_size, output_size)
        
        #雙向
        #self.fc = nn.Linear(hidden_size, output_size)
        #self.rnn = nn.LSTM(input_size, hidden_size, batch_first=True,  bidirectional=True)
        #self.fc = nn.Linear(hidden_size * 2, output_size)
    
    def forward(self, x):
        #print("Input shape:", x.shape)  # 檢查輸入張量的形狀
        out, _ = self.rnn(x)
        out = out.unsqueeze(1)
        out = out[:, -1, :]  # 只取最後一個時間步的輸出
        out = self.fc1(out)
        out = self.fc2(out)
        #out = self.relu(out) #relu
        out = F.sigmoid(out) #sigmoid
        #out = F.tanh(out) #Tanh
        out = self.dropout(out)
        out = self.fc3(out)
        
        #out, _ = self.rnn(x)
        #out = out.unsqueeze(1)
        #out = self.fc(out[:, -1, :])  # 將 out 張量轉為 3 維再進行索引
        return out


In [8]:
# 資料集切分為訓練集和驗證集
train_dialogues, val_dialogues, train_target, val_target = train_test_split(padded_dialogues, target_personality, test_size=0.15, random_state=42)

# 初始化模型
hidden_size = 256
output_size = len(personality_mapping)
model = PersonalityPredictionRNN(input_size, hidden_size, output_size)

# 使用SMOTE生成合成样本
smote = SMOTE(sampling_strategy='auto', random_state=42)  # 可以调整sampling_strategy来控制合成样本的数量
train_dialogues_resampled, train_target_resampled = smote.fit_resample(train_dialogues, train_target)

# 转换为PyTorch张量
train_dialogues_resampled = torch.tensor(train_dialogues_resampled, dtype=torch.float32)
train_target_resampled = torch.tensor(train_target_resampled, dtype=torch.float32)

import matplotlib.pyplot as plt
from collections import Counter

# 计算SMOTE之前的类别分布
class_distribution_before = Counter(train_target)

# 提取类别标签和对应的样本数量
labels_before = list(class_distribution_before.keys())
counts_before = list(class_distribution_before.values())

# 绘制柱状图显示SMOTE之前的类别分布
plt.figure(figsize=(12, 6))
plt.bar(labels_before, counts_before, label='Before SMOTE', alpha=0.5, color='b', width=0.4)
plt.xlabel('Class Labels')
plt.ylabel('Sample Count')
plt.title('Class Distribution Before SMOTE')
plt.legend()
plt.show()

# 计算SMOTE之后的类别分布
class_distribution_after = Counter(train_target_resampled)

# 提取类别标签和对应的样本数量
labels_after = list(class_distribution_after.keys())
counts_after = list(class_distribution_after.values())

# 绘制柱状图显示SMOTE之后的类别分布
plt.figure(figsize=(12, 6))
plt.bar(labels_after, counts_after, label='After SMOTE', alpha=0.5, color='g', width=0.4)
plt.xlabel('Class Labels')
plt.ylabel('Sample Count')
plt.title('Class Distribution After SMOTE')
plt.legend()
plt.show()

In [9]:
# 定義損失函數和優化器 (CEL:分類問題 MSE:回归问题)
criterion = nn.CrossEntropyLoss()
#criterion = nn.MSELoss() 
weight_decay = 0.001
optimizer = optim.Adam(model.parameters(), lr=0.05, weight_decay=weight_decay)
#optimizer = optim.Adam(model.parameters(), lr=0.001)
#StepLR 调度器会每隔 step_size 个周期将学习率乘以 gamma，以逐步降低学习率
scheduler = StepLR(optimizer, step_size=10, gamma=0.1)
#scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10)

In [10]:
# Early Stopping 相關設定
best_val_loss = float('inf')
best_model_state = None
patience = 10
counter = 0

In [11]:
file_path = "C:/Users/JenMing/Desktop/MBTI/LSTM/Model/note.txt"

In [12]:
# 訓練模型
with open(file_path, "w") as f:
    num_epochs = 60
    dimension_counts = {'E/I': 0,
                        'S/N': 0,
                        'T/F': 0,
                        'J/P': 0}
    item_count = 0
    for epoch in range(num_epochs):
        # 在每個訓練循環中檢查 train_dialogues 張量的形狀
        print("Shape of train_dialogues:", train_dialogues.shape)
        f.write("Shape of train_dialogues: {}\n".format(train_dialogues.shape))
        # 在每個驗證循環中檢查 val_dialogues 張量的形狀
        print("Shape of val_dialogues:", val_dialogues.shape)
        f.write("Shape of val_dialogues: {}\n".format(val_dialogues.shape))
        model.train()
        total_loss = 0.0
        for dialogue_batch, target_batch in zip(train_dialogues, train_target):
            target_batch = target_batch.to(torch.long)
            optimizer.zero_grad()

            outputs = model(dialogue_batch.unsqueeze(0))
            loss = criterion(outputs.squeeze(0), target_batch)
            
            # 添加 L2 正则化项到损失
            l2_loss = sum(p.norm(2) for p in model.parameters())
            loss += weight_decay * l2_loss
            
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        scheduler.step()  # 调整学习率
        
        average_loss = total_loss / len(train_dialogues)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {average_loss:.4f}\n")
        f.write(f"Epoch [{epoch+1}/{num_epochs}], Loss: {average_loss:.4f}\n")
        # 驗證模型
        model.eval()
        val_loss = 0.0

        with torch.no_grad():
            for dialogue_batch, target_batch in zip(val_dialogues, val_target):
                target_batch = target_batch.to(torch.long)
                outputs = model(dialogue_batch.unsqueeze(0))
                loss = criterion(outputs.squeeze(1), target_batch.unsqueeze(0))
                val_loss += loss.item()
                
                predicted_personality_ids = outputs.argmax(dim=1)
                predicted_personalities = [personality for personality, id in personality_mapping.items() if id in predicted_personality_ids]
                true_personality_id = target_batch.item()
                true_personality = [personality for personality, id in personality_mapping.items() if id == true_personality_id][0]
                
                #print(predicted_personalities[0])
                #print(true_personality)
                for n in range(4):
                    if n == 0:
                        if predicted_personalities[0][n] == true_personality[n]:
                            dimension_counts['E/I'] += 1
                    elif n == 1:
                        if predicted_personalities[0][n] == true_personality[n]:
                            dimension_counts['S/N'] += 1
                    elif n == 2:
                        if predicted_personalities[0][n] == true_personality[n]:
                            dimension_counts['T/F'] += 1
                    elif n == 3:
                        if predicted_personalities[0][n] == true_personality[n]:
                            dimension_counts['J/P'] += 1
                item_count += 1
                
        average_val_loss = val_loss / len(val_dialogues)
        print(f"Validation Loss: {average_val_loss:.4f}\n")
        f.write(f"Validation Loss: {average_val_loss:.4f}\n")
        
        #scheduler.step(average_val_loss)
        # 比較驗證損失，並根據需要保存最佳模型
        
        if average_val_loss < best_val_loss:
            best_val_loss = average_val_loss
            best_model_state = model.state_dict()
            counter = 0
        else:
            counter += 1
            if counter >= patience:
                print("Early Stopping: Validation loss has not improved for {} epochs. Stopping training.".format(patience))
                break
        
    EI_counts = dimension_counts['E/I']
    SN_counts = dimension_counts['S/N']
    TF_counts = dimension_counts['T/F']
    JP_counts = dimension_counts['J/P']
    print(f'E.I: {EI_counts}/{item_count} ')
    print('Accuracy: '+ str(EI_counts/item_count)+'\n')
    print(f'S.N: {SN_counts}/{item_count} ')
    print('Accuracy: '+ str(SN_counts/item_count)+'\n')
    print(f'T.F: {TF_counts}/{item_count} ')
    print('Accuracy: '+ str(TF_counts/item_count)+'\n')
    print(f'J.P: {JP_counts}/{item_count} ')
    print('Accuracy: '+ str(JP_counts/item_count)+'\n')
    
    f.write(f'E.I: {EI_counts}/{item_count} ')
    f.write('Accuracy: '+ str(EI_counts/item_count)+'\n')
    f.write(f'S.N: {SN_counts}/{item_count} ')
    f.write('Accuracy: '+ str(SN_counts/item_count)+'\n')
    f.write(f'T.F: {TF_counts}/{item_count} ')
    f.write('Accuracy: '+ str(TF_counts/item_count)+'\n')
    f.write(f'J.P: {JP_counts}/{item_count} ')
    f.write('Accuracy: '+ str(JP_counts/item_count)+'\n')
    
    '''
    # 計算整體準確率和各維度的準確率
    total_correct = sum([counts['correct'] for counts in dimension_counts.values()])
    total_total = sum([counts['total'] for counts in dimension_counts.values()])
    overall_accuracy = total_correct / total_total if total_total > 0 else 0.0
    
    print(f"Overall Accuracy: {overall_accuracy:.4f}")
    f.write(f"Overall Accuracy: {overall_accuracy:.4f}\n")
    
    for dimension, counts in dimension_counts.items():
        dimension_accuracy = counts['correct'] / counts['total'] if counts['total'] > 0 else 0.0
        print(f"{dimension} Accuracy: {dimension_accuracy:.4f}")
        f.write(f"{dimension} Accuracy: {dimension_accuracy:.4f}\n")
    '''    
f.close()

Shape of train_dialogues: torch.Size([7372, 57])
Shape of val_dialogues: torch.Size([1302, 57])




Epoch [1/60], Loss: 6.6204

Validation Loss: 5.2876

Shape of train_dialogues: torch.Size([7372, 57])
Shape of val_dialogues: torch.Size([1302, 57])
Epoch [2/60], Loss: 6.5135

Validation Loss: 3.6621

Shape of train_dialogues: torch.Size([7372, 57])
Shape of val_dialogues: torch.Size([1302, 57])
Epoch [3/60], Loss: 6.6933

Validation Loss: 4.7401

Shape of train_dialogues: torch.Size([7372, 57])
Shape of val_dialogues: torch.Size([1302, 57])
Epoch [4/60], Loss: 6.6276

Validation Loss: 8.0194

Shape of train_dialogues: torch.Size([7372, 57])
Shape of val_dialogues: torch.Size([1302, 57])
Epoch [5/60], Loss: 6.6293

Validation Loss: 4.9349

Shape of train_dialogues: torch.Size([7372, 57])
Shape of val_dialogues: torch.Size([1302, 57])
Epoch [6/60], Loss: 6.3833

Validation Loss: 3.9702

Shape of train_dialogues: torch.Size([7372, 57])
Shape of val_dialogues: torch.Size([1302, 57])
Epoch [7/60], Loss: 6.6979

Validation Loss: 3.9356

Shape of train_dialogues: torch.Size([7372, 57])
Shap

Epoch [56/60], Loss: 2.1837

Validation Loss: 2.1639

Shape of train_dialogues: torch.Size([7372, 57])
Shape of val_dialogues: torch.Size([1302, 57])
Epoch [57/60], Loss: 2.1790

Validation Loss: 2.1640

Shape of train_dialogues: torch.Size([7372, 57])
Shape of val_dialogues: torch.Size([1302, 57])
Epoch [58/60], Loss: 2.1807

Validation Loss: 2.1640

Shape of train_dialogues: torch.Size([7372, 57])
Shape of val_dialogues: torch.Size([1302, 57])
Epoch [59/60], Loss: 2.1796

Validation Loss: 2.1639

Shape of train_dialogues: torch.Size([7372, 57])
Shape of val_dialogues: torch.Size([1302, 57])
Epoch [60/60], Loss: 2.1807

Validation Loss: 2.1638

Early Stopping: Validation loss has not improved for 10 epochs. Stopping training.
E.I: 60695/78120 
Accuracy: 0.7769457245263697

S.N: 67363/78120 
Accuracy: 0.8623015873015873

T.F: 44326/78120 
Accuracy: 0.5674091141833078

J.P: 43099/78120 
Accuracy: 0.5517025089605735



In [13]:
if best_model_state:
    torch.save(best_model_state, "C:/Users/JenMing/Desktop/MBTI/LSTM/Model/best_model.pth")
else:
    torch.save(model.state_dict(), "C:/Users/JenMing/Desktop/MBTI/LSTM/Model/best_model.pth")

In [14]:
# 加載已經訓練好的模型
best_model = PersonalityPredictionRNN(input_size, hidden_size, output_size)
best_model.load_state_dict(torch.load("C:/Users/JenMing/Desktop/MBTI/LSTM/Model/best_model.pth"))
best_model.eval()

# 將驗證數據轉換為張量並進行預測
with torch.no_grad():
    val_outputs = best_model(val_dialogues)
    predicted_personality_ids = val_outputs.argmax(dim=1)

# 計算準確率
correct_predictions = (predicted_personality_ids == val_target).sum().item()
total_samples = val_dialogues.shape[0]
accuracy = correct_predictions / total_samples

print("Validation Accuracy: {:.2%}".format(accuracy))
with open(file_path, "a") as f:
    f.write("\n------------\n")
    f.write("Validation Accuracy: {:.2%}".format(accuracy) + "\n")

Validation Accuracy: 25.58%


In [21]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

# 假設 predictions 和 labels 是你的預測和實際標籤
#cm = confusion_matrix(labels, predictions)
accuracy = accuracy_score(val_target, predicted_personality_ids)
precision = precision_score(val_target, predicted_personality_ids, average='weighted')  # 可以使用 'micro'、'macro' 或 'weighted'
recall = recall_score(val_target, predicted_personality_ids, average='weighted')
f1 = f1_score(val_target, predicted_personality_ids, average='weighted')

print(f'Accuracy: {accuracy}')
print(f'Precision: {precision}')
print(f'Recall: {recall}')
print(f'F1 Score: {f1}')

Accuracy: 0.2557603686635945
Precision: 0.12539515217807018
Recall: 0.2557603686635945
F1 Score: 0.14557502255200347


  _warn_prf(average, modifier, msg_start, len(result))


In [None]:
# 將測試數據進行預處理，並轉換為張量
test_personality_list = ['INFJ', 'INTP', 'ENFP', 'INFP', 'INTJ', 'ENFP', 'INFP', 'ENTP', 'ENFP', 'ENFP']  # 加載測試數據，類似於您的訓練數據
# 使用 personality_mapping 將人格類別轉換為 ID
test_personality_ids = [personality_mapping[personality] for personality in test_personality_list]

# 根據人格類別 ID 找到對應的對話編碼
test_dialogues_encoded = [encoded_data[id][0] for id in test_personality_ids]

# 填充對話編碼，使其長度與 input_size 相同
max_dialogue_length = input_size
padded_test_dialogues = [dialogue + [0] * (max_dialogue_length - len(dialogue)) for dialogue in test_dialogues_encoded]

# 將對話編碼進行轉換，並轉換為張量
test_dialogues_padded = pad_sequence([torch.tensor(dialogue, dtype=torch.float32) for dialogue in padded_test_dialogues], batch_first=True)

In [None]:
# 使用模型進行預測
with torch.no_grad():
    test_outputs = best_model(test_dialogues_padded)
    predicted_personality_probs = torch.softmax(test_outputs, dim=1)

# 找到最相近的人格及其概率
closest_personality_id = torch.argmax(predicted_personality_probs, dim=1)
closest_personality = [personality for personality, id in personality_mapping.items() if id == closest_personality_id[0].item()][0]
closest_personality_prob = predicted_personality_probs[0][closest_personality_id[0]].item()

# 輸出最相近的人格及其概率
print("Closest Personality:",closest_personality)
print("Probability:",closest_personality_prob)
print("------------")

# 找到前四高的人格及其概率
top_5_personality_probs, top_5_personality_ids = torch.topk(predicted_personality_probs, k=5)
top_5_personality_probs = top_5_personality_probs[0]
top_5_personality_ids = top_5_personality_ids[0]

# 輸出前四高的人格及其概率
with open(file_path, "a") as f:
    f.write("\n------------\n")
    f.write("Closest Personality: {}\n".format(closest_personality))
    f.write("Probability: {}\n".format(closest_personality_prob))
    f.write("\n------------\n")
    
    for i in range(5):
        personality_id = top_5_personality_ids[i].item()
        personality = [personality for personality, id in personality_mapping.items() if id == personality_id][0]
        prob = top_5_personality_probs[i].item()
        print("Top ", i + 1, " Personality:", personality)
        print("\nProbability:", prob , "\n")
        f.write("Top "+ str(i + 1) + " Personality:"+ personality)
        f.write("\nProbability:"+ str(prob) + "\n")
f.close()

測試SMOTE
目前:epoch=60,hidden_size=128,L2,stepLR=10,lr=0.05,earlystop,crs,drop=0.5
