In [0]:
# Mantenha seu código identado, organizado e comentado sempre que possível, tornando-o mais claro e facilitando a leitura e execução, tanto para você quanto para outras pessoas.

# Evite trazer dados desnecessários para o seu projeto. Se você só precisa de 2 colunas de um total de 10, não é preciso trazer toda a tabela para ser processada. Traga apenas as 2 colunas desejadas.

# Organize seu código de forma que cada consulta, tabela, conexão ou assunto seja alocado em uma célula separada, sempre nomeando a célula adequadamente.

# Ao trabalhar com PySpark, utilize apenas as bibliotecas necessárias. Por exemplo, no caso de dados estruturados como deste projeto, a biblioteca pyspark.sql.functions é suficiente para executar esse serviço. Adicionar muitas bibliotecas pode aumentar o tempo de execução e causar conflitos entre os comandos de execução.

# Diferente da sintaxe do SQL, o PySpark tem um código mais flexível. Neste sentido, quando possível aplique os comandos nesta ordem FILTER, SELECT, JOIN, GROUP BY, AGG, DISTINCT e LIMIT.

# Evite usar comandos que exigem mais poder de processamento ou memória, como display, show, sort, persist, collect ou toPandas. Dependendo do tamanho do seu DataFrame, esses comandos podem sobrecarregar o sistema e tornar o projeto mais lento. Use-os apenas quando necessário.

# Evite repetir a mesma string várias vezes, utilize variáveis ou constantes para situações repetitivas.

# Evite criar objetos paralelos como exp's em DEV, INT ou qualquer outro formato que exija armazenamento físico. Isso pode aumentar os custos do projeto e tornar o processamento mais lento. Prefira trabalhar com visualizações temporárias(temp view), que são mais rápidas e eficientes, reduzindo o tempo de processamento e melhorando o desempenho do seu projeto.

# Após concluir e testar seu projeto, remova todos os comandos e comentários desnecessários. Em seguida, salve e versione o projeto no repositório Git da equipe.

# Em caso de dúvidas sobre a construção e ou padronização do seu código, consulte as regras internacionais do padrão Python PEP8 -> https://peps.python.org/pep-0008/#introduction

In [0]:
# Projeto: Gerar a lista de associados que tiveram cheques emitidos no mês atual

# Resposta: Cruzar a CCRASSO x CHQ_EMITIDOS
# Colunas necessárias:  Oid do cheque, Valor do cheque, Coop e conta do assicoado, E-mail do associado


**Dados EXP:**

**1-)** Nome da EXP: exp_fluxo_de_caixa_pj.nome_tabela

**2-)** Solicitante: Nome ou e-mail do solicitante

----

| LDAP EXECUTOR  |  DATA |  AÇÃO  |    OBSERVAÇÃO
|-|-|-|-
|seu_ldap|09/11/1851 |Construção da exp | Criação de exp com dados da Diana
|seu_ldap|09/11/1851|Correção de exp | Refatoração do código do frame mod_databricks_v3 para diana_framework 
|seu_ldap|09/11/1851 |Alteração de exp | Alteração na regra de negócio... xyz

/Workspace/Users/seu_e-mail@banco_cooperativo.com.br/repositorio_exps/utils/Diana_framework/diana_framework

In [0]:
%run ../../utils/Diana_framework/diana_framework

In [0]:
from pyspark.sql.functions import *

In [0]:
# Modelo de consulta exclusiva para o Denodo
## As conexões teradata e Denodo devem seguir via comando SQL
query_dnd = """
    SELECT 
        current_date() as data_dnd
    FROM 
        portabilidade_ativa_solicitacoes_cas
"""
TableManager.create_temp_view_from_denodo(query_dnd, "TEMP_DND")

In [0]:
# Modelo de consulta exclusiva para o Teradata
## As conexões teradata e Denodo devem seguir via comando SQL
query_teradata = """
    SELECT TOP 1
       current_date() as data_tera
    FROM 
        P_APP_CREDITO_T.PROD_SUP_COMERCIAL
"""
TableManager.create_temp_view_from_tera(query_teradata, "TEMP_TERA")

In [0]:
# Passo a passo para entendimento básico

# Passo 1 - Fazer a conexão com a tabela desejada
df_emitidos = spark.table("banco_cooperativo_CAS.EXP_FLUXO_DE_CAIXA_PJ.CHQ_EMITIDOS")

# Passo 2 - Se possível filtrar os dados inicialmente
df_emitidos = df_emitidos.filter(month(df_emitidos['dat_apresentacao']) == month(current_date()))

# Passo 3 - Selecionar as colunas dos dados já filtrados
df_emitidos = df_emitidos.select("cod_cooperativa","cod_agencia","num_conta","dat_apresentacao","oid_cheque","num_cpf_cnpj","vlr_cheque")

In [0]:
# Aqui estão alguns exemplos de consulta que geram o mesmo resultado, porém cada um é executado de uma maneira diferente. Isso mostra a diversidade que a linguagem PySpark pode oferecer.

# Ex.:1 Modelo mais esperado
df_asso = spark.table("banco_cooperativo_CAS.REF_ORACLE_COREDBPDB_AGUNICO.CCR0ASSO")
df_asso = df_asso.filter(df_asso["is_deleted"] == "N") \
    .select("NUM_AG", "FCONTA", "FCPF_CGC", "fcemail") \
    .withColumn("FCONTA", regexp_replace(df_asso["FCONTA"], "-", "")) \
    .distinct()


# Ex.:2 Modelo mais didático
df_asso = spark.table("banco_cooperativo_CAS.REF_ORACLE_COREDBPDB_AGUNICO.CCR0ASSO")
df_asso = df_asso.filter(df_asso["is_deleted"] == "N")
df_asso = df_asso.select("NUM_AG", "FCONTA", "FCPF_CGC", "fcemail")
df_asso = df_asso.withColumn("FCONTA", regexp_replace(df_asso["FCONTA"], "-", ""))
df_asso = df_asso.distinct()


# Ex.: 3 Modelo mais enxuto
df_asso = (
    spark.table("banco_cooperativo_CAS.REF_ORACLE_COREDBPDB_AGUNICO.CCR0ASSO")
    .filter("is_deleted = 'N'")
    .select("NUM_AG", "FCONTA", "FCPF_CGC", "fcemail")
    .withColumn("FCONTA", regexp_replace(col("FCONTA"), "-", ""))
    .distinct()
)


# Ex.: 4 Modelo sem quebra de linhas
df_asso = spark.table("banco_cooperativo_CAS.REF_ORACLE_COREDBPDB_AGUNICO.CCR0ASSO")
df_asso = (
    df_asso.filter(df_asso["is_deleted"] == "N")
    .select("NUM_AG", "FCONTA", "FCPF_CGC", "fcemail")
    .withColumn("FCONTA", regexp_replace(df_asso["FCONTA"], "-", ""))
    .distinct()
)


# Ex.: 5 Modelo com interação SQL
df_asso = spark.sql("""
    SELECT DISTINCT
        NUM_AG
        , REPLACE(FCONTA,"-", "") AS FCONTA
        , FCPF_CGC
        , fcemail
    FROM
        banco_cooperativo_CAS.REF_ORACLE_COREDBPDB_AGUNICO.CCR0ASSO
    WHERE is_deleted = 'N'
""")

In [0]:
# Neste exemplo foi utilizado left join para fazer a ligação entre os DataFrames. Assim os dados de df_emitidos são validados no df_asso. Outros exemplos de join podem ser o rigth, inner e outer.

df_final = df_emitidos.join(
    df_asso, 
    on=[
        (df_emitidos["cod_cooperativa"] == df_asso["NUM_AG"]), 
        (df_emitidos["num_conta"] == df_asso["FCONTA"])
    ], 
    how='left'
)

df_final = df_final.select("cod_cooperativa","num_conta","num_cpf_cnpj","dat_apresentacao","oid_cheque","FCPF_CGC","fcemail","vlr_cheque").limit(10)
df_final.createOrReplaceTempView("TABELA_TESTE_FRAMEWORK_PYTHON")

In [0]:
# Modelo do preenchimento do comando "table_columns"
TableManager.table_columns = [
    ['COD_COOPERATIVA', 'string', 'Código da Cooperativa'],
    ['NUM_CONTA', 'string', 'Número da conta'],
    ['NUM_CPF_CNPJ', 'string', 'Número do CPF ou CNPJ da pessoa (campo de texto, porém sem formato, ex: 00022233344)'],
    ['DAT_APRESENTACAO', 'timestamp', 'Data da operação'],
    ['OID_CHEQUE', 'string', 'OID do cheque'],
    ['FCPF_CGC', 'string', 'FCPF/CGC'],
    ['FCEMAIL', 'string', 'E-mail'],
    ['VLR_CHEQUE', 'decimal', 'Valor do cheque']
]

## Chave primária da tabela (Evite deixar este campo vazio)
TableManager.table_pk = ['cod_cooperativa', 'num_conta','dat_apresentacao','oid_cheque'] 

## Particionamento da tabela (Cuidado ao particionar a tabela)
#TableManager.table_partition = ['ANO_MES']

## Comentário da tabela
TableManager.table_comment = "Tabela modelo para criação de objeto via framework Diana em linguagem Python"

## Nome da tabela
TableManager.source_view = "TABELA_TESTE_FRAMEWORK_PYTHON"

## Nome do schema
TableManager.schema_name = "exp_fluxo_de_caixa_pj"

print(TableManager.schema_name + "." + TableManager.source_view)

In [0]:
## Validação de tabela
TableManager.check_table()

## Apagar a tabela e seus metadados
#TableManager.drop_table()
 
## Criar tabela com dados totais
TableManager.create_table_overwrite()

## Adicionar dados a uma tabela já existente
#TableManager.create_table_append()

## Adicionar e atualiza dados a uma tabela já existente
#TableManager.create_table_upsert()

## Carregar o visual de algumas linhas do objeto criado
#TableManager.display_table()

In [0]:
# Principais comandos PySpark para operação e tratamento de dados

## Comando que seleciona e distingue os dados considerando os parametros informados
df_emitidos_select = df_emitidos.select("cod_cooperativa","oid_cheque","vlr_cheque").distinct()

## Comando que retira duplicidades do df que sestá sendo tratado
df_emitidos = df_emitidos.dropDuplicates(["cod_cooperativa","oid_cheque"])
df_emitidos = df_emitidos.dropDuplicates()

## Comando para apagar uma coluna
df_emitidos_drop_coluna = df_emitidos.drop("oid_cheque_substring")

## Comando que ordena os dados do df (desc, asc)
df_emitidos_order_by = df_emitidos.orderBy("cod_cooperativa", df_emitidos["cod_cooperativa"].asc())

## Comando que faz group by no df
df_emitidos_group = df_emitidos.groupBy("cod_cooperativa")

## Comando que faz group by e faz agregação de valor no df (sum, max, min)
df_emitidos_group_valor = df_emitidos.groupBy("cod_cooperativa","vlr_cheque").agg({"vlr_cheque":"sum"})

## Comando para limitar os dados que serão carregados no df
df_emitidos_limite = df_emitidos.limit(10)

## Comando para renomear a coluna que já existe
df_emitidos_coluna_renomeada = df_emitidos.withColumnRenamed("oid_cheque_substring","oid_coluna_renomeada")

## Comando para adicionar um nova coluna
df_emitidos_nova_coluna = df_emitidos.withColumn("oid_nova_coluna", col("oid_cheque"))

# Comando para formatar os dados selecinados em formato de data
df_emitidos_to_date = df_emitidos.withColumn("dat_apresentacao", to_date(col("dat_apresentacao")))

## Comando para formatar o tipo da coluna (int, integer, strig, date, timestamp, decimal, float, boolean, bigint)
df_emitidos_withColumn = df_emitidos.withColumn("cod_cooperativa", df_emitidos["cod_cooperativa"].cast("string"))

## Comando para retirar espaços da string
df_emitidos_trim = df_emitidos.withColumn("oid_nova_coluna", trim(col("oid_cheque")))

## Comando que substitui carácter dentro de uma string
df_emitidos_replace = df_emitidos.withColumn("cod_cooperativa_replace", regexp_replace(df_emitidos["cod_cooperativa"], "0", "@"))

## Comando para desmembrar uma string dentro de uma coluna
df_emitidos_substring = df_emitidos.withColumn("oid_cheque_substring", substring(col("oid_cheque"), 1, 2))

## Comando para tornar todas as letras maiusculas
df_asso_upper = df_asso.withColumn("fcemail_upper", upper(col("fcemail")))

## Comando para tornar todas as letras minusculas
df_asso_lower = df_asso.withColumn("fcemail_minuscula", lower(col("fcemail")))

## Comando para tornar a primeira letra maiuscula de todas as outras minusculas 
df_asso_initcap = df_asso.withColumn("fcemail_initcap", initcap(col("fcemail")))

## Comando para arredondar uma coluna para um número inteiro
df_emitidos_media = df_emitidos.withColumn("vlr_cheque", round(col("vlr_cheque")))

# Comando para calcular a diferença entre duas datas
df_emitidos_data_diferenca = df_emitidos.withColumn("data_diferenca", datediff(col("dat_apresentacao"), col("dat_apresentacao")))

## Comando que faz o describe do df selecionado
#df_emitidos.describe()

## Comando que faz o explode de um array
#df_emitidos_explode = df_emitidos.withColumn("array_explode", explode(col("array_explode")))

## Comando para contar as linhas do df
#df_emitidos.count()

## Comando para mostrar em tela o objeto gerado
#display(df_emitidos)

## Comando para criar uma temp view acessível em SQL
#df_emitidos.createOrReplaceTempView("TABELA_TEMPORARIA_CHEQUE_EMITIDOS")