In [1]:
#朴素贝叶斯分类
#设置Spark上下文和SparkSession
# SparkSession的功能包括：
# 创建DataFrame
# 以关系型数据库中表的形式生成DataFrame，之后便可以执行SQL语句，适合小数据量的操作
# 读取.parquet格式的文件，得到DataFrame

# appName(name):
# 给应用设置一个名称,可在Spark的web界面上显示.如果名称已占用,则会随机生成一个.
# 界面地址默认为localhost:4040
# (2.0版本新增)

# config(key=None, value=None, conf=None)
# 配置项,在创建SparkSession和SparkConf时都会被隐式地调用
# (2.0版本新增)
# 参数:
# key ——– 配置信息的键
# value ——– 配置信息的值
# conf ——– SparkConf实例

# getOrCreate()
# 获取已存在的SparkSession实例,若不存在则根据构造器的配置内容创建一个
# (2.0版本新增)



from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark  Naive Bayes classification") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [4]:
#负载数据集
# 负载数据集
# 使用先前建立的DBFS掛載點來讀取數據。
# 創建一個數據框來讀取數據。
# option說明： 　　
# 1、path：解析的CSV文件的目錄，路徑支持通配符； 　　
# 2、header：默認值是false。我們知道，CSV文件第一行一般是解釋各個列的含義的名稱，如果我們不需要加載這一行，
#    我們可以將這個選項設置為true； 　　
# 3、delimiter：默認情況下，CSV是使用英文逗號分隔的，如果不是這個分隔，我們就可以設置這個選項。 　　
# 4、quote：默認情況下的引號是'"'，我們可以通過設置這個選項來支持別的引號。 　　5、mode：解析的模式。默認值是PERMISSIVE
df = spark.read.format('com.databricks.spark.csv') \
            .options(header='true', inferschema='true') \
            .load("file:///home/hadoop/WineData2.csv",header=True);
df.show(5)

+-----+--------+------+-----+---------+----+-----+-------+----+---------+-------+-------+
|fixed|volatile|citric|sugar|chlorides|free|total|density|  pH|sulphates|alcohol|quality|
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-------+-------+
|  7.4|     0.7|   0.0|  1.9|    0.076|11.0| 34.0| 0.9978|3.51|     0.56|    9.4|      5|
|  7.8|    0.88|   0.0|  2.6|    0.098|25.0| 67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|  7.8|    0.76|  0.04|  2.3|    0.092|15.0| 54.0|  0.997|3.26|     0.65|    9.8|      5|
| 11.2|    0.28|  0.56|  1.9|    0.075|17.0| 60.0|  0.998|3.16|     0.58|    9.8|      6|
|  7.4|     0.7|   0.0|  1.9|    0.076|11.0| 34.0| 0.9978|3.51|     0.56|    9.4|      5|
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-------+-------+
only showing top 5 rows



In [5]:
df.printSchema()#第二部份類似jason格式

root
 |-- fixed: double (nullable = true)
 |-- volatile: double (nullable = true)
 |-- citric: double (nullable = true)
 |-- sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free: double (nullable = true)
 |-- total: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [6]:
# Convert to float format
# withColumn(colName, col)

# 通过为原数据框添加一个新列或替换已存在的同名列而返回一个新数据框。colName 是一个字符串, 为新列的名字。
# col 为这个新列的 Column 表达式。withColumn 的第一个参数必须是已存在的列的名字,　withColumn 的第二个参数必须是含有列的表达式。
# withColumnRenamed(existing, new)
# 重命名已存在的列并返回一个新数据框。existing 为已存在的要重命名的列, col 为新列的名字。
def string_to_float(x):
    return float(x)

#
def condition(r):
    if (0<= r <= 6):
        label = "low"
    else:
        label = "high"
    return label

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType
string_to_float_udf = udf(string_to_float, DoubleType())
quality_udf = udf(lambda x: condition(x), StringType())

df = df.withColumn("quality", quality_udf("quality"))

df.show(5,True)

+-----+--------+------+-----+---------+----+-----+-------+----+---------+-------+-------+
|fixed|volatile|citric|sugar|chlorides|free|total|density|  pH|sulphates|alcohol|quality|
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-------+-------+
|  7.4|     0.7|   0.0|  1.9|    0.076|11.0| 34.0| 0.9978|3.51|     0.56|    9.4|    low|
|  7.8|    0.88|   0.0|  2.6|    0.098|25.0| 67.0| 0.9968| 3.2|     0.68|    9.8|    low|
|  7.8|    0.76|  0.04|  2.3|    0.092|15.0| 54.0|  0.997|3.26|     0.65|    9.8|    low|
| 11.2|    0.28|  0.56|  1.9|    0.075|17.0| 60.0|  0.998|3.16|     0.58|    9.8|    low|
|  7.4|     0.7|   0.0|  1.9|    0.076|11.0| 34.0| 0.9978|3.51|     0.56|    9.4|    low|
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-------+-------+
only showing top 5 rows



In [7]:
#处理分类数据并将数据转换为密集向量
#监督学习版本：
def get_dummy(df,categoricalCols,continuousCols,labelCol):

    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
    from pyspark.sql.functions import col

    indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
                 for c in categoricalCols ]

    # default setting: dropLast=True
#     One-Hot Encoding的处理方法
#     在实际的机器学习的应用任务中，特征有时候并不总是连续值，有可能是一些分类值，如性别可分为“male”和“female”。在机器学习任务中，对于这样的特征，通常我们需要对其进行特征数字化，如下面的例子：
#     性别：["male"，"female"]
#     地区：["Europe"，"US"，"Asia"]
#     浏览器：["Firefox"，"Chrome"，"Safari"，"Internet Explorer"]
#     对于某一个样本，如["male"，"US"，"Internet Explorer"]，我们需要将这个分类值的特征数字化，最直接的方法，我们可以采用序列化的方式：[0,1,3]。但是这样的特征处理并不能直接放入机器学习算法中。
#     对于上述的问题，性别的属性是二维的，同理，地区是三维的，浏览器则是思维的，这样，我们可以采用One-Hot编码的方式对上述的样本“["male"，"US"，"Internet Explorer"]”编码，“male”则对应着[1，0]，同理“US”对应着[0，1，0]，“Internet Explorer”对应着[0,0,0,1]。则完整的特征数字化的结果为：[1,0,0,1,0,0,0,0,1]。

# VectorAssembler是一个transformer，将多列数据转化为单列的向量列。

# 转化前的数据：


# id | hour | mobile | userFeatures     | clicked
# ----|------|--------|------------------|---------
#  0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0

# 转化后的数据：

# id | hour | mobile | userFeatures     | clicked | features
# ----|------|--------|------------------|---------|-----------------------------
#  0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0     | [18.0, 1.0, 0.0, 10.0, 0.5]

# --------------------- 

# 当我们对训练集应用各种预处理操作时（特征标准化、主成分分析等等）， 我们都需要对测试集重复利用这些参数，以免出现数据泄露（data leakage）。

# pipeline 实现了对全部步骤的流式化封装和管理（streaming workflows with pipelines），可以很方便地使参数集在新数据集（比如测试集）上被重复使用。

# Pipeline可以将许多算法模型串联起来，比如将特征提取、归一化、分类组织在一起形成一个典型的机器学习问题工作流。主要带来两点好处：

# 1. 直接调用fit和predict方法来对pipeline中的所有算法模型进行训练和预测。

# 2. 可以结合grid search对参数进行选择。

    encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
                 outputCol="{0}_encoded".format(indexer.getOutputCol()))
                 for indexer in indexers ]

    assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
                                + continuousCols, outputCol="features")

    pipeline = Pipeline(stages=indexers + encoders + [assembler])

    model=pipeline.fit(df)
    data = model.transform(df)

    data = data.withColumn('label',col(labelCol))

    return data.select('features','label')

In [9]:
#将数据集转换为数据帧
from pyspark.ml.linalg import Vectors # !!!!caution: not from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer, VectorIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def transData(data):
     return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])

In [10]:
transformed = transData(df)
transformed.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[7.4,0.7,0.0,1.9,...|  low|
|[7.8,0.88,0.0,2.6...|  low|
|[7.8,0.76,0.04,2....|  low|
|[11.2,0.28,0.56,1...|  low|
|[7.4,0.7,0.0,1.9,...|  low|
+--------------------+-----+
only showing top 5 rows



In [11]:
#处理分类标签和变量
# ＃索引標籤，將元數據添加到標籤列
#​ StringIndexer是指把一组字符型标签编码成一组标签索引，索引的范围为0到标签数量，索引构建的顺序为标签的频率，优先编码频率较大的标签
#，所以出现频率最高的标签为0号。如果输入的是数值型的，我们会把它转化成字符型，然后再对其进行编码。
labelIndexer = StringIndexer(inputCol='label',
                             outputCol='indexedLabel').fit(transformed)
labelIndexer.transform(transformed).show(5, True)

+--------------------+-----+------------+
|            features|label|indexedLabel|
+--------------------+-----+------------+
|[7.4,0.7,0.0,1.9,...|  low|         0.0|
|[7.8,0.88,0.0,2.6...|  low|         0.0|
|[7.8,0.76,0.04,2....|  low|         0.0|
|[11.2,0.28,0.56,1...|  low|         0.0|
|[7.4,0.7,0.0,1.9,...|  low|         0.0|
+--------------------+-----+------------+
only showing top 5 rows



In [12]:
# ＃自動識別分類功能，並對其進行索引。
# ＃設置maxCategories，使> 4個不同值的特徵被視為連續。
featureIndexer =VectorIndexer(inputCol="features", \
                              outputCol="indexedFeatures", \
                              maxCategories=4).fit(transformed)
featureIndexer.transform(transformed).show(5, True)

+--------------------+-----+--------------------+
|            features|label|     indexedFeatures|
+--------------------+-----+--------------------+
|[7.4,0.7,0.0,1.9,...|  low|[7.4,0.7,0.0,1.9,...|
|[7.8,0.88,0.0,2.6...|  low|[7.8,0.88,0.0,2.6...|
|[7.8,0.76,0.04,2....|  low|[7.8,0.76,0.04,2....|
|[11.2,0.28,0.56,1...|  low|[11.2,0.28,0.56,1...|
|[7.4,0.7,0.0,1.9,...|  low|[7.4,0.7,0.0,1.9,...|
+--------------------+-----+--------------------+
only showing top 5 rows



In [14]:
#将数据拆分为培训和测试数据集
#＃將數據拆分為培訓和測試集（40％用於測試）
(trainingData, testData) = transformed.randomSplit([0.6, 0.4])

trainingData.show(5,False)
testData.show(5,False)

+---------------------------------------------------------+-----+
|features                                                 |label|
+---------------------------------------------------------+-----+
|[4.7,0.6,0.17,2.3,0.058,17.0,106.0,0.9932,3.85,0.6,12.9] |low  |
|[4.9,0.42,0.0,2.1,0.048,16.0,42.0,0.99154,3.71,0.74,14.0]|high |
|[5.0,0.38,0.01,1.6,0.048,26.0,60.0,0.99084,3.7,0.75,14.0]|low  |
|[5.0,0.4,0.5,4.3,0.046,29.0,80.0,0.9902,3.49,0.66,13.6]  |low  |
|[5.0,0.42,0.24,2.0,0.06,19.0,50.0,0.9917,3.72,0.74,14.0] |high |
+---------------------------------------------------------+-----+
only showing top 5 rows

+----------------------------------------------------------+-----+
|features                                                  |label|
+----------------------------------------------------------+-----+
|[4.6,0.52,0.15,2.1,0.054,8.0,65.0,0.9934,3.9,0.56,13.1]   |low  |
|[5.1,0.42,0.0,1.8,0.044,18.0,88.0,0.99157,3.68,0.73,13.6] |high |
|[5.1,0.51,0.18,2.1,0.042,16.0,101.0,0.9924,3.

In [15]:
#拟合朴素贝叶斯分类模型
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(featuresCol='indexedFeatures', labelCol='indexedLabel')

In [16]:
#管道体系结构
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

In [17]:
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, nb,labelConverter])

In [18]:
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

In [19]:
#做出预测
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)

+--------------------+-----+--------------+
|            features|label|predictedLabel|
+--------------------+-----+--------------+
|[4.6,0.52,0.15,2....|  low|           low|
|[5.1,0.42,0.0,1.8...| high|           low|
|[5.1,0.51,0.18,2....| high|           low|
|[5.2,0.34,0.0,1.8...|  low|           low|
|[5.3,0.715,0.19,1...|  low|           low|
+--------------------+-----+--------------+
only showing top 5 rows



In [20]:
#评价
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.29477
