In [1]:
#!/usr/bin/env python
# coding: utf-8



# pyspark读写dataframe
# import findspark
# findspark.init()

from pyspark.sql import SparkSession
# 创建SparkSession
spark=SparkSession.builder.appName('Titanic').getOrCreate()

In [2]:
# 读取训练集，带有header，自动推断字段类型
df = spark.read.csv("./train.csv", header=True, inferSchema=True).cache()
# 创建临时表train
df.createOrReplaceTempView("train")
df.show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [3]:
# ## EDA探索


from pyspark.sql import Row
from pyspark.sql.functions import *
# 输出schema，dataframe的数据结构信息
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [4]:
# 对Age字段进行描述统计
df.describe(['age']).show()

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|               714|
|   mean| 29.69911764705882|
| stddev|14.526497332334035|
|    min|              0.42|
|    max|              80.0|
+-------+------------------+



In [5]:
# 缺失值统计, Spark SQL类型转换使用cast, col函数将字符串转换为column对象
df.select(*(
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df.columns)).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



In [6]:
# 使用"""定义多行字符串
query = """
SELECT Embarked, count(PassengerId) as count
FROM train
WHERE Survived = 1
GROUP BY Embarked
"""
spark.sql(query).show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   30|
|    null|    2|
|       C|   93|
|       S|  217|
+--------+-----+



In [7]:
# ## 数据预处理



from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType
# cabin字段缺失值多，去掉该字段
df = df.drop('cabin')
before = df.select('age').where('age is null').count()
print("Age字段缺失个数（处理前）: {}".format(before))
# 使用df.na处理缺失值
test = df.na.drop(subset=["age"])
after = test.select('age').where('age is null').count()
print("Age字段缺失个数（处理后）: {}".format(after))

Age字段缺失个数（处理前）: 177
Age字段缺失个数（处理后）: 0


In [8]:
# 按照survived字段统计
df.groupBy('survived').count().show()

# 按照survived的一定比例进行采样
sample_df = df.sampleBy('survived', fractions={0: 0.1, 1: 0.5}, seed=0)
sample_df.groupBy('survived').count().show()

+--------+-----+
|survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+

+--------+-----+
|survived|count|
+--------+-----+
|       1|  160|
|       0|   63|
+--------+-----+



In [9]:
# 添加字段len_name，代表 乘客name的长度
str_length = udf(lambda x: len(x), IntegerType())
# 使用withColumn添加字段
df = df.withColumn('len_name', str_length(df['name']))
df.select('name', 'len_name').show(5)

+--------------------+--------+
|                name|len_name|
+--------------------+--------+
|Braund, Mr. Owen ...|      23|
|Cumings, Mrs. Joh...|      51|
|Heikkinen, Miss. ...|      22|
|Futrelle, Mrs. Ja...|      44|
|Allen, Mr. Willia...|      24|
+--------------------+--------+
only showing top 5 rows



In [10]:
# 将类别变量 转化为数值
def embarked_to_int(embarked):
    if embarked == 'C': return 1
    if embarked == 'Q': return 2
    if embarked == 'S': return 3    
    return 0

# 使用udf，定义函数，将类别变量 转化为数值，使用Spark ML中StringIndexer，结果也是一样的
embarked_to_int = udf(embarked_to_int, IntegerType())
# 添加embarked_index字段
df = df.withColumn('embarked_index', embarked_to_int(df['embarked']))
df.select('embarked', 'embarked_index').show(5)

+--------+--------------+
|embarked|embarked_index|
+--------+--------------+
|       S|             3|
|       C|             1|
|       S|             3|
|       S|             3|
|       S|             3|
+--------+--------------+
only showing top 5 rows



In [11]:
# 计算各列的均值
mean = df.agg(*(mean(c).alias(c) for c in df.columns))
# 字典数据持久化
meaninfo = mean.first().asDict()
print(meaninfo)
# 填充
df = df.fillna(meaninfo["Age"])

{'PassengerId': 446.0, 'Survived': 0.3838383838383838, 'Pclass': 2.308641975308642, 'Name': None, 'Sex': None, 'Age': 29.69911764705882, 'SibSp': 0.5230078563411896, 'Parch': 0.38159371492704824, 'Ticket': 260318.54916792738, 'Fare': 32.2042079685746, 'Embarked': None, 'len_name': 27.20314253647587, 'embarked_index': 2.529741863075196}


In [12]:
# 将sex字段进行数值编码
df.select('sex', 
    when(df['sex'] == 'male', 0).otherwise(1).alias('sex_ix')).show(5)


# ## 数据抽取，转换与特征选择

+------+------+
|   sex|sex_ix|
+------+------+
|  male|     0|
|female|     1|
|female|     1|
|female|     1|
|  male|     0|
+------+------+
only showing top 5 rows



In [13]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
# StringIndexer将一组字符型标签编码成一组标签索引
df = StringIndexer(inputCol='Sex', outputCol='sex_index').fit(df).transform(df)
df.select('Sex', 'sex_index').show(5)

+------+---------+
|   Sex|sex_index|
+------+---------+
|  male|      0.0|
|female|      1.0|
|female|      1.0|
|female|      1.0|
|  male|      0.0|
+------+---------+
only showing top 5 rows



In [14]:
# 将指定的多个列进行合并，方便后续的ML计算
inputCols = ['Pclass', 'Age', 'SibSp', 'Parch', 'Fare', 'embarked_index', 'sex_index', 'len_name']
#创建features列，使用VectorAssembler将给定列列表组合成单个向量列
assembler = VectorAssembler(inputCols=inputCols, outputCol='features')
train = assembler.transform(df).select('PassengerId', col('Survived').alias('label'), 'features')

In [15]:
train.show(5)

+-----------+-----+--------------------+
|PassengerId|label|            features|
+-----------+-----+--------------------+
|          1|    0|[3.0,22.0,1.0,0.0...|
|          2|    1|[1.0,38.0,1.0,0.0...|
|          3|    1|[3.0,26.0,0.0,0.0...|
|          4|    1|[1.0,35.0,1.0,0.0...|
|          5|    0|[3.0,35.0,0.0,0.0...|
+-----------+-----+--------------------+
only showing top 5 rows



In [16]:
# ## 模型训练与预测


# 使用随机森林
from pyspark.ml.classification import RandomForestClassifier
# 将数据集切分为80%训练集，20%测试集
splits = train.randomSplit([0.8, 0.2])
train = splits[0].cache()
test = splits[1].cache()

# cacheNodeIds: 是否缓存节点ID
model = RandomForestClassifier(
    labelCol="label",
    featuresCol="features",
    cacheNodeIds=True)
# 使用train进行训练，test进行预测
predict = model.fit(train).transform(test)
predict.show(5)

+-----------+-----+--------------------+--------------------+--------------------+----------+
|PassengerId|label|            features|       rawPrediction|         probability|prediction|
+-----------+-----+--------------------+--------------------+--------------------+----------+
|         12|    1|[1.0,58.0,0.0,0.0...|[3.94059419701568...|[0.19702970985078...|       1.0|
|         22|    1|[2.0,34.0,0.0,0.0...|[17.1907170516018...|[0.85953585258009...|       0.0|
|         50|    0|[3.0,18.0,1.0,0.0...|[11.9729748030348...|[0.59864874015174...|       0.0|
|         51|    0|[3.0,7.0,4.0,1.0,...|[15.6924051556717...|[0.78462025778358...|       0.0|
|         54|    1|[2.0,29.0,1.0,0.0...|[3.17866143258140...|[0.15893307162907...|       1.0|
+-----------+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [17]:
# ## 模型评估




from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
"""
    二分类评估：BinaryClassificationEvaluator
    多分类评估：MulticlassClassificationEvaluator
    回归评估：RegressionEvaluator
    聚类评估：ClusteringEvaluator
"""
evaluator = MulticlassClassificationEvaluator(
    predictionCol="prediction", 
    labelCol="label", 
    metricName="accuracy")
print(evaluator.evaluate(predict))

evaluator = BinaryClassificationEvaluator(
    rawPredictionCol='prediction',
    labelCol='label',
    metricName='areaUnderROC')
print(evaluator.evaluate(predict))

0.8491620111731844
0.8147235905856596


In [18]:
model.numTrees
model.maxDepth

Param(parent='RandomForestClassifier_4d88d0f5b91c', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.')