In [18]:
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import *
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder,TrainValidationSplit
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression

In [19]:
df = spark.read.option("header", "false").option("delimiter", "\t").csv("../data/SMSSpamCollection").withColumnRenamed("_c0", "results").withColumnRenamed("_c1", "content")
#读入已经解压的csv文件，以tab为分割，并将列名更改
df.printSchema()

root
 |-- results: string (nullable = true)
 |-- content: string (nullable = true)



In [4]:
df.select('results').distinct().show()#计算结果种类

+-------+
|results|
+-------+
|    ham|
|   spam|
+-------+



In [20]:
#清洗数据:因为空白的短信内容可能也是垃圾短信，所以应该清洗掉评价为空的数据
#df0 = df.select(df).where(df['content'].isNotNull())
df0 = df.filter(df['results']!='null')
df0.show(5)

+-------+--------------------+
|results|             content|
+-------+--------------------+
|    ham|Go until jurong p...|
|    ham|Ok lar... Joking ...|
|   spam|Free entry in 2 a...|
|    ham|U dun say so earl...|
|    ham|Nah I don't think...|
+-------+--------------------+
only showing top 5 rows



In [21]:
labeledData = df0.select(when(df0.results == 'ham', 0)#给数据打标签
                        .when(df0.results == 'spam', 1)
                        .otherwise(1)
                        .alias('label'), 
                        'content')
labeledData.show(5)

+-----+--------------------+
|label|             content|
+-----+--------------------+
|    0|Go until jurong p...|
|    0|Ok lar... Joking ...|
|    1|Free entry in 2 a...|
|    0|U dun say so earl...|
|    0|Nah I don't think...|
+-----+--------------------+
only showing top 5 rows



In [22]:
#开始训练

trainingData, testData = labeledData.randomSplit([0.8, 0.2])#将数据集随机分为训练集与测试集

tokenizer = Tokenizer(inputCol="content", outputCol="words")#切割单词
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")#将单词映射为特征
lr = LogisticRegression(maxIter=100, regParam=0.01)#maxIter=100: 最大迭代次数/并行度，regParam=0.0: 正则化惩罚程度参数
#在线性模型中，为了预防overfitting过度拟合，添加了惩罚项
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])#传入数据与参数,并行化处理

model = pipeline.fit(trainingData)#拟合

predictionsDf = model.transform(testData)#读取包含特征向量的列，预测每个特征向量的标签，然后输出一个新的，其中预测的标签追加为列。
predictionsDf.show(5)

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|             content|               words|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    0| &lt;#&gt;  mins ...|[, &lt;#&gt;, , m...|(262144,[19036,27...|[5.57905023590004...|[0.99623805539995...|       0.0|
|    0| and  picking the...|[, and, , picking...|(262144,[34116,59...|[6.32966951150726...|[0.99822054928135...|       0.0|
|    0| says that he's q...|[, says, that, he...|(262144,[8538,187...|[6.96802242869682...|[0.99905937263780...|       0.0|
|    0|"Response" is one...|["response", is, ...|(262144,[12524,21...|[4.17726538284082...|[0.98489137202011...|       0.0|
|    0|"Speak only when ...|["speak, only, wh...|(262144,[24980,55...|[6.13911967894033...|[0.99784782038902...|       0.0|
+-----+-

In [23]:
Successes = predictionsDf.where('label == prediction')
unSuccesses = predictionsDf.where('label != prediction')

#Successes.select('label','content','prediction').take(10)

In [7]:
unSuccesses.select('label','content','prediction').take(10)

[Row(label=0, content="Forgot you were working today! Wanna chat, but things are ok so drop me a text when you're free / bored etc and i'll ring. Hope all is well, nose essay and all xx", prediction=1.0),
 Row(label=1, content='0A$NETWORKS allow companies to bill for SMS, so they are responsible for their "suppliers", just as a shop has to give a guarantee on what they sell. B. G.', prediction=0.0),
 Row(label=1, content="1000's of girls many local 2 u who r virgins 2 this & r ready 2 4fil ur every sexual need. Can u 4fil theirs? text CUTE to 69911(£1.50p. m)", prediction=0.0),
 Row(label=1, content='As a SIM subscriber, you are selected to receive a Bonus! Get it delivered to your door, Txt the word OK to No: 88600 to claim. 150p/msg, EXP. 30Apr', prediction=0.0),
 Row(label=1, content='Babe: U want me dont u baby! Im nasty and have a thing 4 filthyguys. Fancy a rude time with a sexy bitch. How about we go slo n hard! Txt XXX SLO(4msgs)', prediction=0.0),
 Row(label=1, content="Bored 

In [24]:
numSuccesses = Successes.count()#计算标签与预测成功的数量
numInspections = predictionsDf.count()

print ("There were %d inspections and there were %d successful predictions" % (numInspections, numSuccesses))
print("This is a %d%% success rate" % (float(numSuccesses) / float(numInspections) * 100))

There were 1105 inspections and there were 1060 successful predictions
This is a 95% success rate


#F1分数(F1 Score),是统计学中用来衡量二分类(或多任务二分类)模型精确度的一种指标。
#它同时兼顾了分类模型的准确率和召回率。
#F1分数可以看作是模型准确率和召回率的一种加权平均,它的最大值是1,最小值是0,值越大意味着模型越好。

In [25]:
prediction0=predictionsDf.where('label == 0')
TP=prediction0.where('prediction == 0').count()
precision=TP/prediction0.count()
real0=predictionsDf.where('prediction == 0').count()
recall=TP/real0
F1score=(2*precision*recall)/(precision+recall)
print('F1score',F1score)

F1score 0.9767921609076844


#优化调参：交叉验证
#因为样本较小因此采用交叉验证
#交叉验证(CrossValidator)将数据集切分成k折数据集，分别用于训练和测试。
#通过交叉验证选择模型，参数网格有 3 个值，有 2 个值，并使用 2 个折叠
#现在，我们将管道视为估计器，将其包装在CrossValidator实例中。
#这将使我们能够共同选择所有管道阶段的参数。
#交叉验证程序需要一个估计器、一组估计器参数映射和一个评估器。
#我们使用ParamGridBuilder构建参数网格以进行搜索。
#hashingTF有3个值。lr.regParam的numFeatures和2个值，
#此网格将具有3 x 2=6个参数设置，可供CrossValidator选择。

In [26]:
trainingData, testData = labeledData.randomSplit([0.8, 0.2])
tokenizer = Tokenizer(inputCol="content", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

In [27]:
paramGrid = ParamGridBuilder().addGrid(hashingTF.numFeatures, [10, 100, 1000]) .addGrid(lr.regParam, [0.1, 0.01]) .build()

In [28]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),#用于二分类问题
                          numFolds=2)

In [29]:
cvModel = crossval.fit(trainingData)
prediction = cvModel.transform(testData)

In [30]:
Successes = prediction.where('label == prediction')
unSuccesses = prediction.where('label != prediction')

#Successes.select('label','content','prediction').take(10)

In [14]:
unSuccesses.select('label','content','prediction').take(10)

[Row(label=0, content="Forgot you were working today! Wanna chat, but things are ok so drop me a text when you're free / bored etc and i'll ring. Hope all is well, nose essay and all xx", prediction=1.0),
 Row(label=1, content='0A$NETWORKS allow companies to bill for SMS, so they are responsible for their "suppliers", just as a shop has to give a guarantee on what they sell. B. G.', prediction=0.0),
 Row(label=1, content="1000's of girls many local 2 u who r virgins 2 this & r ready 2 4fil ur every sexual need. Can u 4fil theirs? text CUTE to 69911(£1.50p. m)", prediction=0.0),
 Row(label=1, content='As a SIM subscriber, you are selected to receive a Bonus! Get it delivered to your door, Txt the word OK to No: 88600 to claim. 150p/msg, EXP. 30Apr', prediction=0.0),
 Row(label=1, content='Babe: U want me dont u baby! Im nasty and have a thing 4 filthyguys. Fancy a rude time with a sexy bitch. How about we go slo n hard! Txt XXX SLO(4msgs)', prediction=0.0),
 Row(label=1, content="Bored 

In [31]:
numSuccesses = prediction.where('label == prediction').count()#计算标签与预测成功的数量
numInspections = prediction.count()

print ("There were %d inspections and there were %d successful predictions" % (numInspections, numSuccesses))
print("This is a %d%% success rate" % (float(numSuccesses) / float(numInspections) * 100))

There were 1149 inspections and there were 1103 successful predictions
This is a 95% success rate


In [32]:
prediction0=prediction.where('label == 0')
TP=prediction0.where('prediction == 0').count()
precision=TP/prediction0.count()
real0=prediction.where('prediction == 0').count()
recall=TP/real0
F1score=(2*precision*recall)/(precision+recall)
print('F1score',F1score)

F1score 0.9773622047244095
