# Trabajo Final de Big Data

Porfirio Ángel Díaz Sánchez

---

## Requisitos del proyecto

**Descargar librerías:**

https://bit.ly/2NqWD2k

**Descomprir archivos en `file:/home/cloudera/`**

`commons-csv-1.4.jar` y `spark-csv_2.10-1.5.0.jar`

---

## Preparación de los datos

**Definición de `SparkContext` y `SQLContext`**

In [1]:
# En este caso se encuentran definidos por defecto, si no es así, descomentar el siguiente código.
# from pyspark import SparkContext
# sc = SparkContext()
# from pyspark.sql import SQLContext
# sqlContext=SQLContext(sc)

In [2]:
# Se comprueba la versión del SparkContext
sc.version

'1.6.0'

**Lectura del dataset**

In [3]:
file = 'file:/home/cloudera/Documents/trabajo-final-big-data/On_Time_On_Time_Performance_2017_8.csv'

bd = sqlContext.read.format("com.databricks.spark.csv") \
    .option("header", "true") \
    .load(file, inferSchema=True)

## Análisis 1

Análisis de las variables relacionadas con la cancelación de vuelos.

In [4]:
bd.dtypes

[('Year', 'int'),
 ('Quarter', 'int'),
 ('Month', 'int'),
 ('DayofMonth', 'int'),
 ('DayOfWeek', 'int'),
 ('FlightDate', 'string'),
 ('UniqueCarrier', 'string'),
 ('AirlineID', 'int'),
 ('Carrier', 'string'),
 ('TailNum', 'string'),
 ('FlightNum', 'int'),
 ('OriginAirportID', 'int'),
 ('OriginAirportSeqID', 'int'),
 ('OriginCityMarketID', 'int'),
 ('Origin', 'string'),
 ('OriginCityName', 'string'),
 ('OriginState', 'string'),
 ('OriginStateFips', 'int'),
 ('OriginStateName', 'string'),
 ('OriginWac', 'int'),
 ('DestAirportID', 'int'),
 ('DestAirportSeqID', 'int'),
 ('DestCityMarketID', 'int'),
 ('Dest', 'string'),
 ('DestCityName', 'string'),
 ('DestState', 'string'),
 ('DestStateFips', 'int'),
 ('DestStateName', 'string'),
 ('DestWac', 'int'),
 ('CRSDepTime', 'int'),
 ('DepTime', 'int'),
 ('DepDelay', 'double'),
 ('DepDelayMinutes', 'double'),
 ('DepDel15', 'double'),
 ('DepartureDelayGroups', 'int'),
 ('DepTimeBlk', 'string'),
 ('TaxiOut', 'double'),
 ('WheelsOff', 'int'),
 ('Wheels

**Obtiene número de registros**

In [5]:
p1 = bd.count()

**Elimina vuelos desviados**

In [6]:
bd = bd.filter(bd.Diverted == 0)
p2 = bd.count()

**Imputa con valor 0 los datos faltantes**

In [7]:
bd = bd.na.fill({
        'CarrierDelay':0,
        'WeatherDelay':0,
        'NASDelay':0,
        'SecurityDelay':0,
        'LateAircraftDelay':0
    })

**Crea la columna 'Tarde'**

In [8]:
bd = bd.withColumn('Tarde', (bd.CRSDepTime < 2100) & (bd.CRSDepTime >= 1600))

**Obtiene porcentaje de vuelos cancelados que salieron por la tarde**

In [9]:
vuelos_tarde = bd.filter(bd.Tarde == True)
vuelos_tarde_cancelados = vuelos_tarde.filter(vuelos_tarde.Cancelled > 0)
num_tarde = vuelos_tarde.count()
num_tarde_cancelados = vuelos_tarde_cancelados.count()
porcentaje_cancelados_tarde = round((num_tarde_cancelados / num_tarde) * 100, 2)
p3 = porcentaje_cancelados_tarde

**Obtiene distancia promedio de los vuelos cancelados**

In [10]:
distancia_promedio = bd.filter(bd.Cancelled > 0).select('Distance').groupBy().mean().first()[0]
p4 = distancia_promedio

**Obtiene el aeropuerto de origen con mayor tasa de vuelos cancelados**

In [11]:
from pyspark.sql.functions import col, sum, round, count, desc

mas_cancelados = bd \
    .groupBy('Origin') \
    .agg(round(sum('Cancelled') / count('*') * 100, 2).alias('Porcentaje')) \
    .sort(desc('Porcentaje')) \
    .select('Origin', 'Porcentaje') \
    .first()

print(mas_cancelados)
p5 = mas_cancelados

Row(Origin='CRP', Porcentaje=21.95)


**Crea vector de features y renombra la variable respuesta a label**

In [12]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

a1 = VectorAssembler(
    inputCols=['DayofMonth', 'DayOfWeek', 'Tarde', 'Distance'],
    outputCol='features'
)

bd2 = a1.transform(bd).select(col('Cancelled').cast('double').alias('label'), 'features')

**Partición en conjuntos de Train y Test**

In [13]:
(bd2_train, bd2_test) = bd2.randomSplit([0.7, 0.3],seed=123)

**Obtiene número de observaciones de train**

In [14]:
p6 = bd2_train.count()

**Modelo de regresión logística**

In [15]:
from pyspark.ml.classification import LogisticRegression

lgr = LogisticRegression(
    maxIter=10,
    labelCol="label",
    featuresCol="features",
    threshold=0.5
)

lgr_model = lgr.fit(bd2_train)

**Obtiene coeficiente de variable `Tarde`**

In [16]:
p7 = lgr_model.coefficients[2]

**Obtiene área bajo la curva evaluada en `test`**

In [17]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator as BCE

pred_test = lgr_model.transform(bd2_test)
auc_test = BCE(metricName = "areaUnderROC", rawPredictionCol = 'probability').evaluate(pred_test)
p8 = auc_test



**Modelo de bosque aleatorio**

In [18]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import RandomForestClassifier

stringIndexer = StringIndexer(inputCol = 'label', outputCol = 'indexed')
sI = stringIndexer.fit(bd2_train)
bd2_train = sI.transform(bd2_train)

rf = RandomForestClassifier(
    labelCol="indexed",
    featuresCol="features",
    numTrees=500,
    maxDepth=3,
    seed = 123,
    featureSubsetStrategy="sqrt",
    impurity='gini'
)

rf_model = rf.fit(bd2_train)

**Obtiene área bajo la curva evaluada en `test`**

In [19]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator as BCE

pred_test = rf_model.transform(bd2_test)
auc_test = BCE(metricName = "areaUnderROC", rawPredictionCol = 'probability').evaluate(pred_test)
p9 = auc_test

**Red neuronal de clasificación**

In [20]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

mlp = MultilayerPerceptronClassifier(
    labelCol="label",
    featuresCol="features",
    maxIter=100,
    layers=[4, 5, 2],
    seed=123
)

mlp_model = rf.fit(bd2_train)

**Obtiene valor de índice precision**

In [21]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator as MCCE

pred_test = mlp_model.transform(bd2_test)
precision_test = MCCE(metricName="precision").evaluate(pred_test)
p10 = precision_test

### Preguntas

**1. ¿Cuántas observaciones tiene la base de datos?**

In [22]:
print(p1)

510451


**2. Tras eliminar los vuelos desviados (Diverted==1), ¿cuántas observaciones quedan?**

In [23]:
print(p2)

509165


**3. ¿Qué porcentaje de vuelos que han salido por la tarde han sido cancelados?**

In [24]:
print(p3)

2.53


**4. ¿Cuál es la distancia promedio de los vuelos cancelados?**

In [25]:
print(p4)

749.8705689874569


**5. Cuál es el aeropuerto de origen con mayor tasa de vuelos cancelados**

In [26]:
print(p5)

Row(Origin='CRP', Porcentaje=21.95)


**6. Tras dividir la base de datos en los conjuntos de train (70%) y test (30%), ¿Cuántas observaciones tiene el conjunto de train?**

In [27]:
print(p6)

356382


**7. Tras ejecutar el modelo de regresión logística para predecir la variable `Cancelled` empleando las variables `DayOfMonth`, `DayOfweek`, `Tarde`, `Distance`, ¿Cuál es el coeficiente de la variable `Tarde`?**

In [28]:
print(p7)

0.0432159308007


**8. Bajo el anterior modelo, ¿Cuál es el área bajo la curva evaluada en la base de datos `test`?**

In [29]:
print(p8)

0.6511192770431581


**9. Tras ejecutar un bosque aleatorio en el contexto anterior, considerando las mismas variables, ¿Cuánto vale el área bajo la curva evaluada en la base de datos `test`?**

In [30]:
print(p9)

0.7847305688776101


**10. Tras ejecutar una red neuronal en el contexto anterior (MultilayerPerceptronClassifier), ¿Cuánto vale el índice precision?**

In [31]:
print(p10)

0.9780276601454351


## Análisis 2

**Lee nuevamente el dataset**

In [45]:
file = 'file:/home/cloudera/Documents/trabajo-final-big-data/On_Time_On_Time_Performance_2017_8.csv'

bd = sqlContext.read.format("com.databricks.spark.csv") \
    .option("header", "true") \
    .load(file, inferSchema=True)

bd = bd.na.fill({
        'CarrierDelay':0,
        'WeatherDelay':0,
        'NASDelay':0,
        'SecurityDelay':0,
        'LateAircraftDelay':0
    })

**Elimina los datos correspondientes a vuelos cancelados y desviados**

In [46]:
bd = bd.filter((bd.Cancelled == 0) & (bd.Diverted == 0))

**Obtiene número de observaciones tras eliminar cancelados y desviados**

In [47]:
p11 = bd.count()

**Define variable `Retraso2`, que consiste en restarle al retraso total el retraso en la salida**

In [49]:
bd = bd.withColumn('Retraso2', (bd.ArrDelay - bd.LateAircraftDelay))

**Calcula promedio de variable `Retraso2`**

In [50]:
from pyspark.sql.functions import col, avg

promedio = bd.select(avg(col('Retraso2'))).first()[0]
p12 = promedio

**Obtiene aeropuerto de origen con mayor promedio en variable `Retraso2`**

In [53]:
from pyspark.sql.functions import col, avg

mas_retraso2 = bd \
    .groupBy('Origin') \
    .agg(avg(col('Retraso2')).alias('PromedioRetraso')) \
    .sort(desc('PromedioRetraso')) \
    .select('Origin', 'PromedioRetraso') \
    .first()

p13 = mas_retraso2

**Calcula correlación entre `DepDelay` y `Retraso2`**

In [55]:
correlacion = bd.stat.corr('DepDelay', 'Retraso2')
p14 = correlacion

**Vuelve a definir la variable `Tarde`**

In [57]:
bd = bd.withColumn('Tarde', (bd.CRSDepTime < 2100) & (bd.CRSDepTime >= 1600))

**Crea vector de features con las columnas `DepDelay`, `DayOfWeek`, `Tarde` y `Distance`**

In [58]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

a1 = VectorAssembler(
    inputCols=['DepDelay', 'DayOfWeek', 'Tarde', 'Distance'],
    outputCol='features'
)

bd3 = a1.transform(bd).select(col('Retraso2').cast('double').alias('label'), 'features')

**Particiona base de datos en conjuntos `Train` y `Test`**

In [59]:
(bd3_train, bd3_test) = bd3.randomSplit([0.7, 0.3],seed=456)

**Obtiene número de registros en `train`**

In [60]:
p15 = bd3_train.count()

**Modelo de regresión lineal**

In [65]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(labelCol="label", featuresCol="features",)

lr_model = lr.fit(bd3_train)

**Obtiene los coeficientes para variable `Tarde`**

In [66]:
p16 = lr_model.coefficients[2]

**Obtiene índice R2 para base de `train`**

In [69]:
from pyspark.ml.evaluation import RegressionEvaluator

pred_train = lr_model.transform(bd3_train)
p17 = RegressionEvaluator(metricName="r2").evaluate(pred_train)

**Obtiene índice R2 para base de `test`**

In [70]:
from pyspark.ml.evaluation import RegressionEvaluator

pred_test = lr_model.transform(bd3_test)
p18 = RegressionEvaluator(metricName="r2").evaluate(pred_test)

**Árbol de regresión**

In [83]:
from pyspark.ml.regression import DecisionTreeRegressor as DTR

dtr = DTR(maxDepth=10)

dtr_model = dtr.fit(bd3_train)

**Obtiene índice R2 para base de `train`**

In [84]:
from pyspark.ml.evaluation import RegressionEvaluator

pred_train = dtr_model.transform(bd3_train)
p19 = RegressionEvaluator(metricName="r2").evaluate(pred_train)

**Obtiene índice R2 para base de `train`**

In [85]:
from pyspark.ml.evaluation import RegressionEvaluator

pred_test = dtr_model.transform(bd3_test)
p20 = RegressionEvaluator(metricName="r2").evaluate(pred_test)

### Preguntas

**11. Tras eliminar los vuelos cancelados y desviados, ¿Cuántas observaciones contiene la base de datos actual?**

In [51]:
print(p11)

498163


**12. Tras generar la variable `Retraso2 = ArrDelay - LateAircraftDelay`, ¿Cuál es su promedio en el conjunto de datos actuales?**

In [52]:
print(p12)

1.2535997253910869


**13. ¿Cuál es el aeropuerto de origen con mayor promedio para la variable `Retraso2`?**

In [54]:
print(p13)

Row(Origin='RDD', PromedioRetraso=35.8876404494382)


**14. ¿Cuánto vale la correlación entre `DepDelay` y `Retraso2`?**

In [56]:
print(p14)

0.7985696098228822


**15. Configurando unas nuevas bases de datos de `train` (70%) y `test` (30%), con la semilla 456, ¿Cuántos casos contiene la base de datos de `train`?**

In [61]:
print(p15)

348841


**16. Tras construir el modelo de regresión lineal con las variables `DepDelay`, `Distance`, `DayOfWeek`, `Tarde`, para explicar la variable `Retraso2`, ¿Cuánto vale el coeficiente del modelo para `Tarde`?**

In [67]:
print(p16)

-2.56672339152


**17. ¿Cuánto vale el índice R2 empleando los datos de la base de `train`?**

In [71]:
print(p17)

0.6389857542184951


**18. ¿Cuánto vale el índice R2 empleando los datos de la base de `test`?**

In [72]:
print(p18)

0.639342954097753


**19. Considera un árbol de regresión en el contexto anterior con todos los parámetros por defecto y una profundidad de 10. ¿Cuánto vale el índice de ajuste R2 empleando la base de datos de `train`?**

In [86]:
print(p19)

0.3924315682261269


**20. Considerando el árbol anterior, ¿Cuánto vale el índice de ajuste R2 para la base de datos `test`?**

In [87]:
print(p20)

0.3430251883501948
