In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.window import Window
from pyspark.sql.types import StructType
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf, concat_ws, when, lit, count, isnan, col, isnull, substring, row_number


In [2]:
import os 
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /usr/share/java/mariadb-java-client-2.5.3.jar pyspark-shell'

In [3]:
spark_context = SparkContext()
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession

## 1. Carga de datos

In [4]:
PATH = "./data/"

df_aeropuertos = spark.read.load(PATH+"aeropuertosEtapa3.csv",format="csv", sep=",", inferSchema="true", header="true")

df_vuelos = spark.read.load(PATH+"vuelosEtapa3.csv",format="csv", sep=",", inferSchema="true", header="true")

df_poblacion = spark.read.load(PATH+"poblacion2018.csv",format="csv", sep=",", inferSchema="true", header="true")

df_pib = spark.read.load(PATH+"pib_2019.csv",format="csv", sep=",", inferSchema="true", header="true")

df_aeropuertos_mundo= spark.read.load(PATH+"aeropuertos_mundo.csv",format="csv", sep=",", inferSchema="true", header="true")

## 2. Perfilamiento

Tamaño de los dataframes

In [5]:
print((df_vuelos.count(), len(df_vuelos.columns)))
print((df_aeropuertos.count(), len(df_aeropuertos.columns)))

(96346, 13)
(1461, 23)


In [6]:
df_aeropuertos.show(truncate=False, vertical = True)

-RECORD 0-------------------------------------------
 sigla                | 7FO                         
 iata                 | null                        
 nombre               | LA ISLA                     
 municipio            | Puerto Gaitán               
 departamento         | Meta                        
 categoria            | Aeródromo                   
 latitud              | 4.4211                      
 longitud             | -71.6271                    
 propietario          | LA ISLA Y EL ROSARIO S.A.   
 explotador           | LA CEIBA S.A.               
 longitud_pista       | 1079.0                      
 ancho_pista          | 19.0                        
 pbmo                 | 3000.0                      
 elevacion            | 538.0                       
 resolucion           |  1325,000                   
 fecha_construccion   | 2015-06-05                  
 fecha_vigencia       | 2018-06-11                  
 clase                | 1A                    

In [7]:
df_vuelos.show()

+----+---+------+-------+-----------+----------+-------+--------------------+------+------+--------------+---------+-----------+
| ano|mes|origen|destino|tipo_equipo|tipo_vuelo|trafico|             empresa|vuelos|sillas|carga_ofrecida|pasajeros|carga_bordo|
+----+---+------+-------+-----------+----------+-------+--------------------+------+------+--------------+---------+-----------+
|2016| 11|   BOG|    MDE|       R722|         R|      N|                 LAS|     3|     0|       72000.0|        0|    60146.0|
|2014|  2|   EOH|    BGA|       JS32|         R|      N|AEROLINEA DE ANTI...|    45|   766|      112220.0|      439|      999.0|
|2017|  4|   VDF|    EOH|       C210|         T|      N|        HELI JET SAS|    15|     0|           0.0|       60|        0.0|
|2018| 12|   PEI|    EOH|       C303|         T|      N|        HELI JET SAS|     4|     0|           0.0|        3|        0.0|
|2018| 12|   LMC|    BOG|       JS32|         T|      N|         SARPA S.A.S|     2|     0|      

#### Eliminación de duplicados y aeropuertos faltantes:

In [8]:
df_aeropuertos = df_aeropuertos.dropDuplicates()
df_vuelos = df_vuelos.dropDuplicates()
print((df_vuelos.count(), len(df_vuelos.columns)))
print((df_aeropuertos.count(), len(df_aeropuertos.columns)))

(96346, 13)
(1461, 23)


In [9]:
df_ano = df_aeropuertos.groupby('sigla').pivot('Ano').count()
df_ano.select('*').where(isnull('2014')).show(100)

+-----+----+----+----+----+----+
|sigla|2014|2015|2016|2017|2018|
+-----+----+----+----+----+----+
|  7IE|null|   1|   1|   1|   1|
|  9DM|null|   1|   1|   1|   1|
|  7IH|null|   1|   1|   1|   1|
|  9AI|null|   1|   1|   1|   1|
|  9NG|null|   1|   1|   1|   1|
|  9AA|null|   1|   1|   1|   1|
|  AMB|null|   1|   1|   1|   1|
|  AGI|null|   1|   1|   1|   1|
|  7II|null|   1|   1|   1|   1|
|  9NI|null|   1|   1|   1|   1|
|  9NF|null|   1|   1|   1|   1|
|  7GL|null|   1|   1|   1|   1|
|  7IG|null|   1|   1|   1|   1|
|  9DK|null|   1|   1|   1|   1|
|  9ND|null|   1|   1|   1|   1|
|  9NK|null|   1|   1|   1|   1|
|  9NL|null|   1|   1|   1|   1|
|  9LZ|null|   1|   1|   1|   1|
|  7IC|null|   1|   1|   1|   1|
|  9NJ|null|   1|   1|   1|   1|
|  7GZ|null|   1|   1|   1|   1|
|  BAQ|null|   1|   1|   1|   1|
|  9DA|null|   1|   1|   1|   1|
|  9MB|null|   1|   1|   1|   1|
|  9NC|null|   1|   1|   1|   1|
|  7IJ|null|   1|   1|   1|   1|
|  7ID|null|   1|   1|   1|   1|
|  9CY|nul

In [10]:
#Se encuentra que Barranquilla no está en la actualización de 2014, sin embargo si tiene vuelos desde 2010 hasta 2014.
df_vuelos.select('*').where(df_vuelos['origen']=='BAQ').show(1000)

+----+---+------+-------+-----------+----------+-------+--------------------+------+------+--------------+---------+-----------+
| ano|mes|origen|destino|tipo_equipo|tipo_vuelo|trafico|             empresa|vuelos|sillas|carga_ofrecida|pasajeros|carga_bordo|
+----+---+------+-------+-----------+----------+-------+--------------------+------+------+--------------+---------+-----------+
|2018|  3|   BAQ|    EOH|       C25C|         T|      N|             HELICOL|     2|     0|           0.0|        6|        0.0|
|2015|  4|   BAQ|    MDE|       A318|         R|      N|             AVIANCA|     4|   400|       23241.0|      372|      490.0|
|2010|  1|   BAQ|    CZU|       JS32|         R|      N|AEROLINEA DE ANTI...|    29|   568|        6838.0|      205|       66.0|
|2011|  2|   BAQ|    BOG|       B722|         R|      N|           AEROSUCRE|     5|     0|      120000.0|        0|    52404.0|
|2012|  4|   BAQ|    VUP|       PA34|         T|      N|           AVIOCESAR|     8|     0|      

In [11]:
#Se encuentra que el aeropuerta de CUC tiene dos registros por actualización. Se elimina una observación en cada año.

df_aeropuertos.select('*').where(df_aeropuertos['sigla']=='CUC').show(vertical=True)

-RECORD 0------------------------------------
 sigla                | CUC                  
 iata                 | CUC                  
 nombre               | CAMILO DAZA          
 municipio            | San José de Cúcuta   
 departamento         | Norte de Santander   
 categoria            | Internacional        
 latitud              | 7.9274               
 longitud             | -72.5116             
 propietario          | AEROCIVIL            
 explotador           | AEROCIVIL -ORIENT... 
 longitud_pista       | 4768.0               
 ancho_pista          | 30.0                 
 pbmo                 | null                 
 elevacion            | 1027.0               
 resolucion           |   984,000            
 fecha_construccion   | 1987-02-02           
 fecha_vigencia       | null                 
 clase                | 4D                   
 tipo                 | Fumigación           
 numero_vuelos_origen | 204319.0             
 gcd_departamento     | 54        

In [12]:
df_aeropuertos = df_aeropuertos.filter(df_aeropuertos['nombre'] != 'CAMILO DAZA No.2')
#df_aeropuertos = df_aeropuertos.dropDuplicates(['sigla','Ano'])
df_aeropuertos.select('*').where(df_aeropuertos['sigla']=='CUC').show(vertical=True)

-RECORD 0------------------------------------
 sigla                | CUC                  
 iata                 | CUC                  
 nombre               | CAMILO DAZA          
 municipio            | San José de Cúcuta   
 departamento         | Norte de Santander   
 categoria            | Internacional        
 latitud              | 7.9274               
 longitud             | -72.5116             
 propietario          | AEROCIVIL            
 explotador           | AEROCIVIL -ORIENT... 
 longitud_pista       | 4768.0               
 ancho_pista          | 30.0                 
 pbmo                 | null                 
 elevacion            | 1027.0               
 resolucion           |   984,000            
 fecha_construccion   | 1987-02-02           
 fecha_vigencia       | null                 
 clase                | 4D                   
 tipo                 | Fumigación           
 numero_vuelos_origen | 204319.0             
 gcd_departamento     | 54        

In [13]:
print((df_aeropuertos.count(), len(df_aeropuertos.columns)))

(1456, 23)


In [14]:
#Se encuentra que Bogota esta clasificado como Fumigación en 2014, 2015 y 2017, Privado en 2016 y Público en 2018.
df_aeropuertos.select('*').where(df_aeropuertos['sigla']=='BOG').show(truncate = False)

+-----+----+---------+------------+------------+-------------+-------+--------+-----------+------------------+--------------+-----------+----+---------+----------+------------------+--------------+-----+----------+--------------------+----------------+-------------+----+
|sigla|iata|nombre   |municipio   |departamento|categoria    |latitud|longitud|propietario|explotador        |longitud_pista|ancho_pista|pbmo|elevacion|resolucion|fecha_construccion|fecha_vigencia|clase|tipo      |numero_vuelos_origen|gcd_departamento|gcd_municipio|Ano |
+-----+----+---------+------------+------------+-------------+-------+--------+-----------+------------------+--------------+-----------+----+---------+----------+------------------+--------------+-----+----------+--------------------+----------------+-------------+----+
|BOG  |BOG |EL DORADO|Bogotá, D.C.|Bogotá, D.C.|Internacional|4.7017 |-74.1469|AEROCIVIL  |EN CONCESION CODAD|4083.0        |35.0       |null|8356.0   |  320,000 |1959-12-04        |nu

#### Valores nulos:

In [15]:
df_aeropuertos.select([count(when(isnull(c), c)).alias(c) for c in df_aeropuertos.columns]).show(truncate = False, vertical = True)

-RECORD 0--------------------
 sigla                | 0    
 iata                 | 1138 
 nombre               | 0    
 municipio            | 0    
 departamento         | 0    
 categoria            | 0    
 latitud              | 0    
 longitud             | 0    
 propietario          | 10   
 explotador           | 0    
 longitud_pista       | 0    
 ancho_pista          | 0    
 pbmo                 | 233  
 elevacion            | 0    
 resolucion           | 19   
 fecha_construccion   | 0    
 fecha_vigencia       | 1078 
 clase                | 0    
 tipo                 | 0    
 numero_vuelos_origen | 0    
 gcd_departamento     | 0    
 gcd_municipio        | 0    
 Ano                  | 0    



In [16]:
df_vuelos.select([count(when(isnull(c), c)).alias(c) for c in df_vuelos.columns]).show(truncate = False, vertical = True)

-RECORD 0-------------
 ano            | 0   
 mes            | 0   
 origen         | 0   
 destino        | 0   
 tipo_equipo    | 0   
 tipo_vuelo     | 0   
 trafico        | 0   
 empresa        | 0   
 vuelos         | 0   
 sillas         | 0   
 carga_ofrecida | 0   
 pasajeros      | 0   
 carga_bordo    | 2   



In [17]:
#Valores descriptivos para características de aeropuertos

df_aeropuertos.select('longitud_pista', 'ancho_pista').describe().show()

+-------+------------------+------------------+
|summary|    longitud_pista|       ancho_pista|
+-------+------------------+------------------+
|  count|              1456|              1456|
|   mean|1141.4120879120878|21.732142857142858|
| stddev|1377.1186116731806|24.370501233824164|
|    min|               0.0|               0.0|
|    max|           12279.0|             169.0|
+-------+------------------+------------------+



## 3. Transformaciones

#### Dimension Fecha

In [18]:
def creacion_fecha(df_vuelos):
    
    df_fecha = df_vuelos.select('ano','mes')
    
    df_fecha = df_fecha.withColumn("mes", f.format_string("%02d","mes"))
    df_fecha = df_fecha.select(concat_ws('-','ano','mes').alias('Key_Fecha'),'ano','mes')
    df_fecha = df_fecha.dropDuplicates(['Key_Fecha'])
    df_fecha = df_fecha.sort('Key_Fecha')
    df_fecha = df_fecha.withColumn('Mes_nombre', when(df_fecha.mes == 1, lit('Enero'))
                                                .when(df_fecha.mes == 2, lit('Febrero'))
                                                .when(df_fecha.mes == 3, lit('Marzo'))
                                                .when(df_fecha.mes == 4, lit('Abril'))
                                                .when(df_fecha.mes == 5, lit('Mayo'))
                                                .when(df_fecha.mes == 6, lit('Junio'))
                                                .when(df_fecha.mes == 7, lit('Julio'))
                                                .when(df_fecha.mes == 8, lit('Agosto'))
                                                .when(df_fecha.mes == 9, lit('Septiembre'))
                                                .when(df_fecha.mes == 10, lit('Octubre'))
                                                .when(df_fecha.mes == 11, lit('Noviembre'))
                                                .when(df_fecha.mes == 12, lit('Diciembre'))
                                    )
    
    
    
    df_fecha.show(108)

    return df_fecha
    
df_fecha = creacion_fecha(df_vuelos)




+---------+----+---+----------+
|Key_Fecha| ano|mes|Mes_nombre|
+---------+----+---+----------+
|  2010-01|2010| 01|     Enero|
|  2010-02|2010| 02|   Febrero|
|  2010-03|2010| 03|     Marzo|
|  2010-04|2010| 04|     Abril|
|  2010-05|2010| 05|      Mayo|
|  2010-06|2010| 06|     Junio|
|  2010-07|2010| 07|     Julio|
|  2010-08|2010| 08|    Agosto|
|  2010-09|2010| 09|Septiembre|
|  2010-10|2010| 10|   Octubre|
|  2010-11|2010| 11| Noviembre|
|  2010-12|2010| 12| Diciembre|
|  2011-01|2011| 01|     Enero|
|  2011-02|2011| 02|   Febrero|
|  2011-03|2011| 03|     Marzo|
|  2011-04|2011| 04|     Abril|
|  2011-05|2011| 05|      Mayo|
|  2011-06|2011| 06|     Junio|
|  2011-07|2011| 07|     Julio|
|  2011-08|2011| 08|    Agosto|
|  2011-09|2011| 09|Septiembre|
|  2011-10|2011| 10|   Octubre|
|  2011-11|2011| 11| Noviembre|
|  2011-12|2011| 12| Diciembre|
|  2012-01|2012| 01|     Enero|
|  2012-02|2012| 02|   Febrero|
|  2012-03|2012| 03|     Marzo|
|  2012-04|2012| 04|     Abril|
|  2012-

#### Dimensión empresa

In [19]:
def empresa(df_vuelos):
    
    df_empresa=df_vuelos.selectExpr('empresa')
    df_empresa = df_empresa.dropDuplicates(['empresa'])
    df_empresa = df_empresa.sort('empresa')
    df_empresa = df_empresa.withColumn("Key_Empresa",f.row_number().over(Window.orderBy(f.monotonically_increasing_id())))
    df_empresa = df_empresa.select('Key_Empresa','empresa')
    df_empresa.show(10)

    return df_empresa

df_empresa = empresa(df_vuelos)


+-----------+--------------------+
|Key_Empresa|             empresa|
+-----------+--------------------+
|          1|"SERVICIO AÉREO R...|
|          2|              21 AIR|
|          3|                ABSA|
|          4|ABX AIR INC SUCUR...|
|          5| AER CARIBE LIMITADA|
|          6|          AERO APOYO|
|          7|AERO SERVICIOS ES...|
|          8|AERO TAXI GUAYMAR...|
|          9|AEROCHARTER ANDIN...|
|         10|       AEROCOL S.A.S|
+-----------+--------------------+
only showing top 10 rows



#### Dimensión tipo de vuelo

In [20]:
def tipo_vuelo(df_vuelos):
    
    df_tipo = df_vuelos.selectExpr('tipo_vuelo')
    df_tipo = df_tipo.dropDuplicates(['tipo_vuelo'])
    df_tipo = df_tipo.withColumn("Key_Tipo",f.row_number().over(Window.orderBy(f.monotonically_increasing_id())))
    df_tipo = df_tipo.select('Key_Tipo','tipo_vuelo')
    
    df_tipo = df_tipo.withColumn('tipo', when(df_tipo.tipo_vuelo == 'T', lit('Taxi'))
                                                .when(df_tipo.tipo_vuelo == 'C', lit('Charter'))
                                                .when(df_tipo.tipo_vuelo == 'A', lit('Adicionales'))
                                                .when(df_tipo.tipo_vuelo == 'R', lit('Regular'))
                                  )
    
    df_tipo.show(10)

    return df_tipo

df_tipo = tipo_vuelo(df_vuelos)


+--------+----------+-----------+
|Key_Tipo|tipo_vuelo|       tipo|
+--------+----------+-----------+
|       1|         T|       Taxi|
|       2|         C|    Charter|
|       3|         A|Adicionales|
|       4|         R|    Regular|
+--------+----------+-----------+



#### Dimensión tráfico

In [21]:
def trafico(df_vuelos):
    
    df_trafico = df_vuelos.select('trafico')
    df_trafico = df_trafico.dropDuplicates(['trafico'])
    df_trafico = df_trafico.withColumn("Key_Trafico",f.row_number().over(Window.orderBy(f.monotonically_increasing_id())))
    df_trafico = df_trafico.select('Key_Trafico','trafico')
    
    df_trafico = df_trafico.withColumn('trafico_nombre', when(df_trafico.trafico == 'N', lit('Nacional'))
                                                .when(df_trafico.trafico == 'I', lit('Internacional'))
                                                .when(df_trafico.trafico == 'E', lit('Externo'))
                                      )
    
    df_trafico.show(10)

    return df_trafico

df_trafico = trafico(df_vuelos)


+-----------+-------+--------------+
|Key_Trafico|trafico|trafico_nombre|
+-----------+-------+--------------+
|          1|      N|      Nacional|
|          2|      I| Internacional|
+-----------+-------+--------------+



#### Dimensión aeropuertos

In [22]:
df_poblacion.show()
df_pib.show()

+---+---+--------------------+-------+
|_c0| DP|        departamento|  Total|
+---+---+--------------------+-------+
|  0| 23|             Córdoba|1784783|
|  1| 20|               Cesar|1200574|
|  2| 52|              Nariño|1630592|
|  3| 44|          La Guajira| 880560|
|  4| 50|                Meta|1039722|
|  5| 15|              Boyacá|1217376|
|  6| 70|               Sucre| 904863|
|  7|  5|           Antioquia|6407102|
|  8| 54|  Norte de Santander|1491689|
|  9| 88|Archipiélago de S...|  61280|
| 10| 73|              Tolima|1330187|
| 11| 94|             Guainía|  48114|
| 12| 47|           Magdalena|1341746|
| 13| 97|              Vaupés|  40797|
| 14| 68|           Santander|2184837|
| 15| 99|             Vichada| 107808|
| 16| 95|            Guaviare|  82767|
| 17| 11|        Bogotá, D.C.|7412566|
| 18| 76|     Valle del Cauca|4475886|
| 19| 86|            Putumayo| 348182|
+---+---+--------------------+-------+
only showing top 20 rows

+--------+-------------+-----------+
|

In [23]:
df_aeropuertos_mundo.show()
df_aeropuertos_mundo.select([count(when(isnull(c), c)).alias(c) for c in df_aeropuertos_mundo.columns]).show(truncate = False, vertical = True)

aeropuertos_mundo = df_aeropuertos_mundo.dropna()

aeropuertos_mundo.show()

+----+----+--------------------+--------------+----------------+-------+-------+--------+
|icao|iata|              nombre|        ciudad|            pais|altitud|latitud|longitud|
+----+----+--------------------+--------------+----------------+-------+-------+--------+
|AYGA| GKA|              GOROKA|        GOROKA|Papua New Guinea|   1610| -6.082| 145.392|
|AYLA| LAE|                null|           LAE|Papua New Guinea|   null|    0.0|     0.0|
|AYMD| MAG|              MADANG|        MADANG|Papua New Guinea|      7| -5.207| 145.789|
|AYMH| HGU|         MOUNT HAGEN|   MOUNT HAGEN|Papua New Guinea|   1643| -5.826| 144.296|
|AYNZ| LAE|              NADZAB|        NADZAB|Papua New Guinea|     73|  -6.57| 146.726|
|AYPY| POM|PORT MORESBY JACK...|  PORT MORESBY|Papua New Guinea|     45| -9.443|  147.22|
|AYRB| RAB|                null|        RABAUL|Papua New Guinea|   null|    0.0|     0.0|
|AYWK| WWK| WEWAK INTERNATIONAL|         WEWAK|Papua New Guinea|      6| -3.584| 143.669|
|BGAM|null

In [24]:
def aeropuertos_mundo(): 

    aeropuertos_mundo = df_aeropuertos_mundo.dropna()
    
    aeropuertos_mundo = aeropuertos_mundo.select('*').where(aeropuertos_mundo['pais']!= 'Colombia')
    
    aeropuertos_mundo = aeropuertos_mundo.withColumn( 'sigla' , aeropuertos_mundo['iata'])
    
    aeropuertos_mundo = aeropuertos_mundo.withColumn( 'categoria' , lit('Exterior') )
    
    aeropuertos_mundo = aeropuertos_mundo.withColumn( 'ancho_pista' , lit(-1) )
    
    aeropuertos_mundo = aeropuertos_mundo.withColumn( 'longitud_pista' , lit(-1) )
    
    aeropuertos_mundo = aeropuertos_mundo.withColumn( 'Ano' , lit(2018) )
    
    aeropuertos_mundo = aeropuertos_mundo.withColumn( 'Fecha_inicio' , lit('2018-01') )
    
    aeropuertos_mundo = aeropuertos_mundo.withColumn( 'Fecha_expiracion' , lit('2018-12') )
    
    aeropuertos_mundo = aeropuertos_mundo.withColumn( 'Estado' , lit('Vigente') )
    
    aeropuertos_mundo = aeropuertos_mundo.withColumn( 'Key_SocDem' , lit(999) )
    
    aeropuertos_mundo = aeropuertos_mundo.selectExpr('Key_SocDem' , 'sigla' , 'iata' , 'nombre', 'ciudad as municipio', 'pais','categoria','latitud','longitud', 'longitud_pista', 'ancho_pista', 'Ano', 'Fecha_inicio', 'Fecha_expiracion','Estado')
    
    aeropuertos_mundo2014 = aeropuertos_mundo.withColumn( 'Ano' , lit(2014) ).withColumn( 'Estado', lit('Vencido')).withColumn( 'Fecha_inicio' , lit('2014-01')).withColumn( 'Fecha_expiracion' , lit('2014-12') )
    aeropuertos_mundo2015 = aeropuertos_mundo.withColumn( 'Ano' , lit(2015) ).withColumn( 'Estado', lit('Vencido')).withColumn( 'Fecha_inicio' , lit('2015-01')).withColumn( 'Fecha_expiracion' , lit('2015-12') )
    aeropuertos_mundo2016 = aeropuertos_mundo.withColumn( 'Ano' , lit(2016) ).withColumn( 'Estado', lit('Vencido')).withColumn( 'Fecha_inicio' , lit('2016-01')).withColumn( 'Fecha_expiracion' , lit('2016-12') )
    aeropuertos_mundo2017 = aeropuertos_mundo.withColumn( 'Ano' , lit(2017) ).withColumn( 'Estado', lit('Vencido')).withColumn( 'Fecha_inicio' , lit('2017-01')).withColumn( 'Fecha_expiracion' , lit('2017-12') )
    
    aeropuertos_mundo = aeropuertos_mundo2014.union(aeropuertos_mundo)
    aeropuertos_mundo = aeropuertos_mundo2015.union(aeropuertos_mundo)
    aeropuertos_mundo = aeropuertos_mundo2016.union(aeropuertos_mundo)
    aeropuertos_mundo = aeropuertos_mundo2017.union(aeropuertos_mundo)
    
    aeropuertos_mundo.select('*').where(aeropuertos_mundo['sigla'] == 'MAD').show()
    
    
    print((aeropuertos_mundo.count(), len(aeropuertos_mundo.columns)))
    return aeropuertos_mundo
    


def socio_geografica():
    
    df_socio_geo = df_poblacion.join(df_pib, df_poblacion.DP == df_pib.DIVIPOLA, how='inner')
    df_socio_geo = df_socio_geo.selectExpr('departamento','Total as poblacion', 'Valor as PIB','DIVIPOLA')
    fila_internacionales = spark.createDataFrame([('Internacional',0,0,999)], df_socio_geo.columns)
    df_socio_geo = df_socio_geo.union(fila_internacionales) 
    df_socio_geo = df_socio_geo.withColumn("Key_SocDem",f.row_number().over(Window.orderBy(f.monotonically_increasing_id())))
    df_socio_geo = df_socio_geo.select("Key_SocDem",'departamento','poblacion', 'PIB','DIVIPOLA')
    
    df_socio_geo.toPandas().to_csv(PATH+'dim_socio_geo.csv')
    
    return None



def aeropuertos(df_aeropuertos,df_socio_geo,aeropuertos_mundo):
    
    aeropuertos_hist = df_aeropuertos
    
    aeropuertos_hist = aeropuertos_hist.withColumn('Fecha_inicio',when(aeropuertos_hist.Ano == 2014,lit('2010-01')).
                                                                  when(aeropuertos_hist.Ano == 2015,lit('2015-01')).
                                                                  when(aeropuertos_hist.Ano == 2016,lit('2016-01')).
                                                                  when(aeropuertos_hist.Ano == 2017,lit('2017-01')).
                                                                  when(aeropuertos_hist.Ano == 2018,lit('2018-01'))
                                                  )
    
    aeropuertos_hist = aeropuertos_hist.withColumn('Fecha_expiracion',when(aeropuertos_hist.Ano == 2014,lit('2014-12')).
                                                                      when(aeropuertos_hist.Ano == 2015,lit('2015-12')).
                                                                      when(aeropuertos_hist.Ano == 2016,lit('2016-01')).
                                                                      when(aeropuertos_hist.Ano == 2017,lit('2017-01')).
                                                                      when(aeropuertos_hist.Ano == 2018,lit('2018-01'))
                                                  )
    
    aeropuertos_hist = aeropuertos_hist.withColumn('Estado',when(aeropuertos_hist.Ano < 2018,lit('Vencido')).
                                                            when(aeropuertos_hist.Ano == 2018,lit('Vigente'))
                                                  )
    
    aeropuertos_hist = aeropuertos_hist.withColumn('pais', lit('Colombia'))                                              

    
    
    #Integración con Geografica
    
    
    
    aeropuertos_hist = aeropuertos_hist.join(df_socio_geo, aeropuertos_hist.gcd_departamento == df_socio_geo.DIVIPOLA, how='left')
    
    df_socio_geo = df_socio_geo.select('Key_SocDem', 'departamento' , 'poblacion' , 'PIB')
    
    aeropuertos_hist = aeropuertos_hist.selectExpr('Key_SocDem' , 'sigla' , 'iata', 'nombre','municipio' , 'pais' , 'categoria','latitud','longitud', 'longitud_pista', 'ancho_pista', 'Ano', 'Fecha_inicio', 'Fecha_expiracion','Estado')
    
    
    #Integración con Aeropuertos Internacionales
    
    aeropuertos_hist = aeropuertos_hist.union(aeropuertos_mundo)
    
    
    
    aeropuertos_hist.select('*').where(aeropuertos_hist['sigla']=='MAD').show()
    
    
    
       
    
    #Generación de llave 
    aeropuertos_hist = aeropuertos_hist.withColumn("Key_Aeropuertos",f.row_number().over(Window.orderBy(f.monotonically_increasing_id())))
    
    aeropuertos_hist = aeropuertos_hist.selectExpr('Key_Aeropuertos', 'Key_SocDem' , 'sigla' , 'iata', 'nombre','municipio' , 'pais' , 'categoria','latitud','longitud', 'longitud_pista', 'ancho_pista', 'Ano', 'Fecha_inicio', 'Fecha_expiracion','Estado')
   
    aeropuertos_hist.toPandas().to_csv(PATH+'dim_aeropuertos.csv')
    df_socio_geo.toPandas().to_csv(PATH+'dim_socio_geo.csv')
    df_socio_geo.show()
    
    return None

aeropuertos_mundo = aeropuertos_mundo()
socio_geografica()
df_socio_geo = spark.read.load(PATH+'dim_socio_geo.csv',format="csv", sep=",", inferSchema="true", header="true")
df_socio_geo.show(100)
aeropuertos(df_aeropuertos, df_socio_geo,aeropuertos_mundo)

aeropuertos_hist = spark.read.load(PATH+"dim_aeropuertos.csv",format="csv", sep=",", inferSchema="true", header="true")
aeropuertos_hist = aeropuertos_hist.drop('_c0')
aeropuertos_hist.select('*').show(10)
aeropuertos_hist.select('*').where(aeropuertos_hist['sigla']=='MAD').show(10)
aeropuertos_hist.select('Key_Aeropuertos','sigla','nombre','Key_SocDem').where(aeropuertos_hist['sigla']=='MAD').show(10)



+----------+-----+----+-------+---------+-----+---------+-------+--------+--------------+-----------+----+------------+----------------+-------+
|Key_SocDem|sigla|iata| nombre|municipio| pais|categoria|latitud|longitud|longitud_pista|ancho_pista| Ano|Fecha_inicio|Fecha_expiracion| Estado|
+----------+-----+----+-------+---------+-----+---------+-------+--------+--------------+-----------+----+------------+----------------+-------+
|       999|  MAD| MAD|BARAJAS|   MADRID|Spain| Exterior| 40.472|  -3.561|            -1|         -1|2017|     2017-01|         2017-12|Vencido|
|       999|  MAD| MAD|BARAJAS|   MADRID|Spain| Exterior| 40.472|  -3.561|            -1|         -1|2016|     2016-01|         2016-12|Vencido|
|       999|  MAD| MAD|BARAJAS|   MADRID|Spain| Exterior| 40.472|  -3.561|            -1|         -1|2015|     2015-01|         2015-12|Vencido|
|       999|  MAD| MAD|BARAJAS|   MADRID|Spain| Exterior| 40.472|  -3.561|            -1|         -1|2014|     2014-01|         20

#### Tabla de hechos vuelos

In [25]:

def hechos_vuelos(aeropuertos_hist):
    
    
    
    df_hechos_vuelos = df_vuelos.withColumn("mes", f.format_string("%02d","mes"))
    df_hechos_vuelos = df_hechos_vuelos.select(concat_ws('-','ano','mes').alias('Key_Fecha'),'ano' ,'origen','destino','tipo_vuelo', 'trafico','empresa','vuelos','sillas','carga_ofrecida','pasajeros','carga_bordo')
        
   
   #Se crean llaves con ANO+Sigla de aeropuertos tanto en df de Aeropuertos, como en df de vuelos (para origen y destino) 
   #para poder cruzar los DF.

    aeropuertos_hist = aeropuertos_hist.select(concat_ws('-','Ano','sigla').alias('Key_Ano_Sigla'),'Key_Aeropuertos', 'Key_SocDem' , 'sigla' , 'iata', 'nombre','municipio', 'categoria','latitud','longitud', 'longitud_pista', 'ancho_pista', 'Ano', 'Fecha_inicio', 'Fecha_expiracion','Estado')
   
    
    df_hechos_vuelos = df_hechos_vuelos.withColumn('Key_Ano_Origen' , when(df_hechos_vuelos.ano < 2015, concat_ws('-',lit('2014'),'origen')).
                                                                      when(df_hechos_vuelos.ano == 2015, concat_ws('-',lit('2015'),'origen')).
                                                                      when(df_hechos_vuelos.ano == 2016, concat_ws('-',lit('2016'),'origen')).
                                                                      when(df_hechos_vuelos.ano == 2017, concat_ws('-',lit('2017'),'origen')).
                                                                      when(df_hechos_vuelos.ano == 2018, concat_ws('-',lit('2018'),'origen'))
                                                   )
    
    
    
    df_hechos_vuelos = df_hechos_vuelos.withColumn('Key_Ano_Destino', when(df_hechos_vuelos.ano < 2015, concat_ws('-',lit('2014'),'destino')).
                                                                      when(df_hechos_vuelos.ano == 2015, concat_ws('-',lit('2015'),'destino')).
                                                                      when(df_hechos_vuelos.ano == 2016, concat_ws('-',lit('2016'),'destino')).
                                                                      when(df_hechos_vuelos.ano == 2017, concat_ws('-',lit('2017'),'destino')).
                                                                      when(df_hechos_vuelos.ano == 2018, concat_ws('-',lit('2018'),'destino'))
                                                   )
    
    df_hechos_vuelos.show()
    
    df_hechos_vuelos = df_hechos_vuelos.join(aeropuertos_hist, df_hechos_vuelos.Key_Ano_Origen==aeropuertos_hist.Key_Ano_Sigla, how='left')
    df_hechos_vuelos = df_hechos_vuelos.selectExpr('Key_Ano_Destino','Key_Fecha', 'Key_Aeropuertos as Key_Origen','destino','tipo_vuelo', 'trafico', 'empresa','vuelos','sillas','carga_ofrecida','pasajeros','carga_bordo')
    
    df_hechos_vuelos = df_hechos_vuelos.join(aeropuertos_hist, df_hechos_vuelos.Key_Ano_Destino==aeropuertos_hist.Key_Ano_Sigla, how='left')
    df_hechos_vuelos = df_hechos_vuelos.selectExpr('Key_Fecha', 'Key_Origen','Key_Aeropuertos as Key_Destino','tipo_vuelo', 'trafico','empresa','vuelos','sillas','carga_ofrecida','pasajeros','carga_bordo')
                                                                          
    df_hechos_vuelos = df_hechos_vuelos.join(df_tipo, on = 'tipo_vuelo', how='left')
    df_hechos_vuelos = df_hechos_vuelos.selectExpr('Key_Fecha', 'Key_Origen', 'Key_Destino','Key_Tipo','trafico', 'empresa','vuelos','sillas','carga_ofrecida','pasajeros','carga_bordo')

    df_hechos_vuelos = df_hechos_vuelos.join(df_trafico, on = 'trafico', how='left')
    df_hechos_vuelos = df_hechos_vuelos.selectExpr('Key_Fecha', 'Key_Origen', 'Key_Destino','Key_Tipo','Key_Trafico','empresa','vuelos','sillas','carga_ofrecida','pasajeros','carga_bordo')                                              
                                                   
    df_hechos_vuelos = df_hechos_vuelos.join(df_empresa, on = 'empresa', how='left')
    df_hechos_vuelos = df_hechos_vuelos.selectExpr('Key_Fecha', 'Key_Origen', 'Key_Destino','Key_Tipo','Key_Trafico','Key_Empresa','vuelos','sillas','carga_ofrecida','pasajeros','carga_bordo')
        
    #Se elimna la información de los vuelos que ocurren antes de 2014 pero el aeropuerto no existe en la actualización de 2014 (p.ej BAQ)        
    df_hechos_vuelos = df_hechos_vuelos.na.drop(subset=['Key_Origen','Key_Destino'])
    
    #Se incluye la llave principal.
    df_hechos_vuelos = df_hechos_vuelos.withColumn("id",f.row_number().over(Window.orderBy(f.monotonically_increasing_id())))
    
    df_hechos_vuelos = df_hechos_vuelos.select('id','Key_Fecha', 'Key_Origen', 'Key_Destino','Key_Tipo','Key_Trafico','Key_Empresa','vuelos','sillas','carga_ofrecida','pasajeros','carga_bordo')
    
    
    return df_hechos_vuelos



df_hechos_vuelos = hechos_vuelos(aeropuertos_hist)
df_hechos_vuelos.show(truncate=False)
print((df_hechos_vuelos.count(), len(df_hechos_vuelos.columns)))

+---------+----+------+-------+----------+-------+--------------------+------+------+--------------+---------+-----------+--------------+---------------+
|Key_Fecha| ano|origen|destino|tipo_vuelo|trafico|             empresa|vuelos|sillas|carga_ofrecida|pasajeros|carga_bordo|Key_Ano_Origen|Key_Ano_Destino|
+---------+----+------+-------+----------+-------+--------------------+------+------+--------------+---------+-----------+--------------+---------------+
|  2016-04|2016|   IBE|    EOH|         T|      N|AEROCHARTER ANDIN...|     1|     0|           0.0|        0|        0.0|      2016-IBE|       2016-EOH|
|  2010-04|2010|   MVP|    MIS|         T|      N|AEROLINEAS LLANER...|     2|     0|           0.0|        0|       35.0|      2014-MVP|       2014-MIS|
|  2014-02|2014|   EOH|    APO|         T|      N|PACIFICA DE AVIAC...|    10|     0|           0.0|       20|     1000.0|      2014-EOH|       2014-APO|
|  2010-05|2010|   BOG|    EYP|         R|      N|         EASYFLY S.A|   14

In [86]:
df_hechos_vuelos.select('*').where(df_hechos_vuelos['Key_Origen']==2821).show(100)
aeropuertos_hist.select('*').where(aeropuertos_hist['Key_Aeropuertos']==2821).show(100)
aeropuertos_hist.select('*').where(aeropuertos_hist['sigla']=='MAD').show(100)

+-----+---------+----------+-----------+--------+-----------+-----------+------+------+--------------+---------+-----------+
|   id|Key_Fecha|Key_Origen|Key_Destino|Key_Tipo|Key_Trafico|Key_Empresa|vuelos|sillas|carga_ofrecida|pasajeros|carga_bordo|
+-----+---------+----------+-----------+--------+-----------+-----------+------+------+--------------+---------+-----------+
| 5542|  2017-02|      2821|       1009|       4|          2|         32|    28|  8288|      221200.0|     4562|   129169.0|
|20502|  2017-03|      2821|       1221|       4|          2|         49|    31|  7555|      887381.0|     4082|    74506.0|
|41408|  2017-06|      2821|       1009|       4|          2|         32|    30|  8880|      255000.0|     6648|   106564.0|
+-----+---------+----------+-----------+--------+-----------+-----------+------+------+--------------+---------+-----------+

+----------+-----+----+-------+---------+-----+---------+-------+--------+--------------+-----------+----+------------+-----

In [57]:
#Se escriben los archivos.

df_hechos_vuelos.toPandas().to_csv(PATH+'hechos_vuelos.csv')
df_trafico.toPandas().to_csv(PATH+'dim_trafico.csv')
df_tipo.toPandas().to_csv(PATH+'dim_tipo.csv')
df_fecha.toPandas().to_csv(PATH+'dim_fecha.csv')
df_empresa.toPandas().to_csv(PATH+'dim_empresa.csv')
aeropuertos_hist.toPandas().to_csv(PATH+'dim_aeropuertos.csv')