In [1]:
#读取数据
from pyspark.sql import SparkSession 
spark=SparkSession.builder.appName("dataframe").getOrCreate()
sc = SparkContext.getOrCreate()
data = spark.read.csv(r"data_mod.csv",header=True,inferSchema=True)

  "You are passing in an insecure Py4j gateway.  This "


In [2]:
data.printSchema()


root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- ActualElapsedTime: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: double (nullable = true)



# 问题探索
这是航班的历史数据，当然可以做些统计分析看些有何问题，当然我们更重要是做些数据挖掘，这其实可以定位出很多问题来。比如相关分析，看航班和延误的关联关系，研究航班延误的相关因素，还可以做回归，直接把延迟时间作为目标来做回归分析，也可以将是否延误作为一个目标来做二分类预测，也可以将延迟时间分段来作分析。这个就需要从各个角度来看这个延迟时间的问题。  
比如从公共交通事业的角度，我们可以看那机场的延误来比较航班的及时率等服务水平，如果一些公共事务的资源调度也可以参考。另外从旅客的角度，我们可以根据大数据选择一些优化的航线，减少碰到延误的概率，从而规划自己的行程。当然更多是航空公司和保险公司的商业角度,这可能还要关联其他数据。例如航班延误险是很多公司都有的服务，但是一般是根据多少时间，30分钟，60分钟或是2个小时等分段赔付多少，或是直接设置一个赔付阈值，比如国内的比较长，2个小时3个小时以上都有。  
这里我们做个判断，假设是30分钟为一个分界点，也就是起飞延迟大于30分钟设为1，其它为0，这就是个二分类问题。然后建立分类模型，再用数据预测。

## 生成目标变量

In [13]:
#udf
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType


# 定义一个 udf 函数 
def IsDelay(DepDelay):
    if DepDelay>30:
        return 1
    else:
        return 0

# 返回类型为int类型
udfDelay = udf(IsDelay, IntegerType())
# 使用

data1 =  data.withColumn('IsDelay', udfDelay(data.DepDelay))

In [20]:
data1.groupBy('IsDelay').count().show()

+-------+-------+
|IsDelay|  count|
+-------+-------+
|      1|  90945|
|      0|1196388|
+-------+-------+



In [15]:
data1.show(3)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-----------------+--------------+--------+--------+------+----+--------+-------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|ActualElapsedTime|CRSElapsedTime|ArrDelay|DepDelay|Origin|Dest|Distance|IsDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-----------------+--------------+--------+--------+------+----+--------+-------+
|1987|   10|        14|        3|    741|       730|    912|       849|           PS|     1451|               91|            79|      23|      11|   SAN| SFO|   447.0|      0|
|1987|   10|        15|        4|    729|       730|    903|       849|           PS|     1451|               94|            79|      14|      -1|   SAN| SFO|   447.0|      0|
|1987|   10|        17|        6|    741|       730|    918|       849|           PS|     1451|               97|       

## 特征选择
先根据业务数据个人理解，year一般没多少区分，Month可能反应淡季旺季，也可能和天气有关，可以选择；  
DayofMonth 是几月几日，不是明显特征；  
DayOfWeek 是星期几，可以反应哪天比较多，是选择的特征；  
CRSDepTime、CRSArrTime、CRSElapsedTime是规定时间，可以获取的特征数据；  
UniqueCarrier、FlightNum、Origin、Dest 是分类特征；  
Distance为数值变量，距离也是最值得参考的特征；  
另外DayOfWeek也可能需要one-hot转换。


In [22]:
#
data1.columns

['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime', 'UniqueCarrier', 'FlightNum', 'ActualElapsedTime', 'CRSElapsedTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest', 'Distance', 'IsDelay']

In [23]:
features = ['Month',   'DayOfWeek',  'CRSDepTime', 'CRSArrTime', 'UniqueCarrier', 'FlightNum', 'CRSElapsedTime', 'Origin', 'Dest', 'Distance']
target = 'IsDelay'

categoryFeatures = ['UniqueCarrier', 'FlightNum','Origin', 'Dest']

## 特征处理
首先将标签特征转为数值特征，这里使用StringIndexer,可以设置一个管道来通过转换器传递数据，以便提取特征和标签。

In [44]:
from pyspark.ml.feature import StringIndexer 
from pyspark.ml import Pipeline
#创建StringIndexer对象，设定输入输出参数

pipeline = Pipeline(stages=[
    StringIndexer(inputCol=c, outputCol='{}_index'.format(c))
    for c in categoryFeatures
])

stringindex = pipeline.fit(data1)


In [45]:
indexData = stringindex.transform(data1)
indexData.show(5)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-----------------+--------------+--------+--------+------+----+--------+-------+-------------------+---------------+------------+----------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|ActualElapsedTime|CRSElapsedTime|ArrDelay|DepDelay|Origin|Dest|Distance|IsDelay|UniqueCarrier_index|FlightNum_index|Origin_index|Dest_index|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-----------------+--------------+--------+--------+------+----+--------+-------+-------------------+---------------+------------+----------+
|1987|   10|        14|        3|    741|       730|    912|       849|           PS|     1451|               91|            79|      23|      11|   SAN| SFO|   447.0|      0|               11.0|         1081.0|        28.0|       5.0|
|1987|   10|        15|        4|    729|       730|    

In [48]:
indexData.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- ActualElapsedTime: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: double (nullable = true)
 |-- IsDelay: integer (nullable = true)
 |-- UniqueCarrier_index: double (nullable = false)
 |-- FlightNum_index: double (nullable = false)
 |-- Origin_index: double (nullable = false)
 |-- Dest_index: double (nullable = false)



In [49]:
#from pyspark.ml.feature import OneHotEncoder 
#from pyspark.ml import Pipeline
##创建OneHotEncoder对象，设定输入输出参数
#categoryFeaturesIndex = ['Month','DayOfWeek','UniqueCarrier_index','FlightNum_index','Origin_index','Dest_index']
#pipeline = Pipeline(stages=[
#    OneHotEncoder(inputCol=c, outputCol='{}_vec'.format(c))
#    for c in [categoryFeaturesIndex]
#])

#onehot = pipeline.fit(indexData)
#encodeData = onehot.transform(indexData)


Name: org.apache.toree.interpreter.broker.BrokerException
Message: Traceback (most recent call last):
  File "/tmp/kernel-PySpark-1b31336a-190d-42fd-82a4-15c313e78824/pyspark_runner.py", line 158, in <module>
    sleep(1)
  File "/spark/python/pyspark/context.py", line 270, in signal_handler
    raise KeyboardInterrupt()
KeyboardInterrupt

StackTrace: org.apache.toree.interpreter.broker.BrokerState$$anonfun$markFailure$1.apply(BrokerState.scala:163)
org.apache.toree.interpreter.broker.BrokerState$$anonfun$markFailure$1.apply(BrokerState.scala:163)
scala.Option.foreach(Option.scala:257)
org.apache.toree.interpreter.broker.BrokerState.markFailure(BrokerState.scala:162)
sun.reflect.GeneratedMethodAccessor91.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)


In [58]:
#去除无用特征列
print(indexData.columns)
use_data = indexData.select([ 'Month',  'DayOfWeek',  'CRSDepTime', 'CRSArrTime', 'CRSElapsedTime',  'Distance', 'IsDelay', 'UniqueCarrier_index', 'FlightNum_index', 'Origin_index', 'Dest_index'])
use_data.printSchema()

['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime', 'UniqueCarrier', 'FlightNum', 'ActualElapsedTime', 'CRSElapsedTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest', 'Distance', 'IsDelay', 'UniqueCarrier_index', 'FlightNum_index', 'Origin_index', 'Dest_index']
root
 |-- Month: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- Distance: double (nullable = true)
 |-- IsDelay: integer (nullable = true)
 |-- UniqueCarrier_index: double (nullable = false)
 |-- FlightNum_index: double (nullable = false)
 |-- Origin_index: double (nullable = false)
 |-- Dest_index: double (nullable = false)



In [55]:
#分训练测试集
trainData, testData= use_data.randomSplit([0.7, 0.3], seed=7)

In [56]:
print(trainData.count(),testData.count())


901010 386323


## 向量化
spark机器学习的算法和sklearn不同，需要将特征的列组成一个向量输入算法模型里；  
这里使用VectorAssembler将给定的列列表组合到单个特征向量列中，就是一个feature

In [62]:
from pyspark.ml.feature import VectorAssembler  #一个 导入VerctorAssembler 将多个列合并成向量列的特征转换器,即将表中各列用一个类似list表示，输出预测列为单独一列。

assembler = VectorAssembler(inputCols=[ 'Month',  'DayOfWeek',  'CRSDepTime', 'CRSArrTime', 'CRSElapsedTime',  'Distance', 'UniqueCarrier_index', 'FlightNum_index', 'Origin_index', 'Dest_index'], outputCol="features")
trainset = assembler.transform(trainData)
trainset.printSchema()

trainset.select(['features','IsDelay']).show(10,False)

root
 |-- Month: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- Distance: double (nullable = true)
 |-- IsDelay: integer (nullable = true)
 |-- UniqueCarrier_index: double (nullable = false)
 |-- FlightNum_index: double (nullable = false)
 |-- Origin_index: double (nullable = false)
 |-- Dest_index: double (nullable = false)
 |-- features: vector (nullable = true)

+---------------------------------------------------+-------+
|features                                           |IsDelay|
+---------------------------------------------------+-------+
|[10.0,1.0,1.0,556.0,235.0,1846.0,1.0,679.0,5.0,0.0]|0      |
|[10.0,1.0,1.0,556.0,235.0,1846.0,1.0,679.0,5.0,0.0]|0      |
|[10.0,1.0,5.0,35.0,150.0,987.0,10.0,55.0,76.0,19.0]|0      |
|[10.0,1.0,5.0,35.0,150.0,987.0,10.0,55.0,76.0,19.0]|0      |
|[10.0,1.0,5.0,35.0,150.0,987.0,10.0,55

## 建模
使用随机森林建模,随机森林是基于决策树的，决策树是最常见的判别式的建模算法，比如我们看这个飞机延迟问题可以从各个特征开始判断，比如几月，星期几、然后是哪家航空的，哪里飞到哪。用算法可以训练出一个树形的判别方式，最后判别是否延迟。

In [66]:
from pyspark.ml.classification import RandomForestClassifier

rfc = RandomForestClassifier(numTrees=50, labelCol="IsDelay", seed=7  ,maxDepth=8 , maxBins = 50)

rfcModel = rfc.fit(trainset.select(['features','IsDelay']))
     
#输出模型特征重要性、子树权重
print("模型特征重要性:{}".format(rfcModel.featureImportances))
print("模型特征数:{}".format(rfcModel.numFeatures))

Name: org.apache.toree.interpreter.broker.BrokerException
Message: Traceback (most recent call last):
  File "/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o1596.fit.
: java.lang.IllegalArgumentException: requirement failed: DecisionTree requires maxBins (= 50) to be at least as large as the number of values in each categorical feature, but categorical feature 7 has 2161 values. Considering remove this and other categorical features with a large number of values, or add more training examples.
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.ml.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:137)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:106)
	at org.apache.spark.ml.classification.RandomForest

In [67]:
#4.测试
testset =  assembler.transform(testData)
 
print("测试样本数:{}".format(testset.count()))
print(testset.show())
 
 
result = rfcModel.transform(testset)
result.show()
 
#5.分类效果评估
total_amount=result.count()
correct_amount = result.filter(result.IsDelay==result.prediction).count()
precision_rate = correct_amount/total_amount
print("预测准确率为:{}".format(precision_rate))
 
positive_precision_amount = result.filter(result.indexed == 0).filter(result.prediction == 0).count()
negative_precision_amount = result.filter(result.indexed == 1).filter(result.prediction == 1).count()
positive_false_amount = result.filter(result.indexed == 0).filter(result.prediction == 1).count()
negative_false_amount = result.filter(result.indexed == 1).filter(result.prediction == 0).count()
 
print("正样本预测准确数量:{},负样本预测准确数量:{}".format(positive_precision_amount,negative_precision_amount))
 
positive_amount = result.filter(result.indexed == 0).count()
negative_amount = result.filter(result.indexed == 1).count()
 
print("正样本数:{},负样本数:{}".format(positive_amount,negative_amount))
print("正样本预测错误数量:{},负样本错误准确数量:{}".format(positive_false_amount,negative_false_amount))
 
recall_rate1 = positive_precision_amount/positive_amount
recall_rate2 = negative_precision_amount/negative_amount
 
print("正样本召回率为:{},负样本召回率为:{}".format(recall_rate1,recall_rate2)) 


测试样本数:386323
+-----+---------+----------+----------+--------------+--------+-------+-------------------+---------------+------------+----------+--------------------+
|Month|DayOfWeek|CRSDepTime|CRSArrTime|CRSElapsedTime|Distance|IsDelay|UniqueCarrier_index|FlightNum_index|Origin_index|Dest_index|            features|
+-----+---------+----------+----------+--------------+--------+-------+-------------------+---------------+------------+----------+--------------------+
|   10|        1|         1|       556|           235|  1846.0|      0|                1.0|          679.0|         5.0|       0.0|[10.0,1.0,1.0,556...|
|   10|        1|         5|        40|            35|   142.0|      0|                0.0|           92.0|        21.0|      59.0|[10.0,1.0,5.0,40....|
|   10|        1|         5|        40|            35|   142.0|      0|                0.0|           92.0|        21.0|      59.0|[10.0,1.0,5.0,40....|
|   10|        1|         5|       114|            69|   337.0|      

Name: org.apache.toree.interpreter.broker.BrokerException
Message: Traceback (most recent call last):
  File "/tmp/kernel-PySpark-1b31336a-190d-42fd-82a4-15c313e78824/pyspark_runner.py", line 194, in <module>
    eval(compiled_code)
  File "<string>", line 4, in <module>
NameError: name 'rfcModel' is not defined

StackTrace: org.apache.toree.interpreter.broker.BrokerState$$anonfun$markFailure$1.apply(BrokerState.scala:163)
org.apache.toree.interpreter.broker.BrokerState$$anonfun$markFailure$1.apply(BrokerState.scala:163)
scala.Option.foreach(Option.scala:257)
org.apache.toree.interpreter.broker.BrokerState.markFailure(BrokerState.scala:162)
sun.reflect.GeneratedMethodAccessor91.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway