# PySpark笔记之二：PySpark环境LightGBM训练

### Overview
就目前的PySpark版本2.4.5来说，虽有pyspark.ml这个模块可以进行机器学习，但是都是一些工业界不太常用的算法，而XGBoost和LightGBM这样的常用算法还没有集成。幸好微软前几年发布了mmlspark这个包，其中包含了深度学习和LightGBM等算法，可以和PySpark无缝对接。下面我们看看怎么用PySpark和mmlspark来运行LightGBM。

## 1. 安装mmlspark
首先，我们默认已经安装好了PySpark，如果没有安装，那么安装命令如下：

In [1]:
pip install pyspark

Looking in indexes: https://mirrors.aliyun.com/pypi/simple/
Note: you may need to restart the kernel to use updated packages.


然后，安装mmlspark的方式如下，命令行输入：

In [3]:
pyspark --packages com.microsoft.ml.spark:mmlspark_2.11:0.18.1

SyntaxError: invalid syntax (<ipython-input-3-ace3c3df8fdb>, line 1)

或者直接在jupyter notebook中，使用PySpark时，这样启动：

In [4]:
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
            .config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:0.18.1") \
            .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven") \
            .getOrCreate()
import mmlspark

这个包比较大，第一次安装需要较长时间。我们服务器上maven仓库当中，mmlspark版本是0.18.1。
详细安装方式可参考官方文档：Azure / mmlspark，最新的mmlspark版本是1.0.0-rc1。

## 2. 引入依赖包

In [6]:
import numpy as np
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("spark lightgbm") \
    .master("local") \
    .config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:0.18.1") \
    .config("spark.cores.max", "20") \
    .config("spark.driver.memory", "6G") \
    .config("spark.executor.memory", "6G") \
    .config("spark.executor.cores", "6") \
    .getOrCreate()

import mmlspark
from mmlspark.lightgbm import LightGBMClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

其中，VectorAssembler这个类，可以将特征转换成一个向量，作为分类器的输入。BinaryClassificationEvaluator可以评价预测效果。Pipeline相当于Spark的流水线，将各个步骤连接在一起后，一起由Spark运行。

## 3. 加载数据

In [8]:
df_train = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("训练集特征.csv")
df_val = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("验证集特征.csv")

In [17]:
df_train, df_val

(DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: string, SibSp: double, Parch: int, Ticket: string, Fare: string, Cabin: string, Embarked: string],
 DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: string, SibSp: double, Parch: int, Ticket: string, Fare: string, Cabin: string, Embarked: string])

处理训练集特征：

In [10]:
feature_cols = list(df_train.columns)
feature_cols.remove("Survived")  # 从列名当中删除label才是真正的特征列表 
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features") 

VectorAssembler之后，原数据DataFrame当中会多出一列，列名为features，这个字段内容其实就是一个行向量，里面是该样本的每一个特征。

## 4. 模型构建

In [14]:
lgb = LightGBMClassifier(
    objective="binary",
    boostingType='gbdt',
    isUnbalance=True,
    featuresCol='features',
    labelCol='Survived',
    maxBin=60,
    baggingFreq=1,
    baggingSeed=696,
    earlyStoppingRound=30,
    learningRate=0.1,
    lambdaL1=1.0,
    lambdaL2=45.0,
    maxDepth=3,
    numLeaves=128,
    baggingFraction=0.7,
    featureFraction=0.7,
    # minSumHessianInLeaf=1,
    numIterations=800,
    verbosity=30
)

装载各个阶段到Pipeline流水线中，执行训练：

In [15]:
stages = [assembler, lgb]
pipeline_model = Pipeline(stages=stages)
model = pipeline_model.fit(df_train)

IllegalArgumentException: 'Data type string of column Name is not supported.\nData type string of column Sex is not supported.\nData type string of column Age is not supported.\nData type string of column Ticket is not supported.\nData type string of column Fare is not supported.\nData type string of column Cabin is not supported.\nData type string of column Embarked is not supported.'

预测三个数据集:

In [None]:
train_preds = model.transform(df_train)
val_preds = model.transform(df_val)
val_preds = model.transform(df_val)

我们看看模型训练效果：

In [None]:
binaryEvaluator = BinaryClassificationEvaluator()
print ("Train AUC: " + str(binaryEvaluator.evaluate(train_preds, {binaryEvaluator.metricName: "areaUnderROC"})))
print ("Val AUC: " + str(binaryEvaluator.evaluate(val_preds, {binaryEvaluator.metricName: "areaUnderROC"})))
print ("Test AUC: " + str(binaryEvaluator.evaluate(test_preds, {binaryEvaluator.metricName: "areaUnderROC"})))

In [None]:
当然，我们可以把预测概率结果和真实label取出来，方便进行计算其他自定义指标，例如KS等。

In [None]:
train_prob_list = [row.probability[0] for row in train_preds.select('probability').collect()]
train_label_list = [row.label for row in train_preds.select('label').collect()]
 
val_prob_list = [row.probability[0] for row in val_preds.select('probability').collect()]
val_label_list = [row.label for row in val_preds.select('label').collect()]
 
test_prob_list = [row.probability[0] for row in test_preds.select('probability').collect()]
test_label_list = [row.label for row in test_preds.select('label').collect()]


DataFrame在经过一系列transform之后，会多出4列，分别为features|rawPrediction|probability|prediction|，rawPrediction是树模型最后的一对儿得分，和为0；在经过sigmoid之后，得到probability里的一对儿概率值，其和为1，分别表示模型判定该样本为两个分类的可能性；而prediction则是模型预测的样本类别。