In [0]:
%run /eReport/PkgGeneral/PrcGeneral

In [0]:
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from decimal import Decimal

class PrcCargaInterfaces():
  
  def __init__(self, id_solicitud):
    self.id_solicitud = id_solicitud
    
    #Obteniendo parametros
    parametros = PrcGeneral(id_solicitud = self.id_solicitud).obtenerParametros()
    self.archivo_entrada = parametros['archivo_entrada']
    self.tabla_destino = parametros['tabla_destino']
    self.ruta_origen = '/mnt/ereportinterfaz/inseir/'
    self.filename = self.ruta_origen + self.archivo_entrada
        
    #Fecha de actualizacion
    fecha_actualizacion = datetime.now()
    self.fecha_actualizacion = fecha_actualizacion
    self.fecha_actualizacion_str = fecha_actualizacion.strftime("%Y-%m-%d %H:%M:%S")
    
    #Conexion    
    self.url = "jdbc:sqlserver://auqui.database.windows.net;database=dbnormativo"
    self.usuario = "administrador"
    self.password = "Auqui$2020"    
    
  #Interface Dias de Atraso
  def cargarInterfaceDiasAtraso(self): 
          
    spark = SparkSession.builder.appName("Cargar Interface Dias Atraso").getOrCreate()
    
    schema = StructType([StructField('ANIO_PROCESO', IntegerType(), True),
                         StructField('MES_PROCESO', IntegerType(), True),
                         StructField('DIA_PROCESO', IntegerType(), True),
                         StructField('EMPRESA', StringType(), True),
                         StructField('PRODUCTO_FINANCIERO', IntegerType(), True),
                         StructField('NUMERO_CUENTA', StringType(), True),
                         StructField('DIAS_ATRASO', IntegerType(), True),
                         StructField('TIPO_CREDITO', StringType(), True),
                         StructField('CODIGO_BLOQUEO', StringType(), True),
                         StructField('FECHA_VENCIMIENTO', StringType(), True),
                         StructField('NRO_CUOTAS_VENCIDAS', IntegerType(), True)])
    
    df = spark.createDataFrame(spark.sparkContext.textFile(self.filename)\
                               .map(lambda x:(int(x[0:4].strip()),
                                              int(x[4:6].strip()),
                                              int(x[6:8].strip()),
                                              x[8:11].strip(),
                                              int(x[11:13].strip()),
                                              x[13:33].strip(),
                                              int(x[33:37].strip()),
                                              x[37:38].strip(),
                                              x[38:40].strip(),
                                              x[40:48].strip(),
                                              int(x[48:51].strip())
                                              )), schema)    
    
    df = df.withColumn('FECHA_ACTUALIZACION', lit(self.fecha_actualizacion_str))
    df = df.withColumn('FECHA_ACTUALIZACION', when(col('FECHA_ACTUALIZACION').isNotNull(), col('FECHA_ACTUALIZACION')).otherwise(lit(None)))
    
    try:
      df.write.format("com.microsoft.sqlserver.jdbc.spark")\
              .mode("overwrite")\
              .option("truncate", "true")\
              .option("url", self.url)\
              .option("user", self.usuario)\
              .option("password", self.password)\
              .option("dbtable", self.tabla_destino)\
              .option("tableLock", "true")\
              .save()  
    except:
        raise;   
        
  #Interface Dias de Atraso Detalle
  def cargarInterfaceDiasAtrasoDetalle(self): 
          
    spark = SparkSession.builder.appName("Cargar Interface Dias Atraso Detalle").getOrCreate()
    
    schema = StructType([StructField('ANIO_PROCESO', IntegerType(), True),
                         StructField('MES_PROCESO', IntegerType(), True),
                         StructField('DIA_PROCESO', IntegerType(), True),
                         StructField('EMPRESA', StringType(), True),
                         StructField('PRODUCTO_FINANCIERO', IntegerType(), True),
                         StructField('NUMERO_CUENTA', StringType(), True),
                         StructField('CONCEPTO_SALDO', IntegerType(), True),
                         StructField('CLASIFICACION_CONCEPTO', IntegerType(), True),
                         StructField('MONEDA', StringType(), True),
                         StructField('IMPORTE_CONCEPTO', DecimalType(15,6), True)])
    
    df = spark.createDataFrame(spark.sparkContext.textFile(self.filename)\
                               .map(lambda x:(int(x[0:4].strip()),#ANIO_PROCESO
                                              int(x[4:6].strip()),#MES_PROCESO
                                              int(x[6:8].strip()),#DIA_PROCESO
                                              x[8:11].strip(),#EMPRESA
                                              int(x[11:13].strip()),#PRODUCTO_FINANCIERO
                                              x[13:33].strip(),#NUMERO_CUENTA
                                              int(x[33:37].strip()),#CONCEPTO_SALDO
                                              40,#Confirmar enviar valor en duro
                                              x[37:40].strip(),#MONEDA
                                              Decimal(x[40:48].strip())#IMPORTE_CONCEPTO
                                              )), schema)    

    df = df.withColumn('FECHA_ACTUALIZACION', lit(self.fecha_actualizacion_str))
    df = df.withColumn('FECHA_ACTUALIZACION', when(col('FECHA_ACTUALIZACION').isNotNull(), col('FECHA_ACTUALIZACION')).otherwise(lit(None)))
    
    try:
      df.write.format("com.microsoft.sqlserver.jdbc.spark")\
              .mode("overwrite")\
              .option("truncate", "true")\
              .option("url", self.url)\
              .option("user", self.usuario)\
              .option("password", self.password)\
              .option("dbtable", self.tabla_destino)\
              .option("tableLock", "true")\
              .save()  
    except:
        raise;  
        
  #Interface Código único Cliente
  def cargarInterfaceCodigoUnicoCliente(self): 
          
    spark = SparkSession.builder.appName("Cargar Interface Codigo Unico Cliente").getOrCreate()
    
    schema = StructType([StructField('TIPO_MOVIMIENTO', StringType(), True),
                         StructField('CODIGO_UNICO_CLIENTE', StringType(), True),
                         StructField('TIPO_DOC_IDENTIDAD', StringType(), True),
                         StructField('DOC_IDENTIDAD', StringType(), True),
                         StructField('CIIU', StringType(), True),
                         StructField('COD_OFICINA', StringType(), True),
                         StructField('COD_SUBSEDE', StringType(), True),
                         StructField('TIPO_OFICINA', StringType(), True),
                         StructField('NUMERO_OFICINA', StringType(), True),
                         StructField('TIPO_DOC_TRIBUTARIO', StringType(), True),
                         StructField('DOC_TRIBUTARIO', StringType(), True),
                         StructField('TIPO_PERSONA', StringType(), True),
                         StructField('RESIDENCIA', StringType(), True),
                         StructField('MAGNITUD', StringType(), True),
                         StructField('ACCIONISTA_EMPRESA', StringType(), True),
                         StructField('RELACION_LABORAL', StringType(), True),
                         StructField('PAIS_RESIDENCIA', StringType(), True),
                         StructField('GENERO', StringType(), True),
                         StructField('ESTADO_CIVIL', StringType(), True),
                         StructField('SIGLA', StringType(), True),
                         StructField('APELLIDO_PATERNO_RAZON_SOCIAL', StringType(), True),
                         StructField('APELLIDO_MATERNO', StringType(), True),
                         StructField('APELLIDO_CASADA', StringType(), True),
                         StructField('PRIMER_NOMBRE', StringType(), True),
                         StructField('SEGUNDO_NOMBRE', StringType(), True),
                         StructField('NOMBRES_RESTANTES', StringType(), True),
                         StructField('DIRECCION', StringType(), True),
                         StructField('TELEFONO_1', StringType(), True),
                         StructField('TELEFONO_2', StringType(), True),
                         StructField('FECHA_NACIMIENTO', StringType(), True),
                         StructField('OCUPACION', StringType(), True),
                         StructField('DESCRIPCION_OCUPACION', StringType(), True),
                         StructField('CODIGO_SECTORISTA', StringType(), True),
                         StructField('APELLIDO_PATERNO_SECTORISTA', StringType(), True),
                         StructField('APELLIDO_MATERNO_SECTORISTA', StringType(), True),
                         StructField('PRIMER_NOMBRE_SECTORISTA', StringType(), True),
                         StructField('SEGUNDO_NOMBRE_SECTORISTA', StringType(), True),
                         StructField('CODIGO_AGENCIA_SECTORISTA', StringType(), True),
                         StructField('NIVEL_RIESGO', StringType(), True),
                         StructField('FECHA_REVISION', StringType(), True),
                         StructField('RENTA', DecimalType(12,2), True),
                         #StructField('RENTA', StringType(), True),
                         StructField('CODIGO_CLIENTE', StringType(), True),
                         StructField('TIPO_DOC_COMPLEMENTARIO', StringType(), True),
                         StructField('DOC_COMPLEMENTARIO', StringType(), True),
                         StructField('UBIGEO', StringType(), True)])
    
    joan = self.filename
    joan_1 = joan.first
    joan_filas = joan_1.filter(line => line != joan_1)
    df = spark.createDataFrame(spark.sparkContext.textFile(joan_filas)\
                               .map(lambda x:(x[0:1].strip(),#TIPO_MOVIMIENTO
                                              x[1:21].strip(),#CODIGO_UNICO_CLIENTE
                                              x[21:23].strip(),#TIPO_DOC_IDENTIDAD
                                              x[23:35].strip(),#DOC_IDENTIDAD
                                              x[35:39].strip(),#CIIU
                                              x[39:41].strip(),#COD_OFICINA
                                              x[41:43].strip(),#COD_SUBSEDE
                                              x[43:44].strip(),#TIPO_OFICINA
                                              x[44:54].strip(),#NUMERO_OFICINA
                                              x[54:55].strip(),#TIPO_DOC_TRIBUTARIO
                                              x[55:75].strip(),#DOC_TRIBUTARIO
                                              x[75:77].strip(),#TIPO_PERSONA
                                              x[77:79].strip(),#RESIDENCIA
                                              x[79:80].strip(),#MAGNITUD
                                              x[80:82].strip(),#ACCIONISTA_EMPRESA
                                              x[82:84].strip(),#RELACION_LABORAL
                                              x[84:88].strip(),#PAIS_RESIDENCIA
                                              x[88:90].strip(),#GENERO
                                              x[90:92].strip(),#ESTADO_CIVIL
                                              x[92:112].strip(),#SIGLA
                                              x[112:232].strip(),#APELLIDO_PATERNO_RAZON_SOCIAL
                                              x[232:292].strip(),#APELLIDO_MATERNO
                                              x[292:352].strip(),#APELLIDO_CASADA
                                              x[352:412].strip(),#PRIMER_NOMBRE
                                              x[412:472].strip(),#SEGUNDO_NOMBRE
                                              x[472:532].strip(),#NOMBRES_RESTANTES
                                              x[532:782].strip(),#DIRECCION
                                              x[782:792].strip(),#TELEFONO_1
                                              x[792:802].strip(),#TELEFONO_2
                                              x[802:810].strip(),#FECHA_NACIMIENTO
                                              x[810:812].strip(),#OCUPACION
                                              x[812:872].strip(),#DESCRIPCION_OCUPACION
                                              x[872:882].strip(),#CODIGO_SECTORISTA
                                              x[882:942].strip(),#APELLIDO_PATERNO_SECTORISTA
                                              x[942:1002].strip(),#APELLIDO_MATERNO_SECTORISTA
                                              x[1002:1062].strip(),#PRIMER_NOMBRE_SECTORISTA
                                              x[1062:1122].strip(),#SEGUNDO_NOMBRE_SECTORISTA
                                              x[1122:1125].strip(),#CODIGO_AGENCIA_SECTORISTA
                                              x[1125:1126].strip(),#NIVEL_RIESGO
                                              x[1126:1134].strip(),#FECHA_REVISION
                                              Decimal(0),#RENTA
                                              #x[1134:1146].strip(),#RENTA
                                              x[1146:1149].strip(),#CODIGO_CLIENTE
                                              x[1149:1151].strip(),#TIPO_DOC_COMPLEMENTARIO
                                              x[1151:1163].strip(),#DOC_COMPLEMENTARIO
                                              x[1163:1169].strip()#UBIGEO
                                              )), schema)
    
    df = df.withColumn('FECHA_ACTUALIZACION', lit(self.fecha_actualizacion_str))
    df = df.withColumn('FECHA_ACTUALIZACION', when(col('FECHA_ACTUALIZACION').isNotNull(), col('FECHA_ACTUALIZACION')).otherwise(lit(None)))
    #df.select(col('FECHA_REVISION'), col('RENTA'), col('TIPO_DOC_COMPLEMENTARIO')).show(20, False)
    df.show()
    df.printSchema()
    
    #try:
    #  df.write.format("com.microsoft.sqlserver.jdbc.spark")\
    #          .mode("overwrite")\
    #          .option("truncate", "true")\
    #          .option("url", self.url)\
    #          .option("user", self.usuario)\
    #          .option("password", self.password)\
    #          .option("dbtable", self.tabla_destino)\
    #          .option("tableLock", "true")\
    #          .save()  
    #except:
    #    raise;
        
#Ejecución de prueba
#PrcCargaInterfaces(30).cargarInterfaceDiasAtraso()
#PrcCargaInterfaces(31).cargarInterfaceDiasAtrasoDetalle()
PrcCargaInterfaces(32).cargarInterfaceCodigoUnicoCliente()
