# ETL

In [1]:

from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.types import StructType
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
import numpy as np
from pyspark.sql.functions import isnan, when, count, col
from pyspark.rdd import RDD
import copy
import pandas as pd
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import udf, isnan, when, count, col, to_date, regexp_replace, concat_ws, abs
from pyspark.sql import functions as f
from pyspark.sql.functions import monotonically_increasing_id, upper, concat 

import datetime
from pyspark.sql.functions import year, month, dayofmonth

In [2]:
spark_context = SparkContext()
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession

In [None]:
# Cargar las bases de datos 
dfv = sql_context.read.format("csv").option("header", "true").option("infersSchema", "true").load("vuelosEtapa3.csv")
dfa = sql_context.read.format("csv").option("header", "true").option("infersSchema", "true").load("aeropuertosEtapa3.csv")


In [4]:
dfe = sql_context.read.format("csv").option("header", "true").option("infersSchema", "true").load("doc8643AircraftTypes.csv")

In [5]:
#Dimension fecha
fecha = dfv.alias("fecha")

In [6]:
#Transformació, dimensión fecha
fecha= fecha.select("ano","mes")

In [7]:
#Eliminar duplicados
fecha=fecha.drop_duplicates()


In [8]:
fecha= fecha.withColumn("mes", fecha["mes"].cast("integer"))

In [9]:
#Ordenar

fecha=fecha.sort("ano","mes")

In [10]:
#Agregar columna key
fecha = fecha.withColumn(
    "fecha_key",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)

In [11]:
fecha.summary().show()
fecha.head(5)


+-------+-----------------+------------------+-----------------+
|summary|              ano|               mes|        fecha_key|
+-------+-----------------+------------------+-----------------+
|  count|              108|               108|              108|
|   mean|           2014.0|               6.5|             54.5|
| stddev|2.594026207002917|3.4681461017635296|31.32091952673165|
|    min|             2010|                 1|                1|
|    25%|           2012.0|                 3|               27|
|    50%|           2014.0|                 6|               54|
|    75%|           2016.0|                 9|               81|
|    max|             2018|                12|              108|
+-------+-----------------+------------------+-----------------+



[Row(ano='2010', mes=1, fecha_key=1),
 Row(ano='2010', mes=2, fecha_key=2),
 Row(ano='2010', mes=3, fecha_key=3),
 Row(ano='2010', mes=4, fecha_key=4),
 Row(ano='2010', mes=5, fecha_key=5)]

In [12]:
fecha.printSchema()

root
 |-- ano: string (nullable = true)
 |-- mes: integer (nullable = true)
 |-- fecha_key: integer (nullable = true)



In [13]:
#Exportar
fecha.toPandas().to_csv('fecha_v4.csv',index=False)

In [14]:
#Dimension empresa
empresa=dfv.alias("empresa")

In [15]:
#Transformación, dimensión empresa
empresa= empresa.select("empresa")

In [16]:
#Eliminar duplicados
empresa=empresa.drop_duplicates()

In [17]:
#Ordenar

empresa=empresa.sort("empresa")

In [18]:
#Agregar columna key
empresa = empresa.withColumn(
    "empresa_key",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)



In [19]:
#Exportar
empresa.toPandas().to_csv('empresa_v4.csv',index=False)



In [20]:
empresa.summary().show()
empresa.head(5)

+-------+--------------------+-----------------+
|summary|             empresa|      empresa_key|
+-------+--------------------+-----------------+
|  count|                 162|              162|
|   mean|                null|             81.5|
| stddev|                null|46.90948731333567|
|    min|"SERVICIO AÉREO R...|                1|
|    25%|                null|               41|
|    50%|                null|               81|
|    75%|                null|              122|
|    max|      WESTERN GLOBAL|              162|
+-------+--------------------+-----------------+



[Row(empresa='"SERVICIO AÉREO REGIONAL ""REGAIR"" CIA. LTDA."', empresa_key=1),
 Row(empresa='21 AIR', empresa_key=2),
 Row(empresa='ABSA', empresa_key=3),
 Row(empresa='ABX AIR INC SUCURSAL COLOMBIANA', empresa_key=4),
 Row(empresa='AER CARIBE LIMITADA', empresa_key=5)]

In [21]:
empresa.printSchema()

root
 |-- empresa: string (nullable = true)
 |-- empresa_key: integer (nullable = true)



In [22]:
#Dimension tipo de vuelo
tipo_de_vuelo = dfv.alias("tipo_de_vuelo")

In [23]:
#Transformación tipo de vuelo
tipo_de_vuelo= tipo_de_vuelo.select("tipo_vuelo")

In [24]:
#Eliminar duplicados
tipo_de_vuelo=tipo_de_vuelo.drop_duplicates()

In [25]:
#Ordenar
tipo_de_vuelo=tipo_de_vuelo.sort("tipo_vuelo")

In [26]:
#Agregar columna key
tipo_de_vuelo = tipo_de_vuelo.withColumn(
    "tipo_de_vuelo_key",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)


In [27]:
#Exportar
tipo_de_vuelo.toPandas().to_csv('tipo_de_vuelo_v4.csv',index=False)

In [28]:
tipo_de_vuelo.summary().show()
tipo_de_vuelo.head(5)

+-------+----------+------------------+
|summary|tipo_vuelo| tipo_de_vuelo_key|
+-------+----------+------------------+
|  count|         4|                 4|
|   mean|      null|               2.5|
| stddev|      null|1.2909944487358056|
|    min|         A|                 1|
|    25%|      null|                 1|
|    50%|      null|                 2|
|    75%|      null|                 3|
|    max|         T|                 4|
+-------+----------+------------------+



[Row(tipo_vuelo='A', tipo_de_vuelo_key=1),
 Row(tipo_vuelo='C', tipo_de_vuelo_key=2),
 Row(tipo_vuelo='R', tipo_de_vuelo_key=3),
 Row(tipo_vuelo='T', tipo_de_vuelo_key=4)]

In [29]:
tipo_de_vuelo.printSchema()

root
 |-- tipo_vuelo: string (nullable = true)
 |-- tipo_de_vuelo_key: integer (nullable = true)



In [30]:
#Dimension tráfico
trafico = dfv.alias("trafico")

In [31]:
#Transformación tipo de vuelo
trafico= trafico.select("trafico")

In [32]:
#Eliminar duplicados
trafico=trafico.drop_duplicates()

In [33]:
#Sustituir "nan" por "desconocido"
trafico=trafico.withColumn('trafico', regexp_replace('trafico', 'nan', 'desconocido')) 

In [34]:
#Ordenar

trafico=trafico.sort("trafico")

In [35]:
#Agregar columna key
trafico = trafico.withColumn(
    "trafico_key",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)


In [36]:
#Exportar
trafico.toPandas().to_csv('trafico_v4.csv',index=False)

In [37]:
trafico.summary().show()
trafico.head(5)

+-------+-------+------------------+
|summary|trafico|       trafico_key|
+-------+-------+------------------+
|  count|      2|                 2|
|   mean|   null|               1.5|
| stddev|   null|0.7071067811865476|
|    min|      I|                 1|
|    25%|   null|                 1|
|    50%|   null|                 1|
|    75%|   null|                 2|
|    max|      N|                 2|
+-------+-------+------------------+



[Row(trafico='I', trafico_key=1), Row(trafico='N', trafico_key=2)]

In [38]:
trafico.printSchema()

root
 |-- trafico: string (nullable = true)
 |-- trafico_key: integer (nullable = true)



In [39]:
#Dimension aeropuertos
df_aeropuertos = dfa.alias("df_aeropuertos")

In [40]:
df_aeropuertos

DataFrame[sigla: string, iata: string, nombre: string, municipio: string, departamento: string, categoria: string, latitud: string, longitud: string, propietario: string, explotador: string, longitud_pista: string, ancho_pista: string, pbmo: string, elevacion: string, resolucion: string, fecha_construccion: string, fecha_vigencia: string, clase: string, tipo: string, numero_vuelos_origen: string, gcd_departamento: string, gcd_municipio: string, Ano: string]

In [41]:
#Cambiar el tipo de las columnas
df_aeropuertos = df_aeropuertos.withColumn("longitud_pista",df_aeropuertos["longitud_pista"].cast(FloatType()))
df_aeropuertos = df_aeropuertos.withColumn("ancho_pista",df_aeropuertos["ancho_pista"].cast(FloatType()))
df_aeropuertos = df_aeropuertos.withColumn("pbmo",df_aeropuertos["pbmo"].cast(FloatType()))
df_aeropuertos = df_aeropuertos.withColumn("numero_vuelos_origen",df_aeropuertos["numero_vuelos_origen"].cast('int'))
df_aeropuertos = df_aeropuertos.withColumn("gcd_departamento",df_aeropuertos["gcd_departamento"].cast(StringType()))
df_aeropuertos = df_aeropuertos.withColumn("gcd_municipio",df_aeropuertos["gcd_municipio"].cast(StringType()))
df_aeropuertos = df_aeropuertos.withColumn('fecha_construccion',to_date(df_aeropuertos.fecha_construccion, 'yyyy-mm-dd'))
df_aeropuertos = df_aeropuertos.withColumn('fecha_vigencia',to_date(df_aeropuertos.fecha_vigencia, 'yyyy-mm-dd'))
df_aeropuertos = df_aeropuertos.withColumn("resolucion",df_aeropuertos["resolucion"].cast(StringType()))
df_aeropuertos = df_aeropuertos.withColumn("latitud",df_aeropuertos["latitud"].cast(FloatType()))
df_aeropuertos = df_aeropuertos.withColumn("longitud",df_aeropuertos["longitud"].cast(FloatType()))
df_aeropuertos = df_aeropuertos.withColumn("Ano",df_aeropuertos["Ano"].cast(StringType()))

In [42]:
#Eliminar columnas y otros ajustes
df_aeropuertos = df_aeropuertos.fillna({'pbmo': 0, 'resolucion': 0, 'propietario': 0})

In [43]:
#Ordenar por nombres
df_aeropuertos=df_aeropuertos.sort("nombre")

In [44]:
#Agregar columna key
df_aeropuertos = df_aeropuertos.withColumn(
    "aeropuertos_key",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)


In [45]:
df_aeropuertos = df_aeropuertos.select('aeropuertos_key','iata','sigla','nombre','categoria','clase', 'tipo','municipio',
                                       'departamento','propietario', 'explotador','resolucion','fecha_construccion',
                                       'fecha_vigencia','Ano','latitud','longitud','elevacion','numero_vuelos_origen',
                                       'longitud_pista','ancho_pista','pbmo')

In [46]:
#Copia del dataframe
df2_aeropuertos = df_aeropuertos.alias('df2_aeropuertos')

In [47]:
#Se utiliza un Window para extraer, del dataframe pasado por parámetro #Es decir, por cada llave natural, extrae la última versión.
window = Window.partitionBy(df2_aeropuertos['sigla']).orderBy(df2_aeropuertos['Ano'].desc())
df2_aeropuertos = df2_aeropuertos.select('*', f.rank().over(window).alias('rank')).filter(f.col('rank') == 1) 
df2_aeropuertos = df2_aeropuertos.drop('rank')

In [48]:
#Ajustar nombres en nuevo DF para poder hacer el join con el df original
df2_aeropuertos = df2_aeropuertos.selectExpr('aeropuertos_key','Ano as Ano_2')

In [49]:
#Realiza left join entre df_aeropuertos y df2_aeropuertos
df = df_aeropuertos.join(df2_aeropuertos, how = 'left', on ='aeropuertos_key')


In [50]:
df.show()

+---------------+----+-----+--------------------+---------+-----+----------+--------------+------------+-------------------+--------------------+----------+------------------+--------------+----+-------+--------+---------+--------------------+--------------+-----------+------+-----+
|aeropuertos_key|iata|sigla|              nombre|categoria|clase|      tipo|     municipio|departamento|        propietario|          explotador|resolucion|fecha_construccion|fecha_vigencia| Ano|latitud|longitud|elevacion|numero_vuelos_origen|longitud_pista|ancho_pista|  pbmo|Ano_2|
+---------------+----+-----+--------------------+---------+-----+----------+--------------+------------+-------------------+--------------------+----------+------------------+--------------+----+-------+--------+---------+--------------------+--------------+-----------+------+-----+
|              1|null|  ACP|            ACAPULCO|Aeródromo|   1A| Aerocivil|       Nunchía|    Casanare| YOLANDA CAMACHO V.|MARIA A. PACHECO ...|  4

In [51]:
#estado_actual = No, registros anteriores de modificación del aerorpuerto
#estado_actual = Si, último registro de modificación del aeropuerto
df = df.withColumn('estado_actual', f.when(df['Ano_2'].isNull(), 'No').otherwise('Si'))
df = df.withColumn('fecha_vigencia_final', f.when(df['estado_actual'] == 'Si', f.to_date(f.lit('2199-12-31'), 'yyyy-MM-dd')).
                   otherwise(f.to_date(concat(col('Ano'),f.lit('-12-31')), 'yyyy-MM-dd')))

In [52]:
# IDENTIFICAR PRIMER REGISTRO PARA ASIGNAR FECHA VIGENCIA INICIAL
#Se utiliza un Window para extraer, del dataframe pasado por parámetro #Es decir, por cada llave natural, extrae la última versión.
df2_aeropuertos = df_aeropuertos.alias('df2_aeropuertos')
window = Window.partitionBy(df2_aeropuertos['sigla']).orderBy(df2_aeropuertos['Ano'].asc())
df2_aeropuertos = df2_aeropuertos.select('*', f.rank().over(window).alias('rank')).filter(f.col('rank') == 1) 
df2_aeropuertos = df2_aeropuertos.drop('rank')

In [53]:
#Ajustar nombres en nuevo DF para poder hacer el join con el df original
df2_aeropuertos = df2_aeropuertos.selectExpr('aeropuertos_key','fecha_construccion as fecha_vigencia_inicial','Ano as Ano_3')
#Realiza left join entre df_aeropuertos y df2_aeropuertos
df = df.join(df2_aeropuertos, how = 'left', on ='aeropuertos_key')

In [54]:
#estado_inicial = No, registros posteriores de modificación del aerorpuerto
#estado_inicial = Si, primer registro de construcción del aeropuerto
df = df.withColumn('estado_inicial', f.when(df['Ano_3'].isNull(), 'No').otherwise('Si'))
df = df.withColumn('fecha_vigencia_inicial', f.when(df['estado_inicial'] == 'Si', df['fecha_construccion']).
                   otherwise(f.to_date(concat(col('Ano'),f.lit('-01-01')), 'yyyy-MM-dd')))

In [55]:
# Seleccionar columnas para archivo csv
df_aeropuertos = df.select('aeropuertos_key','iata','sigla','nombre','categoria','clase','tipo','municipio','departamento',
 'propietario','explotador','resolucion','fecha_construccion','fecha_vigencia','Ano',
          'latitud','longitud','elevacion','numero_vuelos_origen','longitud_pista','ancho_pista','pbmo',
          'estado_actual','fecha_vigencia_inicial','fecha_vigencia_final',year("fecha_vigencia_inicial").alias("ano_inicial"),year("fecha_vigencia_final").alias("ano_final"))

In [56]:
df_aeropuertos.select('nombre','fecha_construccion','Ano','fecha_vigencia_inicial','fecha_vigencia_final',"ano_inicial", "ano_final").show(20)

+--------------------+------------------+----+----------------------+--------------------+-----------+---------+
|              nombre|fecha_construccion| Ano|fecha_vigencia_inicial|fecha_vigencia_final|ano_inicial|ano_final|
+--------------------+------------------+----+----------------------+--------------------+-----------+---------+
|            ACAPULCO|        2011-01-06|2014|            2011-01-06|          2014-12-31|       2011|     2014|
|            ACAPULCO|        2011-01-06|2015|            2015-01-01|          2015-12-31|       2015|     2015|
|            ACAPULCO|        2011-01-06|2016|            2016-01-01|          2016-12-31|       2016|     2016|
|            ACAPULCO|        2011-01-06|2017|            2017-01-01|          2017-12-31|       2017|     2017|
|            ACAPULCO|        2011-01-06|2018|            2018-01-01|          2199-12-31|       2018|     2199|
|  AEROFLANDES - C.A.|        2013-01-20|2014|            2013-01-20|          2014-12-31|      

In [57]:
df_aeropuertos_csv=df_aeropuertos.alias("df_aeropuertos_csv")

In [58]:
df_aeropuertos_csv=df_aeropuertos_csv.drop("ano_inicial","ano_final")

In [59]:
df_aeropuertos_csv.printSchema()
df_aeropuertos_csv.summary().show()
df_aeropuertos_csv.head(5)

root
 |-- aeropuertos_key: integer (nullable = true)
 |-- iata: string (nullable = true)
 |-- sigla: string (nullable = true)
 |-- nombre: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- clase: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- departamento: string (nullable = true)
 |-- propietario: string (nullable = false)
 |-- explotador: string (nullable = true)
 |-- resolucion: string (nullable = false)
 |-- fecha_construccion: date (nullable = true)
 |-- fecha_vigencia: date (nullable = true)
 |-- Ano: string (nullable = true)
 |-- latitud: float (nullable = true)
 |-- longitud: float (nullable = true)
 |-- elevacion: string (nullable = true)
 |-- numero_vuelos_origen: integer (nullable = true)
 |-- longitud_pista: float (nullable = true)
 |-- ancho_pista: float (nullable = true)
 |-- pbmo: float (nullable = false)
 |-- estado_actual: string (nullable = false)
 |-- fecha_vigencia_inicial: date (nullab

[Row(aeropuertos_key=1, iata=None, sigla='ACP', nombre='ACAPULCO', categoria='Aeródromo', clase='1A', tipo='Aerocivil', municipio='Nunchía', departamento='Casanare', propietario='YOLANDA CAMACHO V.', explotador='MARIA A. PACHECO OCHOA', resolucion=' 4895,000', fecha_construccion=datetime.date(2011, 1, 6), fecha_vigencia=datetime.date(2014, 1, 9), Ano='2014', latitud=5.408599853515625, longitud=-71.86940002441406, elevacion='656.0', numero_vuelos_origen=0, longitud_pista=84.0, ancho_pista=15.0, pbmo=2000.0, estado_actual='No', fecha_vigencia_inicial=datetime.date(2011, 1, 6), fecha_vigencia_final=datetime.date(2014, 12, 31)),
 Row(aeropuertos_key=2, iata=None, sigla='ACP', nombre='ACAPULCO', categoria='Aeródromo', clase='1A', tipo='Privado', municipio='Nunchía', departamento='Casanare', propietario='YOLANDA CAMACHO V.', explotador='MARIA A. PACHECO OCHOA', resolucion=' 4895,000', fecha_construccion=datetime.date(2011, 1, 6), fecha_vigencia=datetime.date(2014, 1, 9), Ano='2015', latitud=

In [60]:
#Exportar
df_aeropuertos_csv.toPandas().to_csv('aeropuertos_v4.csv',index=False)



In [61]:
#Tipo de equipo

In [62]:
df_tipo_equipo_fuente = dfe.alias("df_tipo_equipo_fuente")

In [63]:
df_tipo_equipo_fuente.show(5)

+-------------------+-----------+----------+-----------+----------+--------------------+--------------+---+
|AircraftDescription|Description|Designator|EngineCount|EngineType|    ManufacturerCode| ModelFullName|WTC|
+-------------------+-----------+----------+-----------+----------+--------------------+--------------+---+
|          LandPlane|        L2J|      J328|          2|       Jet|328 SUPPORT SERVICES|Dornier 328JET|  M|
|          LandPlane|        L1P|      UL45|          1|    Piston|              3XTRIM|     450 Ultra|  L|
|          LandPlane|        L1P|      UL45|          1|    Piston|              3XTRIM|         Ultra|  L|
|          LandPlane|        L1P|      TR55|          1|    Piston|              3XTRIM|    550 Trener|  L|
|          LandPlane|        L1P|      TR55|          1|    Piston|              3XTRIM|      Trener  |  L|
+-------------------+-----------+----------+-----------+----------+--------------------+--------------+---+
only showing top 5 rows



In [64]:
# Verificación de datos nulos
df_tipo_equipo_fuente.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_tipo_equipo_fuente.columns]).show()

+-------------------+-----------+----------+-----------+----------+----------------+-------------+---+
|AircraftDescription|Description|Designator|EngineCount|EngineType|ManufacturerCode|ModelFullName|WTC|
+-------------------+-----------+----------+-----------+----------+----------------+-------------+---+
|                  0|          0|         0|          0|         0|               0|            0|  0|
+-------------------+-----------+----------+-----------+----------+----------------+-------------+---+



In [65]:
df_tipo_equipo_fuente = df_tipo_equipo_fuente.selectExpr('Designator as tipo_equipo','AircraftDescription as tipo_aeronave',
                                                         'Description as codigo_descripcion', 'EngineCount',
                                                         'EngineType as numero_motores', 'ManufacturerCode as fabricante',
                                                         'WTC')
df_tipo_equipo_fuente = df_tipo_equipo_fuente.distinct()

#Se utiliza un Window para extraer, del dataframe pasado por parámetro #Es decir, por cada llave natural, extrae la última versión.
df2_tipo_equipo_fuente = df_tipo_equipo_fuente.alias('df2_tipo_equipo_fuente')
window = Window.partitionBy(df2_tipo_equipo_fuente['tipo_equipo']).orderBy(df2_tipo_equipo_fuente['fabricante'].desc())
df2_tipo_equipo_fuente = df2_tipo_equipo_fuente.select('*', f.rank().over(window).alias('rank')).filter(f.col('rank') == 1) 
df2_tipo_equipo_fuente = df2_tipo_equipo_fuente.drop('rank')
print((df2_tipo_equipo_fuente.count(), len(df2_tipo_equipo_fuente.columns)))
df2_tipo_equipo_fuente.schema.names

(2640, 7)


['tipo_equipo',
 'tipo_aeronave',
 'codigo_descripcion',
 'EngineCount',
 'numero_motores',
 'fabricante',
 'WTC']

In [66]:
df_tipo_equipo_fuente =df_tipo_equipo_fuente.sort("tipo_equipo")

In [67]:
#Agregar columna key
df_tipo_equipo_fuente = df_tipo_equipo_fuente.withColumn(
    "tipo_equipo_key",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)


In [68]:
df_tipo_equipo_fuente.summary().show()

df_tipo_equipo_fuente.head(5)

+-------+-----------+-------------+------------------+------------------+--------------------+--------------------+----+------------------+
|summary|tipo_equipo|tipo_aeronave|codigo_descripcion|       EngineCount|      numero_motores|          fabricante| WTC|   tipo_equipo_key|
+-------+-----------+-------------+------------------+------------------+--------------------+--------------------+----+------------------+
|  count|       4113|         4113|              4113|              4113|                4113|                4113|4113|              4113|
|   mean|       null|         null|              null|1.3782534663099002|                null|                null|null|            2057.0|
| stddev|       null|         null|              null|0.6746749888689957|                null|                null|null|1187.4651573835756|
|    min|       A002|    Amphibian|               A1P|                 1|            Electric|328 SUPPORT SERVICES|   H|                 1|
|    25%|       null

[Row(tipo_equipo='A002', tipo_aeronave='Gyrocopter', codigo_descripcion='G1P', EngineCount='1', numero_motores='Piston', fabricante='IRKUT', WTC='L', tipo_equipo_key=1),
 Row(tipo_equipo='A1', tipo_aeronave='LandPlane', codigo_descripcion='L1P', EngineCount='1', numero_motores='Piston', fabricante='DOUGLAS', WTC='M', tipo_equipo_key=2),
 Row(tipo_equipo='A10', tipo_aeronave='LandPlane', codigo_descripcion='L2J', EngineCount='2', numero_motores='Jet', fabricante='FAIRCHILD (1)', WTC='M', tipo_equipo_key=3),
 Row(tipo_equipo='A109', tipo_aeronave='Helicopter', codigo_descripcion='H2T', EngineCount='2', numero_motores='Turboprop/Turboshaft', fabricante='DENEL', WTC='L', tipo_equipo_key=4),
 Row(tipo_equipo='A109', tipo_aeronave='Helicopter', codigo_descripcion='H2T', EngineCount='2', numero_motores='Turboprop/Turboshaft', fabricante='LEONARDO', WTC='L', tipo_equipo_key=5)]

In [69]:
df_tipo_equipo_fuente.printSchema()

root
 |-- tipo_equipo: string (nullable = true)
 |-- tipo_aeronave: string (nullable = true)
 |-- codigo_descripcion: string (nullable = true)
 |-- EngineCount: string (nullable = true)
 |-- numero_motores: string (nullable = true)
 |-- fabricante: string (nullable = true)
 |-- WTC: string (nullable = true)
 |-- tipo_equipo_key: integer (nullable = true)



In [70]:
#Exportar
df_tipo_equipo_fuente.toPandas().to_csv('tipo_equipo_v4.csv',index=False)

In [71]:
#Tabla de hechos
fact_vuelos = dfv.alias("fact_vuelos")


In [72]:
#Cambio del tipo de columna
fact_vuelos = fact_vuelos.withColumn("vuelos",fact_vuelos["vuelos"].cast('int'))
fact_vuelos = fact_vuelos.withColumn("sillas",fact_vuelos["sillas"].cast('int'))
fact_vuelos = fact_vuelos.withColumn("pasajeros",fact_vuelos["pasajeros"].cast('int'))
fact_vuelos = fact_vuelos.withColumn("carga_ofrecida",fact_vuelos["carga_ofrecida"].cast(FloatType()))
fact_vuelos = fact_vuelos.withColumn("carga_bordo",fact_vuelos["carga_bordo"].cast(FloatType()))
fact_vuelos = fact_vuelos.withColumn("ano",fact_vuelos["ano"].cast(StringType()))
fact_vuelos = fact_vuelos.withColumn("mes",fact_vuelos["mes"].cast(StringType()))

In [73]:
#
fact_vuelos = fact_vuelos.fillna({'sillas': 0, 'pasajeros': 0})
fact_vuelos = fact_vuelos.withColumn('trafico', regexp_replace('trafico', 'nan', 'N'))

In [74]:
ajuste_mes = {'1':'01', '2':'02', '3':'03', '4':'04', '5':'05', '6':'06', '7':'07', '8':'08', '9':'09'}
fact_vuelos = fact_vuelos.na.replace(ajuste_mes,2,'mes')

In [75]:
#Incluir foreign keys
key_fecha=fecha.alias("fecha")



In [76]:
#Fecha
fact_vuelos = fact_vuelos.join(key_fecha, ["ano","mes"] )



In [77]:
#Empresa
key_empresa=empresa.alias("key_empresa")

In [78]:
fact_vuelos = fact_vuelos.join(key_empresa, ["empresa"] )


In [79]:
#Tipo de vuelo
key_tipo_de_vuelo=tipo_de_vuelo.alias("key_tipo_de_vuelo")

In [80]:
fact_vuelos = fact_vuelos.join(key_tipo_de_vuelo, ["tipo_vuelo"] )

In [81]:
#Trafico
key_trafico=trafico.alias("key_trafico")

In [82]:
fact_vuelos = fact_vuelos.join(key_trafico, ["trafico"] )

In [83]:
fact_vuelos.printSchema()

root
 |-- trafico: string (nullable = true)
 |-- tipo_vuelo: string (nullable = true)
 |-- empresa: string (nullable = true)
 |-- ano: string (nullable = true)
 |-- mes: string (nullable = true)
 |-- origen: string (nullable = true)
 |-- destino: string (nullable = true)
 |-- tipo_equipo: string (nullable = true)
 |-- vuelos: integer (nullable = true)
 |-- sillas: integer (nullable = false)
 |-- carga_ofrecida: float (nullable = true)
 |-- pasajeros: integer (nullable = false)
 |-- carga_bordo: float (nullable = true)
 |-- fecha_key: integer (nullable = true)
 |-- empresa_key: integer (nullable = true)
 |-- tipo_de_vuelo_key: integer (nullable = true)
 |-- trafico_key: integer (nullable = true)



In [84]:
#Aeropuertos
key_aeropuertos=df_aeropuertos.alias("key_aeropuertos")

In [85]:
key_aeropuertos =key_aeropuertos.selectExpr("iata as origen_b","aeropuertos_key as origen_key", "ano_inicial","ano_final") 

In [86]:
fact_vuelos=fact_vuelos.join(key_aeropuertos, [(fact_vuelos.origen==key_aeropuertos.origen_b)&(fact_vuelos.ano >= key_aeropuertos.ano_inicial)&(fact_vuelos.ano <= key_aeropuertos.ano_final)])

In [87]:
fact_vuelos = fact_vuelos.drop("origen_b","ano_inicial","ano_final" )

In [88]:
key_aeropuertos2=df_aeropuertos.alias("key_aeropuertos2")

In [89]:
key_aeropuertos2 =key_aeropuertos2.selectExpr("iata as destino_b","aeropuertos_key as destino_key","ano_inicial","ano_final") 

In [90]:
fact_vuelos=fact_vuelos.join(key_aeropuertos2, [(fact_vuelos.destino==key_aeropuertos2.destino_b)&(fact_vuelos.ano >= key_aeropuertos2.ano_inicial)&(fact_vuelos.ano <= key_aeropuertos2.ano_final)])

In [91]:
fact_vuelos = fact_vuelos.drop("origen_b","ano_inicial","ano_final" )

In [92]:
#Tipo de equipo
key_tipo_equipo=df_tipo_equipo_fuente.alias("key_tipo_equipo")


In [93]:
fact_vuelos = fact_vuelos.join(key_tipo_equipo,["tipo_equipo"] )

In [94]:
#Transformación tabla de hechos
fact_vuelos= fact_vuelos.select("vuelos","sillas","pasajeros","carga_ofrecida","carga_bordo","fecha_key", "empresa_key", "tipo_de_vuelo_key", "trafico_key", "origen_key", "destino_key", "tipo_equipo_key")

In [95]:
#Agregar columna key
fact_vuelos = fact_vuelos.withColumn(
    "vuelos_key",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)


In [96]:
fact_vuelos.toPandas().to_csv('fact_vuelos_v4.csv',index=False)


In [97]:
fact_vuelos.summary().show()
fact_vuelos.printSchema()

+-------+------------------+-----------------+------------------+-----------------+-----------------+-----------------+-----------------+------------------+-----------+------------------+-----------------+------------------+-----------------+
|summary|            vuelos|           sillas|         pasajeros|   carga_ofrecida|      carga_bordo|        fecha_key|      empresa_key| tipo_de_vuelo_key|trafico_key|        origen_key|      destino_key|   tipo_equipo_key|       vuelos_key|
+-------+------------------+-----------------+------------------+-----------------+-----------------+-----------------+-----------------+------------------+-----------+------------------+-----------------+------------------+-----------------+
|  count|            166793|           166793|            166793|           166793|           166789|           166793|           166793|            166793|     166793|            166793|           166793|            166793|           166793|
|   mean| 8.822318682438711|

In [98]:
fact_vuelos.head(5)


[Row(vuelos=1, sillas=0, pasajeros=0, carga_ofrecida=0.0, carga_bordo=0.0, fecha_key=67, empresa_key=87, tipo_de_vuelo_key=4, trafico_key=2, origen_key=1188, destino_key=1203, tipo_equipo_key=847, vuelos_key=1),
 Row(vuelos=1, sillas=0, pasajeros=0, carga_ofrecida=0.0, carga_bordo=0.0, fecha_key=67, empresa_key=87, tipo_de_vuelo_key=4, trafico_key=2, origen_key=1188, destino_key=1203, tipo_equipo_key=846, vuelos_key=2),
 Row(vuelos=1, sillas=0, pasajeros=0, carga_ofrecida=0.0, carga_bordo=0.0, fecha_key=56, empresa_key=139, tipo_de_vuelo_key=4, trafico_key=2, origen_key=56, destino_key=1202, tipo_equipo_key=2271, vuelos_key=3),
 Row(vuelos=1, sillas=0, pasajeros=0, carga_ofrecida=0.0, carga_bordo=0.0, fecha_key=56, empresa_key=139, tipo_de_vuelo_key=4, trafico_key=2, origen_key=56, destino_key=1202, tipo_equipo_key=2270, vuelos_key=4),
 Row(vuelos=1, sillas=18, pasajeros=1, carga_ofrecida=209.0, carga_bordo=0.0, fecha_key=37, empresa_key=15, tipo_de_vuelo_key=3, trafico_key=2, origen_k