In [0]:
## NOTEBOOK DESENVOLVIDO PARA EXECUTAR O TRATAMENTO DOS DADOS FAKE

In [0]:
## CONEXOES
!pip install pymongo

In [0]:
## CONEXOES BANCOS
import psycopg2
from pymongo import MongoClient

## TRATAMENTOS E AJUSTES
import pandas as pd
from pyspark.sql.types import StringType
from pyspark.sql.functions import regexp_replace, udf, when, length


In [0]:
%run /Workspace/Users/orafaelrp@gmail.com/desafio_tecnico/secrets_databricks

In [0]:
## POSTGRES
post_link = 'https://us-east-1.console.aws.amazon.com/rds/home?region=us-east-1#database:id=database-1;is-cluster=false'
post_user = dbutils.secrets.get(scope="db_secrets", key="postgres_user")
post_senha = dbutils.secrets.get(scope="db_secrets", key="postgres_password")
post_db = 'postgres_ingestao'
post_host = 'database-1.ccv4kogyqnja.us-east-1.rds.amazonaws.com'

## MONGODB
mongo_link = 'https://cloud.mongodb.com/v2/68efdc7fc3401e1005c111c5#/clusters'
mongo_user = dbutils.secrets.get(scope="db_secrets", key="mongo_user")
mongo_password = dbutils.secrets.get(scope="db_secrets", key="mongo_password")

### Coleta PostGres

In [0]:
## CONEXAO COM O BANCO
connection = psycopg2.connect(
    host=post_host,
    database=post_db,
    user=post_user,
    password=post_senha
)
print("Conexao realizada")

In [0]:
## INICIA EXTRACAO DOS DADOS POSTGRES CONTA
cursor = connection.cursor()
cursor.execute("SELECT * FROM CONTA")
result = cursor.fetchall()

## AJUSTE DE NOMENCLATURA COLUNA
col_names = [desc[0] for desc in cursor.description]
df_result = pd.DataFrame(result, columns=col_names)
df_conta = spark.createDataFrame(df_result)

## MASCARAMENTO CPF/CNPJ
def mascarar_parcial(dado, visivel_inicio=3, visivel_fim=2, caractere_mascara='X'):
    if dado is None:
        return None
    tamanho = len(dado)
    if tamanho <= visivel_inicio + visivel_fim:
        return dado
    parte_mascarada = caractere_mascara * (tamanho - visivel_inicio - visivel_fim)
    return dado[:visivel_inicio] + parte_mascarada + dado[-visivel_fim:]

mascarar_parcial_udf = udf(mascarar_parcial, StringType())

df_conta = df_conta.withColumn("id_associado", regexp_replace("id_associado", r'[./-]', '')) \
    .withColumn("id_associado", mascarar_parcial_udf("id_associado")) \
    .withColumn("tp_pessoa", 
                when((df_conta["id_associado"].isNotNull()) & (length(df_conta["id_associado"]) > 11), "PJ")
                .otherwise("PF")
               )
## CRIANDO A TEMP VIEW
df_conta.createOrReplaceTempView("CONTA")

## AMOSTRA
display(df_conta.head(2))


In [0]:
## INICIA EXTRACAO DOS DADOS POSTGRES ASSOCIADO
cursor = connection.cursor()
cursor.execute("SELECT * FROM ASSOCIADO")
result = cursor.fetchall()

## AJUSTE DE NOMENCLATURA COLUNA
col_names = [desc[0] for desc in cursor.description]
df_result = pd.DataFrame(result, columns=col_names)
df_asso = spark.createDataFrame(df_result)


## MASCARAMENTO EMAIL
def mascarar_email(email, visivel_inicio=4, visivel_fim=4, caractere_mascara='*'):
    if email is None or '@' not in email:
        return email
    username, domain = email.split('@', 1)
    if len(username) <= visivel_inicio:
        masked_username = username
    else:
        parte_mascarada = caractere_mascara * (len(username) - visivel_inicio)
        masked_username = username[:visivel_inicio] + parte_mascarada
    if len(domain) <= visivel_fim:
        masked_domain = domain
    else:
        parte_mascarada_dom = caractere_mascara * (len(domain) - visivel_fim)
        masked_domain = parte_mascarada_dom + domain[-visivel_fim:]
    return masked_username + '@' + masked_domain

mascarar_email_udf = udf(mascarar_email, StringType())

df_asso = df_asso.withColumn("email", mascarar_email_udf("email"))

## CRIANDO A TEMP VIEW
df_asso.createOrReplaceTempView("ASSOCIADO")

## AMOSTRA
display(df_asso.head(2))

### Coleta MongoDB

In [0]:
## CONEXAO COM O BANCO
uri = f"mongodb://{mongo_user}:{mongo_password}@ac-d4sizdx-shard-00-00.4vaxogk.mongodb.net:27017,ac-d4sizdx-shard-00-01.4vaxogk.mongodb.net:27017,ac-d4sizdx-shard-00-02.4vaxogk.mongodb.net:27017/?ssl=true&replicaSet=atlas-9q2wob-shard-0&authSource=admin&retryWrites=true&w=majority&appName=mongo1"

## CRIA O CLIENT MONGO
client = MongoClient(uri)

In [0]:
## ACESSA COLEÇÃO
db = client['ingestao_desafio_engenharia']
collection = db['movimento']

## TRAZ TODOS OS DOCUMENTOS DA COLEÇÃO
docs = [{**doc, '_id': str(doc['_id'])} for doc in collection.find()]
df_result = pd.DataFrame(docs)
df_movimento = spark.createDataFrame(df_result)

## MASCARAMENTO CARTAO
def mascarar_parcial(dado, visivel_inicio=6, visivel_fim=4, caractere_mascara='X'):
    if dado is None:
        return None
    tamanho = len(dado)
    if tamanho <= visivel_inicio + visivel_fim:
        return dado
    parte_mascarada = caractere_mascara * (tamanho - visivel_inicio - visivel_fim)
    return dado[:visivel_inicio] + parte_mascarada + dado[-visivel_fim:]

mascarar_parcial_udf = udf(mascarar_parcial, StringType())

df_movimento = df_movimento.withColumn("cartao_ficticio", mascarar_parcial_udf("cartao_ficticio"))

## CRIANDO A TEMP VIEW
df_movimento.createOrReplaceTempView("MONVIMENTO")

## AMOSTRA
display(df_movimento.limit(2))

In [0]:
df_asso_conta_movi = spark.sql("""
SELECT 
    MOVI.id AS id_movimentacao,
    MOVI.id_conta,
    MOVI.tipo_transacao,
    MOVI.des_transacao,
    MOVI.vlr_transacao,
    MOVI.data_movimento AS dt_transacao,
    MOVI.cartao_ficticio,
    ASSO.nome,
    ASSO.sobrenome,
    ASSO.idade,
    ASSO.email,
    CONTA.id_associado AS num_cpf_cnpj,
    CONTA.tp_pessoa,
    CONTA.data_criacao AS dt_criacao_conta,
    MD5(CONCAT(MOVI.id, MOVI.id_conta, MOVI.tipo_transacao, MOVI.des_transacao, MOVI.vlr_transacao, MOVI.data_movimento, MOVI.cartao_ficticio)) AS UNIQUE_ID
FROM MONVIMENTO MOVI
JOIN ASSOCIADO ASSO ON MOVI.id = ASSO.id
JOIN CONTA CONTA ON MOVI.id = CONTA.id
""")

## ESCREVENDO OBJETO FINAL
df_asso_conta_movi.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("dados_fake.asso_conta_movi")

## COMENTANDO OBJETO FINAL
spark.sql("""
COMMENT ON TABLE dados_fake.asso_conta_movi IS 'Tabela consolidada de movimentações, associados e contas, com dados sensíveis mascarados e identificador único. Registra transações por cartão, enriquecida com informações dos associados.';
""")

# Comentários de colunas da tabela consolidada
column_comments = [
    """COMMENT ON COLUMN dados_fake.asso_conta_movi.id_movimentacao IS 'Identificador único da tabela movimento'""",
    """COMMENT ON COLUMN dados_fake.asso_conta_movi.id_conta IS 'Registro da conta correta'""",
    """COMMENT ON COLUMN dados_fake.asso_conta_movi.tipo_transacao IS 'Tipo da transação que foi realizada'""",
    """COMMENT ON COLUMN dados_fake.asso_conta_movi.des_transacao IS 'Descrição da transação que foi realizada'""",
    """COMMENT ON COLUMN dados_fake.asso_conta_movi.vlr_transacao IS 'Valor da transação que foi realizada'""",
    """COMMENT ON COLUMN dados_fake.asso_conta_movi.dt_transacao IS 'Data da transação que foi realizada'""",
    """COMMENT ON COLUMN dados_fake.asso_conta_movi.cartao_ficticio IS 'Número do cartão fictício já mascarado'""",
    """COMMENT ON COLUMN dados_fake.asso_conta_movi.nome IS 'Nome do associado'""",
    """COMMENT ON COLUMN dados_fake.asso_conta_movi.sobrenome IS 'Sobrenome do associado'""",
    """COMMENT ON COLUMN dados_fake.asso_conta_movi.idade IS 'Idade do associado'""",
    """COMMENT ON COLUMN dados_fake.asso_conta_movi.email IS 'E‑mail do associado já mascarado'""",
    """COMMENT ON COLUMN dados_fake.asso_conta_movi.tp_pessoa IS 'Tipo de pessoa (PF/PJ)'""",
    """COMMENT ON COLUMN dados_fake.asso_conta_movi.dt_criacao_conta IS 'Data de criação da conta'""",
    """COMMENT ON COLUMN dados_fake.asso_conta_movi.UNIQUE_ID IS 'Identificador único gerado por hash MD5'"""
]

for stmt in column_comments:
    spark.sql(stmt)

In [0]:
%sql
SELECT * FROM dados_fake.asso_conta_movi LIMIT 100