In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark import SparkContext, SparkConf

In [2]:
conf = SparkConf().setAppName("Spark_mllearn_example").setMaster("local")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.master("local").appName("Spark_mllearn_example").config("", "").getOrCreate()

In [3]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [4]:
dpath = 'data/Titanic/'
df = spark.read.csv(dpath + 'train.csv', header=True)
df

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen ...",male,22.0,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. ...",female,26.0,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Ja...",female,35.0,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. Willia...",male,35.0,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
7,0,1,"McCarthy, Mr. Tim...",male,54.0,0,0,17463,51.8625,E46,S
8,0,3,"Palsson, Master. ...",male,2.0,3,1,349909,21.075,,S
9,1,3,"Johnson, Mrs. Osc...",female,27.0,0,2,347742,11.1333,,S
10,1,2,"Nasser, Mrs. Nich...",female,14.0,1,0,237736,30.0708,,C


In [5]:
df = df.select('Survived', 'Pclass', 'SibSp', 'Parch', 'Fare')
df

Survived,Pclass,SibSp,Parch,Fare
0,3,1,0,7.25
1,1,1,0,71.2833
1,3,0,0,7.925
1,1,1,0,53.1
0,3,0,0,8.05
0,3,0,0,8.4583
0,1,0,0,51.8625
0,3,3,1,21.075
1,3,0,2,11.1333
1,2,1,0,30.0708


## 查看每行记录的缺失值

In [6]:
# df.rdd.map(lambda row: (row['PassengerId'], sum([c == None for c in row]))).collect()

In [7]:
# import pyspark.sql.functions as fn
# df.agg(*[(1-(fn.count(c) / fn.count('*'))).alias(c+'_missing') for c in df.columns]).show()

In [8]:
df = df.withColumn('Survived', df['Survived'].cast('double')) \
.withColumn('Pclass', df['Pclass'].cast('double')) \
.withColumn('SibSp', df['SibSp'].cast('double')) \
.withColumn('Parch', df['Parch'].cast('double')) \
.withColumn('Fare', df['Fare'].cast('double'))
df

Survived,Pclass,SibSp,Parch,Fare
0.0,3.0,1.0,0.0,7.25
1.0,1.0,1.0,0.0,71.2833
1.0,3.0,0.0,0.0,7.925
1.0,1.0,1.0,0.0,53.1
0.0,3.0,0.0,0.0,8.05
0.0,3.0,0.0,0.0,8.4583
0.0,1.0,0.0,0.0,51.8625
0.0,3.0,3.0,1.0,21.075
1.0,3.0,0.0,2.0,11.1333
1.0,2.0,1.0,0.0,30.0708


In [9]:
input_cols = ['Pclass', 'SibSp', 'Parch', 'Fare']
vecAssembler = VectorAssembler(inputCols=input_cols, outputCol='features')
string2Indexer = StringIndexer(inputCol='Survived', outputCol='label')
pipeline = Pipeline(stages=[vecAssembler, string2Indexer])
pipelineFit = pipeline.fit(df)
dataset = pipelineFit.transform(df)
dataset.show(10)

+--------+------+-----+-----+-------+--------------------+-----+
|Survived|Pclass|SibSp|Parch|   Fare|            features|label|
+--------+------+-----+-----+-------+--------------------+-----+
|     0.0|   3.0|  1.0|  0.0|   7.25|  [3.0,1.0,0.0,7.25]|  0.0|
|     1.0|   1.0|  1.0|  0.0|71.2833|[1.0,1.0,0.0,71.2...|  1.0|
|     1.0|   3.0|  0.0|  0.0|  7.925| [3.0,0.0,0.0,7.925]|  1.0|
|     1.0|   1.0|  1.0|  0.0|   53.1|  [1.0,1.0,0.0,53.1]|  1.0|
|     0.0|   3.0|  0.0|  0.0|   8.05|  [3.0,0.0,0.0,8.05]|  0.0|
|     0.0|   3.0|  0.0|  0.0| 8.4583|[3.0,0.0,0.0,8.4583]|  0.0|
|     0.0|   1.0|  0.0|  0.0|51.8625|[1.0,0.0,0.0,51.8...|  0.0|
|     0.0|   3.0|  3.0|  1.0| 21.075|[3.0,3.0,1.0,21.075]|  0.0|
|     1.0|   3.0|  0.0|  2.0|11.1333|[3.0,0.0,2.0,11.1...|  1.0|
|     1.0|   2.0|  1.0|  0.0|30.0708|[2.0,1.0,0.0,30.0...|  1.0|
+--------+------+-----+-----+-------+--------------------+-----+
only showing top 10 rows



## 划分训练集和测试集

In [10]:
trainData, testData = dataset.randomSplit([0.7, 0.4], 123)
print('Train Dataset Count: {}'.format(trainData.count()))
print('Test  Dataset Count: {}'.format(testData.count()))

Train Dataset Count: 556
Test  Dataset Count: 335


## 模型训练

In [11]:
# 模型训练
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainData)

In [12]:
# 模型预测
prediction = lrModel.transform(testData)

# ROC score
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(prediction)

0.6895991939095386

In [13]:
# 使用十折交叉验证
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

grid = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.1, 0.3, 0.5])
    .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2])
    .build()
)

cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=grid,
    evaluator=evaluator,
    numFolds=10
)
cvModel = cv.fit(trainData)

predictions = cvModel.transform(testData)

evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

0.6977907150320942

In [16]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

rf = RandomForestClassifier(
    numTrees=3,
    maxDepth=10,
    maxBins=30,
    labelCol='label',
    seed=123
)

grid = (ParamGridBuilder()
        .addGrid(rf.numTrees, [1, 3, 5])
        .addGrid(rf.maxDepth, [3, 5, 7, 10])
        .addGrid(rf.maxBins, [20, 30, 40])
        .build())

evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(
    estimator=rf,
    evaluator=evaluator,
    estimatorParamMaps=grid,
    numFolds=10
)
cvModel_rf = cv.fit(trainData)
predictions = cvModel_rf.transform(testData)
evaluator.evaluate(predictions)

0.7516606956262128