In [None]:
# !pip3 install matplotlib

In [None]:
# !pip3 install seaborn

In [None]:
# !pip3 install scikit-learn

In [1]:
from pyspark.sql import SparkSession, Row, DataFrame, Column

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
import gc

In [3]:
np.__version__, pd.__version__

('1.18.1', '1.0.1')

In [4]:
spark = SparkSession\
    .builder\
    .master("yarn")\
    .config('spark.executor.cores','2') \
    .config('spark.executor.instances','6') \
    .config("spark.executor.memory", '16g')\
    .appName("ss_004")\
    .getOrCreate()
# .config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1")\

In [5]:
spark

In [None]:
sc = spark.sparkContext

In [None]:
spark.stop()

---

### Making Parquet files

Do only once per datasets

In [None]:
# df = spark.read.csv('/user/ss/datasets/ss_fteng_G3V3_ohe_recip_20200218a.csv', \
#                     sep = ',', header = True, inferSchema = True)

In [None]:
# df.printSchema()

In [None]:
# df.columns

In [None]:
# df

In [None]:
# # DataFrames can be saved as Parquet files, maintaining the schema information.
# df.write.parquet('/user/ss/datasets/ss_fteng_G3V3_ohe_recip_20200218a.parquet')

---

### Read parquet files

In [None]:
# !hdfs dfs -ls /user/ss/datasets/

In [6]:
# Reading files from parquet
df = spark.read.format("parquet") \
.load("/user/ss/datasets/ss_fteng_G3V3_ohe_recip_20200218a.parquet.gzip")

In [None]:
# final_df = final_df.repartition(4)

In [None]:
df.printSchema()

In [None]:
df.count()

In [None]:
df = df.sample(fraction=0.001, seed=924)

In [None]:
df.count()

In [None]:
# df.sample?

In [None]:
df.printSchema()

---

### Read csv files

In [None]:
df = spark.read.csv('/user/ss/datasets/ss_fteng_G3V3_ohe_recip_20200218a.csv', \
                    sep = ',', header = True, inferSchema = True)

In [None]:
df.count()

In [None]:
df.printSchema()

In [None]:
# df = df[df.columns].cast(double)
# df.select(df.age.cast(StringType()).alias('ages')).collect()

In [None]:
df.write.format("parquet")\
.mode("overwrite")\
.option("compression","gzip")\
.save("/user/ss/datasets/ss_fteng_G3V3_ohe_recip_20200218a.parquet.gzip")

---

## Splitting train and test

In [12]:
train_df = df[df['TARGET'].isNotNull()]
test_df = df[df['TARGET'].isNull()]

In [13]:
train_df.count(), test_df.count()

(307511, 48744)

In [None]:
# df.describe().show()

In [None]:
# assert df.count() == (train_df.count() + test_df.count())

In [None]:
train_df.columns 

---

---

---

In [None]:
# # spark機器學習要求輸入的DataFrame類型為數值類型， 將本來的string欄位轉換成double，並替代空值
# for col, t in app_train.dtypes:
#     if t == "string":
#         app_train = app_train.withColumn(col, app_train[col].cast("double"))

# app_train = app_train.withColumn("TARGET", app_train["TARGET"].cast("int"))
# app_train = app_train.fillna(999999)

In [None]:
# train_df.columns

In [14]:
# 跟在普通單機上做訓練時不同，spark做訓練時所有特徵列需要通過VectorAssembler轉換成特徵矩陣，才能用來訓練
import pyspark.ml.feature as ft
featuresCreator = ft.VectorAssembler(
    inputCols=[col for col in df.columns[:] if col not in ["TARGET"]],
    outputCol='features'
    )

##### info funtion

In [15]:
# # function
# def info(slef):
#     for i in spark.sparkContext._conf.getAll():
#         if i[0] in ['spark.executor.instances','spark.executor.cores','spark.executor.memory']:
#             print(f'{i[0]} : {i[1]}')
#     print(f'Partitions : {app_train.rdd.getNumPartitions()}')

---

In [20]:
# 實例化一個LightGBM Regressor， 其參數和單機版本類似但不盡相同， 文檔可以在以下鏈接找到：
# https://mmlspark.azureedge.net/docs/pyspark/LightGBMRegressor.html
from mmlspark.lightgbm import LightGBMRegressor
lgbm = LightGBMRegressor(
    boostingType="goss",
    numIterations=100,
    objective='binary',
    learningRate=0.03,
#     baggingSeed=50,
    lambdaL1=0.8,
    lambdaL2=0.8,
#     baggingFraction=0.87,
    minSumHessianInLeaf=0.03,
    maxDepth=31,
#     featureFraction=0.66,
    numLeaves=63,
    labelCol="TARGET"
                          )

In [21]:
# 建立一個pipeline，簡化訓練步驟
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
                # 特徵整理
                featuresCreator,
                # 模型名稱
                    lgbm])

---

#### One fold

In [22]:
# 這裡是將訓練數據分成訓練集和驗證集，測試模型預測效果 (OK)
import pyspark.ml.evaluation as ev
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator

In [23]:
# res_list = []
# summ = 0
# countt = 0
# for x in range(1, 6):

tr, tv = train_df.randomSplit([0.7, 0.3])

#     res_list = res_list.append(auroc)
#     summ = summ + auroc 
#     countt = countt +1
#     print(f'iteration {x}, Area Under ROC {auroc}')

# print (f'avg AUROC {(summ/countt)}')

# print (f'best AUROC {max(res_list)}')
# print ("")
# print(evaluator.evaluate(t_prediction, {evaluator.metricName: 'areaUnderROC'}))

In [24]:
# 跟在普通單機上做訓練時不同，spark做訓練時所有特徵列需要通過VectorAssembler轉換成特徵矩陣，才能用來訓練
import pyspark.ml.feature as ft
featuresCreator = ft.VectorAssembler(
    inputCols=[col for col in tr.columns[:] if col not in ["TARGET"]],
    outputCol='features'
    )

In [38]:
# %%time
vmodel = pipeline.fit(tr)

In [39]:
# %%time
t_prediction = vmodel.transform(tv)

In [40]:
evaluator = ev.BinaryClassificationEvaluator(
     rawPredictionCol='prediction',
     labelCol='TARGET')

In [41]:
auroc = evaluator.evaluate(t_prediction)

In [42]:
print (f'avg AUROC {(auroc)}')

avg AUROC 0.7804768061351615


In [43]:
print(vmodel)

PipelineModel_febd361aa07a


In [44]:
print(t_prediction)

DataFrame[SK_ID_CURR: int, TARGET: double, AMT_REQ_CREDIT_BUREAU_DAY_1_0: int, AMT_REQ_CREDIT_BUREAU_DAY_2_0: int, AMT_REQ_CREDIT_BUREAU_DAY_3_0: int, AMT_REQ_CREDIT_BUREAU_DAY_4_0: int, AMT_REQ_CREDIT_BUREAU_DAY_5_0: int, AMT_REQ_CREDIT_BUREAU_DAY_6_0: int, AMT_REQ_CREDIT_BUREAU_DAY_8_0: int, AMT_REQ_CREDIT_BUREAU_DAY_9_0: int, AMT_REQ_CREDIT_BUREAU_DAY_nan: int, AMT_REQ_CREDIT_BUREAU_HOUR_1_0: int, AMT_REQ_CREDIT_BUREAU_HOUR_2_0: int, AMT_REQ_CREDIT_BUREAU_HOUR_3_0: int, AMT_REQ_CREDIT_BUREAU_HOUR_4_0: int, AMT_REQ_CREDIT_BUREAU_HOUR_nan: int, AMT_REQ_CREDIT_BUREAU_MON_1_0: int, AMT_REQ_CREDIT_BUREAU_MON_10_0: int, AMT_REQ_CREDIT_BUREAU_MON_11_0: int, AMT_REQ_CREDIT_BUREAU_MON_12_0: int, AMT_REQ_CREDIT_BUREAU_MON_13_0: int, AMT_REQ_CREDIT_BUREAU_MON_14_0: int, AMT_REQ_CREDIT_BUREAU_MON_15_0: int, AMT_REQ_CREDIT_BUREAU_MON_16_0: int, AMT_REQ_CREDIT_BUREAU_MON_17_0: int, AMT_REQ_CREDIT_BUREAU_MON_18_0: int, AMT_REQ_CREDIT_BUREAU_MON_19_0: int, AMT_REQ_CREDIT_BUREAU_MON_2_0: int, AMT_RE

In [45]:
print(evaluator)

BinaryClassificationEvaluator_2b69e40cb988


In [46]:
print(auroc)

0.7804768061351615


---

#### Predict the real test_df

In [47]:
test_prediction = vmodel.transform(test_df)

In [48]:
# 测试集结果输出，从hadoop里将预测数据下载到本机
res = test_prediction.select("SK_ID_CURR", "prediction")
res = res.withColumn("TARGET", res["prediction"])
res = res.select("SK_ID_CURR", "TARGET")

In [52]:
res.show()

+----------+--------------------+
|SK_ID_CURR|              TARGET|
+----------+--------------------+
|    358233|0.024132373856832288|
|    358247|  0.2158121726532002|
|    358258| 0.08563018619947321|
|    358264| 0.10590549863527618|
|    358271| 0.18080085080814795|
|    358273| 0.04829308040105044|
|    358275| 0.07008210655556893|
|    358278| 0.04702128916496139|
|    358287| 0.06805488050674269|
|    358309| 0.04698452259814253|
|    358327|0.057514462264875596|
|    358342|0.041252224178592166|
|    358348| 0.05405219403903489|
|    358352|0.028246558996403666|
|    358373| 0.12345075266078087|
|    358384| 0.04832805580055222|
|    358394|0.034014284528371144|
|    358405|  0.2028062416020224|
|    358409|0.029273721222245787|
|    358412| 0.36126626523418864|
+----------+--------------------+
only showing top 20 rows



In [55]:
res.coalesce(1).write.format("csv")\
    .mode("overwrite")\
    .option("header", "true")\
    .option("mode", "FAILFAST")\
    .option("inferSchema","true")\
    .save("/user/ss/ss_outputs/20200224d_lgbm_on_spark_bdse170_004.csv")

In [50]:
# res.coalesce(1).write.csv("/user/ss/ss_outputs/20200224d_lgbm_on_spark_bdse170_004.csv", header='true')

AnalysisException: 'path hdfs://nncluster/user/ss/ss_outputs/20200224c_lgbm_on_spark_bdse170_004.csv already exists.;'

In [None]:
spark.stop()

---

#### k-fold

In [None]:
# 交叉驗證
import pyspark.ml.evaluation as ev
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator

paramGrid = ParamGridBuilder()\
    .addGrid(lgbm.learningRate, [0.01, 0.05, 0.1])\
    .addGrid(lgbm.lambdaL1, [0, 1])\
    .addGrid(lgbm.lambdaL2, [0, 1])\
    .addGrid(lgbm.maxDepth, [1, 7, 14, 21, 31])\
    .addGrid(lgbm.numLeaves, [1, 7, 14, 21, 31, 49, 99])\
    .build()

evaluatorAUROC = ev.BinaryClassificationEvaluator(
    labelCol='TARGET',
    rawPredictionCol='prediction',
    metricName= 'areaUnderROC')

cv = CrossValidator(
    estimator = pipeline,
    estimatorParamMaps = paramGrid,
    evaluator = evaluatorAUROC,
    numFolds = 3)

cv_model = cv.fit(train_df)

In [None]:
cv_prediction = cv_model.transform(test_df)
cv_prediction.show()

evaluatorAUROC = ev.BinaryClassificationEvaluator(
    labelCol='TARGET',
    rawPredictionCol='prediction',
    metricName= 'areaUnderROC')

auroc = evaluatorAUROC.evaluate(cv_prediction)
print(f'AUROC {auroc}')

In [None]:
# cv_model.avgMetrics

In [None]:
# cv_model.bestModel

In [None]:
cv_prediction = cv_model.transform(test_df)
cv_prediction.show()

print(f'Full AUC {evaluator.evaluate(cv_prediction)}')
print(evaluator.evaluate(cv_model, {evaluator.metricName: 'areaUnderROC'}))

selected = cv_prediction.select("SK_ID_CURR", "TARGET")
for row in selected.collect():
    print(row)


In [None]:
# # 找出最佳模型
# from pyspark.ml.tuning import ParamGridBuilder
# from pyspark.ml.tuning import TrainValidationSplit

# paramGrid = ParamGridBuilder()\
#     .addGrid(lgbm.numLeaves, [10,20,30])\
#     .addGrid(lgbm.numIterations, [100,160,200])\
#     .addGrid(lgbm.baggingSeed, [25,50,75])\
#     .build()
# tvs = TrainValidationSplit( estimator=lgbm,
#     estimatorParamMaps=paramGrid,
#     evaluator=evaluator,
#     trainRatio = 0.8)
# # 最佳模型
# tvs_pipeline = Pipeline(stages=[featuresCreator,tvs])

# tvs_pipelineModel = tvs_pipeline.fit(train_df)

# prediction = tvs_pipelineModel.transform(test_df)
# print(f'Full AUC {evaluator.evaluate(prediction)}')


In [None]:
model = pipeline.fit(train_df)

In [None]:
prediction = model.transform(test_df)

In [None]:
# 测试集结果输出，从hadoop里将预测数据下载到本机
res = prediction.select("SK_ID_CURR", "prediction")
res = res.withColumn("TARGET", res["prediction"])
res = res.select("SK_ID_CURR", "TARGET")

In [None]:
res.coalesce(1).write.csv(""/user/ss/ss_outputs", header='true')

---

### Reference

https://zhuanlan.zhihu.com/p/67828512

In [None]:
from mmlspark import LightGBMRegressor
import pyspark.ml.feature as ft
from pyspark.ml import Pipeline
import pyspark.ml.evaluation as ev
import pyspark.sql.types as typ

# 如果你是通过spark-submit来运行，则需要先实例化一个spark session对象， 在pyspark中spark session对象已经默认生成
# from pyspark import SparkConf, SparkContext
# from pyspark.sql import SparkSession
# conf = SparkConf().setMaster("spark://master:7077").setAppName("MMLSPARK")
# sc = SparkContext(conf = conf)
# spark = SparkSession \
#         .builder \
#         .appName("MMLSPARK") \
#         .enableHiveSupport() \
#         .getOrCreate()


# 读取csv数据，这里读取的是事先使用hadoop fs -put命令上传到hadoop里的数据
app_train = spark.read.csv("/homecredit/train_all3.csv", header='true', inferSchema='true')

# 数据预处理， 将本来应该是数字的字符串数据转化数据类型，并替代空值
for col, t in app_train.dtypes:
    if t == "string":
        app_train = app_train.withColumn(col, app_train[col].cast("double"))

app_train = app_train.withColumn("TARGET", app_train["TARGET"].cast("int"))
app_train = app_train.fillna(999999)

# 跟在普通单机上做训练时不同，spark做训练时所有特征列需要通过VectorAssembler转换成特征矩阵，才能用来训练
featuresCreator = ft.VectorAssembler(
    inputCols=[col for col in app_train.columns[1:] if col != "TARGET"],
    outputCol='features'
    )


# 实例化一个LightGBM Regressor， 其参数和单机版本类似但不尽相同， 文档可以在以下链接找到：
# https://mmlspark.azureedge.net/docs/pyspark/LightGBMRegressor.html
lgbm = LightGBMRegressor(numIterations=120, objective='binary',
        learningRate=0.007, baggingSeed=50,
        boostingType="goss", lambdaL1=0.4, lambdaL2=0.4,
        baggingFraction=0.87, minSumHessianInLeaf=0.003,
        maxDepth=9, featureFraction=0.66, numLeaves=47,
        labelCol="TARGET"
                          )

# 建立一个pipeline，简化训练步骤
pipeline = Pipeline(stages=[
                # 特征整理
                featuresCreator,
                # 模型名称
                    lgbm])

# 这里是将数据分成训练集和验证集，测试模型预测效果
tr, te = app_train.randomSplit([0.7, 0.3], seed=666)

vmodel = pipeline.fit(tr)
t_model = vmodel.transform(te)
evaluator = ev.BinaryClassificationEvaluator(
     rawPredictionCol='prediction',
     labelCol='TARGET')
print(evaluator.evaluate(t_model,
 {evaluator.metricName: 'areaUnderROC'}))

# 实际训练过程
model = pipeline.fit(app_train)

# 测试集的数据预处理和训练
app_test = spark.read.csv("/homecredit/test_all3.csv", header='true', inferSchema='true')
for col, t in app_test.dtypes:
    if t == "string":
        app_test = app_test.withColumn(col, app_test[col].cast("double"))
app_test = app_test.fillna(999999)
prediction = model.transform(app_test)

# 测试集结果输出，从hadoop里将预测数据下载到本机
res = prediction.select("SK_ID_CURR", "prediction")
res = res.withColumn("TARGET", res["prediction"])
res = res.select("SK_ID_CURR", "TARGET")
res.coalesce(1).write.csv("/homecredit/cluster_lgbm.csv", header='true')

---

In [None]:
%%time

# 交叉驗證

paramGrid = ParamGridBuilder()\
    .addGrid(lgbm.learningRate, [0.095,0.1,0.105])\
    .build()

cv = CrossValidator(
    estimator=lgbm,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds = 5)

cv_pipeline = Pipeline(stages=[featuresCreator,cv])

cv_model = cv_pipeline.fit(train_df)
cv_prediction = cv_model.transform(test_df)

print(f'Full AUC {evaluator.evaluate(cv_prediction)}')

In [None]:
%%time

# 找出最佳模型

from pyspark.ml.tuning import TrainValidationSplit
paramGrid = ParamGridBuilder()\
    .addGrid(lgbm.numLeaves, [10,20,30])\
    .addGrid(lgbm.numIterations, [100,160,200])\
    .addGrid(lgbm.baggingSeed, [25,50,75])\
    .build()
tvs = TrainValidationSplit( estimator=lgbm,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    trainRatio = 0.8)
# 最佳模型
tvs_pipeline = Pipeline(stages=[featuresCreator,tvs])

tvs_pipelineModel = tvs_pipeline.fit(train_df)

prediction = tvs_pipelineModel.transform(test_df)
print(f'Full AUC {evaluator.evaluate(prediction)}')


In [None]:
%%time

# 找出最佳模型+交叉驗證

from pyspark.ml.tuning import TrainValidationSplit
paramGrid = ParamGridBuilder()\
    .addGrid(lgbm.numLeaves, [10,20,30])\
    .addGrid(lgbm.numIterations, [100,200,300])\
    .addGrid(lgbm.baggingSeed, [25,50,75])\
    .build()

cv = CrossValidator(
    estimator=lgbm,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds = 10)

tvs_pipeline = Pipeline(stages=[featuresCreator,cv])

tvs_pipelineModel = tvs_pipeline.fit(train_df)

prediction = tvs_pipelineModel.transform(test_df)
print(f'Full AUC {evaluator.evaluate(prediction)}')


---

In [None]:
# https://spark.apache.org/docs/latest/ml-tuning.html
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Prepare training documents, which are labeled.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0),
    (10, "spark compile", 1.0),
    (11, "hadoop software", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

# Prepare test documents, which are unlabeled.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "mapreduce spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    print(row)