In [1]:
## Transformación de tipo pandas a pyspark

class Transformacion:
    """Clase para realizar la transformación de dataframe pandas a dataframe pyspark"""
    
    def __init__(self):
        pass
    
    def equivalent_type(f):
        if f == 'datetime64[ns]': return DateType()
        elif f == 'int64': return LongType()
        elif f == 'int32': return IntegerType()
        elif f == 'float64': return FloatType()
        else: return StringType()

    def define_structure(self, string, format_type):
        try: typo = self.equivalent_type(format_type)
        except: typo = StringType()
        return StructField(string, typo)


    def pandas_to_spark(self, pandas_df, spark):
        columns = list(pandas_df.columns)
        types = list(pandas_df.dtypes)
        struct_list = []
        for column, typo in zip(columns, types): 
            struct_list.append(self.define_structure(column, typo))
        p_schema = StructType(struct_list)
        return spark.createDataFrame(pandas_df, p_schema)

In [2]:
## Utilitarios

class Utilitarios:
    """Clase para usar métodos bastante genéricos y en muchos casos estáticos"""
    
    @staticmethod
    def GenerateDataFileName(catalogo, fecha):
        return 'file_'+catalogo+'_'+fecha
    
    @staticmethod
    def GenerateCatalogueFileName(catalogo):
        return "file_"+catalogo
    
    @staticmethod
    def ConvertPandasToSpark(spark, dataframe_pandas, schema):
        try:
            if(dataframe_pandas is None):
                return spark.createDataFrame([], schema)
            else:
                return spark.createDataFrame(dataframe_pandas, schema)
        except Exception as error:
            ExceptionManager.Treatment(error)
            
    @staticmethod
    def ConvertSparkToPandas(dataframe_spark):
        try:
            if(dataframe_spark is None):
                return None
            return dataframe_spark.toPandas()
        except Exception as error:
            ExceptionManager.Treatment(error)
            
    @staticmethod
    def ConvertStrListToSpark(spark, pd, str_list, list_column_names):
        try:
            if(str_list is None or len(str_list) == 0):
                return None
            return spark.createDataFrame((pd.DataFrame(str_list, columns = list_column_names)))
        except Exception as error:
            ExceptionManager.Treatment(error)
            
    @staticmethod
    def ConvertObjListToPandas(pd, obj_list, list_column_names):
        try:
            if(obj_list is None or len(obj_list) == 0):
                return None
            return pd.DataFrame(obj_list, columns = list_column_names)
        except Exception as error:
            ExceptionManager.Treatment(error)
            
        

In [3]:
## Manejo de Excepciones

import sys, traceback

class ExceptionManager:
    """Clase para manejar las excepciones."""
    
    def __init__(self):
        pass
    
    @staticmethod
    def Treatment(exception):
        try:
            print(exception)
        except Py4JNetworkError as error:
            print(error)
            
    @staticmethod
    def TraceTreatment(exception):
        try:
            exc_type, exc_value, exc_traceback = sys.exc_info()
            print(repr(traceback.format_exception(exc_type, exc_value, exc_traceback)))
            print(exception)
        except Py4JNetworkError as error:
            print(error)

In [4]:
## Configuracion de Contextos

from sqlalchemy import create_engine 
from sqlalchemy.orm import sessionmaker
    
class DBContextDw: 
        """Clase que permite establecer la configuración con la bodega de datos o DataWarehouse."""  
                    
        def __init__ (self):
            self.HostDb= "10.30.80.3"
            self.Port = "5432"
            self.UserName = "user_sirio"
            self.Password ="Cen.2019.sirio"
            self.DataBase = "dm_eventos"
            self.session_marker = None
            
        def Connection(self):
            """Método que permite obtener la sesión de conección a la base de datos"""
            db_string = 'postgresql://{0}:{1}@{2}:{3}/{4}'.format(self.UserName, self.Password, self.HostDb,
                                                                  self.Port, self.DataBase)
            db = create_engine(db_string)  
            self.session_maker = sessionmaker(db)


In [5]:
## Configuracion de Contextos

class HDFSContext: 
    """La clase ContextHdfs establece la configuración con el datalake para acceder a los diferentes archivos 
    almacenados de las bases de datos del HDFS."""
    
    def __init__ (self, Host='10.30.80.3', Port='9000', Path='DATABASE/RDBMS', DataBase='SIVO', Schema='dbo'):
        self.HostHdfs = Host
        self.Port = Port
        self.Path = Path
        self.DataBase = DataBase
        self.Schema = Schema
        
    def HdfsPath(self, tName, fName):        
        """Método que establece el path de búsqueda de un archivo específico."""  
        pathDir = "hdfs://{0}:{1}/{2}/{3}/{4}/{5}/{6}".format(self.HostHdfs, self.Port, self.Path, self.DataBase,
                                                              self.Schema, tName, fName)
        return pathDir
    

In [6]:
## Acceso a datos desde HDFS

from pyspark.sql import SparkSession

class GenericDataFrame():
    """Clase para generar DataFrames Spark y trabajar en la lógica del ETL"""
    def __init__ (self, hdfsContext):
        self.hdfsContext = hdfsContext
        self.url = 'jdbc:postgresql://10.30.80.3/dm_eventos'
        self.properties = {'user': 'user_sirio', 'password': 'Cen.2019.sirio'}
        self.modo = 'append'
        self.spark = SparkSession.builder.appName("Sirio")\
        .config('spark.driver.extraClassPath', '/home/jovyan/work/postgresql-42.2.12.jar').getOrCreate()#SparkSession.builder.appName("Sirio").getOrCreate()
    
    def GetDataHdfs(self, tableName, fileName):
        """Método para retornar el DataFrame desde hadoop"""
        path = self.hdfsContext.HdfsPath(tableName, fileName)
        dataFrame = self.spark.read.json(path, multiLine=True)            
        return dataFrame              
    

In [7]:
## Creación del modelo

from sqlalchemy.ext.declarative import declarative_base  
from sqlalchemy import Column, String, SmallInteger, Integer, Numeric, BigInteger, DateTime, Boolean

class Dim_Agt_Gen(declarative_base()):  
    def __init__(self, pk, codigo_empresa, empresa, codigo_unegocio, unegocio, codigo_central, central
                , codigo_unidad, unidad, fecha_operacion_comercial, version, pot_efectiva):
        self.agtevt_id_pk = pk
        self.agtevt_empresa_id_bk = codigo_empresa
        self.agtevt_empresa = empresa
        self.agtevt_unidad_negocio_id_bk = codigo_unegocio
        self.agtevt_unidad_negocio = unegocio
        self.agtevt_central_id_bk = codigo_central
        self.agtevt_central = central
        self.agtevt_unidad_id_bk = codigo_unidad
        self.agtevt_unidad = unidad
        self.agtevt_fecha_oper_comercial = fecha_operacion_comercial
        self.agtevt_version = version
        self.agtevt_pot_efectiva = pot_efectiva
    
    @classmethod
    def Schema(cls):
        return ['agtevt_id_pk', 'agtevt_empresa_id_bk', 'agtevt_empresa', 'agtevt_unidad_negocio_id_bk', 'agtevt_unidad_negocio'
                , 'agtevt_central_id_bk', 'agtevt_central', 'agtevt_unidad_id_bk', 'agtevt_unidad', 'agtevt_fecha_oper_comercial'
                , 'agtevt_version', 'agtevt_pot_efectiva']
        
    __tablename__ = 'dim_agt_gen'
    __table_args__ = {'schema' : 'cen_dws'} 

    agtevt_id_pk = Column(Integer, primary_key=True)
    agtevt_empresa_id_bk = Column(String)
    agtevt_empresa = Column(String)
    agtevt_unidad_negocio_id_bk = Column(String)
    agtevt_unidad_negocio = Column(String)
    agtevt_central_id_bk = Column(String)
    agtevt_central = Column(String)
    agtevt_unidad_id_bk = Column(String)
    agtevt_unidad = Column(String)
    agtevt_fecha_oper_comercial = Column(DateTime)
    agtevt_version = Column(SmallInteger)
    agtevt_pot_efectiva = Column(Numeric(10,2))
    
    
class Dim_Tmp_Operacion(declarative_base()):  
    def __init__(self, pk, fecha, anio, id_mes, mes, dia, hora):
        self.tmpop_id_pk = pk
        self.tmpop_fecha = fecha
        self.tmpop_anio = anio
        self.tmpop_mes_id = id_mes
        self.tmpop_mes = mes
        self.tmpop_dia = dia
        self.tmpop_hora = hora
      
    @classmethod
    def Schema(cls):
        return ['tmpop_id_pk', 'tmpop_fecha', 'tmpop_anio', 'tmpop_mes_id', 'tmpop_mes', 'tmpop_dia', 'tmpop_hora']
    
    __tablename__ = 'dim_tmp_operacion'
    __table_args__ = {'schema' : 'cen_dws'}

    tmpop_id_pk = Column(Integer, primary_key=True)
    tmpop_fecha = Column(DateTime)
    tmpop_anio = Column(SmallInteger)    
    tmpop_mes_id = Column(SmallInteger)
    tmpop_mes = Column(String)
    tmpop_dia = Column(SmallInteger)
    tmpop_hora = Column(SmallInteger)
      
    
class Fact_Tiempo_Operacion(declarative_base()): 
    """Módelo de la tabla de hechos"""
    def __init__(self, pk, agente_gen, tmpop, tiempo_disponible, tiempo_indisponible):
        self.evt_id_pk = pk
        self.agtevt_id_fk = agente_gen
        self.tmpop_id_fk = tmpop
        self.evt_tiempo_disponible = tiempo_disponible
        self.evt_tiempo_indisponible = tiempo_indisponible
        
    @classmethod
    def Schema(cls):
        return ['evt_id_pk', 'agtevt_id_fk', 'tmpop_id_fk', 'evt_tiempo_disponible', 'evt_tiempo_indisponible']
    
    __tablename__ = 'fact_tiempo_operacion'
    __table_args__ = {'schema' : 'cen_dws'}

    evt_id_pk = Column(BigInteger, primary_key=True) 
    agtevt_id_fk = Column(Integer)
    tmpop_id_fk = Column(Integer)
    evt_tiempo_disponible = Column(Numeric(10, 2))
    evt_tiempo_indisponible = Column(Numeric(10, 2))


In [8]:
## Estructuras de Datos Esquemas

class Estructuras:
    """Clase para obtener de manera estática las estructuras de los dataframes"""
    def __init__(self):
        pass
    
    @staticmethod
    def Schema_Agt_Gen():
        schema = StructType([StructField('agtevt_id_pk', IntegerType(), False),
                             StructField('agtevt_empresa_id_bk', StringType(), False),
                             StructField('agtevt_empresa', StringType(), False),
                             StructField('agtevt_unidad_negocio_id_bk', StringType(), False),
                             StructField('agtevt_unidad_negocio', StringType(), False),
                             StructField('agtevt_central_id_bk', StringType(), False),
                             StructField('agtevt_central', StringType(), False),
                             StructField('agtevt_unidad_id_bk', StringType(), False),
                             StructField('agtevt_unidad', StringType(), False),
                             StructField('agtevt_fecha_oper_comercial', TimestampType(), False),
                             StructField('agtevt_version', ShortType(), False),
                             StructField('agtevt_pot_efectiva', FloatType(), False)
                             ])
        return schema
    
    @staticmethod
    def Schema_Tmp_Operacion():
        schema = StructType([StructField('tmpop_id_pk', IntegerType(), False),
                             StructField('tmpop_fecha', TimestampType(), False),
                             StructField('tmpop_anio', ShortType(), False),
                             StructField('tmpop_mes_id', ShortType(), False),
                             StructField('tmpop_mes', StringType(), False),
                             StructField('tmpop_dia', ShortType(), False),
                             StructField('tmpop_hora', ShortType(), False)
                             ])
        return schema
    
    @staticmethod
    def Schema_Fact_Tiempo_Operacion():
        schema = StructType([StructField('evt_id_pk', LongType(), False),
                             StructField('agtevt_id_fk', IntegerType(), False),
                             StructField('tmpop_id_fk', IntegerType(), False),
                             StructField('evt_tiempo_disponible', FloatType(), True),
                             StructField('evt_tiempo_indisponible', FloatType(), True)
                             ])
        return schema

In [9]:
## Estructuras de Datos Esquemas

class EstructurasHDFS:
    """Clase para obtener de manera estática las estructuras de los dataframes"""
    def __init__(self):
        pass
    
    @staticmethod
    def Schema_EVENTO():
        schema = StructType([StructField('EVENTO_ID', LongType(), True),
                             StructField('EVENTO_FECHA', TimestampType(), True),
                             StructField('EVENTO_CLASE', ShortType(), True),
                             StructField('TPB_EVENTO_ID', ShortType(), True),
                             StructField('TPB_CALIF_ID', ShortType(), True),
                             StructField('TPB_CLASIF_CALIF_ID', ShortType(), True),
                             StructField('TPB_CATEG_CLASIF_ID', ShortType(), True),
                             StructField('EVENTO_NUM_REDSP', ShortType(), True),
                             StructField('NIVELVOLTAJE_ID', ShortType(), True),
                             StructField('EMPRESA_ID', ShortType(), True),
                             StructField('ESTACION_ID', ShortType(), True),
                             StructField('SAM_ID', IntegerType(), True),
                             StructField('SAF_ID', IntegerType(), True),
                             StructField('SISTEMA_CLASE', ShortType(), True),
                             StructField('USUARIO_ID_CREADOR', ShortType(), True),
                             StructField('USUARIO_ID_MODIFICADOR', ShortType(), True),
                             StructField('EVENTO_FECHACREACION', TimestampType(), True),
                             StructField('EVENTO_FECHAMODIFICACION', TimestampType(), True),
                             StructField('NOMBRE_USUARIO_INGRESO', StringType(), True),
                             StructField('EVENTO_ES_RELEVANTE', BooleanType(), True),
                             StructField('ANIO', ShortType(), True)])
        return schema
    
    @staticmethod
    def Schema_EVENTO_DTL():
        schema = StructType([StructField('EVENTO_ID', LongType(), True),
                             StructField('EVENTO_FECHA', TimestampType(), True),
                             StructField('EVENTO_DTL_ID', ShortType(), True),
                             StructField('EVENTO_DTL_PODER', FloatType(), True),
                             StructField('EVENTO_DTL_NOTA', StringType(), True),
                             StructField('EVENTO_DTL_ULTIMO', ShortType(), True),
                             StructField('ANIO', ShortType(), True)])
        return schema

In [10]:
## Acceso a Datos

from sqlalchemy.sql.expression import func as alchemy_func
from sqlalchemy import exc
import pandas as pd

class TiempoOperacionDA:
    """Clase para realizar el acceso a datos y persistencia de información de Tiempo Operacion"""

    def __init__(self):
        self._dBContextDw = DBContextDw()
        self._dBContextDw.Connection()
        self.session_maker = self._dBContextDw.session_maker

    def GetPkAgtGen(self):
        """Método para obtener el Id máximo de la tabla"""
        try:
            session = self.session_maker()
            max_pk = session.query(alchemy_func.max(Dim_Agt_Gen.agtevt_id_pk)).scalar()
            if(max_pk is None):
                max_pk = 0
            return max_pk
        except exc.SQLAlchemyError as error: 
            ExceptionManager.Treatment(error)
            raise
        finally:
            session.close()
            
            
    def GetPkFactTiempoOperacion(self):
        """Método para obtener el Id máximo de la tabla de hechos"""
        try:
            session = self.session_maker()
            max_pk = session.query(alchemy_func.max(Fact_Tiempo_Operacion.evt_id_pk)).scalar()
            if(max_pk is None):
                max_pk = 0
            return max_pk
        except exc.SQLAlchemyError as error: 
            ExceptionManager.Treatment(error)
            raise
        finally:
            session.close()
            
    def GetFechaHoraMaximaTiempoOperacion(self):
        """Método para obtener la fecha máxima de datos"""
        try:
            session = self.session_maker()
            
            max_date = session.query(alchemy_func.max(Fact_Tiempo_Operacion.tmpop_id_fk)).scalar()
            registro = session.query(Dim_Tmp_Operacion).filter(Dim_Tmp_Operacion.tmpop_id_pk == max_date).first()
            
            if(registro is None):
                fecha = '1997-12-31 23:59'
            else:
                fecha = str(registro.tmpop_fecha)
            return fecha
        except exc.SQLAlchemyError as error: 
            ExceptionManager.Treatment(error)
            raise 
        finally:
            session.close()
            
    def GetAllFechasTiempoOperacion(self, anio=None):
        """Método para obtener todos todos los datos de la demensión de tiempo"""
        try:
            session = self.session_maker()
            if(anio == None):
                query = session.query(Dim_Tmp_Operacion) 
            else:
                query = session.query(Dim_Tmp_Operacion).filter(Dim_Tmp_Operacion.tmpop_anio == anio)
            df = pd.read_sql(query.statement, query.session.bind)
            return df
        except exc.SQLAlchemyError as error: 
            ExceptionManager.Treatment(error)
            raise 
        finally:
            session.close()
            
    def GetAllAgtGen(self):
        """Método para obtener todos todos los datos"""
        try:
            session = self.session_maker()
            query = session.query(Dim_Agt_Gen) 
            df = pd.read_sql(query.statement, query.session.bind)
            return df
        except exc.SQLAlchemyError as error: 
            ExceptionManager.Treatment(error)
            raise 
        finally:
            session.close()
            
    def Save(self, dataframe, nombre_tabla, generic_dataframe):
        """Método para almacenar las dimensiones y la tabla de hechos en el DW"""
        try:
            dataframe.write.jdbc(url=generic_dataframe.url, table=nombre_tabla, mode = generic_dataframe.modo, properties=generic_dataframe.properties)
            return True
        except Exception as error:
            ExceptionManager.Treatment(error)
    

In [11]:
## Clases adicionales para refcatorizar codigo

class Refactorizar:
    """Contiene metodos auxiliares del negocio"""
    
    @staticmethod
    def DafaFrameUnidadNegocio(df_empresa,df_cat_empresa,df_cat_unidad_negocio):
        try:
            df_unidad_negocio = df_empresa.select(col('EMPRESA_ID').alias('UNegocioId'),
                                                  col('EMPRESA_CODIGO').alias('UNegocioCodigo'),
                                                  col('EMPRESA_NOMBRE').alias('UNegocio'),
                                                  col('EMPRESA_CODIGO').alias('EmpresaCodigo'),
                                                  col('EMPRESA_NOMBRE').alias('Empresa'))
            
            df_cat_unidad_negocio = df_cat_unidad_negocio\
            .join(df_cat_empresa, df_cat_unidad_negocio.IdEmpresa == df_cat_empresa.IdEmpresa)\
            .select(df_cat_unidad_negocio.IdUNegocio.alias('UNegocioId'),
                    df_cat_unidad_negocio.Codigo.alias('UNegocioCodigo'),
                    df_cat_unidad_negocio.Nombre.alias('UNegocio'),
                    df_cat_empresa.Codigo.alias('EmpresaCodigo'),
                    df_cat_empresa.Nombre.alias('Empresa'))
            
            return df_cat_unidad_negocio.union(df_unidad_negocio).distinct()
        except Exception as error:
            ExceptionManager.Treatment(error)
            
    @staticmethod
    def DafaFrameEstacion(df_central,df_cat_central):
        try:
            df_central = df_central.select(col('ESTACION_ID').alias('EstacionId'),
                                           col('ESTACION_CODIGO').alias('EstacionCodigo'),
                                           col('ESTACION_NOMBRE').alias('Estacion'),
                                           col('ESTACION_ID').alias('EstacionPadreId'))
            
            df_cat_central = df_cat_central.select(col('IdCentral').alias('EstacionId'),
                                                   col('Codigo').alias('EstacionCodigo'),
                                                   col('Nombre').alias('Estacion'),
                                                   col('IdCentral').alias('EstacionPadreId'))
            
            df_cat_central = df_cat_central.union(df_central).distinct()

            return df_cat_central.distinct()
        except Exception as error:
            ExceptionManager.Treatment(error)
            
    @staticmethod
    def DafaFrameElemento(df_unidad,df_cat_unidad):
        try:
            df_unidad = df_unidad.select(col('ELEMENTO_ID').alias('ElementoId'),
                                         col('ELEMENTO_CODIGO').alias('ElementoCodigo'),
                                         col('ELEMENTO_NOMBRE').alias('Elemento'),
                                         col('ELEMENTO_TIPO').alias('Tipo'),
                                         lit(0).alias('VoltajeId'),
                                         col('EMPRESA_ID').alias('UNegocioId'),
                                         col('ESTACION_ID').alias('EstacionId'),
                                         to_timestamp(when(col('UNIDAD_FECHAOPERACION').isNull(),col('ELEMENTO_FECHACREACION'))\
                                         .otherwise(col('UNIDAD_FECHAOPERACION')),'yyyy-MM-dd HH:mm')\
                                         .alias('FechaInicioOpComercial'),
                                         col('ELEMENTO_FECHACREACION').alias('FechaCreacion'),
                                         col('UNIDAD_POTENCIAEFECTIVA').alias('PotEfectiva'))
            
            df_cat_unidad = df_cat_unidad.select(col('IdUnidad').alias('ElementoId'),
                                                 col('Codigo').alias('ElementoCodigo'),
                                                 col('Nombre').alias('Elemento'),
                                                 lit(1).alias('Tipo'),
                                                 lit(0).alias('VoltajeId'),
                                                 col('IdUNegocio').alias('UNegocioId'),
                                                 col('IdCentral').alias('EstacionId'),
                                                 to_timestamp(col('FechaInicioOpComercial'),'yyyy-MM-dd HH:mm')\
                                                 .alias('FechaInicioOpComercial'),
                                                 col('FechaCreacion'),
                                                 col('Pot_Efectiva').alias('PotEfectiva'))
            
            df_cat_unidad = df_cat_unidad.union(df_unidad).distinct()
            
            return df_cat_unidad.distinct()
        except Exception as error:
            ExceptionManager.Treatment(error)
            
    
    @staticmethod
    def AgregarDetalles(df_datos,unidad_negocio,centrales,unidades):
        try:
            df_datos = df_datos\
            .join(unidad_negocio, df_datos.UNegocioId == unidad_negocio.UNegocioId)\
            .join(centrales, df_datos.CentralId == centrales.EstacionId)\
            .join(unidades, df_datos.UnidadId == unidades.ElementoId)\
            .select(unidad_negocio.UNegocioCodigo,
                    unidad_negocio.UNegocio,
                    unidad_negocio.EmpresaCodigo,
                    unidad_negocio.Empresa,
                    centrales.EstacionCodigo,
                    centrales.Estacion,
                    unidades.ElementoCodigo,
                    unidades.Elemento,
                    unidades.FechaInicioOpComercial,
                    unidades.PotEfectiva,
                    lit(1).alias('Version'),
                    df_datos.Fecha,
                    df_datos.TiempoOperacion).distinct()  
            return df_datos
        except Exception as error:
            ExceptionManager.Treatment(error)
            
               
    @staticmethod
    def AsignarVersion(df_agentes,agentes_dw):
        try:
            ##### VALIDAR LA VERSIÓN PARA LOS AGENTES DE GENERACIÓN
            agt_gen = df_agentes
            agt_gen_dw = agentes_dw
            
            agt_new_version = agt_gen.join(agt_gen_dw,
                                           (agt_gen.agtevt_empresa_id_bk == agt_gen_dw.agtevt_empresa_id_bk) &\
                                           (agt_gen.agtevt_unidad_negocio_id_bk == agt_gen_dw.agtevt_unidad_negocio_id_bk) &\
                                           (agt_gen.agtevt_central_id_bk == agt_gen_dw.agtevt_central_id_bk) &\
                                           (agt_gen.agtevt_unidad_id_bk == agt_gen_dw.agtevt_unidad_id_bk))\
            .filter(agt_gen.agtevt_pot_efectiva != agt_gen_dw.agtevt_pot_efectiva)\
            .groupby(agt_gen.agtevt_empresa_id_bk,agt_gen.agtevt_unidad_negocio_id_bk,
                     agt_gen.agtevt_central_id_bk,agt_gen.agtevt_unidad_id_bk)\
            .agg(func.max(agt_gen.agtevt_version).alias('agtevt_version'))
            
            ##### SUMAMOS UNO A LA VERSIÓN DE LOS AGENTES QUE HAN CAMBIADO SU POTENCIA EFECTIVA
            df_agentes = df_agentes.join(agt_new_version,
                                        (agt_new_version.agtevt_empresa_id_bk == df_agentes.agtevt_empresa_id_bk) &\
                                        (agt_new_version.agtevt_unidad_negocio_id_bk == df_agentes.agtevt_unidad_negocio_id_bk) &\
                                        (agt_new_version.agtevt_central_id_bk == df_agentes.agtevt_central_id_bk) &\
                                        (agt_new_version.agtevt_unidad_id_bk == df_agentes.agtevt_unidad_id_bk), how='left')\
            .select(df_agentes.agtevt_empresa_id_bk,
                    df_agentes.agtevt_empresa,
                    df_agentes.agtevt_unidad_negocio_id_bk,
                    df_agentes.agtevt_unidad_negocio,
                    df_agentes.agtevt_central_id_bk,
                    df_agentes.agtevt_central,
                    df_agentes.agtevt_unidad_id_bk,
                    df_agentes.agtevt_unidad,
                    df_agentes.agtevt_fecha_oper_comercial.cast(TimestampType()),
                    when(agt_new_version.agtevt_version.isNull(),df_agentes.agtevt_version)\
                    .otherwise(agt_new_version.agtevt_version+1).alias('agtevt_version').cast(ShortType()),
                    round(df_agentes.agtevt_pot_efectiva.cast(FloatType()),2).alias('agtevt_pot_efectiva'))

            return df_agentes
        except Exception as error:
            ExceptionManager.Treatment(error)
            
            
    @staticmethod
    def AsignarVersionesNuevas(datos_totales,df_agente_gen):
        try:
            datos_totales = datos_totales\
            .join(df_agente_gen,
                  (datos_totales.EmpresaCodigo == df_agente_gen.agtevt_empresa_id_bk) &\
                  (datos_totales.UNegocioCodigo == df_agente_gen.agtevt_unidad_negocio_id_bk) &\
                  (datos_totales.EstacionCodigo == df_agente_gen.agtevt_central_id_bk) &\
                  (datos_totales.ElementoCodigo == df_agente_gen.agtevt_unidad_id_bk) &\
                  (datos_totales.PotEfectiva == df_agente_gen.agtevt_pot_efectiva), how='left')\
            .select(datos_totales.EmpresaCodigo,
                    datos_totales.Empresa,
                    datos_totales.UNegocioCodigo,
                    datos_totales.UNegocio,
                    datos_totales.EstacionCodigo,
                    datos_totales.Estacion,
                    datos_totales.ElementoCodigo,
                    datos_totales.Elemento,
                    datos_totales.PotEfectiva,
                    when(df_agente_gen.agtevt_pot_efectiva.isNull(),datos_totales.Version)\
                    .otherwise(df_agente_gen.agtevt_version).alias('Version'),
                    datos_totales.Fecha,
                    datos_totales.TiempoOperacion)
            
            return datos_totales
        except Exception as error:
            ExceptionManager.Treatment(error)
        
    @staticmethod
    def ConstruirHorasAnuales(spark,anio,unidades):
        try:
            if(anio==None):
                anioInicio=1998
                anioFin=datetime.datetime.now().year
            else:
                anioInicio=anio
                anioFin=anio
                
            #fechaInicio = str(anioInicio)+'-01-01 00:00:00'
            #fechaFin = str(anioFin)+'-12-31 23:59:59'
            
            fechaInicio = str(anioInicio)+'-10-04 00:00:00'
            fechaFin = str(anioFin)+'-10-04 23:59:59'
            
            df_time = spark\
            .sql("SELECT sequence(to_timestamp('{}'), to_timestamp('{}'), interval 1 hour) as timestamp"\
                 .format(fechaInicio,fechaFin))
            df_time = df_time.withColumn("Fecha", explode(col("timestamp"))).select('Fecha')
            df_time = df_time.withColumn('Id',1+func.row_number().over(Window.partitionBy().orderBy('Fecha')))

            df_time_par = df_time.filter(df_time.Id%2==0).select(col('Fecha').alias('FechaPar'),col('Id').alias('IdPar'))
            df_time_impar = df_time.filter(df_time.Id%2!=0).select(col('Fecha').alias('FechaImpar'),col('Id').alias('IdImpar'))


            df_time_total_ida = df_time_par.join(df_time_impar, (df_time_par.IdPar-1)==df_time_impar.IdImpar)\
            .select(df_time_impar.FechaImpar.alias('FechaInicio'),df_time_par.FechaPar.alias('FechaFin'))

            df_time_total_vuelta = df_time_impar.join(df_time_par, (df_time_par.IdPar+1)==df_time_impar.IdImpar)\
            .select(df_time_par.FechaPar.alias('FechaInicio'),df_time_impar.FechaImpar.alias('FechaFin'))

            df_time_total = df_time_total_ida.union(df_time_total_vuelta).orderBy('FechaInicio')
        
            rdd = df_time_total.rdd.cartesian(unidades.rdd).map(lambda x: (x[1][0],x[0][0],x[0][1]))
            df_total = rdd.toDF(['IdUnidad','FechaInicio','FechaFin'])
            return df_total
        except Exception as error:
            ExceptionManager.Treatment(error)
            
    def RellenarHoras(fechaInicio,fechaAuxiliar,estado,FechaFinAnio):
        try:
            DatosTotales = []
            while (fechaInicio<fechaAuxiliar):
                minuto=0
                if(estado is False):
                    minuto=60
                DatosTotales.append([fechaInicio,'None',minuto,not estado])
                fechaInicio += timedelta(hours=1)
            
            if(estado):
                minutoAlFinalHora=60-fechaAuxiliar.minute
            else:
                minutoAlFinalHora=fechaAuxiliar.minute
            
            if(fechaAuxiliar < FechaFinAnio):
                DatosTotales[-1][1]=fechaAuxiliar.strftime("%Y-%m-%d %H:%M")
                DatosTotales[-1][2]=minutoAlFinalHora
                DatosTotales[-1][3]=estado
            return DatosTotales
        except Exception as error:
            print(error)

    def RecorrerDatos(x):
        try:
            id = x[0]
            datos = list(x[1])
            DatosTotales = []
            anio = datos[0][0].year
            estadoInicial = datos[0][1] #EL - true, FL - false
            estadoFin = not datos[-1][1]
            fechaFin = datos[0][0]

            fechaInicioAnio = datetime.datetime(anio,1,1,0,0,0)
            fechaFinAnio = datetime.datetime(anio+1,1,1,0,0,0)

            datos.append([fechaFinAnio,estadoFin])

            conjuntoDatos = Refactorizar.RellenarHoras(fechaInicioAnio,fechaFin,estadoInicial,fechaFinAnio)
            DatosTotales += conjuntoDatos

            misma_hora=None
            for i in range(0,len(datos)-1):
                fechaInicio = datos[i][0]
                fechaInicio -= timedelta(minutes=fechaInicio.minute)
                fechaFin = datos[i+1][0]   
                estado = datos[i+1][1]     
                #print(DatosTotales)
                if(fechaInicio.hour!=fechaFin.hour):
                    conjuntoDatos = Refactorizar.RellenarHoras(fechaInicio,fechaFin,estado,fechaFinAnio)
                    if(misma_hora==None):
                        DatosTotales += conjuntoDatos[1:]
                    else:
                        DatosTotales = DatosTotales[:-1]
                        dato = Refactorizar.CalcularMismaHora(misma_hora)
                        DatosTotales += dato
                        misma_hora=None
                        DatosTotales += conjuntoDatos[1:]
                else:
                    if(misma_hora==None):
                        misma_hora=[datos[i][1],[fechaInicio,datos[i][0],datos[i+1][0]]]
                    else:
                        misma_hora[1].append(datos[i+1][0])


            #estadoInicial = not datos[-1][1]
            #fechaInicio = datos[-1][0]
            #fechaInicio -= timedelta(minutes=fechaInicio.minute)
            #conjuntoDatos = RellenarHoras(fechaInicio,fechaFinAnio,estadoInicial,fechaFinAnio)
            #DatosTotales += conjuntoDatos[1:]

            return (id,DatosTotales)
        except Exception as error:
            print(error)
                
    def CalcularMismaHora(horas):
        total=0
        estadoInicial=not horas[0]
        #if(not estadoInicial):
        finHora= horas[1][0] +  timedelta(hours=1)
        horas[1].append(finHora)

        for i in range(0,len(horas[1])-1):
            inicio=horas[1][i]
            fin=horas[1][i+1]
            dif_minutos = divmod((fin-inicio).total_seconds(), 60)[0]

            if(not estadoInicial and (i+1)%2==0):
                total += dif_minutos
            if(estadoInicial and (i+1)%2==1):
                total += dif_minutos
        return [[horas[1][0],horas[1][1].strftime('%Y-%m-%d %H:%M'),total,not estadoInicial]]
     
        
    def CalcularHorasIntermedias(x):
        try:
            id=x[0]
            datos=list(x[1])

            datosProcesamiento = []
            datosSalida = []
            fechaRegistroInicial = datos[0][0]
            estadoRegistroInicial = datos[0][1]
            fechaCreacionInicial = datos[0][2]

            #Primero si los minutos del primer registro es diferente de cero entonces:
            #Agrego el limiter inferior con hora y minuto cero y con estado contrario al primer registro
            if(fechaRegistroInicial.minute!=0):
                fechaInicial = fechaRegistroInicial - timedelta(minutes=fechaRegistroInicial.minute)
                estadoInicial = not estadoRegistroInicial
                datosProcesamiento.append([fechaInicial,estadoInicial,fechaCreacionInicial])

            #Agrego la data ingresada
            datosProcesamiento += datos

            #Siempre agrego el limite superior con la hora y minuto en cero (la siguiente hora),
            #con estado cambiado del ultimo registro
            fechaRegistroFinal = datos[-1][0]
            estadoRegistroFinal = datos[-1][1]
            fechaCreacionFinal = datos[-1][2]

            fechaFinal = (fechaRegistroFinal - timedelta(minutes=fechaRegistroFinal.minute)) + timedelta(hours=1)
            estadoFinal = not estadoRegistroFinal
            datosProcesamiento.append([fechaFinal,estadoFinal,fechaCreacionFinal])

            #Iteramos, tomo el elemento i-1 y el elemento i, i el estado i es falso, el tiempo es la diferencia,
            #caso contrario el tiempo es cero y saco el registro i
            for i in range(0,len(datosProcesamiento)):
                if(i==0):
                    fecha = datosProcesamiento[i][0]
                    estado = datosProcesamiento[i][1]
                    fechaCreacion = datosProcesamiento[i][2]
                    tiempo = 0.0
                    datosSalida.append([fecha,tiempo,estado,fechaCreacion])
                    continue

                datoAnterior = datosProcesamiento[i-1]
                datoActual = datosProcesamiento[i]
                estadoActual = datoActual[1]
                fechaCreacionActual = datoActual[2]
                fechaAnterior = datoAnterior[0]
                fechaActual = datoActual[0]
                if(estadoActual is False):
                    tiempo = divmod((fechaActual-fechaAnterior).total_seconds(), 60)[0]
                    datosSalida.append([fechaActual,tiempo,estadoActual,fechaCreacionActual])
                else:
                    tiempo = 0.0
                    datosSalida.append([fechaActual,tiempo,estadoActual,fechaCreacionActual])

            #Sumo el tiempo del último registro al tiempo del penultimo registro
            datosSalida[-2][1] += datosSalida[-1][1]

            #si los datos de salida tienen dos elementos, devuelvo el primer registro, caso contrario devuelvo
            #los datos desde el segundo registro al penultimo registro
            if(len(datosSalida)==2):
                datosSalida = datosSalida[0:1]
            else:
                datosSalida = datosSalida[1:-1]   
            return (id,datosSalida)
        except Exception as error:
                ExceptionManager.Treatment(error)
    
    def RellenarFechas(x):
        try:
            id = x[0]
            datos = list(x[1])

            anio = datos[0][0].year
            fechaCreacion = datos[0][3]
            datosSalida = []
            fechaInicioAnio = datetime.datetime(anio,1,1,0,0,0)
            fechaInicioAnioSiguiente = datetime.datetime(anio+1,1,1,0,0,0)

            #Validamos: 1.- Si el es el primero recorremos e insertamos las horas desde el registro 1 hasta el inicio del año
            for i in range(0,len(datos)):
                tiempo = 0.0
                if(i == 0):
                    fechaLimiteInferior = datos[i][0] - (timedelta(minutes=datos[i][0].minute) + timedelta(hours=1))
                    estadoActual = datos[i][2]
                    if(not estadoActual):
                        tiempo = 60.0
                    if(fechaCreacion>fechaInicioAnio):
                        fechaInicioAnio = fechaCreacion
                    if(fechaLimiteInferior>fechaInicioAnio):
                        while(fechaLimiteInferior>=fechaInicioAnio):
                            #print([fechaLimiteInferior,tiempo,not estadoActual])
                            datosSalida.append([fechaLimiteInferior,tiempo,not estadoActual,fechaCreacion])
                            fechaLimiteInferior -= timedelta(hours=1)     
                else:
                    fechaAnterior = datos[i-1][0]
                    fechaActual = datos[i][0]
                    estadoActual = datos[i][2]
                    fechaAnteriorSinMinutos = fechaAnterior - timedelta(minutes=fechaAnterior.minute) 
                    fechaActualSinMinutos = fechaActual - timedelta(minutes=fechaActual.minute) 
                    diferencia_tiempo = divmod((fechaActual-fechaAnterior).total_seconds(), 60)[0]
                    if(not estadoActual):
                        tiempo = 60.0
                    if(not ((diferencia_tiempo<60.0) & (fechaAnteriorSinMinutos == fechaActualSinMinutos))):
                        fechaActualSinMinutos -= timedelta(hours=1)
                        while(fechaActualSinMinutos>fechaAnterior):
                            #print([fechaActualSinMinutos,tiempo,not estadoActual])
                            datosSalida.append([fechaActualSinMinutos,tiempo,not estadoActual,fechaCreacion])
                            fechaActualSinMinutos -= timedelta(hours=1)
                    #SI ES EL ÚLTIMO REGISTRO, RECORRO PARA ABAJO HASTA EL COMIENZO DEL SIUIENTE AÑO
                    if(i==len(datos)-1):
                        fechaActualSistema = datetime.datetime.now()
                        fechaLimiteSuperior = datos[i][0] + (timedelta(minutes=60)-timedelta(minutes=datos[i][0].minute))
                        estadoActual = datos[i][2]
                        if(estadoActual):
                            tiempo = 60.0
                        #RECORRO PARA ABAJO
                        if(fechaInicioAnioSiguiente>fechaActualSistema):
                            fechaInicioAnioSiguiente = fechaActualSistema
                        if(fechaLimiteSuperior<fechaInicioAnioSiguiente):
                            while(fechaLimiteSuperior<fechaInicioAnioSiguiente):
                                #print([fechaLimiteSuperior,tiempo,estadoActual])
                                datosSalida.append([fechaLimiteSuperior,tiempo,estadoActual,fechaCreacion])
                                fechaLimiteSuperior += timedelta(hours=1)

            datos += datosSalida

            return(id,datos)
        except Exception as error:
                ExceptionManager.Treatment(error)
    
    def SepararDatos(x):
        try:
            id = x[0].split('-') 
            datos = list(x[1])
            splitDatos = [(id[0],id[1],id[2],d[0],d[1],d[2],d[3]) for d in datos]
            return splitDatos
        except Exception as error:
            print(error)
            
    def SumarizarTiempos(df_datos_tiempos):
        try:
            #df_datos_tiempos.filter((func.year(col('Fecha'))==2018) &\
            #                        (func.month(col('Fecha'))== 7)&\
            #                        (func.dayofmonth(col('Fecha'))==19) &\
            #                        (func.hour(col('Fecha'))==11)).show(1000)
            df_datos_tiempos = df_datos_tiempos\
            .groupby('UNegocioId',
                     'CentralId',
                     'UnidadId',
                     func.year(col('Fecha')).alias('Anio'),
                     func.month(col('Fecha')).alias('Mes'),
                     func.dayofmonth(col('Fecha')).alias('Dia'),
                     func.hour(col('Fecha')).alias('Hora'),
                     func.from_unixtime(unix_timestamp('Fecha'), 'yyyy-MM-dd HH:00:00').alias('Fecha'))\
            .agg(func.sum('TiempoOperacion').alias('TiempoOperacion'))
            df_datos_tiempos.filter(col('TiempoOperacion')>60.0).show(1000)
            return df_datos_tiempos
        except Exception as error:
            print(error)
    

In [12]:
## CREACIÓN DE VISTAS

class CreacionVistas:
    """Otorga método rapidos para formar las vistas de datos que se necesitan para realizar cálculos"""
    
    @staticmethod
    def CrearvRepBOSNIExtendONOFF(df_evento,df_evento_dtl,df_calif,df_evt,df_mtx,df_clasif,df_categ,unidades):
        try:
            df_evento = df_evento.filter(df_evento.EVENTO_CLASE==4)\
            .withColumn('CALIF_ID',
                        when(df_evento.TPB_CALIF_ID.isNull(),59)\
                        .otherwise(when(df_evento.TPB_CALIF_ID==0,59).otherwise(df_evento.TPB_CALIF_ID)))
            
            eventos = df_evento\
            .join(df_evento_dtl, df_evento.EVENTO_ID == df_evento_dtl.EVENTO_ID)\
            .join(df_calif, df_evento.CALIF_ID == df_calif.TPB_CALIF_ID, how='left')\
            .join(df_evt, df_evento.TPB_EVENTO_ID == df_evt.TPB_EVENTO_ID, how='left')\
            .join(df_mtx,
                  (df_evento.TPB_EVENTO_ID == df_mtx.TPB_EVENTO_ID) &\
                  (df_evento.CALIF_ID == df_mtx.TPB_CALIF_ID), how='left')\
            .select(to_timestamp(df_evento.EVENTO_FECHA,'yyyy-MM-dd HH:mm').alias('EventoFecha'),
                    df_evento.TPB_EVENTO_ID.alias('EvtId'),
                    df_evt.TPB_EVENTO_CODIGO.alias('EvtCodigo'),
                    df_evento.TPB_CALIF_ID.alias('CausalId'),
                    df_calif.TPB_CALIF_CODIGO.alias('CausalCodigo'), 
                    df_evento.EMPRESA_ID.alias('UNegocioId'),
                    df_evento.ESTACION_ID.alias('CentralId'),
                    df_evento_dtl.EVENTO_DTL_ID.alias('UnidadId'),
                    when(df_evento_dtl.EVENTO_DTL_PODER.isNull(),0)\
                    .otherwise(df_evento_dtl.EVENTO_DTL_PODER).alias('Potencia'),
                    when(df_evt.TPB_EVENTO_CALCULO_ON.isNull(),False)\
                    .otherwise(df_evt.TPB_EVENTO_CALCULO_ON).alias('CalculoON'),
                    when(df_evt.TPB_EVENTO_CALCULO_OFF.isNull(),False)\
                    .otherwise(df_evt.TPB_EVENTO_CALCULO_OFF).alias('CalculoOFF'))#,
                    #when(df_mtx.TPB_MTX_ON_OFF.isNull(),False)\
                    #.otherwise(df_mtx.TPB_MTX_ON_OFF).alias('ONOFF'),
                    #when(df_mtx.TPB_MTX_OFF_ON.isNull(),False)\
                    #.otherwise(df_mtx.TPB_MTX_OFF_ON).alias('OFFON')) 
            
            ###### FILTRAMOS SOLO EVENTOS DONDE INTERVIENEN CALCULOS ON y OFF
            eventos = eventos.filter((eventos.CalculoON==True) | (eventos.CalculoOFF==True))\
            .orderBy('UnidadId','EventoFecha')

            eventos = eventos\
            .join(unidades, eventos.UnidadId == unidades.ElementoId)\
            .select(eventos.EventoFecha,
                    eventos.EvtId,
                    eventos.EvtCodigo,
                    eventos.CausalId,
                    eventos.CausalCodigo, 
                    eventos.UNegocioId,
                    eventos.CentralId,
                    eventos.UnidadId,
                    to_timestamp(unidades.FechaCreacion,'yyyy-MM-dd HH:mm').alias('FechaCreacion'),
                    eventos.Potencia,
                    eventos.CalculoON,
                    eventos.CalculoOFF)
            
            return eventos
        except Exception as error:
            ExceptionManager.Treatment(error)
            
            
    

In [13]:
## Lógica de Negocio de Interrupciones del SNI

import pandas as pd
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, asc, concat, split, udf, regexp_replace,to_date,to_timestamp,round,\
                                  UserDefinedFunction, array, explode, struct, lit, trim, when, unix_timestamp
import pyspark.sql.functions as func
from pyspark.sql.types import *
import datetime
from datetime import timedelta
from pyspark.sql import Window
import re


class TiempoOperacionBI:
    """Clase para manejar la lógica del ETL para Energía No Suministrada del SNI"""
    
    def __init__(self):
        try:
            self._fallasSniDA = TiempoOperacionDA()
            self._genericDataFrame = None
            self._df_tiempo_operacion = None
            self._df_agt_gen = None
        except Exception as error:
            ExceptionManager.Treatment(error)
    
    def PoseeArchivos(self,rdd):
        try:
            num = rdd.count()
            return num>0
        except Exception as error:
            return False
    
    def GetDataFrameHdfs(self,table_name,file_name):
        try:
            df = self._genericDataFrame.GetDataHdfs(table_name,file_name)
            return df
        except Exception as error:
            ExceptionManager.Treatment(error)
            return None 
        
    
    def GetData(self,directoryNme,anio=None,schema=None):
        """Método para recolectar datos de un directorio específico"""
        try:
            sc = self._genericDataFrame.spark.sparkContext
            
            if anio==None:
                ruta = self._genericDataFrame.hdfsContext.HdfsPath(directoryNme,"file_"+directoryNme+'*')
                archivo = "file_"+directoryNme+'*'
            else:
                ruta = self._genericDataFrame.hdfsContext.HdfsPath(directoryNme,"file_"+directoryNme+'_'+str(anio))
                archivo = "file_"+directoryNme+'_'+str(anio)
                
            rdd = sc.wholeTextFiles(ruta)
            posee = self.PoseeArchivos(rdd)
            
            if posee==False:
                return self._genericDataFrame.spark.createDataFrame([],schema)
            
            return self._genericDataFrame.GetDataHdfs(directoryNme,archivo)
        except Exception as error:
            ExceptionManager.Treatment(error)
            return None 
    
    def ProcesarDatos(self,anio=None):
        """Método principal en el que se realiza la limpieza y tratamiento de lo datos"""
        
        try:
            str_fecha = self._fallasSniDA.GetFechaHoraMaximaTiempoOperacion() 
            if(anio == None):
                anio = datetime.datetime.strptime(str_fecha,'%Y-%m-%d %H:%M').year + 1

            self._genericDataFrame = GenericDataFrame(HDFSContext(DataBase='BDTREV2'))
            table_empresa = 'EMPRESA'
            table_central = 'CENTRAL'
            table_unidad = 'UNIDAD'
            
            file_name = Utilitarios.GenerateCatalogueFileName(table_empresa)
            df_empresa = self.GetDataFrameHdfs(table_empresa,file_name)
            
            file_name = Utilitarios.GenerateCatalogueFileName(table_central)
            df_central = self.GetDataFrameHdfs(table_central,file_name)
            
            file_name = Utilitarios.GenerateCatalogueFileName(table_unidad)
            df_unidad = self.GetDataFrameHdfs(table_unidad,file_name)
            
            self._genericDataFrame = GenericDataFrame(HDFSContext(DataBase='SIVO'))
            table_cat_empresa = 'CFG_Empresa'
            table_cat_unidad_negocio = 'CFG_UnidadNegocio'
            table_cat_central = 'CFG_Central'
            table_cat_unidad = 'CFG_Unidad'
            
            file_name = Utilitarios.GenerateCatalogueFileName(table_cat_empresa)
            df_cat_empresa = self.GetDataFrameHdfs(table_cat_empresa,file_name)
            
            file_name = Utilitarios.GenerateCatalogueFileName(table_cat_unidad_negocio)
            df_cat_unidad_negocio = self.GetDataFrameHdfs(table_cat_unidad_negocio,file_name)
            
            file_name = Utilitarios.GenerateCatalogueFileName(table_cat_central)
            df_cat_central = self.GetDataFrameHdfs(table_cat_central,file_name)
            
            file_name = Utilitarios.GenerateCatalogueFileName(table_cat_unidad)
            df_cat_unidad = self.GetDataFrameHdfs(table_cat_unidad,file_name)
            
            
            self._genericDataFrame = GenericDataFrame(HDFSContext(DataBase='BOSNI'))
            table_causal = 'TPB_CALIF'
            table_evento = 'TPB_EVENTO'
            table_clasificacion = 'TPB_CLASIFICACION_CALIF'
            table_categoria = 'TPB_CATEGORIA_CLASIF'
            table_mtx_calif_evento = 'MTX_CALIF_EVENTO'
            
            
            file_name = Utilitarios.GenerateCatalogueFileName(table_causal)
            df_calif = self.GetDataFrameHdfs(table_causal,file_name)
            
            file_name = Utilitarios.GenerateCatalogueFileName(table_evento)
            df_evt = self.GetDataFrameHdfs(table_evento,file_name)
            
            file_name = Utilitarios.GenerateCatalogueFileName(table_clasificacion)
            df_clasif = self.GetDataFrameHdfs(table_clasificacion,file_name)
            
            file_name = Utilitarios.GenerateCatalogueFileName(table_categoria)
            df_categ = self.GetDataFrameHdfs(table_categoria,file_name)
            
            file_name = Utilitarios.GenerateCatalogueFileName(table_mtx_calif_evento)
            df_mtx = self.GetDataFrameHdfs(table_mtx_calif_evento,file_name)

            print('Inicio: ' + str(datetime.datetime.now()))

            directoryNme = 'EVENTO'
            df_evento = self.GetData(directoryNme,anio,EstructurasHDFS.Schema_EVENTO())
            
            directoryNme = 'EVENTO_DTL'
            df_evento_dtl = self.GetData(directoryNme,anio,EstructurasHDFS.Schema_EVENTO_DTL())
            
            ################################### CATÁLOGO DE UNIDADES DE NEGOCIO
            unidad_negocio = Refactorizar.DafaFrameUnidadNegocio(df_empresa,df_cat_empresa,df_cat_unidad_negocio)
            
            ################################### CENTRAL         
            centrales = Refactorizar.DafaFrameEstacion(df_central,df_cat_central)
            
            ################################### UNIDAD
            unidades = Refactorizar.DafaFrameElemento(df_unidad,df_cat_unidad)
                
            ################################### DATOS DE VISTAS
            vBosniExtend = CreacionVistas.CrearvRepBOSNIExtendONOFF(df_evento,df_evento_dtl,df_calif,df_evt,
                                                                    df_mtx,df_clasif,df_categ,unidades)

            #vBosniExtend.filter(vBosniExtend.UnidadId==2834).show(vBosniExtend.count())
            #vBosniExtend.filter((vBosniExtend.UnidadId==3064) &\
            #                    (func.year(vBosniExtend.EventoFecha)==2018) &\
            #                    (func.month(vBosniExtend.EventoFecha)==7) &\
            #                    (func.dayofmonth(vBosniExtend.EventoFecha)==19)).show(vBosniExtend.count())
            
            ################################### CÁLCULO DEL TIEMPO DE OPERACIÓN DE LAS UNIDADES
            print('Proceso de generación de datos: ' + str(datetime.datetime.now()))
            
            #vBosniExtend = vBosniExtend.filter(vBosniExtend.UnidadId.isin({2906}))
            
            ################################## CALCULAMOS LOS TIEMPOS DE OPERACIÓN DE LAS HORAS DE EVENTOS
            datosValueKey = vBosniExtend.rdd\
            .map(lambda x: (str(x.UNegocioId)+'-'+str(x.CentralId)+'-'+str(x.UnidadId)+'-'+
                            str(x.EventoFecha.year)+'-'+str(x.EventoFecha.month)+'-'+str(x.EventoFecha.day)+'-'+
                            str(x.EventoFecha.hour),
                            [x.EventoFecha,x.CalculoON,x.FechaCreacion]))
            #print('MAP')
            #for d in datosValueKey.collect():
            #    if((d[1][0].year==2018) & (d[1][0].month==7) & (d[1][0].day==19)):
            #        print(d[0],[t for t in d[1]])
                        
            datosReduceValueByKey = datosValueKey.groupByKey()
            #print('GROUPBY')
            #for d in datosReduceValueByKey.collect():
            #    for t in d[1]:
            #        if((t[0].year==2018) & (t[0].month==7) & (t[0].day==19)):
            #            print(d[0],t)
            
            datosConTiemposCalculados = datosReduceValueByKey.map(lambda x: Refactorizar.CalcularHorasIntermedias(x))

            #print('CALCULAR HORAS')
            #for d in datosConTiemposCalculados.collect():
            #    for t in d[1]:
            #        print(d[0],t)
                        
            #for d in datosConTiemposCalculados.collect():
            #    for t in d[1]:
            #        if((t[0].year==2018) & (t[0].month==7) & (t[0].day==19)):
            #            print(d[0],t)
            
            rddSeparadoConTiemposCalculados = datosConTiemposCalculados.flatMap(lambda x: Refactorizar.SepararDatos(x))
            
            df_datos_separados = rddSeparadoConTiemposCalculados\
            .toDF(['UNegocioId','CentralId','UnidadId','Fecha','TiempoOperacion','CalculoOn','FechaCreacion'])\
            .orderBy('UnidadId','Fecha')
            
            #df_datos_separados.filter((func.year(col('Fecha'))==2018) &\
            #                          (func.month(col('Fecha'))==7) &\
            #                          (func.dayofmonth(col('Fecha'))==19)).show(1000)
            ################################## RELLENAMOS LAS HORAS QUE FALTAN CON LA POTENCIA DE OPERACIÓN
            rddValueKeyUnidad = df_datos_separados.rdd\
            .map(lambda x: (x[0]+'-'+x[1]+'-'+x[2],[x[3],x[4],x[5],x[6]]))
            
            rddGroupByUnidad = rddValueKeyUnidad.groupByKey()
            
            rddDatosTotales = rddGroupByUnidad.map(lambda x: Refactorizar.RellenarFechas(x))
            
            rddSeparadoTotales = rddDatosTotales.flatMap(lambda x: Refactorizar.SepararDatos(x))
            
            df_datos_tiempos = rddSeparadoTotales\
            .toDF(['UNegocioId','CentralId','UnidadId','Fecha','TiempoOperacion','CalculoOn','FechaCreacion'])
            
            #df_datos_tiempos.show()
            ############################## SUMAR HORARIAMENTE POR CADA UNIDAD EL TIEMPO DE OPERACIÓN
            df_datos_tiempos = Refactorizar.SumarizarTiempos(df_datos_tiempos)
            
            #df_datos_tiempos.orderBy('Fecha').show(df_datos_tiempos.count())
            #df_datos_tiempos.printSchema()
            #print(df_datos_tiempos.count())
            ############################## PROCESAR AGENTES
            datos_totales = self.AgregarAgentes(df_datos_tiempos,unidad_negocio,centrales,unidades)
            
            ################################### LIMPIEZA DE DIMENSIONES
            ################################### LIMPIEZA DE AGENTES
            print('Limpiar Agentes')
            agentes = self.LimpiarAgentes(datos_totales)
            
            ##############################  ASIGNACIONES DE VERSIONES
            print('Asignar Versiones')
            datos_totales = Refactorizar.AsignarVersionesNuevas(datos_totales,self._df_agt_gen)
            
            ##############################  LIMPIEZ DE TIEMPO DE OPERACION
            print('Tiempo Operación')
            tiempo_operacion = self.LimpiarTiempoOperacion(datos_totales)
            
            
            ##############################  LIMPIEZ DE FACT TIEMPO OPERACION
            print('Limpiar fact')
            fact_tiempo_operacion = self.LimpiarFactTiempoOperacion(datos_totales,agentes,tiempo_operacion)
            
            ################################### ALMACENAMIENTO DE DATOS
            print('Almacenar Datos')
            guardado = self.SaveData(fact_tiempo_operacion)
            
            print(guardado)
            print('Fin fecha: ' + str_fecha + ' al tiempo: ' + str(datetime.datetime.now()))   
        except Exception as error:
            ExceptionManager.Treatment(error)
            
    def AgregarAgentes(self,df_datos_tiempos,unidad_negocio,centrales,unidades):
        try:  
            ##### DETALLE
            df_datos = Refactorizar.AgregarDetalles(df_datos_tiempos,unidad_negocio,centrales,unidades)

            return df_datos
        except Exception as error:
            ExceptionManager.Treatment(error)
            
    def AgregarDatos(self,unidad_negocio,df_origen,df_pasos,df_total):
        try:           
            df_datos_totales = Refactorizar.AgregarAgtDistribucion(df_total,unidad_negocio)
            
            df_datos_totales = Refactorizar.AgregarAgtOrigen(df_datos_totales,unidad_negocio,df_origen)
            
            df_datos_totales = Refactorizar.AgregarPasos(df_datos_totales,df_pasos)

            return df_datos_totales
        except Exception as error:
            ExceptionManager.Treatment(error)
            
            
    def LimpiarAgentes(self,datos):
        try:
            agentes_dw = Utilitarios\
            .ConvertPandasToSpark(self._genericDataFrame.spark,
                                  self._fallasSniDA.GetAllAgtGen(),
                                  Estructuras.Schema_Agt_Gen())
            
            udfNoVigenteUno = UserDefinedFunction(lambda x: re.sub('\(NO VIGENTE. NO USAR\) ','',str(x)), StringType())
            udfNoVigenteDos = UserDefinedFunction(lambda x: re.sub('\(NO VIGENTE. NO USAR \)\. ','',str(x)), StringType())
            udfNoVigenteTres = UserDefinedFunction(lambda x: re.sub('\(NO VIGENTE. NO USAR\)\. ','',str(x)), StringType())
            udfNoVigenteCuatro = UserDefinedFunction(lambda x: re.sub('\(NO VIGENTE. NO _USAR\) ','',str(x)), StringType())
            
            df_agentes = datos\
            .select(col('EmpresaCodigo').alias('agtevt_empresa_id_bk'),
                    func.upper(col('Empresa')).alias('agtevt_empresa'),
                    col('UNegocioCodigo').alias('agtevt_unidad_negocio_id_bk'),
                    func.upper(regexp_replace(regexp_replace(col('UNegocio'),'CNEL EP ',''),'CELEC EP - ','')).alias('agtevt_unidad_negocio'),
                    when(col('EstacionCodigo').isNull(),None).otherwise(col('EstacionCodigo')).alias('agtevt_central_id_bk'),
                    when(col('Estacion').isNull(),None).otherwise(func.upper(regexp_replace(col('Estacion'),'CENTRAL ',''))).alias('agtevt_central'),
                    col('ElementoCodigo').alias('agtevt_unidad_id_bk'),
                    func.upper(col('Elemento')).alias('agtevt_unidad'),
                    col('FechaInicioOpComercial').alias('agtevt_fecha_oper_comercial'),
                    col('Version').alias('agtevt_version'),
                    when(col('PotEfectiva').isNull(),0).otherwise(round(col('PotEfectiva').cast(FloatType()),2)).alias('agtevt_pot_efectiva')).distinct()
            
            df_agentes = df_agentes.select(*[udfNoVigenteUno(column).alias(column) for column in df_agentes.columns])
            df_agentes = df_agentes.select(*[udfNoVigenteDos(column).alias(column) for column in df_agentes.columns])
            df_agentes = df_agentes.select(*[udfNoVigenteTres(column).alias(column) for column in df_agentes.columns])  
            df_agentes = df_agentes.select(*[udfNoVigenteCuatro(column).alias(column) for column in df_agentes.columns])
            
            df_agentes = Refactorizar.AsignarVersion(df_agentes,agentes_dw)
            
            self._df_agt_gen = df_agentes.exceptAll(agentes_dw.select(Dim_Agt_Gen.Schema()[1:]))\
            .withColumn('agtevt_id_pk',self._fallasSniDA.GetPkAgtGen() + \
                        func.row_number().over(Window.partitionBy().orderBy('agtevt_empresa_id_bk',
                                                                            'agtevt_unidad_negocio_id_bk',
                                                                            'agtevt_central_id_bk',
                                                                            'agtevt_unidad_id_bk',
                                                                            'agtevt_version')))\
            .select(Dim_Agt_Gen.Schema())
            
            df_agentes_totales = self._df_agt_gen.union(agentes_dw)
            return df_agentes_totales            
        except Exception as error:
            ExceptionManager.Treatment(error)

            
    def LimpiarTiempoOperacion(self,datos):
        try:
            valueReplace = ['ENERO','FEBRERO','MARZO','ABRIL','MAYO','JUNIO','JULIO','AGOSTO','SEPTIEMBRE','OCTUBRE',
                            'NOVIEMBRE','DICIEMBRE']        
            paramReplace =  ['January','February','March','April','May','June','July','August','September','October',
                             'November','December']
            
            df_fechas_dw = Utilitarios\
            .ConvertPandasToSpark(self._genericDataFrame.spark,
                                  self._fallasSniDA.GetAllFechasTiempoOperacion(),
                                  Estructuras.Schema_Tmp_Operacion())
            
            df_fechas = datos.select(to_timestamp('Fecha','yyyy-MM-dd HH:mm:ss').alias('Fecha')).distinct()
            
            df_fechas = df_fechas.select(func.substring(regexp_replace(regexp_replace(regexp_replace('Fecha', '-', ''),':',''),' ',''),0,10).alias('tmpop_id_pk').cast(IntegerType()),
                                         col('Fecha').alias('tmpop_fecha'),
                                         func.year('Fecha').alias('tmpop_anio').cast(ShortType()),
                                         func.month('Fecha').alias('tmpop_mes_id').cast(ShortType()),
                                         func.date_format(col('Fecha'), 'MMMMM').alias('tmpop_mes'),
                                         func.date_format(col('Fecha'), 'dd').alias('tmpop_dia').cast(ShortType()),
                                         func.hour('Fecha').alias('tmpop_hora').cast(ShortType()))
            
            df_fechas=df_fechas.na.replace(paramReplace,valueReplace,"tmpop_mes")
            
            #Conocemos si los registros estan almacenados para borrarlos del dataframe.
            self._df_tiempo_operacion = df_fechas.exceptAll(df_fechas_dw)
            df_fechas_totales = self._df_tiempo_operacion.union(df_fechas_dw)
            return df_fechas_totales
        except Exception as error:
            ExceptionManager.Treatment(error)
            
            
    def LimpiarFactTiempoOperacion(self,datos,agentes,tiempo_operacion):
        try:            
            #PK de producción
            id_produccion = self._fallasSniDA.GetPkFactTiempoOperacion()
            #Proceso de limpieza
            df_fact = datos\
            .join(tiempo_operacion, 
                  (func.substring(regexp_replace(regexp_replace(regexp_replace(datos.Fecha, '-', ''),':',''),' ',''),0,10) == tiempo_operacion.tmpop_id_pk))\
            .join(agentes, 
                  (datos.EmpresaCodigo == agentes.agtevt_empresa_id_bk) & \
                  (datos.UNegocioCodigo == agentes.agtevt_unidad_negocio_id_bk) & \
                  (datos.EstacionCodigo == agentes.agtevt_central_id_bk) & \
                  (datos.ElementoCodigo == agentes.agtevt_unidad_id_bk) &\
                  (datos.Version == agentes.agtevt_version))\
            .select(agentes.agtevt_id_pk.alias('agtevt_id_fk'),
                    tiempo_operacion.tmpop_id_pk.alias('tmpop_id_fk'),
                    datos.TiempoOperacion.alias('evt_tiempo_disponible').cast(FloatType()),
                    (60-datos.TiempoOperacion).alias('evt_tiempo_indisponible').cast(FloatType()))\
            .withColumn('evt_id_pk', (id_produccion + func.row_number().over(Window.partitionBy()\
                                                                             .orderBy('agtevt_id_fk',
                                                                                      'tmpop_id_fk'))).cast(LongType()))\
            .select(Fact_Tiempo_Operacion.Schema())
                        
            return df_fact
        except Exception as error:
            ExceptionManager.Treatment(error)
            
    def SaveData(self,df_datos):
        """Método para Guardar los datos limpios una vez procesados"""
        try:
            save = True
            print('Lista de Agentes Generación: ' + str(datetime.datetime.now())) 
            save = save & self._fallasSniDA.Save(self._df_agt_gen,'cen_dws.dim_agt_gen',self._genericDataFrame)
            
            print('Lista de Fechas de Operación: ' + str(datetime.datetime.now()))
            save = save & self._fallasSniDA.Save(self._df_tiempo_operacion,'cen_dws.dim_tmp_operacion',self._genericDataFrame)

            print('Lista de datos: ' + str(datetime.datetime.now()))
            save = save & self._fallasSniDA.Save(df_datos,'cen_dws.fact_tiempo_operacion',self._genericDataFrame)

            return save
        except Exception as error:
            ExceptionManager.Treatment(error)
            return False


In [14]:
proceso = TiempoOperacionBI()
proceso.ProcesarDatos(1999)

Inicio: 2020-07-20 12:47:20.082795
Proceso de generación de datos: 2020-07-20 12:47:21.604281
+----------+---------+--------+----+---+---+----+-----+---------------+
|UNegocioId|CentralId|UnidadId|Anio|Mes|Dia|Hora|Fecha|TiempoOperacion|
+----------+---------+--------+----+---+---+----+-----+---------------+
+----------+---------+--------+----+---+---+----+-----+---------------+

Limpiar Agentes
Asignar Versiones
Tiempo Operación
Limpiar fact
Almacenar Datos
Lista de Agentes Generación: 2020-07-20 12:48:36.464801
Lista de Fechas de Operación: 2020-07-20 12:49:52.999079
Lista de datos: 2020-07-20 12:51:36.631081
True
Fin fecha: 2020-07-20 12:00:00 al tiempo: 2020-07-20 12:54:58.135936
