# 使用RFormula Estimator来创建一个特征向量

In [21]:
from pyspark.ml.feature import RFormula
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("spark-python").getOrCreate()

In [22]:

arrival_data = spark.createDataFrame(
[
("SFO", "B737", 18, 95.1, "late"),
("SEA", "A319", 5, 65.7, "ontime"),
("LAX", "B747", 15, 31.5, "late"),
("ATL", "A319", 14, 40.5, "late")
], ["origin", "model", "hour", "temperature", "arrival"])
arrival_data.show()

+------+-----+----+-----------+-------+
|origin|model|hour|temperature|arrival|
+------+-----+----+-----------+-------+
|   SFO| B737|  18|       95.1|   late|
|   SEA| A319|   5|       65.7| ontime|
|   LAX| B747|  15|       31.5|   late|
|   ATL| A319|  14|       40.5|   late|
+------+-----+----+-----------+-------+



In [23]:
#RFormula用于将数据中的字段通过R语言的Model Formulae转换成特征值，输出结果为一个特征向量和Double类型的label
#关于R语言Model Formulae的介绍可参考：https://stat.ethz.ch/R-manual/Rdevel/library/stats/html/formula.html
formula = RFormula(formula="arri~ . + hour:temperature", featuresCol = "features", labelCol= "label")
# 首先调用fit函数，它返回一个模型(model，类型为transformer),然后调用transform
# output = formula.fit(arrival_data).transform(arrival_data)
model = formula.fit(arrival_data)
output = model.transform(arrival_data)
output.select('*').show(truncate=False) # 在scala能显示label，这里没有？


+------+-----+----+-----------+-------+------------------------------------------+
|origin|model|hour|temperature|arrival|features                                  |
+------+-----+----+-----------+-------+------------------------------------------+
|SFO   |B737 |18  |95.1       |late   |[1.0,0.0,0.0,0.0,0.0,18.0,95.1,1.0,1711.8]|
|SEA   |A319 |5   |65.7       |ontime |[0.0,1.0,0.0,1.0,0.0,5.0,65.7,0.0,328.5]  |
|LAX   |B747 |15  |31.5       |late   |[0.0,0.0,0.0,0.0,1.0,15.0,31.5,1.0,472.5] |
|ATL   |A319 |14  |40.5       |late   |[0.0,0.0,1.0,1.0,0.0,14.0,40.5,1.0,567.0] |
+------+-----+----+-----------+-------+------------------------------------------+



# 使用IDF estimator来计算每个单词的权重

In [27]:
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import IDF

text_data = spark.createDataFrame(
[
(1, "Spark is a unified data analytics engine"),
(2, "Spark is cool and it is fun to work with Spark"),
(3, "There is a lot of exciting sessions at upcoming Spark summit"),
(4, "mllib transformer estimator evaluator and pipelines")
],["id", "line"])
text_data.show()

tokenizer = Tokenizer(inputCol="line", outputCol="words")

# Tokenizer transformer的输出列是HashingTF的输入
tf = HashingTF(inputCol="words", outputCol="wordFreqVect")
tfResult = tf.transform(tokenizer.transform(text_data))
tfResult.show()

# HashingTF transformer的输出是IDF estimator的输入
idf = IDF(inputCol="wordFreqVect", outputCol="features")

# 因为IDF是一个estimator,所以调用fit函数
idfModel = idf.fit(tfResult)
print(idfModel)
# 返回对象是一个模型（Model）, 它是类型Transformer
weightedWords = idfModel.transform(tfResult)
weightedWords.show()

+---+--------------------+
| id|                line|
+---+--------------------+
|  1|Spark is a unifie...|
|  2|Spark is cool and...|
|  3|There is a lot of...|
|  4|mllib transformer...|
+---+--------------------+

+---+--------------------+--------------------+--------------------+
| id|                line|               words|        wordFreqVect|
+---+--------------------+--------------------+--------------------+
|  1|Spark is a unifie...|[spark, is, a, un...|(262144,[1461,747...|
|  2|Spark is cool and...|[spark, is, cool,...|(262144,[8443,158...|
|  3|There is a lot of...|[there, is, a, lo...|(262144,[3023,891...|
|  4|mllib transformer...|[mllib, transform...|(262144,[91106,91...|
+---+--------------------+--------------------+--------------------+

IDF_283fffce2c9a
+---+--------------------+--------------------+--------------------+--------------------+
| id|                line|               words|        wordFreqVect|            features|
+---+--------------------+-------

In [28]:
# feature列包含一个向量用于每个单词的权重
weightedWords.select("features").show(truncate=False)
weightedWords.printSchema()

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                                                                                                            |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|(262144,[1461,7473,15889,110213,160735,227410,234657],[0.9162907318741551,0.9162907318741551,0.22314355131420976,0

# 使用StringIndexer estimator来对电影类型进行编码

In [29]:
from pyspark.ml.feature import StringIndexer
movie_data = spark.createDataFrame([(1, "Comedy"),
(2, "Action"),
(3, "Comedy"),
(4, "Horror"),
(5, "Action"),
(6, "Comedy") ],["id", "genre"])
movieIndexer = StringIndexer(inputCol="genre", outputCol="genreIdx")
# 首先拟合数据
movieIndexModel = movieIndexer.fit(movie_data)
# 使用返回的transformer来转换该数据
indexedMovie = movieIndexModel.transform(movie_data)
indexedMovie.show()

# 显示结果
print(movieIndexModel.labels)
indexedMovie.orderBy("genreIdx").show()


+---+------+--------+
| id| genre|genreIdx|
+---+------+--------+
|  1|Comedy|     0.0|
|  2|Action|     1.0|
|  3|Comedy|     0.0|
|  4|Horror|     2.0|
|  5|Action|     1.0|
|  6|Comedy|     0.0|
+---+------+--------+

['Comedy', 'Action', 'Horror']
+---+------+--------+
| id| genre|genreIdx|
+---+------+--------+
|  3|Comedy|     0.0|
|  1|Comedy|     0.0|
|  6|Comedy|     0.0|
|  5|Action|     1.0|
|  2|Action|     1.0|
|  4|Horror|     2.0|
+---+------+--------+



In [13]:
movieIndexModel.labels

['Comedy', 'Action', 'Horror']

# 使用OneHotEncoderEstimator estimator
### 注：OneHotEncoderEstimator从Spark 2.3.0 API中才出现，并且从Spark 3.0.0开始改名为OneHotEncoder，原来的OneHotEncoder会被删除

In [14]:
# OneHotEncoderEstimator estimator消费StringIndexer estimator的输出
from pyspark.ml.feature import OneHotEncoderEstimator
# 输入列genreIdx是之前示例中StringIndex的输出列
oneHotEncoderEst = OneHotEncoderEstimator(inputCols = ["genreIdx"], outputCols =["genreIdxVector"],dropLast=True)
# 指使indexedMovie数据（在上一个示例中产生的）
oneHotEncoderModel = oneHotEncoderEst.fit(indexedMovie)
oneHotEncoderVect = oneHotEncoderModel.transform(indexedMovie)
print(oneHotEncoderVect.labels)
oneHotEncoderVect .orderBy("genre").show()


+---+------+--------+--------------+
| id| genre|genreIdx|genreIdxVector|
+---+------+--------+--------------+
|  5|Action|     1.0| (2,[1],[1.0])|
|  2|Action|     1.0| (2,[1],[1.0])|
|  3|Comedy|     0.0| (2,[0],[1.0])|
|  1|Comedy|     0.0| (2,[0],[1.0])|
|  6|Comedy|     0.0| (2,[0],[1.0])|
|  4|Horror|     2.0|     (2,[],[])|
+---+------+--------+--------------+



 # 使用Word2Vec estimator来计算单词的嵌入和发现类似的单词

In [None]:
#Word2Vec具有两种模型，其一是 CBOW ，其思想是通过每个词的上下文窗口词词向量来预测中心词的词向量。
#其二是 Skip-gram，其思想是通过每个中心词来预测其上下文窗口词，
#并根据预测结果来修正中心词的词向量。两种方法示意图如下图所示：

<img src="./CBOW.png" width="30%">,<img src="skip.png" width="30%">

<img src="skip-gram.jpg" width="30%">

In [15]:
from pyspark.ml.feature import Word2Vec
documentDF = spark.createDataFrame([
("Hi I heard about Spark".split(" "), ),
("I wish Java could use case classes".split(" "), ),
("Logistic regression models are neat".split(" "), )
], ["text"])
documentDF.show(truncate=False)

+------------------------------------------+
|text                                      |
+------------------------------------------+
|[Hi, I, heard, about, Spark]              |
|[I, wish, Java, could, use, case, classes]|
|[Logistic, regression, models, are, neat] |
+------------------------------------------+



In [16]:
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0,windowSize=5,inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
result.show(truncate=False)

+------------------------------------------+-----------------------------------------------------------------+
|text                                      |result                                                           |
+------------------------------------------+-----------------------------------------------------------------+
|[Hi, I, heard, about, Spark]              |[-0.0325182176893577,-0.06207826212048531,0.09105337858200074]   |
|[I, wish, Java, could, use, case, classes]|[-0.011344701583896364,-0.06132613827607461,0.029776256232123287]|
|[Logistic, regression, models, are, neat] |[-0.08718884875997901,-0.029027827456593516,0.02657611258327961] |
+------------------------------------------+-----------------------------------------------------------------+



In [11]:
model.getVectors().show(truncate=False)

+----------+----------------------------------------------------------------+
|word      |vector                                                          |
+----------+----------------------------------------------------------------+
|heard     |[0.07805141806602478,0.10116440057754517,-0.08185699582099915]  |
|are       |[-0.13309834897518158,-0.1130678728222847,-0.11082476377487183] |
|neat      |[-0.14862976968288422,0.05695963650941849,0.10542261600494385]  |
|classes   |[-0.0874999612569809,0.03785271197557449,0.14914149045944214]   |
|I         |[0.15604113042354584,-0.15416713058948517,-0.15440209209918976] |
|regression|[-0.1519547700881958,-0.09226707369089127,-0.10659107565879822] |
|Logistic  |[-0.11443932354450226,0.14195924997329712,0.02488933876156807]  |
|Spark     |[-0.0024795527569949627,0.07086249440908432,0.10906671732664108]|
|could     |[0.1541176289319992,0.09678726643323898,-0.12757770717144012]   |
|use       |[0.102830670773983,0.1281798928976059,0.027981573715

In [None]:
#在ml库中，Word2vec 的实现使用的是skip-gram模型。

# 使用MinMaxScaler estimator来重新调节特征

In [17]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
# 构造DataFrame
employee_data = spark.createDataFrame(
[
(1, Vectors.dense(125400, 5.3)),
(2, Vectors.dense(179100, 6.9)),
(3, Vectors.dense(154770, 5.2)),
(4, Vectors.dense(199650, 4.11))
],["empId", "features"])
employee_data.show()

+-----+---------------+
|empId|       features|
+-----+---------------+
|    1| [125400.0,5.3]|
|    2| [179100.0,6.9]|
|    3| [154770.0,5.2]|
|    4|[199650.0,4.11]|
+-----+---------------+



In [18]:
# MinMaxScaler estimator
# x-min/max-min
minMaxScaler = MinMaxScaler(min = 0.0, max=1.0, inputCol="features",
outputCol="scaledFeatures")
# 拟合数据，建立模型
scalerModel = minMaxScaler.fit(employee_data)
# 使用学习到的模型对数据集进行转换
scaledData = scalerModel.transform(employee_data)
# 输出特征缩放到的范围
# 显示结果
scaledData.show(truncate=False)

+-----+---------------+-----------------------------------------+
|empId|features       |scaledFeatures                           |
+-----+---------------+-----------------------------------------+
|1    |[125400.0,5.3] |[0.0,0.42652329749103923]                |
|2    |[179100.0,6.9] |[0.7232323232323232,1.0]                 |
|3    |[154770.0,5.2] |[0.39555555555555555,0.39068100358422936]|
|4    |[199650.0,4.11]|[1.0,0.0]                                |
+-----+---------------+-----------------------------------------+



In [19]:
scalerModel.originalMax

DenseVector([199650.0, 6.9])

# 使用StandardScaler estimator标准化围绕均值0的特征

In [20]:

from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors
# 构造DataFrame
employee_data = spark.createDataFrame([
(1, Vectors.dense(125400, 5.3)),
(2, Vectors.dense(179100, 6.9)),
(3, Vectors.dense(154770, 5.2)),
(4, Vectors.dense(199650, 4.11))
],["empId", "features"])
# 将单位标准偏差设置为true并围绕平均值(withStd 是否为标准差 ,withMean 是否均值为0)
standardScaler = StandardScaler(withStd = True, withMean = True, inputCol="features",
outputCol="scaledFeatures")
# 拟合数据，建立模型
standardMode = standardScaler.fit(employee_data)
# 使用学习到的模型对数据集进行转换
standardData = standardMode.transform(employee_data)

# 显示结果
standardData.show(truncate=False)


+-----+---------------+------------------------------------------+
|empId|features       |scaledFeatures                            |
+-----+---------------+------------------------------------------+
|1    |[125400.0,5.3] |[-1.2290717420781212,-0.06743742573177589]|
|2    |[179100.0,6.9] |[0.4490658767775897,1.3248191055048937]   |
|3    |[154770.0,5.2] |[-0.3112523404805006,-0.1544534589340674] |
|4    |[199650.0,4.11]|[1.091258205781032,-1.102928220839048]    |
+-----+---------------+------------------------------------------+

