In [1]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

from pyspark.mllib.linalg import Vectors

#密集向量
dv = Vectors.dense(2, 5, 8)
#print(dv[1])

#稀疏向量
sv=Vectors.sparse(4, (1, 2, 3), (3, 5, 7))
#print(sv[0]) 

from pyspark.ml import Pipeline,Transformer
from pyspark.ml.feature import Tokenizer,HashingTF
from pyspark.ml.classification import LogisticRegression
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
	(3, "hadoop mapreduce", 0.0)], ["id", "text", "label"])
training.show()
#-------------------------------------------------------------------------------------------------------------
#通过Tokenizer:把text里的每个单词--->words[]数组里                                    ################
tokenizer = Tokenizer(inputCol="text", outputCol="words")                            #              #
                                                                                     #   Tokenizer  #
words = tokenizer.transform(training)                                                #              # 
words.show()                                                                         ################
#-------------------------------------------------------------------------------------------------------------
#通过HashTF:把word[]数组-->特征向量：                                        
#(统计词频term frequency,由每个词所对应的唯一标识频率，来构成向量 )                  ################
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")      #               #
                                                                                    #    HashTF     #
df2=hashingTF.transform(words)                                                      #               #
df2.show()                                                                          #################
#------------------------------------------------------------------------------------------------------------

#打印id=o的text中所有词的频率
v=df2.first()
idx=0
for val in v.features.toArray():
    idx=idx+1
    if val!=0:
        print('%d:%f'%(idx,val))

#------------------------------------------------------------------------------------------------------------
#逻辑回归算法，算法是一个Estimator                                                 #############
# maxIter 最大迭代次数，regParam 是正则化参数，threshold是阈值                    #     Lr     #
lr = LogisticRegression(maxIter=10, regParam=0.001,threshold=0.5)                ###############
#------------------------------------------------------------------------------------------------------------
#Pipeline 连接 Transformer 和 Estimator                                 ##################################
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])                 #  Tokenizer \                    #
                                                                       #  HashTF    \pipeline---> model  #
#训练出模型，模型是 Transformer                                        #    Lr      \                    # 
model = pipeline.fit(training)                                         ##################################
#------------------------------------------------------------------------------------------------------------



#测试数据（不含标签）
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

#模型对测试数据进行预测，得出预测结果（DataFrame）
prediction = model.transform(test)

selected = prediction.select("id", "text", "probability", "prediction")
selected.show()

for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))


+---+----------------+-----+
| id|            text|label|
+---+----------------+-----+
|  0| a b c d e spark|  1.0|
|  1|             b d|  0.0|
|  2|     spark f g h|  1.0|
|  3|hadoop mapreduce|  0.0|
+---+----------------+-----+

+---+----------------+-----+--------------------+
| id|            text|label|               words|
+---+----------------+-----+--------------------+
|  0| a b c d e spark|  1.0|[a, b, c, d, e, s...|
|  1|             b d|  0.0|              [b, d]|
|  2|     spark f g h|  1.0|    [spark, f, g, h]|
|  3|hadoop mapreduce|  0.0| [hadoop, mapreduce]|
+---+----------------+-----+--------------------+

+---+----------------+-----+--------------------+--------------------+
| id|            text|label|               words|            features|
+---+----------------+-----+--------------------+--------------------+
|  0| a b c d e spark|  1.0|[a, b, c, d, e, s...|(262144,[17222,27...|
|  1|             b d|  0.0|              [b, d]|(262144,[27526,30...|
|  2|     s