# MLlib

In [1]:
import findspark
findspark.init('/data/spark-2.2.3-bin-hadoop2.7/')

In [2]:
from pyspark import SparkContext
sc = SparkContext('local')

In [6]:
from pyspark.sql import SparkSession

ML Pipelines

In [7]:
spark = SparkSession.builder.master('local').appName("Word Conut").getOrCreate()

In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF,Tokenizer

In [8]:
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

In [9]:
training.show()

+---+----------------+-----+
| id|            text|label|
+---+----------------+-----+
|  0| a b c d e spark|  1.0|
|  1|             b d|  0.0|
|  2|     spark f g h|  1.0|
|  3|hadoop mapreduce|  0.0|
+---+----------------+-----+



In [14]:
#定义
tokenizer = Tokenizer(inputCol="text",outputCol="words")
hashingTF = HashingTF(inputCol =tokenizer.getOutputCol(),outputCol = "features")
lr = LogisticRegression(maxIter=10,regParam=0.001)

In [15]:
pipeline = Pipeline(stages=[tokenizer,hashingTF,lr])

In [16]:
model = pipeline.fit(training)

In [17]:
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

In [18]:
test.show()

+---+------------------+
| id|              text|
+---+------------------+
|  4|       spark i j k|
|  5|             l m n|
|  6|spark hadoop spark|
|  7|     apache hadoop|
+---+------------------+



In [19]:
prediction = model.transform(test)

In [20]:
prediction.show()

+---+------------------+--------------------+--------------------+--------------------+--------------------+----------+
| id|              text|               words|            features|       rawPrediction|         probability|prediction|
+---+------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  4|       spark i j k|    [spark, i, j, k]|(262144,[20197,24...|[-1.6609033227472...|[0.15964077387874...|       1.0|
|  5|             l m n|           [l, m, n]|(262144,[18910,10...|[1.64218895265644...|[0.83783256854767...|       0.0|
|  6|spark hadoop spark|[spark, hadoop, s...|(262144,[155117,2...|[-2.5980142174393...|[0.06926633132976...|       1.0|
|  7|     apache hadoop|    [apache, hadoop]|(262144,[66695,15...|[4.00817033336812...|[0.98215753334442...|       0.0|
+---+------------------+--------------------+--------------------+--------------------+--------------------+----------+



In [21]:
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

(4, spark i j k) --> prob=[0.1596407738787475,0.8403592261212525], prediction=1.000000
(5, l m n) --> prob=[0.8378325685476744,0.16216743145232562], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.06926633132976037,0.9307336686702395], prediction=1.000000
(7, apache hadoop) --> prob=[0.9821575333444218,0.01784246665557808], prediction=0.000000


### 决策树

In [3]:
from pyspark.ml.linalg import Vector,Vectors
from pyspark.ml import Pipeline
from pyspark.sql import Row
from pyspark.ml.feature import IndexToString,StringIndexer,VectorIndexer

In [4]:
#读取文本
def load(x):
    rel = {}
    rel['features'] = Vectors.dense(float(x[0]),float(x[1]),float(x[3]))
    rel['label'] = str(x[4])
    return rel

In [8]:
data = spark.sparkContext.textFile('iris.txt').map(lambda x:x.split(',')).map(lambda p:Row(**load(p))).toDF()

In [9]:
data.createOrReplaceTempView('iris')

In [25]:
type(data)

pyspark.sql.dataframe.DataFrame

In [10]:
data.take(10)

[Row(features=DenseVector([5.1, 3.5, 0.2]), label='Iris-setosa'),
 Row(features=DenseVector([4.9, 3.0, 0.2]), label='Iris-setosa'),
 Row(features=DenseVector([4.7, 3.2, 0.2]), label='Iris-setosa'),
 Row(features=DenseVector([4.6, 3.1, 0.2]), label='Iris-setosa'),
 Row(features=DenseVector([5.0, 3.6, 0.2]), label='Iris-setosa'),
 Row(features=DenseVector([5.4, 3.9, 0.4]), label='Iris-setosa'),
 Row(features=DenseVector([4.6, 3.4, 0.3]), label='Iris-setosa'),
 Row(features=DenseVector([5.0, 3.4, 0.2]), label='Iris-setosa'),
 Row(features=DenseVector([4.4, 2.9, 0.2]), label='Iris-setosa'),
 Row(features=DenseVector([4.9, 3.1, 0.1]), label='Iris-setosa')]

In [12]:
df = spark.sql('select * from iris')

In [14]:
df.show()

+-------------+-----------+
|     features|      label|
+-------------+-----------+
|[5.1,3.5,0.2]|Iris-setosa|
|[4.9,3.0,0.2]|Iris-setosa|
|[4.7,3.2,0.2]|Iris-setosa|
|[4.6,3.1,0.2]|Iris-setosa|
|[5.0,3.6,0.2]|Iris-setosa|
|[5.4,3.9,0.4]|Iris-setosa|
|[4.6,3.4,0.3]|Iris-setosa|
|[5.0,3.4,0.2]|Iris-setosa|
|[4.4,2.9,0.2]|Iris-setosa|
|[4.9,3.1,0.1]|Iris-setosa|
|[5.4,3.7,0.2]|Iris-setosa|
|[4.8,3.4,0.2]|Iris-setosa|
|[4.8,3.0,0.1]|Iris-setosa|
|[4.3,3.0,0.1]|Iris-setosa|
|[5.8,4.0,0.2]|Iris-setosa|
|[5.7,4.4,0.4]|Iris-setosa|
|[5.4,3.9,0.4]|Iris-setosa|
|[5.1,3.5,0.3]|Iris-setosa|
|[5.7,3.8,0.3]|Iris-setosa|
|[5.1,3.8,0.3]|Iris-setosa|
+-------------+-----------+
only showing top 20 rows



In [15]:
type(df)

pyspark.sql.dataframe.DataFrame

In [16]:
rel = df.rdd.map(lambda t:str(t[1])+':'+str(t[0])).collect()

In [17]:
for i in rel:
    print(i)
    break

Iris-setosa:[5.1,3.5,0.2]


In [18]:
#特征处理,进行索引，并进行重新命名
labelindexer = StringIndexer(inputCol='label',outputCol='indexedlabel').fit(df)
featureindexer = VectorIndexer(inputCol='features',outputCol='indexedfeatures',maxCategories=4).fit(df)

In [21]:
labelconverter = IndexToString(inputCol='prediction',outputCol='predictedlabel',labels=labelindexer.labels)

In [24]:
train,test = data.randomSplit([0.7,0.3])

In [22]:
#建立模型
from pyspark.ml.classification import DecisionTreeClassificationModel,DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [23]:
dt = DecisionTreeClassifier(labelCol='indexedlabel',featuresCol='indexedfeatures')
pipelineclassifier = Pipeline(stages=[labelindexer,featureindexer,dt,labelconverter])

In [26]:
model = pipelineclassifier.fit(train)

In [27]:
pred = model.transform(test)

In [28]:
pred.show()

+-------------+---------------+------------+---------------+--------------+--------------------+----------+---------------+
|     features|          label|indexedlabel|indexedfeatures| rawPrediction|         probability|prediction| predictedlabel|
+-------------+---------------+------------+---------------+--------------+--------------------+----------+---------------+
|[4.4,3.2,0.2]|    Iris-setosa|         0.0|  [4.4,3.2,0.2]|[41.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|    Iris-setosa|
|[4.6,3.1,0.2]|    Iris-setosa|         0.0|  [4.6,3.1,0.2]|[41.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|    Iris-setosa|
|[4.8,3.4,0.2]|    Iris-setosa|         0.0|  [4.8,3.4,0.2]|[41.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|    Iris-setosa|
|[4.9,2.4,1.0]|Iris-versicolor|         1.0|  [4.9,2.4,1.0]| [0.0,0.0,1.0]|       [0.0,0.0,1.0]|       2.0| Iris-virginica|
|[5.0,3.0,0.2]|    Iris-setosa|         0.0|  [5.0,3.0,0.2]|[41.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|    Iris-setosa|
|[5.0,3.

In [31]:
#模型评估
evaluator = MulticlassClassificationEvaluator(labelCol='indexedlabel',predictionCol='prediction',metricName='accuracy')

In [32]:
accuracy = evaluator.evaluate(pred)

In [33]:
accuracy

0.9302325581395349

In [36]:
tree = model.stages[2]

In [39]:
print(tree.toDebugString)

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_483e85f3950da3e9d761) of depth 5 with 15 nodes
  If (feature 2 <= 0.6)
   Predict: 0.0
  Else (feature 2 > 0.6)
   If (feature 2 <= 1.7)
    If (feature 0 <= 4.9)
     Predict: 2.0
    Else (feature 0 > 4.9)
     If (feature 0 <= 6.9)
      If (feature 1 <= 2.2)
       Predict: 1.0
      Else (feature 1 > 2.2)
       Predict: 1.0
     Else (feature 0 > 6.9)
      Predict: 2.0
   Else (feature 2 > 1.7)
    If (feature 0 <= 5.9)
     If (feature 0 <= 5.8)
      Predict: 2.0
     Else (feature 0 > 5.8)
      Predict: 1.0
    Else (feature 0 > 5.9)
     Predict: 2.0



### 交叉验证

In [40]:
from pyspark.ml.linalg import Vector,Vectors
from pyspark.ml.feature import HashingTF,Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import Row
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.classification import LogisticRegression,LogisticRegressionModel
from pyspark.ml import Pipeline, PipelineModel

In [41]:
lr = LogisticRegression().setLabelCol("indexedlabel").setFeaturesCol("indexedfeatures").setMaxIter(50)

In [43]:
lrPipeline = Pipeline().setStages([labelindexer, featureindexer, lr, labelconverter])

In [44]:
parmgrid = ParamGridBuilder().addGrid(lr.elasticNetParam,[0.2,0.8]).addGrid(lr.regParam,[0.01,0.1,0.5]).build()

In [45]:
cv = CrossValidator(estimator=lrPipeline,
                    evaluator=MulticlassClassificationEvaluator(labelCol='indexedlabel',predictionCol='prediction'),
                    estimatorParamMaps=parmgrid).setNumFolds(3)

In [46]:
cvmodel = cv.fit(train)

In [48]:
pred_1 = cvmodel.transform(test)
pred_1.show()

+-------------+---------------+------------+---------------+--------------------+--------------------+----------+---------------+
|     features|          label|indexedlabel|indexedfeatures|       rawPrediction|         probability|prediction| predictedlabel|
+-------------+---------------+------------+---------------+--------------------+--------------------+----------+---------------+
|[4.4,3.2,0.2]|    Iris-setosa|         0.0|  [4.4,3.2,0.2]|[4.54394107622339...|[0.98017154548802...|       0.0|    Iris-setosa|
|[4.6,3.1,0.2]|    Iris-setosa|         0.0|  [4.6,3.1,0.2]|[4.00585057313672...|[0.95891326011930...|       0.0|    Iris-setosa|
|[4.8,3.4,0.2]|    Iris-setosa|         0.0|  [4.8,3.4,0.2]|[4.70687210796738...|[0.98116452730350...|       0.0|    Iris-setosa|
|[4.9,2.4,1.0]|Iris-versicolor|         1.0|  [4.9,2.4,1.0]|[-0.7688384702878...|[0.10593707233481...|       1.0|Iris-versicolor|
|[5.0,3.0,0.2]|    Iris-setosa|         0.0|  [5.0,3.0,0.2]|[3.23944757644271...|[0.884675

In [49]:
evaluator.evaluate(pred_1)

0.9302325581395349

In [50]:
bestmodel = cvmodel.bestModel

In [52]:
lrmodel = bestmodel.stages[2]

In [53]:
lrmodel.coefficientMatrix

DenseMatrix(3, 3, [-1.1416, 3.0978, -2.8297, 0.6692, -0.7847, -0.4548, 0.5291, -3.0196, 4.3806], 1)

In [55]:
lrmodel.numClasses

3

In [58]:
lrmodel.numFeatures

3

In [59]:
lr.explainParam(lr.regParam)

'regParam: regularization parameter (>= 0). (default: 0.0)'