# Mi primer proceso ETL

Para el siguiente proceso de ETL se van a realizar todas las dimensiones excepto Fecha_Aeropuerto pues es una dimensión autogenerada.  

Por otro lado, para cada una de las tablas vamos a seleccionar las variables correspondientes, en este caso ninguna tabla tiene información de más de una tabla, por lo tanto no hay que hacer ningún joint. Posteriormente se borrarán los datos repetidos y se generará un id único como clave primaria. Para las tablas que tienen una llave secundaria se va a hacer un join inicial sobre las primeras tablas y posteriormente se filtrarán los datos. 
El único atributo de una tabla que no existía en las tablas iniciales es trimestre (en la dimensión fecha), se hace la construcción dependiedo del mes.
Al mismo tiempo se realiza la creación de la dimensión aeropuerto la cual contiene la historia de los cambios en infraestructura de estos.

Primero importamos todas las librerías a utilizar:

In [44]:
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.types import StructType
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf, monotonically_increasing_id
from pyspark.sql.types import IntegerType,DateType,DoubleType, StringType

import os 

In [45]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /usr/share/java/mariadb-java-client-2.5.3.jar pyspark-shell'

Configuramos la sesión de pyspark

In [48]:
spark_context = SparkContext()
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession

### Cargamos las nuevas tablas de vuelos desde el csv


In [10]:
df_vuelos = spark.read.load("vuelosEtapa2.csv",format="csv", sep=",", inferSchema="true", header="true")
df_vuelos = df_vuelos.replace("nan", None)
df_vuelos = df_vuelos.withColumn("sillas",df_vuelos.sillas.cast(IntegerType()))
df_vuelos = df_vuelos.withColumn("pasajeros",df_vuelos.pasajeros.cast(IntegerType()))
df_vuelos = df_vuelos.withColumn("vuelos",df_vuelos.vuelos.cast(IntegerType()))
df_vuelos.show()

+----+---+------+-------+-----------+----------+-------+---------------+------+------+--------------+---------+-----------+
| ano|mes|origen|destino|tipo_equipo|tipo_vuelo|trafico|        empresa|vuelos|sillas|carga_ofrecida|pasajeros|carga_bordo|
+----+---+------+-------+-----------+----------+-------+---------------+------+------+--------------+---------+-----------+
|2010|  3|   ADZ|    BOG|       C560|         T|      N|CENTRAL CHARTER|     1|     0|           0.0|        7|        0.0|
|2010|  5|   ADZ|    BOG|       C560|         T|      N|CENTRAL CHARTER|     1|     0|           0.0|        8|        0.0|
|2010|  6|   ADZ|    CLO|       C560|         T|      N|CENTRAL CHARTER|     1|     0|           0.0|        8|        0.0|
|2010|  8|   ADZ|    CLO|       C560|         T|      N|CENTRAL CHARTER|     1|     0|           0.0|        6|        0.0|
|2010| 10|   ADZ|    CLO|       C560|         T|      N|CENTRAL CHARTER|     1|     0|           0.0|        8|        0.0|
|2010|  

In [22]:
df_aeropuertos = spark.read.load("aeropuertos_cambios_infraestructura.csv",format="csv", sep=",", inferSchema="true", header="true")
df_aeropuertos = df_aeropuertos.replace("nan", None)
df_aeropuertos = df_aeropuertos.drop("_c0")
df_aeropuertos = df_aeropuertos.withColumn("pbmo",df_aeropuertos.pbmo.cast(DoubleType()))
df_aeropuertos = df_aeropuertos.withColumn("fecha_construccion",df_aeropuertos.fecha_construccion.cast(DateType()))
df_aeropuertos = df_aeropuertos.withColumn("fecha_vigencia",df_aeropuertos.fecha_vigencia.cast(DateType()))
df_aeropuertos = df_aeropuertos.withColumn("numero_vuelos_origen",df_aeropuertos.numero_vuelos_origen.cast(IntegerType()))
df_aeropuertos = df_aeropuertos.withColumn("gcd_departamento",df_aeropuertos.gcd_departamento.cast(StringType()))
df_aeropuertos = df_aeropuertos.withColumn("gcd_municipio",df_aeropuertos.gcd_municipio.cast(StringType()))
df_aeropuertos.show()

+-----+----+--------------------+-------------------+------------+---------+-------+--------+--------------------+--------------------+--------------+-----------+------+---------+----------+------------------+--------------+-----+----------+--------------------+----------------+-------------+----+
|sigla|iata|              nombre|          municipio|departamento|categoria|latitud|longitud|         propietario|          explotador|longitud_pista|ancho_pista|  pbmo|elevacion|resolucion|fecha_construccion|fecha_vigencia|clase|      tipo|numero_vuelos_origen|gcd_departamento|gcd_municipio| Ano|
+-----+----+--------------------+-------------------+------------+---------+-------+--------+--------------------+--------------------+--------------+-----------+------+---------+----------+------------------+--------------+-----+----------+--------------------+----------------+-------------+----+
|  7FO|null|             LA ISLA|      Puerto Gaitán|        Meta|Aeródromo| 4.4211|-71.6271|LA ISLA Y 

### Se crea la dimensión aeropuerto con historia

En este caso se carga la tabla y al mismo tiempo se crea la dimensión aeropuerto. En la cual se realizan cambios con respecto a la creación de la dimensión aeropuertos en el ETL con historia. Como se mencionó en la Wiki, se crearon tres columnas adicionales "InicioVigencia", "FinalVigencia" y "Actual". Todo esto se realiza en el código que se presenta a continuación y finalmente se deja la dimensión aeropuerto en "aeropuertos_df".

In [23]:
Inicio_udf = udf(lambda x: str(x) +'-01-01', StringType())
Final_udf = udf(lambda x: str(x) +'-12-31', StringType())
aeropuertos = df_aeropuertos.withColumn("InicioVigencia", Inicio_udf(df_aeropuertos["Ano"]))
aeropuertos = aeropuertos.withColumn("InicioVigencia",aeropuertos.InicioVigencia.cast(DateType()))
aeropuertos = df_aeropuertos.withColumn("FinalVigencia", Final_udf(df_aeropuertos["Ano"]))
aeropuertos = aeropuertos.withColumn("FinalVigencia",aeropuertos.FinalVigencia.cast(DateType()))

aeropuertos = aeropuertos.withColumn('Actual', functions.when(aeropuertos['Ano'] == aeropuertos.select("Ano").rdd.max()[0], "S").otherwise("N"))
aeropuertos = aeropuertos.withColumn("id_aeropuerto", monotonically_increasing_id())
aeropuertos = aeropuertos.withColumnRenamed('Ano','ano')
aeropuertos.show()

+-----+----+--------------------+-------------------+------------+---------+-------+--------+--------------------+--------------------+--------------+-----------+------+---------+----------+------------------+--------------+-----+----------+--------------------+----------------+-------------+----+-------------+------+-------------+
|sigla|iata|              nombre|          municipio|departamento|categoria|latitud|longitud|         propietario|          explotador|longitud_pista|ancho_pista|  pbmo|elevacion|resolucion|fecha_construccion|fecha_vigencia|clase|      tipo|numero_vuelos_origen|gcd_departamento|gcd_municipio| ano|FinalVigencia|Actual|id_aeropuerto|
+-----+----+--------------------+-------------------+------------+---------+-------+--------+--------------------+--------------------+--------------+-----------+------+---------+----------+------------------+--------------+-----+----------+--------------------+----------------+-------------+----+-------------+------+-------------

### Creamos las dimensiones restantes y la tabla de hechos

In [24]:
def dimension_fecha(aeropuertos, vuelos):
    fecha = vuelos.select("ano", "mes").distinct().dropna()
    fecha = fecha.withColumn("id_fecha", monotonically_increasing_id()+1)
    def trimestre (mes):
        if mes < 4:
            return(1)
        elif mes < 7:
            return(2)
        elif mes < 10:
            return(3)
        else:
            return(4)
    trimestre_udf = udf(lambda x: trimestre(x), IntegerType())
    fecha = fecha.withColumn("trimestre", trimestre_udf(fecha["mes"]))
    
    return(fecha)

def dimension_aerolinea(aeropuertos, vuelos):
    aerolinea = vuelos.select("empresa").distinct().sort("empresa").dropna()
    aerolinea = aerolinea.withColumn("id_aerolinea", monotonically_increasing_id()+1)
    return(aerolinea)

def dimension_operacion(aeropuertos, vuelos):
    operacion = vuelos.select("tipo_vuelo").distinct().dropna()
    operacion = operacion.withColumn("id_operacion", monotonically_increasing_id()+1)
    return(operacion)

def dimension_avion(aeropuertos, vuelos):
    avion = vuelos.select("tipo_equipo").distinct().dropna()
    avion = avion.withColumn("id_avion", monotonically_increasing_id()+1)
    return(avion)

def dimension_trafico(aeropuertos, vuelos):
    trafico = vuelos.select("trafico").distinct().dropna()
    trafico = trafico.withColumn("id_trafico", monotonically_increasing_id()+1)
    return(trafico)

def tabla_hechos_vuelos(aeropuertos, vuelos, aerolinea,operacion, avion, trafico, fecha, aeropuerto):
    vuelos = vuelos.join(aerolinea, how='inner', on='empresa')
    vuelos = vuelos.join(operacion, how='inner', on='tipo_vuelo')
    vuelos = vuelos.join(avion, how='inner', on='tipo_equipo')
    vuelos = vuelos.join(trafico, how='inner', on='trafico')
    vuelos = vuelos.join(fecha, how='inner', on=["ano", "mes"])
    vuelos = vuelos.join(aeropuerto.withColumnRenamed('sigla', 'origen'), how='inner', on = ["origen", "ano"])
    vuelos = vuelos.withColumnRenamed("id_aeropuerto", "id_origen")
    vuelos = vuelos.join(aeropuerto.withColumnRenamed('sigla', 'destino'), how='inner', on = ["destino", "ano"])
    vuelos = vuelos.withColumnRenamed("id_aeropuerto", "id_destino")

    
    tabla_hechos = vuelos.select("vuelos", "sillas", "pasajeros", "carga_ofrecida", "carga_bordo",
                                "id_aerolinea", "id_operacion", "id_avion", "id_trafico", "id_fecha", "id_origen", "id_destino")
    
    tabla_hechos = tabla_hechos.withColumn('id_aerolinea', functions.when(tabla_hechos['id_aerolinea'] == 'null', '-1').otherwise(tabla_hechos['id_aerolinea']))
    tabla_hechos = tabla_hechos.withColumn('id_operacion', functions.when(tabla_hechos['id_operacion'] == 'null', '-1').otherwise(tabla_hechos['id_operacion']))
    tabla_hechos = tabla_hechos.withColumn('id_avion', functions.when(tabla_hechos['id_avion'] == 'null', '-1').otherwise(tabla_hechos['id_avion']))
    tabla_hechos = tabla_hechos.withColumn('id_trafico', functions.when(tabla_hechos['id_trafico'] == 'null', '-1').otherwise(tabla_hechos['id_trafico']))
    tabla_hechos = tabla_hechos.withColumn('id_fecha', functions.when(tabla_hechos['id_fecha'] == 'null', '-1').otherwise(tabla_hechos['id_fecha']))
    tabla_hechos = tabla_hechos.withColumn('id_origen', functions.when(tabla_hechos['id_origen'] == 'null', '-1').otherwise(tabla_hechos['id_origen']))
    tabla_hechos = tabla_hechos.withColumn('id_destino', functions.when(tabla_hechos['id_destino'] == 'null', '-1').otherwise(tabla_hechos['id_destino']))
    
    return(tabla_hechos)

### Creamos el proceso de ETL

en este paso llamamos las funciones creadas anteriormente y guardamos cada tabla en un csv diferente. 

In [26]:
def ETL(df_aeropuertos, df_vuelos):
    PATH='modelo/'
    
    fecha = dimension_fecha(df_aeropuertos, df_vuelos)
    fecha.toPandas().to_csv(PATH+'dimension_fecha.csv')
    print("Dimensión Fecha: \n")
    print("Las dimensiones de la base de datos son: ", (fecha.count(), len(fecha.columns)), "\n")
    print("Las columnas son: ", fecha.columns, "\n")
    print(fecha.show(5))
    print("\n")
    
    
    aerolinea = dimension_aerolinea(df_aeropuertos, df_vuelos)
    aerolinea.toPandas().to_csv(PATH+'dimension_aerolinea.csv')
    print("Dimensión aerolinea: \n")
    print("Las dimensiones de la base de datos son: ", (aerolinea.count(), len(aerolinea.columns)), "\n")
    print("Las columnas son: ", aerolinea.columns, "\n")
    print(aerolinea.show(5))
    print("\n")
    
    operacion = dimension_operacion(df_aeropuertos, df_vuelos)
    operacion.toPandas().to_csv(PATH+'dimension_operacion.csv')
    print("Dimensión operacion: \n")
    print("Las dimensiones de la base de datos son: ", (operacion.count(), len(operacion.columns)), "\n")
    print("Las columnas son: ", operacion.columns, "\n")
    print(operacion.show(5))
    print("\n")
    
    avion = dimension_avion(df_aeropuertos, df_vuelos)
    avion.toPandas().to_csv(PATH+'dimension_avion.csv')
    print("Dimensión avion: \n")
    print("Las dimensiones de la base de datos son: ", (avion.count(), len(avion.columns)), "\n")
    print("Las columnas son: ", avion.columns, "\n")
    print(avion.show(5))
    print("\n")
    
    trafico = dimension_trafico(df_aeropuertos, df_vuelos)
    trafico.toPandas().to_csv(PATH+'dimension_trafico.csv')
    print("Dimensión trafico: \n")
    print("Las dimensiones de la base de datos son: ", (trafico.count(), len(trafico.columns)), "\n")
    print("Las columnas son: ", trafico.columns, "\n")
    print(trafico.show(5))
    print("\n")
      
    
    aeropuertos.toPandas().to_csv(PATH+'dimension_aeropuerto_historia.csv')
    print("Dimensión aeropuerto con historia: \n")
    print("Las dimensiones de la base de datos son: ", (aeropuertos.count(), len(aeropuertos.columns)), "\n")
    print("Las columnas son: ", aeropuertos.columns, "\n")
    print(aeropuertos.show(5))
    print("\n")
    
    tabla_hechos = tabla_hechos_vuelos(df_aeropuertos, df_vuelos, aerolinea,operacion, avion, trafico, fecha, aeropuerto_historia)
    tabla_hechos.toPandas().to_csv(PATH+'tabla_hechos.csv')
    print("Tabla de Hechos de Vuelos: \n")
    print("Las dimensiones de la base de datos son: ", (tabla_hechos.count(), len(tabla_hechos.columns)), "\n")
    print("Las columnas son: ", tabla_hechos.columns, "\n")
    print(tabla_hechos.show(5))
    print("\n")

## Ejecutamos lo construido

In [28]:
ETL(df_aeropuertos, df_vuelos)

Dimensión Fecha: 

Las dimensiones de la base de datos son:  (84, 4) 

Las columnas son:  ['ano', 'mes', 'id_fecha', 'trimestre'] 

+----+---+------------+---------+
| ano|mes|    id_fecha|trimestre|
+----+---+------------+---------+
|2012| 10| 17179869185|        4|
|2010|  7| 42949672961|        3|
|2010| 12| 42949672962|        4|
|2015|  2| 60129542145|        1|
|2014|  4|128849018881|        2|
+----+---+------------+---------+
only showing top 5 rows

None


Dimensión aerolinea: 

Las dimensiones de la base de datos son:  (86, 2) 

Las columnas son:  ['empresa', 'id_aerolinea'] 

+--------------------+------------+
|             empresa|id_aerolinea|
+--------------------+------------+
| AER CARIBE LIMITADA|           1|
|          AERO APOYO|  8589934593|
|AERO SERVICIOS ES...| 17179869185|
|AERO TAXI GUAYMAR...| 25769803777|
|AEROCHARTER ANDIN...| 34359738369|
+--------------------+------------+
only showing top 5 rows

None


Dimensión operacion: 

Las dimensiones de la base 

NameError: name 'aeropuerto_historia' is not defined

## Función de actualización 

Como se explicó en la Wiki, se creó una función de actualización que podría ser usada para realizar la actualización de los datos, cuando estos son actualizados de forma periodica con periodos mas cortos de un año y en los cuales debido a la naturaleza de los datos sería de mayor relevancia utilizarla. 

Inicialmente se cargan los datos de la base de datos antigua,  eliminamos los registros con fechas de construccióin superior a 2013 y eliminamos los registros repetidos.

In [36]:
df_aeropuertos = spark.read.load("aeropuertos.csv",format="csv", sep=",", inferSchema="true", header="true")
df_aeropuertos = df_aeropuertos.replace("nan", None)
df_aeropuertos = df_aeropuertos.drop("_c0")
df_aeropuertos = df_aeropuertos.withColumn("pbmo",df_aeropuertos.pbmo.cast(DoubleType()))
df_aeropuertos = df_aeropuertos.withColumn("fecha_construccion",df_aeropuertos.fecha_construccion.cast(DateType()))
df_aeropuertos = df_aeropuertos.withColumn("fecha_vigencia",df_aeropuertos.fecha_vigencia.cast(DateType()))
df_aeropuertos = df_aeropuertos.withColumn("numero_vuelos_origen",df_aeropuertos.numero_vuelos_origen.cast(IntegerType()))
df_aeropuertos = df_aeropuertos.withColumn("gcd_departamento",df_aeropuertos.gcd_departamento.cast(StringType()))
df_aeropuertos = df_aeropuertos.withColumn("gcd_municipio",df_aeropuertos.gcd_municipio.cast(StringType()))
df_aeropuertos = df_aeropuertos.filter("fecha_construccion < date'2014-01-01'")
df_aeropuertos = df_aeropuertos.drop_duplicates(subset=['sigla'])
df_aeropuertos.show()

+-----+----+--------------------+--------------------+------------------+-------------+-------+--------+--------------------+--------------------+--------------+-----------+-------+---------+----------+------------------+--------------+-----+----------+--------------------+----------------+-------------+
|sigla|iata|              nombre|           municipio|      departamento|    categoria|latitud|longitud|         propietario|          explotador|longitud_pista|ancho_pista|   pbmo|elevacion|resolucion|fecha_construccion|fecha_vigencia|clase|      tipo|numero_vuelos_origen|gcd_departamento|gcd_municipio|
+-----+----+--------------------+--------------------+------------------+-------------+-------+--------+--------------------+--------------------+--------------+-----------+-------+---------+----------+------------------+--------------+-----+----------+--------------------+----------------+-------------+
|  9CL|null|        LA VENTUROSA|San Luis de Palenque|          Casanare|    Aeród

In [37]:
def dimension_aeropuerto(aeropuertos, vuelos): 
    from pyspark.sql.functions import lit
    aeropuerto = aeropuertos.select("iata", "sigla", "nombre", "latitud", "longitud", "longitud_pista", 
                                    "ancho_pista", "pbmo", "elevacion", "resolucion", "numero_vuelos_origen",
                                   "explotador", "propietario", "categoria",
                                   "tipo", "clase", "departamento", "municipio", "gcd_departamento", "gcd_municipio", 
                                   "fecha_construccion", "fecha_vigencia")
    
    aeropuerto = aeropuerto.withColumn("InicioVigencia", aeropuerto["fecha_construccion"])
    aeropuerto = aeropuerto.withColumn("FinalVigencia", lit("2050-1-1"))
    aeropuerto = aeropuerto.withColumn("FinalVigencia",aeropuerto.FinalVigencia.cast(DateType()))
    aeropuerto = aeropuerto.withColumn("VersionActual", lit("S"))
    
    aeropuerto = aeropuerto.withColumn("id_aeropuerto", monotonically_increasing_id())

    return(aeropuerto)

In [38]:
aeropuerto_dw_df = dimension_aeropuerto(df_aeropuertos, df_vuelos)
from pyspark.sql.window import Window

window = Window.partitionBy(aeropuerto_dw_df['id_aeropuerto']).orderBy(aeropuerto_dw_df['VersionActual'].desc())
aeropuerto_dw_df = aeropuerto_dw_df.select('*', functions.rank().over(window).alias('rank')).filter(functions.col('rank') == 1) 
aeropuerto_dw_df = aeropuerto_dw_df.drop('rank')
aeropuerto_dw_df.show()

+----+-----+--------------------+-------+--------+--------------+-----------+-------+---------+----------+--------------------+--------------------+--------------------+-------------+----------+-----+----------------+--------------------+----------------+-------------+------------------+--------------+--------------+-------------+-------------+-------------+
|iata|sigla|              nombre|latitud|longitud|longitud_pista|ancho_pista|   pbmo|elevacion|resolucion|numero_vuelos_origen|          explotador|         propietario|    categoria|      tipo|clase|    departamento|           municipio|gcd_departamento|gcd_municipio|fecha_construccion|fecha_vigencia|InicioVigencia|FinalVigencia|VersionActual|id_aeropuerto|
+----+-----+--------------------+-------+--------+--------------+-----------+-------+---------+----------+--------------------+--------------------+--------------------+-------------+----------+-----+----------------+--------------------+----------------+-------------+---------

In [33]:
from pyspark.sql.window import Window

def aeropuerto_actualizacion(aeropuerto_df, aeropuerto_dw_df):
    from pyspark.sql.window import Window
    """Retorna los registros de  que se deben insertar y los que se deben actualizar según un manejo de historia tipo 2.

 
    """

    #Se utiliza un Window para extraer, del dataframe pasado por parámetro, por cada wwi_stock_item_id, la versión mayor.
    #Es decir, por cada llave natural, extrae la última versión.
    window = Window.partitionBy(aeropuerto_dw_df['id_aeropuerto']).orderBy(aeropuerto_dw_df['VersionActual'].desc())
    aeropuerto_dw_df = aeropuerto_dw_df.select('*', functions.rank().over(window).alias('rank')).filter(functions.col('rank') == 1) 
    aeropuerto_dw_df = aeropuerto_dw_df.drop('rank')
    
    #Renombra columnas del DataWarehouse, poniendo prefijo dw_ para evitar confusiones
    aeropuerto_dw_df = aeropuerto_dw_df.selectExpr("iata as dw_iata", "sigla as dw_sigla", "nombre as dw_nombre", "latitud as dw_latitud", "longitud as dw_longitud", "longitud_pista as dw_longitud_pista", 
                                    "ancho_pista as dw_ancho_pista", "pbmo as dw_pbmo", "elevacion as dw_elevacion", "resolucion as dw_resolucion", "numero_vuelos_origen as dw_numero_vuelos_origen",
                                   "explotador as dw_explotador", "propietario as dw_propietario", "categoria as dw_categoria",
                                   "tipo as dw_tipo", "clase as dw_clase", "departamento as dw_departamento", "municipio as dw_municipio", "gcd_departamento as dw_gcd_departamento", "gcd_municipio as dw_gcd_municipio", 
                                   "fecha_construccion as dw_fecha_construccion", "fecha_vigencia as dw_fecha_vigencia", "id_aeropuerto as dw_id_aeropuerto", "VersionActual as VersionActual",
                                                  "InicioVigencia as InicioVigencia", "FinalVigencia as FinalVigencia")

    #Realiza left join entre stockitems que se están procesando y stockitems en el DW.
    df = aeropuerto_df.join(aeropuerto_dw_df.withColumnRenamed('dw_sigla', 'sigla'), how = 'left', on ='sigla')

    #Encontar llave máxima
    max_key = df.agg({"dw_id_aeropuerto": "max"}).collect()[0][0]
    
    if max_key is None:
        max_key = 1
    
    #cambio = 0, ya existía esa llave natural y no cambió.
    #cambio = 1, ya existía llave natural y cambió registro.
    #cambio = -1, no existía llave natural
    df = df.withColumn('cambio', functions.when(df['dw_id_aeropuerto'].isNull(), -1)\
        .when(((df['nombre'] == df['dw_nombre']) | (df['nombre'].isNull() & df['dw_nombre'].isNull()))&
               ((df['longitud_pista'] == df['dw_longitud_pista']) | (df['longitud_pista'].isNull() & df['dw_longitud_pista'].isNull()))&
               ((df['latitud'] == df['dw_latitud']) | (df['latitud'].isNull() & df['dw_latitud'].isNull()))&
               ((df['longitud'] == df['dw_longitud']) | (df['longitud'].isNull() & df['dw_longitud'].isNull()))&
               ((df['explotador'] == df['dw_explotador']) | (df['explotador'].isNull() & df['dw_explotador'].isNull()))&
               ((df['propietario'] == df['dw_propietario']) | (df['propietario'].isNull() & df['dw_propietario'].isNull()))&
               ((df['categoria'] == df['dw_categoria']) | (df['categoria'].isNull() & df['dw_categoria'].isNull()))&
               ((df['tipo'] == df['dw_tipo']) | (df['tipo'].isNull() & df['dw_tipo'].isNull()))&
               ((df['clase'] == df['dw_clase']) | (df['clase'].isNull() & df['dw_clase'].isNull()))&
               ((df['departamento'] == df['dw_departamento']) | (df['departamento'].isNull() & df['dw_departamento'].isNull()))&
               ((df['municipio'] == df['dw_municipio']) | (df['municipio'].isNull() & df['dw_municipio'].isNull()))&
               ((df['gcd_departamento'] == df['dw_gcd_departamento']) | (df['gcd_departamento'].isNull() & df['dw_gcd_departamento'].isNull()))&
               ((df['gcd_municipio'] == df['dw_gcd_municipio']) | (df['gcd_municipio'].isNull() & df['dw_gcd_municipio'].isNull()))&
               ((df['fecha_vigencia'] == df['dw_fecha_vigencia']) | (df['fecha_vigencia'].isNull() & df['dw_fecha_vigencia'].isNull())), 0)\
            .otherwise(1))
    #caso 0: Se quitan del df.
    df = df.where(df['cambio'] != 0)
    
    #Nuevos ids
    df = df.withColumn('new_id', functions.monotonically_increasing_id() + max_key)
    
    #caso -1: Donde no se hizo el join (i.e, los registros transaccionales nuevos)
    df = df.withColumn('dw_VersionActual', functions.when(df['cambio'] == -1, "S").otherwise(df['VersionActual']))
    df = df.withColumn('dw_InicioVigencia', functions.when(df['cambio'] == -1, df["fecha_construccion"]).otherwise(df['InicioVigencia']))
    df = df.withColumn('dw_FinalVigencia', functions.when(df['cambio'] == -1, functions.to_date(functions.lit('2050-01-01'), 'yyyy-MM-dd')).otherwise(functions.to_date(functions.lit(str(df["Ano"]-1)+'-12-31'), 'yyyy-MM-dd')))
    df = df.withColumn('insert', functions.when(df['cambio'] == -1, 1).otherwise(0))
    
    
    #caso 1:
    # 1.1 Es necesario editar existentes: poner dw_date_to como fecha actual. Después, actualizar en la base de datos (NO INSERTAR)
    # 1.2 Es necesario crear nueva fila, identica a anterior excepto que : _version = _version + 1, date_from = hoy, date_to = 2199-12-31
    df_dup = df.where(df['cambio'] == 1)
    #1.2
    df_dup = df_dup.withColumn('dw_VersionActual', functions.lit("S"))
    df_dup = df_dup.withColumn('dw_InicioVigencia', functions.to_date(functions.lit(str(df_dup["Ano"]-1)+'-12-31'), 'yyyy-MM-dd'))
    df_dup = df_dup.withColumn('dw_FinalVigencia', functions.to_date(functions.lit('2050-12-31'), 'yyyy-MM-dd'))
    df_dup = df_dup.withColumn('insert',functions.lit(1))
    
    
    #Juntar tablas
    df = df.union(df_dup)
    
    #Eliminar columna 'cambio'
    df = df.drop('cambio')

    return df


In [39]:
aeropuertos_df = spark.read.load("aeropuertos_cambios_infraestructura.csv",format="csv", sep=",", inferSchema="true", header="true")
aeropuertos_df = aeropuertos_df.replace("nan", None)
aeropuertos_df = aeropuertos_df.drop("_c0")
aeropuertos_df = aeropuertos_df.withColumn("pbmo",aeropuertos_df.pbmo.cast(DoubleType()))
aeropuertos_df = aeropuertos_df.withColumn("fecha_construccion",aeropuertos_df.fecha_construccion.cast(DateType()))
aeropuertos_df = aeropuertos_df.withColumn("fecha_vigencia",aeropuertos_df.fecha_vigencia.cast(DateType()))
aeropuertos_df = aeropuertos_df.withColumn("numero_vuelos_origen",aeropuertos_df.numero_vuelos_origen.cast(IntegerType()))
aeropuertos_df = aeropuertos_df.withColumn("Ano",aeropuertos_df.Ano.cast(IntegerType()))
aeropuertos_df = aeropuertos_df.withColumn("gcd_departamento",aeropuertos_df.gcd_departamento.cast(StringType()))
aeropuertos_df = aeropuertos_df.withColumn("gcd_municipio",aeropuertos_df.gcd_municipio.cast(StringType()))

In [41]:
aeropuerto_dw_df = dimension_aeropuerto(df_aeropuertos, df_vuelos)

In [42]:
aeropuerto_dw_df = dimension_aeropuerto(df_aeropuertos, df_vuelos)


resultado = aeropuerto_actualizacion(aeropuertos_df.where(aeropuertos_df['Ano'] == 2014), aeropuerto_dw_df)

In [43]:
resultado.show()

+-----+----+--------------------+--------------------+------------+---------+-------+--------+--------------------+--------------------+--------------+-----------+-------+---------+----------+------------------+--------------+-----+----------+--------------------+----------------+-------------+----+-------+--------------------+----------+-----------+-----------------+--------------+-------+------------+-------------+-----------------------+--------------------+--------------------+------------+----------+--------+---------------+-------------------+-------------------+----------------+---------------------+-----------------+----------------+-------------+--------------+-------------+-------------+----------------+-----------------+----------------+------+
|sigla|iata|              nombre|           municipio|departamento|categoria|latitud|longitud|         propietario|          explotador|longitud_pista|ancho_pista|   pbmo|elevacion|resolucion|fecha_construccion|fecha_vigencia|clase|  