In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [2]:
spark = SparkSession.builder.appName("Test1").getOrCreate()


In [3]:
qqp_df = spark.read.csv("glue/QQP/train.tsv", header=True, inferSchema=True, sep="\\t")
qqp_df = qqp_df.dropna()


In [4]:
from pyspark.sql.functions import concat_ws

# 合并 question1 和 question2，特征提取
qqp_df = qqp_df.withColumn("text", concat_ws(" [SEP] ", qqp_df["question1"], qqp_df["question2"]))
qqp_df.show()

+------+------+------+--------------------+--------------------+------------+--------------------+
|    id|  qid1|  qid2|           question1|           question2|is_duplicate|                text|
+------+------+------+--------------------+--------------------+------------+--------------------+
|133273|213221|213222|How is the life o...|Which level of pr...|           0|How is the life o...|
|402555|536040|536041|How do I control ...|How do you contro...|           1|How do I control ...|
|360472|364011|490273|What causes stool...|What can cause st...|           0|What causes stool...|
|150662|155721|  7256|What can one do a...|What do i do afte...|           1|What can one do a...|
|183004|279958|279959|Where can I find ...|Would a second ai...|           0|Where can I find ...|
|119056|193387|193388|How not to feel g...|I don't beleive I...|           0|How not to feel g...|
|356863|422862| 96457|How is air traffi...|How do you become...|           0|How is air traffi...|
|106969|14

In [30]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# 标签转换为数值
label_indexer = StringIndexer(inputCol="is_duplicate", outputCol="label")

# 文本预处理 + 特征工程
tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
vectorizer = CountVectorizer(inputCol="filtered", outputCol="raw_features")
idf = IDF(inputCol="raw_features", outputCol="features")

# 模型定义（Logistic Regression）
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=20)

# 构建 Pipeline
pipeline = Pipeline(stages=[
    label_indexer,
    tokenizer,
    remover,
    vectorizer,
    idf,
    lr
])

# 划分数据集
qqp_train, qqp_test = qqp_df.randomSplit([0.8, 0.2], seed=42)


In [31]:
# 拟合模型
qqp_model = pipeline.fit(qqp_train)


In [32]:
# 预测
qqp_predictions = qqp_model.transform(qqp_test)

In [33]:
# 评估模型
qqp_evaluator = MulticlassClassificationEvaluator(labelCol="is_duplicate", predictionCol="prediction",
                                              metricName="accuracy")
qqp_accuracy = qqp_evaluator.evaluate(qqp_predictions)

In [34]:
# 输出结果
print(f"QQP Accuracy: {qqp_accuracy:.4f}")

QQP Accuracy: 0.7349


In [36]:
# MRPC
mrpc_df = spark.read.csv("glue/MRPC/msr_paraphrase_train.txt", header=True, inferSchema=True, sep="\\t")
mrpc_df = mrpc_df.dropna()
mrpc_df = (mrpc_df.withColumnRenamed("Quality", "label")
           .withColumnRenamed("#1 String", "text1")
           .withColumnRenamed("#2 String", "text2")
           .withColumnRenamed("#1 ID", "id1")
           .withColumnRenamed("#2 ID", "id2"))
mrpc_df.show()

+-----+-------+-------+--------------------+--------------------+
|label|    id1|    id2|               text1|               text2|
+-----+-------+-------+--------------------+--------------------+
|    1| 702876| 702977|"Amrozi accused h...|"Referring to him...|
|    0|2108705|2108831|Yucaipa owned Dom...|Yucaipa bought Do...|
|    1|1330381|1330521|They had publishe...|On June 10, the s...|
|    0|3344667|3344648|Around 0335 GMT, ...|Tab shares jumped...|
|    1|1236820|1236712|The stock rose $2...|PG&E Corp. shares...|
|    1| 738533| 737951|Revenue in the fi...|With the scandal ...|
|    0| 264589| 264502|The Nasdaq had a ...|The tech-laced Na...|
|    1| 579975| 579810|The DVD-CCA then ...|The DVD CCA appea...|
|    0|3114205|3114194|That compared wit...|Earnings were aff...|
|    1|1355540|1355592|He said the foods...|The foodservice p...|
|    0| 222621| 222514|Shares of Genente...|Shares of Xoma fe...|
|    0|3131772|3131625|Legislation makin...|Legislation makin...|
|    0|  5

In [37]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# 分词器
tokenizer1 = Tokenizer(inputCol="text1", outputCol="words1")
tokenizer2 = Tokenizer(inputCol="text2", outputCol="words2")

# 停用词
remover1 = StopWordsRemover(inputCol="words1", outputCol="filtered1")
remover2 = StopWordsRemover(inputCol="words2", outputCol="filtered2")

# TF
tf1 = HashingTF(inputCol="filtered1", outputCol="rawFeatures1", numFeatures=2000)
tf2 = HashingTF(inputCol="filtered2", outputCol="rawFeatures2", numFeatures=2000)

# IDF
idf1 = IDF(inputCol="rawFeatures1", outputCol="features1")
idf2 = IDF(inputCol="rawFeatures2", outputCol="features2")

# 特征拼接
assembler = VectorAssembler(inputCols=["features1", "features2"], outputCol="features")

# 逻辑回归
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Pipeline
pipeline = Pipeline(stages=[
    tokenizer1, tokenizer2,
    remover1, remover2,
    tf1, tf2,
    idf1, idf2,
    assembler,
    lr
])


In [38]:
# 划分训练/测试集
train_df, test_df = mrpc_df.randomSplit([0.8, 0.2], seed=42)

# 训练模型
model = pipeline.fit(train_df)

# 预测
predictions = model.transform(test_df)

# 评估
evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print(f"MRPC Accuracy: {accuracy:.4f}")

MRPC Accuracy: 0.5808


In [41]:
# STS-B
sts_df = spark.read.csv("glue/STS-B/train.tsv", header=True, inferSchema=True, sep="\\t")
sts_df = sts_df.dropna()
sts_df.printSchema()

root
 |-- index: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- filename: string (nullable = true)
 |-- year: string (nullable = true)
 |-- old_index: integer (nullable = true)
 |-- source1: string (nullable = true)
 |-- source2: string (nullable = true)
 |-- sentence1: string (nullable = true)
 |-- sentence2: string (nullable = true)
 |-- score: double (nullable = true)



In [42]:
sts_df = sts_df.select("sentence1", "sentence2", "score")
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# 分词
tokenizer1 = Tokenizer(inputCol="sentence1", outputCol="words1")
tokenizer2 = Tokenizer(inputCol="sentence2", outputCol="words2")

# 去停用词
remover1 = StopWordsRemover(inputCol="words1", outputCol="filtered1")
remover2 = StopWordsRemover(inputCol="words2", outputCol="filtered2")

# HashingTF
tf1 = HashingTF(inputCol="filtered1", outputCol="tf1", numFeatures=2000)
tf2 = HashingTF(inputCol="filtered2", outputCol="tf2", numFeatures=2000)

# IDF
idf1 = IDF(inputCol="tf1", outputCol="vec1")
idf2 = IDF(inputCol="tf2", outputCol="vec2")

# 拼接特征
assembler = VectorAssembler(inputCols=["vec1", "vec2"], outputCol="features")

# 线性回归
lr = LinearRegression(featuresCol="features", labelCol="score")

pipeline = Pipeline(stages=[
    tokenizer1, tokenizer2,
    remover1, remover2,
    tf1, tf2,
    idf1, idf2,
    assembler,
    lr
])

train_df, test_df = sts_df.randomSplit([0.8, 0.2], seed=42)

model = pipeline.fit(train_df)
predictions = model.transform(test_df)

# 评估 RMSE
evaluator = RegressionEvaluator(labelCol="score", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse:.4f}")



RMSE: 5.7341


In [43]:
# 计算 Pearson 和 Spearman 相关系数
preds = predictions.select("score", "prediction").toPandas()
y_true = preds["score"].values
y_pred = preds["prediction"].values
import numpy as np
pearson_corr = np.corrcoef(y_true, y_pred)[0, 1]
print(f"Pearson correlation: {pearson_corr:.4f}")
from scipy.stats import spearmanr
spearman_corr, _ = spearmanr(y_true, y_pred)
print(f"Spearman correlation: {spearman_corr:.4f}")


Pearson correlation: -0.0005
Spearman correlation: 0.0126
