In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import functions as f
from pymssql import _mssql
from pyspark.sql.functions import monotonically_increasing_id as mid

In [2]:
appName = "PySpark SQL Server via JDBC"
master = "local"
conf = SparkConf() \
    .setAppName(appName) \
    .setMaster(master) \
    .set("spark.driver.extraClassPath","C:\\Users\\estudiante\\mssql-jdbc-9.2.1.jre8.jar")
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)
spark = sql_context.sparkSession

In [3]:
server='localhost:1433'
database = "ProyectoAeropuertosSemana8"
user = "sa"
password  = "12345"

In [4]:
conn = _mssql.connect(server=server, user=user, password=password,database=database)
query = f"-- Create schemas\
-- Create tables\
IF (NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'DimFecha'))\
BEGIN\
  CREATE TABLE DimFecha\
  (\
    IDFecha INT NOT NULL,\
    Año VARCHAR(4),\
    Mes VARCHAR(2),\
    PRIMARY KEY(IDFecha)\
  )\
END;\
\
IF (NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'DimTipo_Equipo'))\
BEGIN\
  CREATE TABLE DimTipo_Equipo\
  (\
    IDEquipo INT NOT NULL,\
    NombreEquipo VARCHAR(4),\
    PRIMARY KEY(IDEquipo)\
  )\
END;\
\
IF (NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'DimTipoVuelo'))\
BEGIN\
  CREATE TABLE DimTipoVuelo\
  (\
    IDTipoVuelo INT NOT NULL,\
    CodigoVuelo VARCHAR(1),\
    TipoVuelo VARCHAR(10),\
    PRIMARY KEY(IDTipoVuelo)\
  )\
END;\
\
IF (NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'DimTipo_Trafico'))\
BEGIN\
  CREATE TABLE DimTipo_Trafico\
  (\
    IDTipoTrafico INT NOT NULL,\
    Codigo_Trafico VARCHAR(1),\
    Descripcion VARCHAR(10),\
    PRIMARY KEY(IDTipoTrafico)\
  )\
END;\
\
IF (NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'DimEmpresaTransportadora'))\
BEGIN\
  CREATE TABLE DimEmpresaTransportadora\
  (\
    IDEmpresa INT NOT NULL,\
    NombreEmpresa VARCHAR(50),\
    PRIMARY KEY(IDEmpresa)\
  )\
END;\
\
IF (NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'FactVuelos'))\
BEGIN\
  CREATE TABLE FactVuelos\
  (\
    ID INT NOT NULL,\
    IDFecha INT,\
    IDTipoEquipo INT,\
    IDAeropuertoOrigen INT,\
    IDAeropuertoDestino INT,\
    IDTipoVuelo INT,\
    IDTipoTrafico INT,\
    IDEmpresa INT,\
    Vuelos INT,\
    Pasajeros INT,\
    CargaBordo INT,\
    TotalSillas INT,\
    TotalCarga INT,\
    PRIMARY KEY(ID)\
  )\
END;\
\
IF (NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'DimAeropuerto'))\
BEGIN\
  CREATE TABLE DimAeropuerto\
  (\
    IDAeropuerto INT NOT NULL,\
    Sigla CHARACTER(3),\
    IATA CHARACTER(3),\
    Ubicacion VARCHAR(50),\
    NombreAeropuerto VARCHAR(50),\
    Municipio VARCHAR(50),\
    Departamento VARCHAR(50),\
    Pais VARCHAR(50),\
    Categoria VARCHAR(50),\
    Latitud FLOAT,\
    Longitud FLOAT,\
    Propietario VARCHAR(50),\
    Explotador VARCHAR(50),\
    LongitudPista INT,\
    AnchoPista INT,\
    PBMO INT,\
    Elevacion INT,\
    Resolucion VARCHAR(50),\
    Clase VARCHAR(50),\
    Tipo VARCHAR(50),\
    GCD_Municipio VARCHAR(50),\
    GCD_Departamento VARCHAR(50),\
    FechaInicioVigencia DATE,\
    FechaFinVigencia DATE,\
    VersionDelRegistro VARCHAR(1),\
    Anio INT,\
    IDPIB INT,\
    PRIMARY KEY(IDAeropuerto)\
  )\
END;\
IF (NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'DimInformacionPIB'))\
BEGIN\
CREATE TABLE DimInformacionPIB\
(\
    IDPIB INT NOT NULL,\
    PIB INT,\
    FechaInicioVigencia DATE,\
    FechaFinVigencia DATE,\
    VersionRegistro VARCHAR(1),\
    PRIMARY KEY(IDPIB)\
)\
END;"
conn.execute_query(query)

In [5]:
dfvuelos = spark.read.format("csv").load("vuelosEtapa3.csv",format="csv",sep=",",
                                         inferSchema='true',header='true')
dfaeropuertos =spark.read.format("csv").load("aeropuertosEtapa3.csv",format="csv",sep=";",
                                         inferSchema='true',header='true')
dfaeropuertosdelmundo=spark.read.format("csv").load("Aeropuertosdelmundo.csv",format="csv",sep=";",
                                         inferSchema='true',header='true')
dfPIB=spark.read.format("csv").load("InformacionPIB.csv",format="csv",sep=",",
                                         inferSchema='true',header='true')

In [6]:
from pyspark.sql.functions import sequence, to_date, explode, col,when,lit,expr,substring,regexp_replace
from pyspark.sql import functions as sf

In [7]:
#Agregar nombres de los aeropuertos internacionales
dfaeropuertosdelmundo=dfaeropuertosdelmundo.dropDuplicates()
dfaeropuertosdelmundo_origen=dfaeropuertosdelmundo.selectExpr('Origen as sigla', 'Ciudad_Origen as municipio',
                                                              'APTO_ORIGEN as nombre', 'Pais_Origen as pais')
dfaeropuertosdelmundo_destino=dfaeropuertosdelmundo.selectExpr('Destino as sigla', 'Ciudad_Destino as municipio',
                                                              'APTO_DESTINO as nombre', 'Pais_Destino as pais')
dfaeropuertosdelmundo = dfaeropuertosdelmundo_origen.union(dfaeropuertosdelmundo_destino).where("pais is not null") 
dfaeropuertosdelmundo=dfaeropuertosdelmundo.dropDuplicates().filter(dfaeropuertosdelmundo.pais!="COLOMBIA")
dfaeropuertosdelmundo1=dfaeropuertosdelmundo.filter(dfaeropuertosdelmundo.pais!="COLOMBIA")
dfaeropuertosdelmundo2=dfaeropuertosdelmundo.filter(dfaeropuertosdelmundo.pais=="COLOMBIA")

In [8]:
dfaeropuertos=dfaeropuertos.withColumn("pais",lit("COLOMBIA")).withColumn("ubicacion",lit("Nacional"))
dfaeropuertosdelmundo1=dfaeropuertosdelmundo1.withColumn("Ano",lit("2013")).withColumn("ubicacion",lit("Internacional"))
dfaeropuertosdelmundo2=dfaeropuertosdelmundo2.withColumn("Ano",lit("2013")).withColumn("ubicacion",lit("Nacional"))
dfaeropuertos1=dfaeropuertosdelmundo1.unionByName(dfaeropuertosdelmundo2, allowMissingColumns=True)
dfaeropuertos=dfaeropuertos.unionByName(dfaeropuertosdelmundo1, allowMissingColumns=True)

In [9]:
#TransformacionInformacionPIB
dfPIBUnpivot=dfPIB.withColumnRenamed("DEPARTAMENTOS","Departamento").withColumnRenamed("2014","Ano2014").withColumnRenamed("2015","Ano2015").withColumnRenamed("2016","Ano2016")\
             .withColumnRenamed("2017","Ano2017").withColumnRenamed("2018","Ano2018").withColumnRenamed("2019","Ano2019")\
             .withColumnRenamed("2020","Ano2020")
unpivotExpr = "stack(7, 'Ano2014',Ano2014, 'Ano2015', Ano2015, 'Ano2016', Ano2016,'Ano2017', Ano2017,'Ano2018', Ano2018,'Ano2019', Ano2019,'Ano2020', Ano2020) as (Anio,PIB)"
dfPIBUnpivot = dfPIBUnpivot.selectExpr("Codigo","Departamento", unpivotExpr).where("PIB is not null")    
dfPIBUnpivot=dfPIBUnpivot.withColumn('Anio', substring('Anio', 4,4))
dfPIBUnpivot=dfPIBUnpivot.withColumn('Departamento', 
    when(dfPIBUnpivot.Departamento.startswith('San Andrés'),'San Andrés islas') \
   .when(dfPIBUnpivot.Departamento.startswith('BOG'),'Bogotá, D.C.') \
   .otherwise(dfPIBUnpivot.Departamento))

In [10]:
dfBIPparte1 = dfPIBUnpivot.filter(dfPIBUnpivot.Anio=="2014")
dfBIPparte2 = dfPIBUnpivot.filter(dfPIBUnpivot.Anio=="2015")
dfBIPparte3 = dfPIBUnpivot.filter(dfPIBUnpivot.Anio=="2016")
dfBIPparte4 = dfPIBUnpivot.filter(dfPIBUnpivot.Anio=="2017")
dfBIPparte5 = dfPIBUnpivot.filter(dfPIBUnpivot.Anio=="2018")

In [11]:
def load_table(spark, jdbc_hostname, database, data_table, username, password):
    jdbc_url = "jdbc:sqlserver://{0};database={1}".format(jdbc_hostname, database)

    connection_details = {
        "user": username,
        "password": password,
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
    }

    df = spark.read.jdbc(url=jdbc_url, table=data_table, properties=connection_details)
    return df

def write_table(spark, jdbc_hostname, database, data_table, username, password,mode,df):
    jdbc_url = "jdbc:sqlserver://{0};database={1}".format(jdbc_hostname, database)

    connection_details = {
        "user": username,
        "password": password,
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
        
        
    }

    df.write.jdbc(url=jdbc_url, table=data_table, properties=connection_details, mode=mode)
    return df

In [12]:
from pyspark.sql.types import DateType
def actualizar_historia_PIB(df_a_cargar):
    tablaDWH = load_table(spark, server, database, 'DimInformacionPIB', user, password)
    
    df=df_a_cargar.selectExpr('Departamento as Departamento', 'Anio as Anio', 'PIB as PIB')
    if tablaDWH.count()==0:
        
        df=df.withColumn("FechaInicioVigencia",lit("1900-01-01"))
        df=df.withColumn("FechaFinVigencia",lit("2300-01-01"))
        df=df.withColumn("VersionDelRegistro",lit("S"))
        df=df.dropDuplicates()
        df=df.sort(col("Departamento"))
        df = df.coalesce(1).withColumn("IDPIB", mid())
        df.createOrReplaceTempView("df")
        df = spark.sql("SELECT INT(IDPIB),STRING(Departamento),INT(PIB), DATE(FechaInicioVigencia),\
                       DATE(FechaFinVigencia), STRING(VersionDelRegistro),INT(Anio) from df")
        x=df.count()
        df=write_table(spark, server, database, 'DimInformacionPIB', user,password,'overwrite',df)
    else:
        tablaDWH.persist()
        df_fechas_antiguas=tablaDWH.selectExpr('Departamento','Departamento as DepartamentoAnt')
        df_fechas_antiguas=df_fechas_antiguas.dropDuplicates()
        df_nuevo=df.selectExpr('Departamento','Anio as AnioNuevo')
        df_nuevo=df_nuevo.dropDuplicates()
        df_temp=tablaDWH.join(df_nuevo, how = 'left', on = 'Departamento')
        #registros que no tienen actualizacion
        df_temp.persist()
        df_mantener=df_temp.filter(df_temp.AnioNuevo.isNull())
        df_mantener=df_mantener.drop('AnioNuevo')
        df_mantener=df_mantener.withColumn("Origen",lit("Mantener"))
        #registros que si tienen actualización
        df_actualizar=df_temp.filter(df_temp.AnioNuevo.isNotNull())
        #registros viejos que no van a cambiar
        df_actualizar.persist()
        df_actualizar_registrosviejos=df_actualizar.filter(df_actualizar.VersionDelRegistro=="N")
        df_actualizar_registrosviejos=df_actualizar_registrosviejos.drop('AnioNuevo')
        #actualización de registros que eran vigentes
        df_actualizar_registrosvigentes=df_actualizar.filter(df_actualizar.VersionDelRegistro=="S")
        df_actualizar_registrosvigentes=df_actualizar_registrosvigentes.withColumn("VersionDelRegistro",lit("N"))
        df_actualizar_registrosvigentes=df_actualizar_registrosvigentes.withColumn("FechaFinVigencia",
                                                                                    sf.concat(sf.col('Anio'),sf.lit('-12-31'))
                                                                                   )    
        df_actualizar_registrosvigentes=df_actualizar_registrosvigentes.drop('AnioNuevo')
        #registros nuevos para ingresar a la base
        #Encontar llave máxima
        max_key = tablaDWH.agg({"IDPIB": "max"}).collect()[0][0]
        df_nuevos_registros=df.alias('df_nuevos_registros')
        df_nuevos_registros=df_nuevos_registros.join(df_fechas_antiguas,how = 'left', on = 'Departamento')
        df_nuevos_registros=df_nuevos_registros.withColumn("FechaInicioVigencia",when(df_nuevos_registros.DepartamentoAnt.isNull(),\
                                                                                     '1900-01-01')\
                                                           .otherwise(sf.concat(sf.col('Anio'),sf.lit('-01-01'))))
        df_nuevos_registros=df_nuevos_registros.withColumn("FechaFinVigencia",lit("2300-01-01"))
        df_nuevos_registros=df_nuevos_registros.withColumn("VersionDelRegistro",lit("S"))
        df_nuevos_registros = df_nuevos_registros.withColumn('IDPIB',  mid() + max_key+1)
        #unir en un solo dataframe
        df_mantener.createOrReplaceTempView("df_mantener")
        df_mantener = spark.sql("SELECT INT(IDPIB) ,STRING(Departamento),INT(PIB), DATE(FechaInicioVigencia),\
                       DATE(FechaFinVigencia), STRING(VersionDelRegistro),INT(Anio) from df_mantener")
        
        df_actualizar_registrosviejos.createOrReplaceTempView("df_actualizar_registrosviejos")
        df_actualizar_registrosviejos = spark.sql("SELECT INT(IDPIB) ,STRING(Departamento),INT(PIB), DATE(FechaInicioVigencia),\
                       DATE(FechaFinVigencia), STRING(VersionDelRegistro),INT(Anio) from df_actualizar_registrosviejos")
        
        df_actualizar_registrosvigentes.createOrReplaceTempView("df_actualizar_registrosvigentes")
        df_actualizar_registrosvigentes = spark.sql("SELECT INT(IDPIB) ,STRING(Departamento),INT(PIB), DATE(FechaInicioVigencia),\
                       DATE(FechaFinVigencia), STRING(VersionDelRegistro),INT(Anio) from df_actualizar_registrosvigentes")
        
        df_nuevos_registros.createOrReplaceTempView("df_nuevos_registros")
        df_nuevos_registros = spark.sql("SELECT INT(IDPIB) ,STRING(Departamento),INT(PIB), DATE(FechaInicioVigencia),\
                       DATE(FechaFinVigencia), STRING(VersionDelRegistro),INT(Anio) from df_nuevos_registros")
        
        df2 = df_nuevos_registros.union(df_mantener)
        df2 = df2.union(df_actualizar_registrosviejos)
        df2 = df2.union(df_actualizar_registrosvigentes)
        x=df2.count()
        df2=write_table(spark, server, database, 'DimInformacionPIB', user,password,'overwrite',df2)
    return x

In [13]:
x=actualizar_historia_PIB(dfBIPparte1)
x=actualizar_historia_PIB(dfBIPparte2)
x=actualizar_historia_PIB(dfBIPparte3)
x=actualizar_historia_PIB(dfBIPparte4)
x=actualizar_historia_PIB(dfBIPparte5)

In [14]:
# Creacion tabla DimFechaMes
df_fechames=dfvuelos.selectExpr('ano as Anio', 'mes as Mes')
df_fechames=df_fechames.dropDuplicates()
df_fechames=df_fechames.sort(col("Anio"),col('Mes'))
df_fechames = df_fechames.coalesce(1).withColumn("IDFecha", mid())
df_fechames=write_table(spark, server, database, 'DimFecha', user,password,'overwrite',df_fechames)
df_fechames.show(5)

+----+---+-------+
|Anio|Mes|IDFecha|
+----+---+-------+
|2010|  1|      0|
|2010|  2|      1|
|2010|  3|      2|
|2010|  4|      3|
|2010|  5|      4|
+----+---+-------+
only showing top 5 rows



In [15]:
# Creacion tabla DimTipoVuelo
df_tipo_vuelo=dfvuelos.selectExpr('tipo_vuelo as CodigoVuelo')
df_tipo_vuelo=df_tipo_vuelo.dropDuplicates()
df_tipo_vuelo=df_tipo_vuelo.withColumn("TipoVuelo", \
    when((df_tipo_vuelo.CodigoVuelo =="A"), "Adicionales") \
    .when((df_tipo_vuelo.CodigoVuelo =="C"),"Charter") \
    .when((df_tipo_vuelo.CodigoVuelo =="R"),"Regular") \
    .when((df_tipo_vuelo.CodigoVuelo =="T"),"Taxi") \
    .otherwise("nan") \
   )
df_tipo_vuelo=df_tipo_vuelo.sort(col("CodigoVuelo"))
df_tipo_vuelo = df_tipo_vuelo.coalesce(1).withColumn("IDTipoVuelo", mid())
df_tipo_vuelo=write_table(spark, server, database, 'DimTipoVuelo', user,password,'overwrite',df_tipo_vuelo)
df_tipo_vuelo.show(5)

+-----------+-----------+-----------+
|CodigoVuelo|  TipoVuelo|IDTipoVuelo|
+-----------+-----------+-----------+
|          A|Adicionales|          0|
|          C|    Charter|          1|
|          R|    Regular|          2|
|          T|       Taxi|          3|
+-----------+-----------+-----------+



In [16]:
# Creacion tabla DimTipo_Trafico
df_trafico=dfvuelos.selectExpr('trafico as Codigo_Trafico')
df_trafico=df_trafico.dropDuplicates()
df_trafico=df_trafico.withColumn("Descripcion", \
   when((df_trafico.Codigo_Trafico =="I"), "Internacional") \
   .when((df_trafico.Codigo_Trafico =="N"),"Nacional") \
   .when((df_trafico.Codigo_Trafico =="E"),"Externo") \
   .otherwise("nan") \
  )
df_trafico=df_trafico.sort(col("Codigo_Trafico"))
df_trafico = df_trafico.coalesce(1).withColumn("IDTipoTrafico", mid())
df_trafico=write_table(spark, server, database, 'DimTipo_Trafico', user,password,'overwrite',df_trafico)
df_trafico.show(5)

+--------------+-------------+-------------+
|Codigo_Trafico|  Descripcion|IDTipoTrafico|
+--------------+-------------+-------------+
|             I|Internacional|            0|
|             N|     Nacional|            1|
+--------------+-------------+-------------+



In [17]:
# Creacion tabla DimEmpresaTransportadora
df_empresatrans1=dfvuelos.selectExpr('empresa as NombreEmpresa')
df_empresatrans1=df_empresatrans1.dropDuplicates()
df_empresatrans1=df_empresatrans1.sort(col("NombreEmpresa"))
df_empresatrans1 = df_empresatrans1.coalesce(1).withColumn("IDEmpresa", mid())
df_empresatrans1=write_table(spark, server, database, 'DimEmpresaTransportadora', user,password,'overwrite',df_empresatrans1)
df_empresatrans1.show(5)

+--------------------+---------+
|       NombreEmpresa|IDEmpresa|
+--------------------+---------+
|"SERVICIO AÉREO R...|        0|
|              21 AIR|        1|
|                ABSA|        2|
|ABX AIR INC SUCUR...|        3|
| AER CARIBE LIMITADA|        4|
+--------------------+---------+
only showing top 5 rows



In [18]:
# Creacion tabla DimTipo_Equipo
df_tipoequipo=dfvuelos.selectExpr('tipo_equipo as NombreEquipo')
df_tipoequipo=df_tipoequipo.dropDuplicates()
df_tipoequipo=df_tipoequipo.sort(col("NombreEquipo"))
df_tipoequipo = df_tipoequipo.coalesce(1).withColumn("IDEquipo", mid())
df_tipoequipo=write_table(spark, server, database, 'DimTipo_Equipo', user,password,'overwrite',df_tipoequipo)
df_tipoequipo.show(5)

+------------+--------+
|NombreEquipo|IDEquipo|
+------------+--------+
|         318|       0|
|         319|       1|
|         330|       2|
|         332|       3|
|         727|       4|
+------------+--------+
only showing top 5 rows



In [19]:
dfaeropuertosconhistoriaparte0 = dfaeropuertos.filter(dfaeropuertos.Ano=="2013")
dfaeropuertosconhistoriaparte1 = dfaeropuertos.filter(dfaeropuertos.Ano=="2014")
dfaeropuertosconhistoriaparte2 = dfaeropuertos.filter(dfaeropuertos.Ano=="2015")
dfaeropuertosconhistoriaparte3 = dfaeropuertos.filter(dfaeropuertos.Ano=="2016")
dfaeropuertosconhistoriaparte4 = dfaeropuertos.filter(dfaeropuertos.Ano=="2017")
dfaeropuertosconhistoriaparte5 = dfaeropuertos.filter(dfaeropuertos.Ano=="2018")

In [20]:
# Actualización tabla DimAeropuertoHistoria
from pyspark.sql.types import DateType
def actualizar_historia_aeropuertos(df_a_cargar):
    InfoPIB = load_table(spark, server, database, 'DimInformacionPIB', user, password)
    tablaDWH = load_table(spark, server, database, 'DimAeropuerto', user, password)
    df=df_a_cargar.selectExpr('sigla as Sigla', 'iata as IATA', 'nombre as NombreAeropuerto','ubicacion as Ubicacion',
                                          'municipio as Municipio','departamento as Departamento', 'categoria as Categoria',
                                           'latitud as Latitud','longitud as Longitud', 'propietario as Propietario',
                                           'explotador as Explotador','longitud_pista as LongitudPista',
                                           'ancho_pista as AnchoPista', 'pbmo as PBMO', 'elevacion as Elevacion',
                                           'resolucion as Resolucion','clase as Clase', 'tipo as Tipo', 'pais as Pais',
                                           'gcd_municipio as GCD_Municipio', 'gcd_departamento as GCD_Departamento',
                                           'Ano as Anio')
    if tablaDWH.count()==0:
        
        df=df.withColumn("FechaInicioVigencia",lit("1900-01-01"))
        df=df.withColumn("FechaFinVigencia",lit("2300-01-01"))
        df=df.withColumn("VersionDelRegistro",lit("S"))
        df=df.dropDuplicates()
        df=df.sort(col("Sigla"))
        df = df.coalesce(1).withColumn("IDAeropuerto", mid())
        df.createOrReplaceTempView("df")
        InfoPIB.createOrReplaceTempView("InfoPIB")
        df = spark.sql("SELECT INT(df.IDAeropuerto), STRING(df.Sigla), STRING(df.IATA), STRING(df.NombreAeropuerto),\
                        STRING(df.Ubicacion),string(df.Pais),\
                        STRING(df.Categoria),DOUBLE(df.Latitud), DOUBLE(df.Longitud),STRING(df.Municipio), STRING(df.Departamento), \
                         STRING(df.Propietario),STRING(df.Explotador),INT(df.LongitudPista), INT(df.AnchoPista),\
                        STRING(df.PBMO),INT(df.Elevacion), STRING(df.Resolucion), STRING(df.Clase),\
                        STRING(df.Tipo),STRING(df.GCD_Municipio), STRING(df.GCD_Departamento), DATE(df.FechaInicioVigencia),\
                       DATE(df.FechaFinVigencia), STRING(df.VersionDelRegistro),INT(df.Anio), INT(InfoPIB.IDPIB) from df\
                       left join InfoPIB on InfoPIB.Departamento= df.Departamento and\
                             InfoPIB.Anio= df.Anio")
        
        x=df.count()
        df=write_table(spark, server, database, 'DimAeropuerto', user,password,'overwrite',df)
    else:
        tablaDWH.persist()
        df_fechas_antiguas=tablaDWH.selectExpr('Sigla','Sigla as SiglaAnt')
        df_fechas_antiguas=df_fechas_antiguas.dropDuplicates()
        df_nuevo=df.selectExpr('Sigla','Anio as AnioNuevo')
        df_nuevo=df_nuevo.dropDuplicates()
        df_temp=tablaDWH.join(df_nuevo, how = 'left', on = 'Sigla')
        #registros que no tienen actualizacion
        df_temp.persist()
        df_mantener=df_temp.filter(df_temp.AnioNuevo.isNull())
        df_mantener=df_mantener.drop('AnioNuevo')
        df_mantener=df_mantener.withColumn("Origen",lit("Mantener"))
        #registros que si tienen actualización
        df_actualizar=df_temp.filter(df_temp.AnioNuevo.isNotNull())
        #registros viejos que no van a cambiar
        df_actualizar.persist()
        df_actualizar_registrosviejos=df_actualizar.filter(df_actualizar.VersionDelRegistro=="N")
        df_actualizar_registrosviejos=df_actualizar_registrosviejos.drop('AnioNuevo')
        #actualización de registros que eran vigentes
        df_actualizar_registrosvigentes=df_actualizar.filter(df_actualizar.VersionDelRegistro=="S")
        df_actualizar_registrosvigentes=df_actualizar_registrosvigentes.withColumn("VersionDelRegistro",lit("N"))
        df_actualizar_registrosvigentes=df_actualizar_registrosvigentes.withColumn("FechaFinVigencia",
                                                                                    sf.concat(sf.col('Anio'),sf.lit('-12-31'))
                                                                                   )    
        df_actualizar_registrosvigentes=df_actualizar_registrosvigentes.drop('AnioNuevo')
        #registros nuevos para ingresar a la base
        #Encontar llave máxima
        max_key = tablaDWH.agg({"IDAeropuerto": "max"}).collect()[0][0]
        df_nuevos_registros=df.alias('df_nuevos_registros')
        df_nuevos_registros=df_nuevos_registros.join(df_fechas_antiguas,how = 'left', on = 'Sigla')
        df_nuevos_registros=df_nuevos_registros.withColumn("FechaInicioVigencia",when(df_nuevos_registros.SiglaAnt.isNull(),\
                                                                                     '1900-01-01')\
                                                           .otherwise(sf.concat(sf.col('Anio'),sf.lit('-01-01'))))
        df_nuevos_registros=df_nuevos_registros.withColumn("FechaFinVigencia",lit("2300-01-01"))
        df_nuevos_registros=df_nuevos_registros.withColumn("VersionDelRegistro",lit("S"))
        df_nuevos_registros = df_nuevos_registros.withColumn('IDAeropuerto',  mid() + max_key+1)
        #unir en un solo dataframe
        InfoPIB.createOrReplaceTempView("InfoPIB")
        df_mantener.createOrReplaceTempView("df_mantener")
        df_mantener = spark.sql("SELECT INT(df_mantener.IDAeropuerto), STRING(df_mantener.Sigla), STRING(df_mantener.IATA), STRING(df_mantener.NombreAeropuerto),\
                        STRING(df_mantener.Categoria),DOUBLE(df_mantener.Latitud), DOUBLE(df_mantener.Longitud),STRING(df_mantener.Municipio), STRING(df_mantener.Departamento), \
                        STRING(df_mantener.Propietario),STRING(df_mantener.Explotador),INT(df_mantener.LongitudPista), INT(df_mantener.AnchoPista),\
                        STRING(df_mantener.PBMO),INT(df_mantener.Elevacion), STRING(df_mantener.Resolucion), STRING(df_mantener.Clase),\
                        STRING(df_mantener.Ubicacion),string(df_mantener.Pais),\
                        STRING(df_mantener.Tipo),STRING(df_mantener.GCD_Municipio), STRING(df_mantener.GCD_Departamento), DATE(df_mantener.FechaInicioVigencia),\
                        DATE(df_mantener.FechaFinVigencia), STRING(df_mantener.VersionDelRegistro),INT(df_mantener.Anio), INT(InfoPIB.IDPIB) from df_mantener\
                       left join InfoPIB on InfoPIB.Departamento= df_mantener.Departamento and\
                             InfoPIB.Anio= df_mantener.Anio")
        
        df_actualizar_registrosviejos.createOrReplaceTempView("df_actualizar_registrosviejos")
        df_actualizar_registrosviejos = spark.sql("SELECT INT(df_actualizar_registrosviejos.IDAeropuerto), STRING(df_actualizar_registrosviejos.Sigla), STRING(df_actualizar_registrosviejos.IATA), STRING(df_actualizar_registrosviejos.NombreAeropuerto),\
                        STRING(df_actualizar_registrosviejos.Categoria),DOUBLE(df_actualizar_registrosviejos.Latitud), DOUBLE(df_actualizar_registrosviejos.Longitud),STRING(df_actualizar_registrosviejos.Municipio), STRING(df_actualizar_registrosviejos.Departamento), \
                        STRING(df_actualizar_registrosviejos.Propietario),STRING(df_actualizar_registrosviejos.Explotador),INT(df_actualizar_registrosviejos.LongitudPista), INT(df_actualizar_registrosviejos.AnchoPista),\
                        STRING(df_actualizar_registrosviejos.PBMO),INT(df_actualizar_registrosviejos.Elevacion), STRING(df_actualizar_registrosviejos.Resolucion), STRING(df_actualizar_registrosviejos.Clase),\
                        STRING(df_actualizar_registrosviejos.Ubicacion),string(df_actualizar_registrosviejos.Pais),\
                        STRING(df_actualizar_registrosviejos.Tipo),STRING(df_actualizar_registrosviejos.GCD_Municipio), STRING(df_actualizar_registrosviejos.GCD_Departamento), DATE(df_actualizar_registrosviejos.FechaInicioVigencia),\
                        DATE(df_actualizar_registrosviejos.FechaFinVigencia),\
                        STRING(df_actualizar_registrosviejos.VersionDelRegistro),INT(df_actualizar_registrosviejos.Anio), INT(InfoPIB.IDPIB) from df_actualizar_registrosviejos\
                       left join InfoPIB on InfoPIB.Departamento= df_actualizar_registrosviejos.Departamento and\
                             InfoPIB.Anio= df_actualizar_registrosviejos.Anio")
        
        df_actualizar_registrosvigentes.createOrReplaceTempView("df_actualizar_registrosvigentes")
        df_actualizar_registrosvigentes = spark.sql("SELECT INT(df_actualizar_registrosvigentes.IDAeropuerto), STRING(Sigla), STRING(IATA), STRING(NombreAeropuerto),\
                        STRING(Categoria),DOUBLE(Latitud), DOUBLE(Longitud),STRING(Municipio), STRING(df_actualizar_registrosvigentes.Departamento), \
                         STRING(Propietario),STRING(Explotador),INT(LongitudPista), INT(AnchoPista),\
                        STRING(PBMO),INT(Elevacion), STRING(Resolucion), STRING(Clase),\
                        STRING(Ubicacion),string(Pais),\
                        STRING(Tipo),STRING(GCD_Municipio), STRING(GCD_Departamento), DATE(df_actualizar_registrosvigentes.FechaInicioVigencia),\
                       DATE(df_actualizar_registrosvigentes.FechaFinVigencia), STRING(df_actualizar_registrosvigentes.VersionDelRegistro),\
                       INT(df_actualizar_registrosvigentes.Anio), INT(InfoPIB.IDPIB) from df_actualizar_registrosvigentes\
                       left join InfoPIB on InfoPIB.Departamento= df_actualizar_registrosvigentes.Departamento and\
                             InfoPIB.Anio= df_actualizar_registrosvigentes.Anio")
        
        df_nuevos_registros.createOrReplaceTempView("df_nuevos_registros")
        df_nuevos_registros = spark.sql("SELECT INT(df_nuevos_registros.IDAeropuerto), STRING(df_nuevos_registros.Sigla), STRING(df_nuevos_registros.IATA), STRING(df_nuevos_registros.NombreAeropuerto),\
                        STRING(df_nuevos_registros.Categoria),DOUBLE(df_nuevos_registros.Latitud), DOUBLE(df_nuevos_registros.Longitud),STRING(df_nuevos_registros.Municipio), STRING(df_nuevos_registros.Departamento), \
                         STRING(df_nuevos_registros.Propietario),STRING(df_nuevos_registros.Explotador),INT(df_nuevos_registros.LongitudPista), INT(df_nuevos_registros.AnchoPista),\
                        STRING(df_nuevos_registros.PBMO),INT(df_nuevos_registros.Elevacion), STRING(df_nuevos_registros.Resolucion), STRING(df_nuevos_registros.Clase),\
                        STRING(df_nuevos_registros.Ubicacion),string(df_nuevos_registros.Pais),\
                        STRING(df_nuevos_registros.Tipo),STRING(df_nuevos_registros.GCD_Municipio), STRING(df_nuevos_registros.GCD_Departamento), DATE(df_nuevos_registros.FechaInicioVigencia),\
                       DATE(df_nuevos_registros.FechaFinVigencia), STRING(df_nuevos_registros.VersionDelRegistro),\
                       INT(df_nuevos_registros.Anio), INT(InfoPIB.IDPIB) from df_nuevos_registros\
                       left join InfoPIB on InfoPIB.Departamento= df_nuevos_registros.Departamento and\
                             InfoPIB.Anio= df_nuevos_registros.Anio")
        
        df2 = df_nuevos_registros.union(df_mantener)
        df2 = df2.union(df_actualizar_registrosviejos)
        df2 = df2.union(df_actualizar_registrosvigentes)
        
        x=df2.count()
        df2=write_table(spark, server, database, 'DimAeropuerto', user,password,'overwrite',df2)
    return x

In [21]:
x=actualizar_historia_aeropuertos(dfaeropuertosconhistoriaparte0)
x=actualizar_historia_aeropuertos(dfaeropuertosconhistoriaparte1)
x=actualizar_historia_aeropuertos(dfaeropuertosconhistoriaparte2)
x=actualizar_historia_aeropuertos(dfaeropuertosconhistoriaparte3)
x=actualizar_historia_aeropuertos(dfaeropuertosconhistoriaparte4)
x=actualizar_historia_aeropuertos(dfaeropuertosconhistoriaparte5)

In [23]:
#Creacion Tabla de Hechos Vuelos
import pyspark.sql.functions as F
DimAeropuerto = load_table(spark, server, database, 'DimAeropuerto', user, password)
df_aeropuertoorigen=DimAeropuerto.selectExpr('Sigla as origen','Anio','FechaInicioVigencia',
                                                     'FechaFinVigencia','IDAeropuerto as IDAeropuertoOrigen')
df_aeropuertodestino=DimAeropuerto.selectExpr('Sigla as destino','Anio','FechaInicioVigencia',
                                                      'FechaFinVigencia','IDAeropuerto as IDAeropuertoDestino')

df_hechos_vuelos=dfvuelos.alias('df_hechos_vuelos')
columns = ['vuelos', 'sillas','pasajeros','carga_bordo','carga_ofrecida']
for column in columns:
    df_hechos_vuelos = df_hechos_vuelos.withColumn(column,F.when(F.isnan(F.col(column)),0).otherwise(F.col(column)))

df_hechos_vuelos=df_hechos_vuelos.filter(df_hechos_vuelos.origen!=df_hechos_vuelos.destino)
df_hechos_vuelos= df_hechos_vuelos.withColumn("vuelos",df_hechos_vuelos.vuelos.cast('int'))
df_hechos_vuelos= df_hechos_vuelos.withColumn("vuelos",df_hechos_vuelos.vuelos.cast('int'))
df_hechos_vuelos= df_hechos_vuelos.withColumn("relativedate",sf.concat(sf.col('ano'),lit("-"),sf.col('mes'),sf.lit('-01')))
df_hechos_vuelos= df_hechos_vuelos.withColumn("vuelos",df_hechos_vuelos.vuelos.cast('int'))
df_hechos_vuelos= df_hechos_vuelos.withColumn("sillas",df_hechos_vuelos.sillas.cast('int'))
df_hechos_vuelos= df_hechos_vuelos.withColumn("carga_ofrecida",df_hechos_vuelos.carga_ofrecida.cast('int'))
df_hechos_vuelos= df_hechos_vuelos.withColumn("carga_bordo",df_hechos_vuelos.carga_bordo.cast('int'))
df_hechos_vuelos= df_hechos_vuelos.withColumn("pasajeros",df_hechos_vuelos.pasajeros.cast('int'))
df_hechos_vuelos=df_hechos_vuelos.filter(df_hechos_vuelos.vuelos!=0)
df_hechos_vuelos=df_hechos_vuelos.withColumn("sillas",when(df_hechos_vuelos.sillas == 0,df_hechos_vuelos.pasajeros).otherwise(df_hechos_vuelos.sillas))
df_hechos_vuelos=df_hechos_vuelos.withColumn("carga_ofrecida",when(df_hechos_vuelos.carga_ofrecida == 0,df_hechos_vuelos.carga_bordo).otherwise(df_hechos_vuelos.carga_ofrecida))
df_hechos_vuelos=df_hechos_vuelos.groupBy("relativedate","ano","mes","origen","destino","tipo_equipo","tipo_vuelo","trafico","empresa") \
    .sum("vuelos","pasajeros","carga_bordo","sillas","carga_ofrecida")
df_hechos_vuelos=df_hechos_vuelos.withColumnRenamed("sum(vuelos)", "Vuelos")
df_hechos_vuelos=df_hechos_vuelos.withColumnRenamed("sum(pasajeros)", "Pasajeros")
df_hechos_vuelos=df_hechos_vuelos.withColumnRenamed("sum(carga_bordo)", "CargaBordo")
df_hechos_vuelos=df_hechos_vuelos.withColumnRenamed("sum(sillas)", "TotalSillas")
df_hechos_vuelos=df_hechos_vuelos.withColumnRenamed("sum(carga_ofrecida)", "TotalCarga")
df_hechos_vuelos.createOrReplaceTempView("df_hechos_vuelos")
df_aeropuertoorigen.createOrReplaceTempView("df_aeropuertoorigen")
df_aeropuertodestino.createOrReplaceTempView("df_aeropuertodestino")
df_fechames.createOrReplaceTempView("df_fechames")
df_tipo_vuelo.createOrReplaceTempView("df_tipo_vuelo")
df_empresatrans1.createOrReplaceTempView("df_empresatrans1")
df_trafico.createOrReplaceTempView("df_trafico")
df_tipoequipo.createOrReplaceTempView("df_tipoequipo")
df_hechos_vuelos = spark.sql("select  IDFecha,IDTipoVuelo,IDTipoTrafico,IDEmpresa,IDEquipo,IDAeropuertoOrigen,\
                             IDAeropuertoDestino,Vuelos,Pasajeros,CargaBordo,TotalSillas,TotalCarga from df_hechos_vuelos\
                             left join df_fechames on df_fechames.Anio= df_hechos_vuelos.ano and\
                             df_fechames.Mes= df_hechos_vuelos.mes\
                             left join df_tipo_vuelo on df_tipo_vuelo.CodigoVuelo= df_hechos_vuelos.tipo_vuelo\
                             left join df_trafico on df_trafico.Codigo_Trafico= df_hechos_vuelos.trafico\
                             left join df_empresatrans1 on df_empresatrans1.NombreEmpresa= df_hechos_vuelos.empresa\
                             left join df_tipoequipo on df_tipoequipo.NombreEquipo= df_hechos_vuelos.tipo_equipo\
                             left join df_aeropuertoorigen on df_aeropuertoorigen.origen= df_hechos_vuelos.origen and\
                             (df_hechos_vuelos.relativedate BETWEEN df_aeropuertoorigen.FechaInicioVigencia\
                             AND df_aeropuertoorigen.FechaFinVigencia)\
                             left join df_aeropuertodestino on df_aeropuertodestino.destino= df_hechos_vuelos.destino and\
                             (df_hechos_vuelos.relativedate BETWEEN df_aeropuertodestino.FechaInicioVigencia\
                             AND df_aeropuertodestino.FechaFinVigencia)")
df_hechos_vuelos = df_hechos_vuelos.coalesce(1).withColumn("ID", mid())
df_hechos_vuelos=df_hechos_vuelos.filter(df_hechos_vuelos.IDAeropuertoOrigen.isNotNull())
df_hechos_vuelos=df_hechos_vuelos.filter(df_hechos_vuelos.IDAeropuertoDestino.isNotNull())
df_hechos_vuelos=write_table(spark, server, database, 'FactVuelos', user,password,'overwrite',df_hechos_vuelos)

In [24]:
spark.stop()