In [1]:
import pytz,os
import pandas as pd
from pyspark.sql.window import Window
from pyspark.sql.types import LongType 
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import countDistinct, row_number
from pyspark.sql import SQLContext, HiveContext
from dateutil.relativedelta import relativedelta
from matplotlib import pyplot as plt
from datetime import datetime, timedelta
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType
from pyspark.ml.linalg import Vectors, VectorUDT


In [2]:
sqlContext = SQLContext.getOrCreate(sc)

In [3]:
dbutils.widgets.text("idunidade_operacional", "3")
dbutils.widgets.text("idcidade", "3")

In [4]:
u_operacional = dbutils.widgets.get("idunidade_operacional") 
cidade = dbutils.widgets.get("idcidade")

## Funções para geração de listas

#### Funções de Verificação

In [7]:
"""
verifica a partir de ordens de servico de religacao quem foi religado no final de semana
"""

def verifica_religados_final_semana (nsc_paracorte):
  
  df_cortes_religacoes = spark.read.table('transient.religacoes_modelo')
  df_cortes_religacoes = df_cortes_religacoes.where((col('idcidade')== cidade) & (col('idunidadeoperacional') == u_operacional))
  
  df_cortes_religacoes_fds = df_cortes_religacoes.where(datediff(current_date(), col('datahoraterminoexecucao'))<=2)

  nsc_paracorte_religados =  nsc_paracorte.alias('paracortar').join(df_cortes_religacoes_fds.alias('rel_fds'),\
                                                                    [nsc_paracorte.cdc_proc_aberto == df_cortes_religacoes_fds.idligacao],"LEFT")\
                                                              .select('paracortar.*', 'rel_fds.datahoraterminoexecucao')
  
  nsc_para_corte = nsc_paracorte_religados.where(col('datahoraterminoexecucao').isNull())
  nsc_religados = nsc_paracorte_religados.where(col('datahoraterminoexecucao').isNotNull())

  return nsc_para_corte.drop('datahoraterminoexecucao')

In [8]:
"""
verifica quais ligacoes estao com ordem de corte aberta 
"""

def verifica_ordens_corte_abertas(nsc_paracorte):
  
  df_ordens_corte_abertas  = spark.read.table('transient.cortes_modelo')
  
  df_ordens_corte_abertas = df_ordens_corte_abertas.where((col('idstatusordemservico')==1)\
                                                          &(col('datahoraterminoexecucao').isNull())\
                                                          &(datediff(current_date(), col('datahoraabertura'))<=3)\
                                                          &(datediff(current_date(), col('datahoraabertura'))>=0))
  
  paracortar_osaberta = nsc_paracorte.alias('paracortar').join(df_ordens_corte_abertas.alias('os'), \
                                                               [nsc_paracorte.cdc_proc_aberto == df_ordens_corte_abertas.idligacao], "LEFT")\
                                                          .select('paracortar.*', 'os.idligacao')
  
  return paracortar_osaberta.where(col('idligacao').isNull()).drop('idligacao') 

In [9]:
"""
verifica quais ligacoes da lista correspondem aos moradores atuais da ligacao
"""
def get_cdcs_id_corrente(lista_debitos):
    listas_debitos = lista_debitos.withColumn('cliente_devedor_diferente',\
                                            when(col("cadas_idclienteinquilino").isNull(),0)\
                                             .otherwise(\
                                                        when(col("preds_idclienteinquilino") == col("cadas_idclienteinquilino"), 0)\
                                                        .otherwise(1)))
 
    debitos_usuario_antigo = listas_debitos.where(col("cliente_devedor_diferente") == 1)
    debitos_usuario_corrente = listas_debitos.where(col("cliente_devedor_diferente") == 0)
    return (debitos_usuario_antigo, debitos_usuario_corrente)

#### Funções de Geração de lista

In [11]:
"""
A  função a seguir constroi a lista de ligações aptas para serem cortadas
A partir de Predições e processos pendentes, identificam-se os Non-Self-Cures
que tem debito pendente e um processo com data apta para corte 
"""

def get_nao_self_cures_paracortar(df_predictions):
  
  # COLETANDO TODOS OS PROCESSOS ABERTOS COM DEBITOS PENDENTES
  processos_abertos = spark.read.table('transient.processos_modelo')\
  .where((col('status_processo') == 'P')\
         & (col('data_pagamento').isNull())\
         & (col('data_parcelamento').isNull())\
         & (col('id_situacao_fatura') == 1)\
         & (datediff(current_date(), col('data_apto_corte'))>1)\
         & (col('id_objetivo_processo_corte') ==2))\
  .withColumnRenamed('id_processo_corte', 'id_proc')\
  .withColumnRenamed('cdc', 'cdc_proc_aberto')\
  .withColumnRenamed('fatura_id', 'fatura_proc_aberto')\
  .withColumnRenamed('status_processo', 'status_p')\
  .withColumnRenamed('data_pagamento', 'data_pag')\
  .withColumnRenamed('data_parcelamento', 'data_parc')\
  .withColumnRenamed('id_situacao_fatura', 'situacao_fat')\
  .withColumnRenamed('data_corte_prevista', 'data_c_prev')\
  .withColumnRenamed('data_apto_corte', 'd_apto_corte')\
  .withColumnRenamed('valor', 'valor_proc_aberto')
  
  ##########  RETIRANDO OS QUE JA ESTAO CORTADOS - VIA SITUACAO_COBRANCA ###########
  
  df_cadastros = spark.read.table('transient.cadastros_modelo')
  processos_abertos = processos_abertos.alias('p').join(df_cadastros.alias('c'),\
                                                        [processos_abertos.cdc_proc_aberto == df_cadastros.cdc], 'LEFT')\
                                                  .select('p.*', 'c.situacao_cobranca')
  processos_abertos = processos_abertos.where(col('situacao_cobranca')!= "CORTADO")
  
  ##################################################################################
  
  ################### INCORPORANDO AS PREDICOES #####################################
  df_preds_processo_aberto = processos_abertos.alias('abertos')\
  .join(df_predictions.alias('preds'), [processos_abertos.cdc_proc_aberto == df_predictions.cdc,\
                                        processos_abertos.id_proc == df_predictions.id_processo_corte,\
                                        processos_abertos.fatura_proc_aberto == df_predictions.fatura_id], 'LEFT')\
  .select('preds.*', 'abertos.id_proc', 'abertos.cdc_proc_aberto',\
          'abertos.fatura_proc_aberto', 'abertos.valor_proc_aberto',\
          'abertos.status_p', 'abertos.situacao_fat', 'abertos.data_pag',\
          'abertos.data_parc', 'abertos.data_c_prev', 'abertos.d_apto_corte')
  ####################################################################################
  

  ####################### IDENTIFICAR CDCS NON SELF CURES ############################
  ### CONTROLAR OS SELF CURES APENAS NO PERIODO DOS 20 DIAS APOIX A DATA PREVISTA DE CORTE
  df_preds = df_preds_processo_aberto.withColumn('prediction_integer', when(col('prediction') == 'SELF_CURE',\
                                                                            when(datediff(current_date(), col('d_apto_corte'))>20,1)\
                                                                            .otherwise(0))\
                                                 .otherwise(when(col('prediction') == 'NON_SELF_CURE', 1)\
                                                            .otherwise(when(datediff(current_date(), col('d_apto_corte'))>20,1).otherwise(0))))\
  .withColumn('prediction_integer_cdc', sum(col('prediction_integer')).over(Window.partitionBy('cdc_proc_aberto')))\
  .withColumn('prediction_final_cdc', when(col('prediction_integer_cdc')>0, 'NON_SELF_CURE'))
  #######################################################################################
  
  ############### SELECIONAMOS CDCs NON SELF CURES #########################################
  df_preds = df_preds.where(col('prediction_integer_cdc') >= 1)
  df_preds = df_preds.dropDuplicates(['cdc_proc_aberto', 'fatura_proc_aberto', 'data_pag', 'data_parc', 'situacao_fat'])
  ##########################################################################################
  
  ############### CALCULANDO O TOTAL DE DEBITO e TOTAL DE FATURAS DO CDC NO PROCESSO ########
  procs_preds1 = df_preds.groupBy('id_proc', 'cdc_proc_aberto').\
                                     agg(max('prediction_final_cdc').alias('prediction_final_cdc'),\
                                         max(col('d_apto_corte')).alias('d_apto_corte'),\
                                         round(sum(when(col('valor_proc_aberto').isNull(), 0)\
                                                   .otherwise(col('valor_proc_aberto'))),2).alias('total_debito_cdc_processo'),\
                                         countDistinct(col('fatura_proc_aberto')).alias('total_faturas_cdc_processo'))
  ############################################################################################
  
  ##########  CALCULANDO TOTAL DE DEBITO DO CDC AO LONGO DOS PROCESSOS ABERTOS ###############
  procs_preds_viaveis_corte1 = procs_preds1.groupBy('cdc_proc_aberto').\
                               agg(max('prediction_final_cdc').alias('target_final'),\
                                   max(col('d_apto_corte')).alias('data_apto_corte'),\
                                   round(sum(col('total_debito_cdc_processo')),2).alias('total_debito_cdc'),\
                                   sum(col('total_faturas_cdc_processo')).alias('total_faturas_debidas_cdc'))
  #############################################################################################

  
  ################### FILTRANDO LIGACOES QUE TEM DEBITO MAIOR A 30 ############################
  procs_preds_viaveis_corte1 = procs_preds_viaveis_corte1.where(col('total_debito_cdc') >= 30)
  
  ########################## ADICIONANDO COLUNA DO VALOR HIT ##################################
  procs_preds_viaveis_corte1 = procs_preds_viaveis_corte1\
                                            .withColumn('hit_cdc', round(col('total_debito_cdc')/col('total_faturas_debidas_cdc'),2))
  
  ## SEPARANDO LIGACOES COM DEBITO MENOR A 30 REAIS, CASO NECESSARIO EMITIR EM LISTA SEPARADA ##
  debitos_menos_de_10 = procs_preds_viaveis_corte1.where(col('total_debito_cdc') < 30)
  
  ################ VERIFICACOES LIGACOES RELIGADAS NO FINAL DE SEMANA #########################
  if (datetime.today().weekday() == 0):
    print ('E segunda')
    procs_preds_viaveis_corte1 = verifica_religados_final_semana(procs_preds_viaveis_corte1)
    
  procs_preds_viaveis_corte2_ = verifica_ordens_corte_abertas(procs_preds_viaveis_corte1)
  
  
  ### JOIN PARA INCORPORAR INFORMACOES DE GRUPO, SETOR, ROTA DE LEITURA, LOGRADOURO ###########
  para_cortar = procs_preds_viaveis_corte2_.alias('vc').join(df_cadastros.alias('c'),\
                                                             procs_preds_viaveis_corte2_.cdc_proc_aberto == df_cadastros.cdc, "LEFT")\
                                                       .select('c.grupo_leitura_id', 'c.grupo_leitura', 'c.setor_leitura',\
                                                               'c.logradouro_id', 'c.logradouro','vc.*')
  
  para_cortar_final = para_cortar.withColumnRenamed('logradouro', 'logradouro_completo')\
  .withColumn('grupo_setor', concat(col('grupo_leitura_id'), lit('-'), col('setor_leitura')))\
  .withColumnRenamed('cdc_proc_aberto', 'cdc')
  #############################################################################################
  
  return para_cortar_final

In [12]:
"""
A  função a seguir constroi a lista de ligações Self-Cures, cortadas ou não
"""

def get_self_cures(df_predictions):
  
  # COLETANDO TODOS OS PROCESSOS ABERTOS COM DEBITOS PENDENTES
  processos_abertos = spark.read.table('transient.processos_modelo')\
  .where((col('status_processo') == 'P')\
         & (col('data_pagamento').isNull())\
         & (col('data_parcelamento').isNull())\
         & (col('id_situacao_fatura') == 1)\
         & (datediff(current_date(), col('data_apto_corte'))>1)\
         & (col('id_objetivo_processo_corte') ==2))\
  .withColumnRenamed('id_processo_corte', 'id_proc')\
  .withColumnRenamed('cdc', 'cdc_proc_aberto')\
  .withColumnRenamed('fatura_id', 'fatura_proc_aberto')\
  .withColumnRenamed('status_processo', 'status_p')\
  .withColumnRenamed('data_pagamento', 'data_pag')\
  .withColumnRenamed('data_parcelamento', 'data_parc')\
  .withColumnRenamed('id_situacao_fatura', 'situacao_fat')\
  .withColumnRenamed('data_corte_prevista', 'data_c_prev')\
  .withColumnRenamed('data_apto_corte', 'd_apto_corte')\
  .withColumnRenamed('valor', 'valor_proc_aberto')
  
  ##########  RETIRANDO OS QUE JA ESTAO CORTADOS - VIA SITUACAO_COBRANCA ###########
  
  df_cadastros = spark.read.table('transient.cadastros_modelo')
  processos_abertos = processos_abertos.alias('p').join(df_cadastros.alias('c'),\
                                                        [processos_abertos.cdc_proc_aberto == df_cadastros.cdc], 'LEFT')\
                                                  .select('p.*', 'c.situacao_cobranca')
  processos_abertos = processos_abertos.where(col('situacao_cobranca')!= "CORTADO")
  
  ##################################################################################
  
  ################### INCORPORANDO AS PREDICOES #####################################
  df_preds_processo_aberto = processos_abertos.alias('abertos')\
  .join(df_predictions.alias('preds'), [processos_abertos.cdc_proc_aberto == df_predictions.cdc,\
                                        processos_abertos.id_proc == df_predictions.id_processo_corte,\
                                        processos_abertos.fatura_proc_aberto == df_predictions.fatura_id], 'LEFT')\
  .select('preds.*', 'abertos.id_proc', 'abertos.cdc_proc_aberto',\
          'abertos.fatura_proc_aberto', 'abertos.valor_proc_aberto',\
          'abertos.status_p', 'abertos.situacao_fat', 'abertos.data_pag',\
          'abertos.data_parc', 'abertos.data_c_prev', 'abertos.d_apto_corte')
  ####################################################################################
  

  ####################### IDENTIFICAR CDCS SELF CURES ############################
  ### CONTROLAR OS SELF CURES APENAS NO PERIODO DOS 20 DIAS APOIX A DATA PREVISTA DE CORTE
  df_preds = df_preds_processo_aberto.withColumn('prediction_integer', when(col('prediction') == 'SELF_CURE',\
                                                                            when(datediff(current_date(), col('d_apto_corte'))>20,1)\
                                                                            .otherwise(0))\
                                                 .otherwise(when(col('prediction') == 'NON_SELF_CURE', 1)\
                                                            .otherwise(when(datediff(current_date(), col('d_apto_corte'))>20,1).otherwise(0))))\
  .withColumn('prediction_integer_cdc', sum(col('prediction_integer')).over(Window.partitionBy('cdc_proc_aberto')))\
  .withColumn('prediction_final_cdc', when(col('prediction_integer_cdc')>0, 'NON_SELF_CURE'))
  #######################################################################################
  
  ############### SELECIONAMOS SELF CURES #########################################
  df_preds = df_preds.where(col('prediction_integer_cdc') <= 0)
  df_preds = df_preds.dropDuplicates(['cdc_proc_aberto', 'fatura_proc_aberto', 'data_pag', 'data_parc', 'situacao_fat'])
  ##########################################################################################
  
  ############### CALCULANDO O TOTAL DE DEBITO e TOTAL DE FATURAS DO CDC NO PROCESSO ########
  procs_preds1 = df_preds.groupBy('id_proc', 'cdc_proc_aberto').\
                                     agg(max('prediction_final_cdc').alias('prediction_final_cdc'),\
                                         max(col('d_apto_corte')).alias('d_apto_corte'),\
                                         round(sum(when(col('valor_proc_aberto').isNull(), 0)\
                                                   .otherwise(col('valor_proc_aberto'))),2).alias('total_debito_cdc_processo'),\
                                         countDistinct(col('fatura_proc_aberto')).alias('total_faturas_cdc_processo'))
  ############################################################################################
  
  ##########  CALCULANDO TOTAL DE DEBITO DO CDC AO LONGO DOS PROCESSOS ABERTOS ###############
  self_cures = procs_preds1.groupBy('cdc_proc_aberto').\
                               agg(max('prediction_final_cdc').alias('target_final'),\
                                   max(col('d_apto_corte')).alias('data_apto_corte'),\
                                   round(sum(col('total_debito_cdc_processo')),2).alias('total_debito_cdc'),\
                                   sum(col('total_faturas_cdc_processo')).alias('total_faturas_debidas_cdc'))
  #############################################################################################

  ########################## ADICIONANDO COLUNA DO VALOR HIT ##################################
  self_cures = self_cures.withColumn('hit_cdc', round(col('total_debito_cdc')/col('total_faturas_debidas_cdc'),2))
  #############################################################################################
  
  ### JOIN PARA INCORPORAR INFORMACOES DE GRUPO, SETOR, ROTA DE LEITURA, LOGRADOURO ###########
  self_cures_list = self_cures.alias('sc').join(df_cadastros.alias('c'),self_cures.cdc_proc_aberto == df_cadastros.cdc, "LEFT")\
                                          .select('c.grupo_leitura_id', 'c.grupo_leitura',\
                                                  'c.setor_leitura', 'c.logradouro_id', 'c.logradouro','sc.*')
  
  self_cures_list1 = self_cures_list.withColumnRenamed('logradouro', 'logradouro_completo')\
  .withColumn('grupo_setor', concat(col('grupo_leitura_id'), lit('-'), col('setor_leitura')))\
  .withColumnRenamed('cdc_proc_aberto', 'cdc')\
  .withColumnRenamed('target_final', 'predicao_modelo')\
  .withColumn('predicao_modelo', lit('SELF_CURE'))
  #############################################################################################
  
 
  return self_cures_list1.orderBy(desc(col('hit_cdc')))

In [13]:
"""
A  função a seguir constroi a lista de preditos(Self-cures ou Nao Self-Cures) cortados a mais de 90 dias
"""

def get_cortados_mais_90_dias(df_predictions):
  
  df_predictions = df_predictions.withColumn('integer_prediction', when(col('prediction')== "SELF_CURE",0).otherwise(1))
  window_spec_preds = Window.partitionBy('cdc')
  df_predictions = df_predictions.withColumn('prediction_cdc', when(sum(col('integer_prediction'))\
                                                                    .over(window_spec_preds) > 0, 'NON_SELF_CURE')\
                                                               .otherwise('SELF_CURE'))

  df_processos = spark.read.table('transient.processos_modelo')
  df_cadastros = spark.read.table('transient.cadastros_modelo')
  
  print(df_predictions.columns)
  
  ############### JOIN PREDICTIONS E CADASTROS PARA TRAZER A SITUACAO DE COBRANSA = CORTADO ##########################
  
  df_predictions = df_predictions.alias('p').join(df_cadastros.alias('c'), [df_predictions.cdc == df_cadastros.cdc], "LEFT")\
                                            .select('p.*', 'c.situacao_cobranca')
  
  ####################################################################################################################
  
  df_predictions = df_predictions.where((col('situacao_cobranca')=="CORTADO"))
  
  procs = df_processos.dropDuplicates(['id_processo_corte', 'cdc', 'fatura_id'])
  
  procs1 = procs.groupBy('cdc').agg(max('data_corte').alias('data_corte_mais_recente'),\
                                    round(sum(col('valor')),2).alias('debito_ate_corte'),countDistinct(col('fatura_id'))\
                                    .alias('nro_faturas_debidas_ate_corte'))
  
  procs2 = procs1.alias('processos').join(df_predictions.alias('preds'),\
                                          procs1.cdc == df_predictions.preds_cdc, "INNER")\
                                    .select('processos.*', 'preds.prediction_cdc')
  
  procs2 = procs2.select('cdc', 'prediction_cdc', 'data_corte_mais_recente',\
                         'debito_ate_corte', 'nro_faturas_debidas_ate_corte').dropDuplicates(['cdc'])
  
  procs2 = procs2.where(datediff(current_date(), col('data_corte_mais_recente'))>=90)
  
  return procs2

In [14]:
"""
A  função a seguir constroi a lista para fiscalizar a partir do modelo de religacao do Vinicius
"""
def get_lista_parafiscalizar(n_cidade = "", n_unidade = ""):
  
  df_cadastros = spark.read.table('transient.cadastros_modelo')
  
  timezone = pytz.timezone('America/Sao_Paulo')
  today_date = datetime.now(tz = timezone)
  t_ano = str(today_date.year)
  t_mes = str(today_date.month)
  t_dia = str(today_date.day)
  
  if (len(t_mes)==1):
    t_mes = "0"+t_mes
  if (len(t_dia) ==1):
    t_dia = "0"+t_dia
    
  nome_tabela_preditos_modelo = "modelos_preditivos.religacao_" + n_cidade + "_preditos_" + t_dia + "_" + t_mes + "_" + t_ano
  
  try:
    tabela_preditos_religacao = spark.read.table(nome_tabela_preditos_modelo)
  except:
    return None
  else:
    tabela_preditos_religacao = tabela_preditos_religacao.alias('r').join(df_cadastros.alias('c'),\
                                                                        tabela_preditos_religacao.CDC == df_cadastros.cdc, "LEFT")\
                                                                  .select('r.CDC', 'c.grupo_leitura', 'c.grupo_leitura_id',\
                                                                          'c.setor_leitura', 'c.logradouro_id', 'c.logradouro',\
                                                                          'c.situacao_cobranca', 'r.DEBITO_TOTAL_FATURAS',\
                                                                          'r.QTD_FATURAS_PENDENTES_FATURAS', 'r.ACAO_A_REALIZAR', 'r.TARGET')\
                                                                  .withColumn('hit_cdc', round(col('DEBITO_TOTAL_FATURAS')/col('QTD_FATURAS_PENDENTES_FATURAS'),2))\
                                                                  .withColumnRenamed('logradouro', 'logradouro_completo')\
                                                                  .withColumnRenamed('DEBITO_TOTAL_FATURAS', 'total_debito_cdc')\
                                                                  .withColumnRenamed('QTD_FATURAS_PENDENTES_FATURAS','total_faturas_debidas_cdc')\
                                                                  .withColumnRenamed('ACAO_A_REALIZAR', 'acao_sugerida')\
                                                                  .withColumnRenamed('TARGET', 'probabilidade_religacao')
    parafiscalizar = tabela_preditos_religacao.where(col('acao_sugerida') == 'FISCALIZAR')
    return parafiscalizar

In [15]:
"""
Aplica o modelo de religacao do Vinicius a partir de uma lista de preditos por outro modelo (Modelo Self-Cures)
"""

def aplica_modelo_preditivo_religa(df_lista, n_cidade = "", n_unidade = ""):
  
  timezone = pytz.timezone('America/Sao_Paulo')
  today_date = datetime.now(tz = timezone)
  t_ano = str(today_date.year)
  t_mes = str(today_date.month)
  t_dia = str(today_date.day)
  if (len(t_mes)==1):
    t_mes = "0"+t_mes
  if (len(t_dia) ==1):
    t_dia = "0"+t_dia
  
  nome_tabela_preditos_modelo = "modelos_preditivos.religacao_" + n_cidade + "_preditos_" + t_dia + "_" + t_mes + "_" + t_ano

  try:
    tabela_preditos_religacao = spark.read.table(nome_tabela_preditos_modelo)
    
  except:
    return df_lista.withColumn('probabilidade_religacao', lit(1.0)).withColumn('acao_sugerida', lit("CORTAR"))
  
  else:
    tabela_preditos_religacao = tabela_preditos_religacao.where(col('acao_a_realizar') == 'CORTAR')
    df_lista_filtrada = df_lista.alias("l1").join(tabela_preditos_religacao.alias("l2"), \
                                                  df_lista.cdc == tabela_preditos_religacao.CDC, "inner")\
                                            .select("l1.*", "l2.ACAO_A_REALIZAR",\
                                                    "l2.TARGET")
    
    df_lista_filtrada = df_lista_filtrada.where(col("target").isNotNull())
    
    df_lista_filtrada_final = df_lista_filtrada.withColumn("acao_sugerida", col("ACAO_A_REALIZAR"))\
                                               .withColumnRenamed('TARGET', 'probabilidade_religacao')\
                                               .drop("ACAO_A_REALIZAR")
 
    return df_lista_filtrada_final
    

In [16]:
"""
Gera priorização de ligacoes com base no potencial de retorno do débito em:
 - grupo de leitura
 - logradouro
 - cdc
"""
def ordenacao_lista(df_lista):
  
  lista = df_lista\
  .withColumn('logradouro_completo', max(col('logradouro_completo'))\
                                    .over(Window.partitionBy('logradouro_id')))\
  .withColumn('potencial_retorno_cdc', sqrt('total_debito_cdc')*col('probabilidade_religacao'))\
  .withColumn('potencial_retorno_grupo', sum(col('potencial_retorno_cdc'))\
                                         .over(Window.partitionBy('grupo_leitura_id')))\
  .withColumn('potencial_retorno_logradouro', sum(col('potencial_retorno_cdc'))\
                                              .over(Window.partitionBy('logradouro_id')))\
  .withColumn('debito_total_grupo',round(sum(col('total_debito_cdc'))\
                                         .over(Window.partitionBy('grupo_leitura_id')),2))\
  .withColumn('debito_total_logradouro', round(sum(col('total_debito_cdc'))\
                                               .over(Window.partitionBy('logradouro_id')),2))\
  .withColumn('hit_total_grupo', round(sum(col('hit_cdc'))\
                                       .over(Window.partitionBy('grupo_leitura_id')),2))\
  .withColumn('hit_total_logradouro', round(sum(col('hit_cdc'))\
                                            .over(Window.partitionBy('logradouro_id')),2))\
  .orderBy(desc('potencial_retorno_grupo'),\
           desc('potencial_retorno_logradouro'),\
           desc('potencial_retorno_cdc'))
 
  lista_final = lista.withColumnRenamed('cdc', 'ligacao')\
                     .drop('grupo_leitura_id', 'logradouro_id')

  return lista_final

In [17]:
"""
Gerencia a priorizacao com base no modelo selfcures e o modelo de religacao, para fiscalizacao e corte
"""

def get_priorizacao_proximidade(df_lista, obj = "cortar", n_cidade="", n_unidade = ""):
  
  if (obj == "fiscalizar"):
    lista = df_lista.select('cdc','grupo_leitura', 'grupo_leitura_id', 'setor_leitura',\
                            'logradouro_id', 'logradouro_completo','total_debito_cdc',\
                            'total_faturas_debidas_cdc','hit_cdc', 'acao_sugerida', 'probabilidade_religacao')
  elif (obj == "cortar"):
    lista = df_lista.select('cdc','grupo_leitura', 'grupo_leitura_id', 'setor_leitura',\
                            'logradouro_id', 'logradouro_completo','total_debito_cdc', 'total_faturas_debidas_cdc',\
                            'hit_cdc')
    
  
   #################### APLICANDO MODELO DE RELIGACAO ########################
  if (obj == "cortar"):
    lista_modelo = aplica_modelo_preditivo_religa(lista, n_cidade,  n_unidade)
  else:
    lista_modelo = lista
  
  ############################################################################

  ##################### CONSTRUINDO LISTA ALTERNATIVA ########################
  
  lista_final = ordenacao_lista(lista_modelo)
  
  ############################################################################
  
  return lista_final

In [18]:
def geracao_listas_handler(df_predictions, periodo_inicial, periodo_final,
                           n_cidade = "",
                           n_unidade = "",
                           dir_path = "mnt/datalake-ambiental/output/modelo_self_cure",
                           prefix_lista = "sugestoes_modeloselfcures_paracampo"):
  
  timezone = pytz.timezone('America/Sao_Paulo')
  data_hora_listas = datetime.now(tz = timezone).strftime('%Y-%m-%d %H:%M:%S')
  df_cadastros = spark.read.table('transient.cadastros_modelo').cache()
  
  # SE RENOMEIA O CLIENTE INQUILINO PARA NAO CRIAR UM ERRO DE CODIGO NA HORA DE USAR A FUNCAO DE VERIFICACAO DE CLIENTES ATUAIS
  df_predictions = df_predictions.withColumnRenamed('idclienteinquilino', 'preds_idclienteinquilino')
  
  # TRAZEMOS INFORMACOES DE CADASTROS QUE IRAO NOS AJUDAR NA EXECUCAO DE FILTROS, REGRAS E AGREGACOES EM FUNCOES POSTERIORES
  predicoes_cadastros = df_predictions.alias('preds').join(df_cadastros.alias('cadastro'),\
                                                           [df_predictions.cdc == df_cadastros.cdc], 'LEFT')\
                                                    .select('preds.*', 'cadastro.tipo_faturamento', 'cadastro.tipo_faturamento_id',\
                                                            'cadastro.idclienteinquilino','cadastro.ativo_juridico',\
                                                            'cadastro.situacao_ligacao_id')\
                                                    .withColumnRenamed('idclienteinquilino', 'cadas_idclienteinquilino')
  
  
  ############ GERANDO OS TIPOS DE FATURAMENTO E CATEGORIAS PARA A UNIDADE DO PARAMETRO##################
  
  #######################################################################################################
  
  #################### TRANSFORMANDO IDS EM LISTA PARA APLICACAO DE FILTRO ##############################
  sqlContext.sql("refresh table transient.codigos_tipo_faturamento")
  sqlContext.sql("refresh table transient.codigos_categorias")
  
  df_t_faturamento = spark.read.table('transient.codigos_tipo_faturamento')
  df_categorias = spark.read.table('transient.codigos_categorias')
  
  lista_id_faturamento = list(set([row.idtipofaturamento for row in df_t_faturamento.collect()]))
  lista_id_categorias =  list(set([row.idcategoria for row in df_categorias.collect()]))
  ##########################################################################################################
  
  ##########APLICA FILTRO PARA RETIRAR CLIENTES PUBLICOS E MANTER APENAS AS LIGACOES EM SITUACAO ATIVA######
  predicoes_cadastros  = predicoes_cadastros.where((col('tipo_faturamento_id').isin(*lista_id_faturamento))\
                                                   & (col('idcategoria').isin(*lista_id_categorias))\
                                                   & (col('situacao_ligacao_id') == 1))
  
  #############################################################################################################
  
  #SEPARAMOS OS CLIENTES QUE MORAM ATUALMENTE NAS LIGACOES QUE ENTRARAO PARA CORTE/FISCALIZACAO
  lista_devedor_diferente_morador_atual, devedores_atuais = get_cdcs_id_corrente(predicoes_cadastros)
  
  #MANTEMOS APENAS QUEM NAO ESTA EM PROCESSO JURIDICO
  devedores_atuais_ = devedores_atuais.where((~(col("ativo_juridico") == "S")) | (col("ativo_juridico").isNull()))
  
  #APLICAMOS AS FUNCOES PARA SEGMENTAR SELFCURES, NON-SELFCURES PARA CAMPO E CORTADOS A MAIS DE 90 DIAS (ESSA ULTIMA E UMA LISTA APENAS PARA ANALISE)
  self_cure_list = get_self_cures(devedores_atuais_)
  #cortados_mais_90_dias_list = get_cortados_mais_90_dias(devedores_atuais_)
  non_selfcures_paracorte_list = get_nao_self_cures_paracortar(devedores_atuais_)
  non_selfcures_parafiscalizar_list = get_lista_parafiscalizar(n_cidade = n_cidade, n_unidade = n_unidade)
  nsc_paracortar_list = get_priorizacao_proximidade(non_selfcures_paracorte_list, n_cidade = n_cidade, n_unidade = n_unidade)
  
  flag = 1
  if (not non_selfcures_parafiscalizar_list is None):
    nsc_parafiscalizar_list = get_priorizacao_proximidade(non_selfcures_parafiscalizar_list, obj = "fiscalizar", n_cidade = n_cidade, n_unidade = n_unidade)
  else:
    flag = 0
  
  #CONSTRUINDO O NOME EXCEL DAS LISTAS
  
  arquivo_campo = \
  os.path.join('/dbfs',dir_path,n_cidade,"listas",prefix_lista+"_"+n_unidade+"_"+n_cidade+"_"+periodo_inicial+"_"+periodo_final+"_exec_"+data_hora_listas.replace(":","_") + ".xls")
 
  
  
  #SALVANDO EM EXCEL
  
  with pd.ExcelWriter(arquivo_campo) as writer:
    nsc_paracortar_list.toPandas().to_excel(writer, sheet_name='para_corte')
    if (flag == 1):
      nsc_parafiscalizar_list.toPandas().to_excel(writer, sheet_name='para_fiscalizacao')
    self_cure_list.toPandas().to_excel(writer, sheet_name='self_cures')
    #cortados_mais_90_dias_list.toPandas().to_excel(writer, sheet_name='cortados_mais_90_dias')
 
  return arquivo_campo

## Executor Principal

In [20]:
import logging

logger = spark._jvm.org.apache.log4j.LogManager.getLogger('gerador_base')


#########################################GERACAO DE TABELAS BASE################################################
dbutils.notebook.run("utilidades", 500, {"v_output": "codigos_cidade_unidadeoperacional",\
                                         "id_unidade_operacional": dbutils.widgets.get("idunidade_operacional"),\
                                         "id_cidade": dbutils.widgets.get("idcidade")})

dbutils.notebook.run("gerador_views_genericas", 500, {"a_output": "cadastros",\
                                                                    "idunidade_operacional": dbutils.widgets.get("idunidade_operacional"),\
                                                                    "idcidade": dbutils.widgets.get("idcidade")})

dbutils.notebook.run("gerador_views_genericas", 500, {"a_output": "processos",\
                                                                    "idunidade_operacional": dbutils.widgets.get("idunidade_operacional"),\
                                                                    "idcidade": dbutils.widgets.get("idcidade")})

dbutils.notebook.run("utilidades", 500, {"v_output": "codigos_tipo_faturamento",\
                                           "id_unidade_operacional": dbutils.widgets.get("idunidade_operacional"),\
                                           "id_cidade": dbutils.widgets.get("idcidade")})
  
dbutils.notebook.run("utilidades", 500, {"v_output": "codigos_categorias",\
                                            "id_unidade_operacional": dbutils.widgets.get("idunidade_operacional"),\
                                             "id_cidade": dbutils.widgets.get("idcidade")})

dbutils.notebook.run("gerador_views_genericas", 500, {"a_output": "cortes",\
                                                      "idunidade_operacional": dbutils.widgets.get("idunidade_operacional"),\
                                                      "idcidade": dbutils.widgets.get("idcidade")})


In [21]:

################################# REFRESHING DAS TABELAS GERADAS ##############################
sqlContext.sql("refresh table transient.codigos_cidade_unidadeoperacional")
sqlContext.sql("refresh table transient.codigos_tipo_faturamento")
sqlContext.sql("refresh table transient.codigos_categorias")
sqlContext.sql("refresh table transient.cadastros_modelo")
sqlContext.sql("refresh table transient.processos_modelo")
###############################################################################################

################################# IDENTIFICANDO NOMES DE CIDADE E UNIDADE ######################
n_unidade_cidade = spark.read.table('transient.codigos_cidade_unidadeoperacional')

nome_uoperacional1 ="".join(n_unidade_cidade.collect()[0].unidadeoperacional.lower().split(" "))
nome_cidade1 ="".join(n_unidade_cidade.collect()[0].cidade.lower().split(" "))

nome_uoperacional2 ="_".join(n_unidade_cidade.collect()[0].unidadeoperacional.lower().split(" "))
nome_cidade2 ="_".join(n_unidade_cidade.collect()[0].cidade.lower().split(" "))
###############################################################################################

################################# CARREGANDO PREDICOES DO DIA #################################
nome_tabela_predictions  = "modelos_preditivos."+ nome_uoperacional1 +"_" + nome_cidade1+ "_vf_predictions_execucao_noturna"

df_predictions = spark.read.table(nome_tabela_predictions)
###############################################################################################

################################# GERANDO LISTAS #################################################
data_execucao = datetime.now()
periodo_inicial = data_execucao.replace(day=1) + relativedelta(months=-8)
periodo_final = data_execucao + relativedelta(days = -35)
periodo_inicial = periodo_inicial.strftime('%Y-%m-%d')
periodo_final = periodo_final.strftime('%Y-%m-%d')
arquivo_campo= geracao_listas_handler(df_predictions, periodo_inicial, periodo_final,\
                                      n_cidade=nome_cidade2, n_unidade = nome_uoperacional2)
###############################################################################################

######################### SALVANDO O NOME DO ARQUIVO PARA ANEXAR NO E-MAIL ###################
data_para_tabela = {'nome_lista': [arquivo_campo],
                      'unidade': ["maua"]}
df1 = pd.DataFrame(data_para_tabela)
df2 = spark.createDataFrame(df1)
df2.write.mode("append").insertInto("transient.listas_para_email")
###############################################################################################                                                          