In [1]:
import dask.dataframe as dd
from dask.distributed import Client
import time
import asyncio
import pandas as pd
#from pandera.typing.dask import DataFrame, Series
import pandera as pa
from sqlalchemy import table, column, select, types, Float
from sqlalchemy.dialects import oracle

In [2]:
client= Client("tcp://10.128.0.48:8786")

In [3]:
data="/tmp/daskdata/source/APDW_CARGA.txt"

In [4]:
url_db = 'oracle+cx_oracle://c##pocdask:pocdask@vm-oracle-xe2.c.poc-dask-porto.internal:1521/XE'

In [5]:
meta_schema=pd.DataFrame({ 
 #'cod': pd.Series([],name='cod', dtype= 'Int64'),
 'cod_ra': pd.Series([],name='cod_ra', dtype= 'Int64'),
 'cod_mo': pd.Series([],name='cod_mo', dtype= 'Int64'),
 '00AAAJ': pd.Series([],name='00AAAJ', dtype= 'str'),
 'dat_ult_at': pd.Series([],name='dat_ult_at', dtype= 'datetime64[ns]'),
 'qtd_doc_seg_n': pd.Series([],name='qtd_doc_seg_n', dtype= 'Int64'),
 'qtd_doc_seg_r': pd.Series([],name='qtd_doc_seg_r', dtype= 'Int64'),
 'vlr_doc_seg_emit': pd.Series([],name='vlr_doc_seg_emit', dtype= 'Float64'),
 'vlr_premio_cobra': pd.Series([],name='vlr_premio_cobra', dtype= 'Float64'),
 'vlr_sinistro_avi': pd.Series([],name='vlr_sinistro_avi', dtype= 'Float64'),
 'vlr_sinistro_pag': pd.Series([],name='vlr_sinistro_pag', dtype= 'Float64')
 })

LEITURA BRUTA DE DADOS DO DIRETORIO
====================================================================================

In [70]:
def pipeline_raw_oracle(data):
    #le os dados
    df=dd.read_csv(data, sep="|", assume_missing=True, dtype={'00AAAJ': str})
    df_result=(df.drop('Unnamed: 11', axis=1)
          .assign(DAT_ULT_AT=dd.to_datetime(df['DAT_ULT_AT'],format='%Y-%m-%d',errors='coerce'))
          .to_sql("tb_raw_data",url_db, if_exists='append', index=False,  compute=True, parallel=True, chunksize=500))
    print("Data frame lido")

In [71]:
start_time = time.time()
pipeline_raw_oracle(data)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Inserção concluida,{elapsed_time} ")

Data frame lido
Inserção concluida,25.381739139556885 


FUNÇÃO RELATORIO DE VALORES FEVEREIRO
====================================================================================

In [32]:
def soma_valores_fevereiro():
    #define o select da tabela
    tb=table('tb_raw_data',
        column('cod'),
        column('dat_ult_at'),
        column('vlr_doc_seg_emit'),
        column('vlr_premio_cobra'),
        column('vlr_sinistro_avi'),
        column('vlr_sinistro_pag')
    )
    #le  a tabela
    df = dd.read_sql(sql=select(tb), con=url_db,index_col= 'cod', dtype={
        'dat_ult_at': 'datetime64[ns]',
        'vlr_doc_seg_emit': float,   
        'vlr_premio_cobra': float,
        'vlr_sinistro_avi': float,
        'vlr_sinistro_pag': float     
    }).reset_index()
    #Aplica as regras de negocio
    df=df.drop_duplicates()
    df_result = (df.assign(dat_ult_at=dd.to_datetime(df['dat_ult_at'],format='%Y-%m-%d',errors='coerce'))
                .query("cod==1 & dat_ult_at.between('2023-02-01','2023-02-28')")
                .groupby(['cod','dat_ult_at'])['vlr_doc_seg_emit','vlr_premio_cobra','vlr_sinistro_avi','vlr_sinistro_pag'].sum()
                .to_sql("tb_soma_valores_fevereiro",url_db, if_exists='append', index=True,  compute=True, parallel=True, chunksize=500))

In [33]:
#insere no oracle
print("Iniciando inserção")
start_time = time.time()
soma_valores_fevereiro()
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Inserção concluida,{elapsed_time} ")

Iniciando inserção
Inserção concluida,92.62596702575684 


FUNÇÃO CONSULTA TABELA DE REFERÊNCIA
====================================================================================

In [13]:
def consulta_tab_ref():
    tb=table('tb_raw_data',
        column('cod'),
        column('dat_ult_at'),
        column('vlr_doc_seg_emit'),
        column('vlr_premio_cobra'),
        column('vlr_sinistro_avi'),
        column('vlr_sinistro_pag')
    )
    df = dd.read_sql(sql=select(tb), con=url_db,index_col= 'cod', dtype={
            'vlr_doc_seg_emit': float,   
            'vlr_premio_cobra': float,
            'vlr_sinistro_avi': float,
            'vlr_sinistro_pag': float     
        
    })
    
    df_ref = dd.read_sql(sql='table_ref', con=url_db,index_col= 'cod')
    df_merge= dd.merge(df,df_ref, how='left')
    print("Iniciando inserção")
    df_merge=(df_merge.fillna({
        'nome_cliente': "Cliente não encontrado"
    }).to_sql("tb_consulta_ref",url_db, if_exists='append', index=True,  compute=True, parallel=True, chunksize=500))


In [14]:
start_time = time.time()
consulta_tab_ref()
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Inserção concluida,{elapsed_time} ")

Iniciando inserção
Inserção concluida,225.934805393219 


FUNÇÃO SEPARA COD RA
====================================================================================


In [64]:
df = dd.read_sql_table(table_name='tb_raw_data', con=url_db,index_col= 'cod', meta= meta_schema)
df=client.persist(df)
def filtra_cod(cod):
    df_codra = client.persist(df[df['cod_ra']==cod])
    df_codra.to_sql(f"tb_codra_dask_{cod}",url_db, if_exists='append', index=False, compute = True, parallel = True, chunksize=500,dtype={
            '00AAAJ': types.VARCHAR(6),
             'vlr_doc_seg_emit': oracle.FLOAT(binary_precision=53),
             'vlr_premio_cobra': oracle.FLOAT(binary_precision=53),
             'vlr_sinistro_avi': oracle.FLOAT(binary_precision=53),
             'vlr_sinistro_pag':oracle.FLOAT(binary_precision=53)
        })

async def escreve_tabelas():
    tasks = [asyncio.to_thread(filtra_cod, cod) for cod in df.cod_ra.unique()]
    res = await asyncio.gather(*tasks)
    
start_time = time.time()    
await escreve_tabelas()
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Inserção concluida,{elapsed_time} ")

Inserção concluida,183.59454321861267 


Pandera 
=

In [6]:
schema= pa.DataFrameSchema({
    '00AAAJ': pa.Column(str,checks=pa.Check.isin(['aaaa']))
})

In [7]:
df = dd.read_sql_table(table_name='tb_raw_data', con=url_db,index_col= 'cod', meta= meta_schema)
df=client.persist(df)

In [8]:
try:
    schema.validate(df.compute())
except pa.errors.SchemaError as err: 
    df_erros= err.failure_cases
    

In [12]:
df_erros.info

<bound method DataFrame.info of    index failure_case
0      1       00AAAJ
1      1       00AAAJ
2      1       00AAAJ
3      1       00AAAJ
4      1       00AAAJ
5      1       00AAAJ
6      1       00AAAJ
7      1       00AAAJ
8      1       00AAAJ
9      1       00AAAJ>