In [None]:
# Widgets para obtenção de parametros para execução do algoritmo

dbutils.widgets.text("repartition", "100")
repartition_param = dbutils.widgets.get("repartition")

dbutils.widgets.text("aplicacao", "similaridade_anomalia")
aplicacao = dbutils.widgets.get("aplicacao")

dbutils.widgets.text("tabela_controle", "controle_similaridade")
tabela_controle = dbutils.widgets.get("tabela_controle")

dbutils.widgets.text("data_anomalia_inicio", "2000-01-01")
data_anomalia_inicio_param = dbutils.widgets.get("data_anomalia_inicio")

dbutils.widgets.text("storage_account", "storage_account")
storage_account_param = dbutils.widgets.get("storage_account")

dbutils.widgets.text("container", "pouso")
container_param = dbutils.widgets.get("container")

dbutils.widgets.text("input_path_anomalia", "input/anomalia")
input_path_anomalia = dbutils.widgets.get("input_path_anomalia")

dbutils.widgets.text("input_path_resultado", "input/relacao_anomalia")
input_path_resultado = dbutils.widgets.get("input_path_resultado")

dbutils.widgets.text("output_path", "output/relacao_anomalia")
output_path_param = dbutils.widgets.get("output_path")

print("repartition", repartition_param)
print("aplicacao", aplicacao)
print("tabela_controle", tabela_controle)
print("data_anomalia_inicio", data_anomalia_inicio_param)
print("storage_account_param", storage_account_param)
print("container_param", container_param)
print("input_path_anomalia", input_path_anomalia)
print("input_path_resultado", input_path_resultado)
print("output_path_param", output_path_param)

In [None]:
controle_df = spark.sql("select * from {aplicacao}.{tabela_controle}".format(aplicacao=aplicacao, tabela_controle=tabela_controle))
data = controle_df.select("last_date").collect()
data_ultima_avaliacao_param = data[0][0]
print("data_ultima_avaliacao_param", data_ultima_avaliacao_param)

In [None]:
!python -m spacy download pt_core_news_sm

import nltk
nltk.download('stopwords')

In [None]:
def create_mount(nome_container, path_mountpoint, storage_account, application_id, directory_id, service_credential):
        
  path = "/mnt/" + path_mountpoint
  
  SOURCE = "abfss://{container}@{storage_acct}.dfs.core.windows.net/".format(container=nome_container, storage_acct=storage_account)
  CONFIGS = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": application_id,
           "fs.azure.account.oauth2.client.secret": service_credential,
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/{directory_id}/oauth2/token".format(directory_id=directory_id)}        
    
  try:

      dbutils.fs.mount(
          source=SOURCE,
          mount_point=path,
          extra_configs=CONFIGS)
        
      print(SOURCE)                
  except Exception as e:
    
    print (str(e))
    if "Directory already mounted" in str(e):
      pass # Ignore error if already mounted.
  
  print("Success.")
    
  return path    

def unmount(path):
  mnts = dbutils.fs.ls(path)
  for mnt in mnts: 
    try:
      dbutils.fs.unmount(mnt[0])
    except Exception as e:
      if "Directory not mounted" in str(e):
        print(mnt[0] , " - Não estava montado !")
        pass
      else:
        print(e) 

  dbutils.fs.refreshMounts()  

In [None]:
def get_data(path, format="parquet"):
  data_df = spark.read.option("recursiveFileLookup","true").format(format).load(path)
  return data_df

def save_df(df_output, file_dest_path, mode="append", format="parquet"):
  df_output.write.mode(mode).format(format).option("mergeSchema", "true").save(file_dest_path)

In [None]:
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, FloatType, DateType
import gensim.parsing.preprocessing as gsp
from gensim import utils
from nltk.corpus import stopwords
import spacy
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, Normalizer
from pyspark.ml import Pipeline
import pyspark.sql.functions as F
from pyspark.sql import Window

In [None]:
def lematize(x):
    return ' '.join(w.lemma_ for w in nlp(x) if w.pos_ in ("VERB", "NOUN", "ADJ"))

def strip_accent(x):
    return utils.deaccent(x)
    
def strip_stopwords(x):
    return ' '.join(w for w in x.split() if not w in stops)
    
filters = [
           utils.to_unicode,
           gsp.strip_tags, 
           lematize,
           gsp.strip_punctuation,
           gsp.strip_multiple_whitespaces,
           gsp.strip_numeric,
           gsp.strip_short,
           strip_stopwords,
           strip_accent
          ]

def clean_text(x):
    #Padronizacao em minuscula
    s = x.lower()

    #Aplicacao dos filtros definidos
    for f in filters:
        s = f(s)
        
    return s


In [None]:
#Funcao udf da limpeza do texto
clean_text_udf = F.udf(clean_text, StringType())

#Funcao udf do coseno
dot_udf = F.udf(lambda x,y: float(x.dot(y)), FloatType()) 

In [None]:
def avalia_similaridade(registrosAnomalia, registrosResultado): 
  registrosAnomalia = registrosAnomalia.repartition(NR_REPARTITION)

  #Remocao de registros com dados obrigatorios nulos
  registrosAnomalia = registrosAnomalia.filter("id is not null and descricao is not null")

  #Limpeza de dados
  registrosAnomaliaCleaned = registrosAnomalia.withColumn("descricao", clean_text_udf("descricao"))
  
  #Remove registros cuja descricao ficou vazia pos pre processamento
  registrosAnomaliaCleaned = registrosAnomaliaCleaned.filter("descricao is not null")
  registrosAnomaliaCleaned = registrosAnomaliaCleaned.select(F.col("id"), F.col("descricao"), F.col("data_ocorrencia"), F.col("data_inclusao"))
  registrosAnomaliaCleaned.cache()

  if registrosAnomaliaCleaned.select('id').head() is None:
    print("Nao existem registros para montar grupo de anomalias de referencia.")
  else:
      #Criacao de pipeline de processamento do texto
      tokenizer = Tokenizer(inputCol="descricao", outputCol="tokens")
      hashingTF = HashingTF(inputCol="tokens", outputCol="rawFeatures", numFeatures=150)
      idfModel = IDF(inputCol="rawFeatures", outputCol="features")
      normaliser = Normalizer(inputCol="features", outputCol="norm_vec")
      idfPipeline = Pipeline(stages=[tokenizer,hashingTF, idfModel, normaliser])
      idfModel = idfPipeline.fit(registrosAnomaliaCleaned)
      #Obtencao das features com base no pipeline criado e calculo da norma
      normalizedRegistrosAnomalia = idfModel.transform(registrosAnomaliaCleaned)

      #Obtem campos relevantes para analise do coseno
      df_word_norm_ref = normalizedRegistrosAnomalia.select(normalizedRegistrosAnomalia.id, normalizedRegistrosAnomalia.norm_vec, normalizedRegistrosAnomalia.data_ocorrencia, normalizedRegistrosAnomalia.data_inclusao).cache()

      #Selecao das anomalias que serao avaliadas quanto a similaridade
      df_word_norm_em_analise = df_word_norm_ref.filter(F.col("data_inclusao") > data_ultima_avaliacao_param)
      df_word_norm_em_analise = df_word_norm_em_analise.select(F.col("id").alias("id2"), F.col("norm_vec").alias("norm_vec2"), F.col("data_ocorrencia").alias("data_ocorrencia2")).cache()
      
      #Remove do escopo "em analise" anomalias que já foram analisadas
      registrosResultadoDistintos = registrosResultado.select("ANAN_CD_ANOMALIA").distinct()
      df_word_norm_em_analise = df_word_norm_em_analise.join(registrosResultadoDistintos, df_word_norm_em_analise.id2 == registrosResultadoDistintos.ANAN_CD_ANOMALIA, how="anti")

      #Geracao do produto cartesiano das anomalias para calculo do coseno
      cross = df_word_norm_em_analise.crossJoin(df_word_norm_ref)
      #Remocao dos registros duplicados
      cross = cross.dropDuplicates(['id', 'id2'])
      #Remocao das linhas que compara a anomalia consigo mesmo e de anomalias cuja a similar tenha ocorrido apos a em analise
      cross = cross.filter("id != id2 and data_ocorrencia2 >= data_ocorrencia")

      #Calculo do coseno
      cosine = cross.withColumn("similarity", dot_udf(cross.norm_vec, cross.norm_vec2))

      #Persistencia do data lake
      resultadoSchema = StructType().add("anan_cd_anomalia", StringType()).add("anre_cd_anomalia", StringType()).add("selo_cd_selo", StringType()).add("rean_nr_similaridade", FloatType()).add("rean_dt_criacao", DateType())
      resultado = cosine.select(F.col("id2").alias("anan_cd_anomalia"), F.col("id").alias("anre_cd_anomalia"), F.round(cosine["similarity"], 2).alias('rean_nr_similaridade'))
      
      data_analise = datetime.today()
      resultado = resultado.withColumn("rean_dt_criacao", F.lit(data_analise))
      
      resultado = resultado.withColumn("selo_cd_selo", F.lit(RECORRENTE))

      resultado = resultado.select(resultado.anan_cd_anomalia, resultado.anre_cd_anomalia, resultado.selo_cd_selo, resultado.rean_nr_similaridade, resultado.rean_dt_criacao)
      resultado = spark.createDataFrame(resultado.rdd, resultadoSchema)
      return resultado

In [None]:
def presiste_resultado(resultado, output_path, limiar_cosseno):
  resultado = resultado.filter(F.col("rean_nr_similaridade") >= limiar)
  w = Window.partitionBy(['anan_cd_anomalia', 'anre_cd_anomalia'])
  resultado = resultado.withColumn('max_rean_nr_similaridade', F.max('rean_nr_similaridade').over(w))\
    .where(F.col('rean_nr_similaridade') == F.col('max_rean_nr_similaridade'))\
    .drop('max_rean_nr_similaridade')
  save_df(resultado, output_path, format="delta")

In [None]:
#Set configs
INPUT_PATH_ANOMALIA = "/mnt/{container}/{input_path}".format(container=container_param, input_path=input_path_anomalia)
INPUT_PATH_RESULTADO = "/mnt/{container}/{input_path}".format(container=container_param, input_path=input_path_resultado)
OUTPUT_PATH = "/mnt/{container}/{output_path}".format(container=container_param, output_path=output_path_param)
STORAGE_ACCOUNT = storage_account_param
APPLICATION_ID = "APPLICATION_ID"
DIRECTORY_ID = "DIRECTORY_ID"
SERVICE_CREDENTIAL = "SERVICE_CREDENTIAL"

# Monta volumes de input/output
MOUNT = create_mount(container_param, container_param, STORAGE_ACCOUNT, APPLICATION_ID, DIRECTORY_ID, SERVICE_CREDENTIAL)

In [None]:
#########Constantes#########
#Selo de recorrencia
RECORRENTE = 'R'
LIMIAR_COSSENO = 0.65

#Numero particao
NR_REPARTITION = int(repartition_param)

#Obtem a lista de stopwords em portugues
stops = stopwords.words("portuguese")
#Inclusao de stopwords do dominio
stops.extend(["ambos", "existente", "obs", "imediatamente", "existente", "então", "cujo", "total", "consequencia", "consequência","fato", "profissional", "normalmente", "tornar", "matrícula", "matricula", "mat", "apenas", "pleno", "efetuar", "possivel", "possível", "consequente", "junior", "descricao", "descrição", "situacao", "situação", "caracerizar", "nome", "cada"])
stops = set(stops)

#Define o idioma em portugues para realizar a lematizacao
nlp = spacy.load("pt_core_news_sm")

#Leitura dos dados a partir do data lake
registrosAnomalia = get_data(INPUT_PATH_ANOMALIA, format="parquet")
registrosResultado = get_data(INPUT_PATH_RESULTADO, format="parquet")

#Faz conversão de tipos e remove datas fora do período de interesse
registrosAnomalia = registrosAnomalia.withColumn("ANCA_DT_ANOMALIA", F.to_timestamp(F.col("ANCA_DT_ANOMALIA"), 'yyyy-MM-dd'))
registrosAnomalia = registrosAnomalia.withColumn("ANCA_DT_INCLUSAO", F.to_timestamp(F.col("ANCA_DT_INCLUSAO"), 'yyyy-MM-dd HH:mm:ss'))
registrosAnomalia = registrosAnomalia.withColumn("ANCA_DT_CARGA", F.to_timestamp(F.col("ANCA_DT_CARGA"), 'yyyy-MM-dd HH:mm:ss'))
registrosAnomalia = registrosAnomalia.filter((F.col("ANCA_DT_ANOMALIA") >= data_anomalia_inicio_param) & (F.col("ANCA_DT_ANOMALIA") <= F.current_timestamp()))
registrosAnomalia = registrosAnomalia.select(\
                                             F.col("ANCA_CD_ANOMALIA").alias("id"), \
                                             F.col("ANCA_DS_ANOMALIA").alias("descricao"), \
                                             F.col("ANCA_TIPO_ACIDENTE").alias("tipo_acidente"), \
                                             F.col("ANCA_NM_TIPO").alias("tipo_anomalia"),\
                                             F.col("ANCA_DT_ANOMALIA").alias("data_ocorrencia"), \
                                             F.col("ANCA_DT_INCLUSAO").alias("data_inclusao")).distinct()
resultado = avalia_similaridade(registrosAnomalia, registrosResultado)
presiste_resultado(resultado, OUTPUT_PATH, LIMIAR_COSSENO)

In [None]:
spark.sql('OPTIMIZE delta.`{}`'.format(OUTPUT_PATH))
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)
spark.sql('VACUUM delta.`{}` RETAIN 0 HOURS'.format(OUTPUT_PATH))

In [None]:
# Desmonta todos os volumes
unmount('/mnt/')