# Árboles de Regresión

In [1]:
#from pyspark import SparkContext
#sc = SparkContext()
#from pyspark.sql import SQLContext
#sqlContext=SQLContext(sc)

In [2]:
bd5 = sqlContext.read.format(
    "com.databricks.spark.csv"
).option("header", "true").load("bd5.csv", inferSchema=True)
sqlContext.registerDataFrameAsTable(bd5, "bd5")

In [3]:
bd5.dtypes

[('YEAR', 'int'),
 ('MONTH', 'int'),
 ('DAY_OF_MONTH', 'int'),
 ('DAY_OF_WEEK', 'int'),
 ('CRS_DEP_TIME', 'int'),
 ('OP_UNIQUE_CARRIER', 'string'),
 ('TAIL_NUM', 'string'),
 ('ARR_DELAY', 'double'),
 ('DEP_DELAY', 'double'),
 ('ORIGIN', 'string'),
 ('DEST', 'string'),
 ('DISTANCE', 'double'),
 ('CANCELLED', 'double'),
 ('DIVERTED', 'double'),
 ('CARRIER_DELAY', 'double'),
 ('WEATHER_DELAY', 'double'),
 ('NAS_DELAY', 'double'),
 ('SECURITY_DELAY', 'double'),
 ('LATE_AIRCRAFT_DELAY', 'double'),
 ('LogD', 'double'),
 ('Retraso', 'int'),
 ('RetrasoNeto', 'double'),
 ('Horario', 'int')]

In [4]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='OP_UNIQUE_CARRIER',outputCol='IndexUniqueCarrier') #el índice empieza en el 0!
bd6=indexer.fit(bd5).transform(bd5)

bd6.groupBy('OP_UNIQUE_CARRIER','IndexUniqueCarrier').count().sort('IndexUniqueCarrier').show()


+-----------------+------------------+-----+
|OP_UNIQUE_CARRIER|IndexUniqueCarrier|count|
+-----------------+------------------+-----+
|               AA|               0.0| 8853|
|               UA|               1.0| 6112|
|               WN|               2.0| 5395|
|               DL|               3.0| 4239|
|               VX|               4.0| 1703|
|               NK|               5.0| 1581|
|               F9|               6.0| 1295|
|               OO|               7.0| 1166|
|               B6|               8.0|  121|
|               EV|               9.0|    1|
+-----------------+------------------+-----+



## Ajuste del modelo

In [5]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

a1  = VectorAssembler(
    inputCols=['DEP_DELAY','DISTANCE','DAY_OF_WEEK',
               'CRS_DEP_TIME','IndexUniqueCarrier'],
    outputCol='features')

bd7 = a1.transform(bd6).select(col("ARR_DELAY").alias("label"),'features')

### Partición Test - Train

In [6]:
(bd_train, bd_test) = bd7.randomSplit([0.7, 0.3],seed=123)
print(bd_train.count())
print(bd_test.count())

21278
9188


In [7]:
from pyspark.ml.regression import DecisionTreeRegressor as DTR

rt = DTR(maxDepth=5)

model = rt.fit(bd_train)
pred = model.transform(bd7)

In [8]:
pred.show()

+-----+--------------------+--------------------+
|label|            features|          prediction|
+-----+--------------------+--------------------+
|-19.0|[-8.0,236.0,4.0,1...| -11.634247284014485|
|  7.0|[6.0,236.0,4.0,12...|-0.49661508704061896|
|-12.0|[-5.0,236.0,4.0,1...| -11.634247284014485|
|-14.0|[-6.0,236.0,4.0,8...| -11.634247284014485|
|-15.0|[-5.0,651.0,4.0,2...| -11.634247284014485|
|  0.0|[-5.0,370.0,4.0,1...| -11.634247284014485|
|-23.0|[-8.0,868.0,4.0,1...| -11.634247284014485|
|-16.0|[-6.0,1464.0,4.0,...| -11.634247284014485|
| -3.0|[-3.0,1464.0,4.0,...|  -7.959116541353383|
|  3.0|[-6.0,1055.0,4.0,...| -11.634247284014485|
|-16.0|[-11.0,255.0,4.0,...| -11.634247284014485|
|-11.0|[0.0,1440.0,4.0,1...|  -4.694469357249626|
|  0.0|[1.0,641.0,4.0,22...|  -4.694469357249626|
| -4.0|[2.0,1440.0,4.0,1...|-0.49661508704061896|
|  3.0|[-9.0,1055.0,4.0,...| -11.634247284014485|
| -7.0|[5.0,1055.0,4.0,1...|-0.49661508704061896|
| 18.0|[7.0,370.0,4.0,20...|   10.05084745762712|


In [9]:
pred.groupBy('prediction').count().show(50)


+--------------------+-----+
|          prediction|count|
+--------------------+-----+
|   81.25139664804469|  256|
|  49.261538461538464|   90|
|   296.5596330275229|  148|
|  106.18518518518519|   80|
|   34.88826815642458|  252|
|  13.069019607843137| 1840|
|    3.99789029535865| 2025|
|  -4.694469357249626| 4809|
|               116.0|    3|
|   222.1494252873563|  113|
|   20.07672301690507| 1091|
|   137.3181818181818|   32|
|  100.92307692307692|   14|
|  38.726384364820845|  867|
|   10.05084745762712|  509|
|  223.48591549295776|  209|
|   180.7741935483871|   92|
|  59.922680412371136|  287|
|               248.0|    2|
|  -7.959116541353383| 6140|
|              319.25|   30|
|  26.604417670682732|  685|
| -11.634247284014485| 5515|
|   73.29629629629629|  667|
|              52.194|  722|
|  112.34075723830735|  617|
|   888.6666666666666|    5|
|-0.49661508704061896| 2959|
|   95.57894736842105|   29|
|  141.56521739130434|   34|
|   93.73684210526316|   58|
|   190.804232

In [10]:
from pyspark.ml.evaluation import RegressionEvaluator
print(RegressionEvaluator(metricName="r2").evaluate(pred))

0.7232582316927613


## Tuneado de parámetros

In [11]:
# DecisionTreeRegressor(featuresCol="features", 
#    labelCol="label", 
#    predictionCol="prediction", 
#    maxDepth=5, 
#    maxBins=32, 
#    minInstancesPerNode=1, 
#    minInfoGain=0.0, 
#    maxMemoryInMB=256, 
#    impurity="variance")

In [12]:
rt = DTR(maxDepth=20,minInstancesPerNode=10,maxBins=50)
model = rt.fit(bd_train)
pred = model.transform(bd7)
print(RegressionEvaluator(metricName="r2").evaluate(pred))

0.7844910947404549


### Validación externa

In [13]:
pred2 = model.transform(bd_test)
print(RegressionEvaluator(metricName="r2").evaluate(pred2))

0.7575124312762554
