In [1]:
import os
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"


if "_JAVA_OPTIONS" in os.environ:
    del os.environ["_JAVA_OPTIONS"]

try:
    spark = SparkSession.builder \
        .appName("MBTI_SparkNLP_Loader") \
        .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.1.0") \
        .config("spark.driver.maxResultSize", "0") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.driver.memory", "4g") \
        .config("spark.kryoserializer.buffer.max", "256m") \
        .getOrCreate()
    print("Spark Session 啟動成功！")

except Exception as e:
    print(f"Spark Session 啟動失敗，請檢查系統 PATH 和 JDK 安裝: {e}")


25/12/11 17:17:30 WARN Utils: Your hostname, jolin-System-Product-Name resolves to a loopback address: 127.0.1.1; using 140.118.127.138 instead (on interface eno1)
25/12/11 17:17:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/jolin/.ivy2/cache
The jars for the packages stored in: /home/jolin/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-559c0de7-6c7a-44b9-879a-46d96f21b17d;1.0
	confs: [default]


:: loading settings :: url = jar:file:/home/jolin/miniconda3/envs/CAA-MBTI-Predictor/lib/python3.9/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found com.johnsnowlabs.nlp#spark-nlp_2.12;3.1.0 in central
	found com.typesafe#config;1.3.0 in central
	found org.rocksdb#rocksdbjni;6.5.3 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.603 in central
	found com.github.universal-automata#liblevenshtein;3.0.0 in central
	found com.google.code.findbugs#annotations;3.0.1 in central
	found net.jcip#jcip-annotations;1.0 in central
	found com.google.code.findbugs#jsr305;3.0.1 in central
	found com.google.protobuf#protobuf-java-util;3.0.0-beta-3 in central
	found com.google.protobuf#protobuf-java;3.0.0-beta-3 in central
	found com.google.code.gson#gson;2.3 in central
	found it.unimi.dsi#fastutil;7.0.12 in central
	found org.projectlombok#lombok;1.16.8 in central
	found org.slf4j#slf4j-api;1.7.21 in central
	found com.navigamez#greex;1.0 in central
	found dk.brics.automaton#automaton;1.11-8 in central
	found org.json4s#json4s-ext_2.12;3.5.3 in central
	found joda-time#joda-time;2.9.5 in central
	found org.joda#joda-convert;1.8.1 in 

Spark Session 啟動成功！


In [2]:
import re
from pyspark.sql.functions import udf
import pandas as pd

def preprocess_english_posts(text):
    """
    將英文文本進行預處理：
    1. 將句子結束符號（. ! ?）替換為 ||| 
    2. 移除其他常見標點符號，並清理多餘的空白。
    """
    if not isinstance(text, str) or not text:
        return ""
    
    # 替換英文的句子結束符為 ||| 
    # [\.!?] 匹配句號、驚嘆號、問號
    # \s* 匹配標點符號前後的任意空白字符
    text = re.sub(r'\s*([\.!?])\s*', ' ||| ', text)
    
    # 移除其他常見標點符號（例如：逗號, 括號, 引號, 冒號, 分號）
    # 這裡保留了 ||| 供後續處理
    text = re.sub(r'[,()\'"\-\:\;]', '', text)
    
    # 清理多餘的空白
    text = re.sub(r'\s+', ' ', text).strip()

    # 處理可能產生的重複分隔符
    text = re.sub(r'\s*\|\|\|\s*', '|||', text) 
    text = re.sub(r'\|\|\|+', '|||', text)
    
    # 確保開頭和結尾沒有 |||
    if text.endswith('|||'):
        text = text[:-3]
    if text.startswith('|||'):
        text = text[3:]
        
    return text.strip()

In [3]:
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import concat, col
from pyspark.sql.functions import col

filename = "MBTI 500"

def testData(mbti_predictor, pipeline):

    # 註冊為 PySpark UDF
    english_cleaner_udf = udf(preprocess_english_posts, StringType())

    test_data_pd = pd.read_csv(f"{filename}.csv")

    # 確保只處理包含文本和MBTI_Type的欄位
    temp_pd = test_data_pd.copy()
    temp_pd['post'] = temp_pd['Response']
    temp_pd['user_id'] = temp_pd['MBTI_Type']

    # 僅保留後續需要的欄位
    input_data_for_spark = temp_pd[['MBTI_Type', 'post', 'user_id']]

    # 定義 Schema
    schema = StructType([
        StructField("MBTI_Type", StringType(), True),
        StructField("post", StringType(), True),
        StructField("user_id", StringType(), True)
    ])

    data_rows = [tuple(x) for x in input_data_for_spark.values]

    test_data_spark = spark.createDataFrame(data_rows, schema=schema)
    test_data_spark_cleaned = test_data_spark.withColumn(
        "post",
        english_cleaner_udf(col("post"))
    )

    print("--- 預處理後的 Post 欄位 ---")
    test_data_spark_cleaned.show(truncate=False)

    predictions_spark = mbti_predictor.transform(test_data_spark_cleaned)
    results_df = predictions_spark.select(
        "user_id", 
        "post",
        col("ie_labeled_pred").alias("IE"),    
        col("ns_labeled_pred").alias("NS"),   
        col("tf_labeled_pred").alias("TF"),
        col("pj_labeled_pred").alias("PJ")
    )

    # 串接四個維度成為最終的 MBTI 類型字串
    final_predictions = results_df.withColumn(
        "prediction",
        concat(col("IE"), col("NS"), col("TF"), col("PJ"))
    )

    results_pd = final_predictions.select("user_id", "post", "prediction").toPandas()
    results_pd = results_pd.rename(columns={'user_id': 'MBTI_Type'})
    results_pd['prediction'] = results_pd['prediction'].str.upper()
    results_pd['MBTI_Type'] = results_pd['MBTI_Type'].str.upper()

    output_filename = f"mbti_predictions_results_{filename}_{pipeline}.csv"
    results_pd.to_csv(output_filename, index=False)
    
    print("\n預測結果（包含輸入文本）：")
    print(results_pd)
    print(f"\n結果已儲存到檔案: {output_filename}")
    return output_filename

In [4]:
import pandas as pd

def analysisResults(outputfile):

    results_pd = pd.read_csv(f"{outputfile}")
    results_pd['MBTI_Type'] = results_pd['MBTI_Type'].astype(str)
    results_pd['prediction'] = results_pd['prediction'].astype(str)

    # 確保只有長度為 4 的有效 MBTI 類型才被計算
    valid_data = results_pd[
        (results_pd['MBTI_Type'].str.len() == 4) & 
        (results_pd['prediction'].str.len() == 4)
    ].copy()

    if valid_data.empty:
        print("錯誤：沒有有效的 4 字母 MBTI 類型可供計算。請檢查 'MBTI_Type' 欄位是否正確。")
    else:
        # 提取四個維度的標籤
        valid_data['O_IE'] = valid_data['MBTI_Type'].str[0]
        valid_data['P_IE'] = valid_data['prediction'].str[0]

        valid_data['O_NS'] = valid_data['MBTI_Type'].str[1]
        valid_data['P_NS'] = valid_data['prediction'].str[1]

        valid_data['O_TF'] = valid_data['MBTI_Type'].str[2]
        valid_data['P_TF'] = valid_data['prediction'].str[2]

        valid_data['O_PJ'] = valid_data['MBTI_Type'].str[3]
        valid_data['P_PJ'] = valid_data['prediction'].str[3]
        
        total_count = len(valid_data)

        # 計算總體正確率 (4個字母完全匹配)
        overall_accuracy = (valid_data['MBTI_Type'] == valid_data['prediction']).sum() / total_count * 100

        # 計算單一維度正確率
        ie_accuracy = (valid_data['O_IE'] == valid_data['P_IE']).sum() / total_count * 100
        ns_accuracy = (valid_data['O_NS'] == valid_data['P_NS']).sum() / total_count * 100
        tf_accuracy = (valid_data['O_TF'] == valid_data['P_TF']).sum() / total_count * 100
        pj_accuracy = (valid_data['O_PJ'] == valid_data['P_PJ']).sum() / total_count * 100

        print("\n=== MBTI 預測正確率統計結果 ===")
        print(f"總計有效樣本數: {total_count}")
        print("----------------------------------------")
        print("1. 總體正確率（4個字母完全匹配）：")
        print(f"   -> {overall_accuracy:.2f}%")
        print("----------------------------------------")
        print("2. 單一維度正確率：")
        print(f"   I/E (內向/外向) 正確率: {ie_accuracy:.2f}%")
        print(f"   N/S (直覺/感覺) 正確率: {ns_accuracy:.2f}%")
        print(f"   T/F (思考/情感) 正確率: {tf_accuracy:.2f}%")
        print(f"   P/J (感知/判斷) 正確率: {pj_accuracy:.2f}%")
        print("----------------------------------------")

In [5]:
# 載入 PySpark 模型
if 'spark' in locals() and spark.sparkContext._jsc is not None:
    
    pipelinenumber = 202107080921
    MODEL_PATH = f"pretrainedModels/final_pipeline_{pipelinenumber}"
    
    try:

        mbti_predictor = PipelineModel.load(MODEL_PATH)
        print("PySpark 模型載入成功！")

        outputfile = testData(mbti_predictor, pipelinenumber)
        analysisResults(outputfile)
        spark.stop()
        
    except Exception as e:
        print(f"模型載入或預測階段失敗: {e}")

PySpark 模型載入成功！
--- 預處理後的 Post 欄位 ---


25/12/11 17:17:45 WARN TaskSetManager: Stage 72 contains a task of very large size (43077 KiB). The maximum recommended task size is 1000 KiB.


+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

25/12/11 17:17:46 WARN StringIndexerModel: Input column ie does not exist during transformation. Skip StringIndexerModel for this column.
25/12/11 17:17:46 WARN StringIndexerModel: Input column ns does not exist during transformation. Skip StringIndexerModel for this column.
25/12/11 17:17:46 WARN StringIndexerModel: Input column tf does not exist during transformation. Skip StringIndexerModel for this column.
25/12/11 17:17:46 WARN StringIndexerModel: Input column pj does not exist during transformation. Skip StringIndexerModel for this column.
25/12/11 17:17:47 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
25/12/11 17:17:47 WARN TaskSetManager: Stage 73 contains a task of very large size (43077 KiB). The maximum recommended task size is 1000 KiB.
25/12/11 17:17:49 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
25/12/11 17:17:49 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
        


預測結果（包含輸入文本）：
       MBTI_Type                                               post prediction
0           INTJ  know intj tool use interaction people excuse a...       INTJ
1           INTJ  rap music ehh opp yeah know valid well know fa...       INTJ
2           INTJ  preferably p hd low except wew lad video p min...       ESTP
3           INTJ  drink like wish could drink red wine give head...       INTP
4           INTJ  space program ah bad deal meing freelance max ...       INTP
...          ...                                                ...        ...
106062      INFP  stay frustrate world life want take long nap w...       INFJ
106063      INFP  fizzle around time mention sure mistake thing ...       INTP
106064      INFP  schedule modify hey w intp strong wing underst...       INFP
106065      INFP  enfj since january busy schedule able spend li...       INFP
106066      INFP  feel like men good problem tell parent want te...       INFJ

[106067 rows x 3 columns]

結果已儲存到檔案: