# `Otimizações CPU`: Comparação do Cadastro de Clientes com Bases Públicas (OFAC, ONU e OFAC-NON)

## Instalando os Módulos

In [None]:
!pip install python-Levenshtein thefuzz pyspark

## Bibliotecas Necessárias

In [2]:
import os
import time
import pyspark
import pandas as pd
from thefuzz import fuzz
from pyspark.sql.functions import lit
from pyspark.sql.functions import udf
from pyspark.sql.functions import trim
from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import StorageLevel
from pyspark.sql.types import IntegerType

import warnings
warnings.filterwarnings("ignore")

## Inicialização da Sessão do Spark

In [3]:
conf = SparkConf()

spark = SparkSession \
    .builder \
    .config(conf=conf) \
    .appName("Python Spark SQL - Testes com Bases Públicas - Otimizações CPU") \
    .getOrCreate()

## Path das Bases

In [6]:
file_renov = "nomes_clientes_banco.csv"
file_lista_risco = "base_ofac.csv"

## Base Renovação

In [None]:
df_renov = spark.read \
        .format("csv") \
        .option("delimiter", ",") \
        .option("header", True) \
        .option("escape", "\"") \
        .option("ignoreLeadingWhiteSpace", True) \
        .option("ignoreTrailingWhiteSpace", True) \
        .load(file_renov)

df_renov = df_renov.withColumnRenamed('TpPessoa', 'TIPO_RELACAO')

df_renov = df_renov.withColumn("NOME", F.trim(F.col("NOME")))

df_renov.show()

## Base Lista de Risco

In [None]:
df_lista_risco = spark.read \
    .format("csv") \
    .option("delimiter", ",") \
    .option("header", True) \
    .option("encoding", "cp1252") \
    .option("ignoreLeadingWhiteSpace", True) \
    .option("ignoreTrailingWhiteSpace", True) \
    .load(file_lista_risco)

df_lista_risco = df_lista_risco.dropDuplicates()

df_lista_risco = df_lista_risco.withColumnRenamed("NOME", "NOME_RISCO")

df_lista_risco = df_lista_risco.withColumnRenamed("TIPO", "TIPO_RELACAO")

df_lista_risco.show()

## Ordenação das Colunas Nome*

In [9]:
df_renov = df_renov.orderBy("NOME")

df_lista_risco = df_lista_risco.orderBy("NOME_RISCO")

## Operação de Join entre as Bases

In [None]:
df_joined = df_renov.join(F.broadcast(df_lista_risco), ["TIPO_RELACAO"])

df_joined.show()

## Cálculo de Similaridade entre as Bases

In [None]:
# Levenstein Ratio
lev    = F.levenshtein
str1   = F.col("NOME")
str2   = F.col("NOME_RISCO")
len_s1 = F.length(str1)
len_s2 = F.length(str2)

df_lev = df_joined.withColumn(
    "SIMILARITY",
    (100 * (1 - (lev(str1, str2) / (len_s1 + len_s2)))).cast("int")
)

df_filter = df_lev.filter(F.col("SIMILARITY") > 90)

df_lev.show()

## Lista de Sanções

In [None]:
df_filter.show()