## Scraping Divida Ativa

Ações - inclusão de registros de pagamentos efetivaodos da dívida atíva.

### Data Sources
- Dados extraidos da intranet da pmmc
- file:  'summary_consulta-parcelamentos-data-pagamento.parquet'

### Changes
- 17-01-2024 : Started project

In [1]:
import datetime
import os
from pathlib import Path

import numpy as np
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
from sqlalchemy.types import (BigInteger, Boolean, Date, DateTime, Integer,
                              String)
from summarytools import dfSummary

import config

### File Locations

In [2]:
in_file = Path.cwd().parent / "data" / "processed" / "summary_consulta-parcelamentos-data-pagamento.parquet"

script_file = Path.cwd().parent / "scripts_sql" / f"DML_pagamentos_efetivados.sql"

DATA_DOWNLOAD = os.getenv('DATA_DOWNLOAD')
DATA_DOWNLOAD

'2024-01-16'

In [3]:
df = pd.read_parquet(in_file)
df.columns

Index(['TipoNum', 'Inscricao', 'ValorParcelado', 'Parcelamento', 'DataAcordo',
       'ValorPago', 'Honorarios', 'Parcela', 'DataPagamento', 'Numero', 'Ano',
       'Tipo', 'IdParcela', 'IdAcordo', 'IdCadastro'],
      dtype='object')

In [4]:
df.columns = [c.lower() for c in df.columns]
df.columns

Index(['tiponum', 'inscricao', 'valorparcelado', 'parcelamento', 'dataacordo',
       'valorpago', 'honorarios', 'parcela', 'datapagamento', 'numero', 'ano',
       'tipo', 'idparcela', 'idacordo', 'idcadastro'],
      dtype='object')

In [5]:
# for col in df.columns:
#     print(f'{col:.<30}:', max([len(str(v)) for v in df[col]]))

### Data Manipulation

In [6]:
print(df.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1253788 entries, 0 to 1253787
Data columns (total 15 columns):
 #   Column          Non-Null Count    Dtype         
---  ------          --------------    -----         
 0   tiponum         1253788 non-null  int64         
 1   inscricao       1253788 non-null  int64         
 2   valorparcelado  1253788 non-null  Float64       
 3   parcelamento    1253788 non-null  int64         
 4   dataacordo      1253788 non-null  datetime64[ns]
 5   valorpago       1253788 non-null  Float64       
 6   honorarios      1142340 non-null  Float64       
 7   parcela         1253788 non-null  int64         
 8   datapagamento   1253788 non-null  datetime64[ns]
 9   numero          1253788 non-null  int64         
 10  ano             1253788 non-null  int64         
 11  tipo            1253788 non-null  object        
 12  idparcela       1253788 non-null  object        
 13  idacordo        1253788 non-null  object        
 14  idcadastro      12

In [7]:
df['data_referencia'] = datetime.datetime.strptime(DATA_DOWNLOAD, '%Y-%m-%d')
df['data_referencia']

0         2024-01-16
1         2024-01-16
2         2024-01-16
3         2024-01-16
4         2024-01-16
             ...    
1253783   2024-01-16
1253784   2024-01-16
1253785   2024-01-16
1253786   2024-01-16
1253787   2024-01-16
Name: data_referencia, Length: 1253788, dtype: datetime64[ns]

### Get connection data

In [8]:
DB_HOST = os.getenv('DB_HOST')
DB_PORT = os.getenv('DB_PORT')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_SCHEMA = os.getenv('DB_SCHEMA')

conn_string = f'postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/postgres'

### Create conection

In [9]:
db = create_engine(conn_string)

### Create schema

In [10]:
with db.connect() as conn:
    conn.execute('CREATE SCHEMA IF NOT EXISTS divida_ativa;')
    conn.execute('SET search_path TO public, public;')
    conn.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";')
    conn.execute('SET search_path TO divida_ativa, public;')

### Create table

In [11]:
sql_script_ddl = (
    f"CREATE TABLE IF NOT EXISTS {DB_SCHEMA}.pagamento_efetivado ("
        'uuid_pagamento_efetivado UUID NOT NULL DEFAULT uuid_generate_v4(),'
        f"data_referencia DATE, "
        f"created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, "
        'tiponum INTEGER, '
        'inscricao VARCHAR(20), '
        'valorparcelado FLOAT, '
        'parcelamento INTEGER, '
        'dataacordo DATE, '
        'valorpago FLOAT, '
        'honorarios FLOAT, '
        'parcela INTEGER, '
        'datapagamento DATE, '
        'numero INTEGER, '
        'ano INTEGER, '
        'tipo VARCHAR(50), '
        'idparcela VARCHAR(14), '
        'idacordo VARCHAR(11), '
        'idcadastro VARCHAR(16), '
        f" CONSTRAINT pagamento_efetivado_pkey PRIMARY KEY (uuid_pagamento_efetivado)"
    "); "
)
sql_script_ddl

'CREATE TABLE IF NOT EXISTS divida_ativa.pagamento_efetivado (uuid_pagamento_efetivado UUID NOT NULL DEFAULT uuid_generate_v4(),data_referencia DATE, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, tiponum INTEGER, inscricao VARCHAR(20), valorparcelado FLOAT, parcelamento INTEGER, dataacordo DATE, valorpago FLOAT, honorarios FLOAT, parcela INTEGER, datapagamento DATE, numero INTEGER, ano INTEGER, tipo VARCHAR(50), idparcela VARCHAR(14), idacordo VARCHAR(11), idcadastro VARCHAR(16),  CONSTRAINT pagamento_efetivado_pkey PRIMARY KEY (uuid_pagamento_efetivado)); '

In [12]:
with db.connect() as conn:
    conn.execute(sql_script_ddl)

In [13]:
with db.connect() as conn:
    conn.execute(F'CREATE INDEX IF NOT EXISTS idx_idparcela ON {DB_SCHEMA}.pagamento_efetivado (idparcela);')

### Insert into table - pagamentos efetivados

In [None]:
dtypes = {}

# Converte o DataFrame em uma tabela temporária no banco de dados
registros_adicionados = df.to_sql('tabela_temporaria', db, schema=DB_SCHEMA, if_exists='replace', dtype=dtypes, index=False, chunksize=1000)

colunas_existentes = [c.lower() for c in df.columns]

query = (
        f"INSERT INTO {DB_SCHEMA}.pagamento_efetivado ({', '.join(colunas_existentes)}) "
        f"SELECT {', '.join(colunas_existentes)} "
        f"FROM {DB_SCHEMA}.tabela_temporaria  AS tb_temp "
        f"WHERE NOT EXISTS ( "
            f"SELECT 1  "
            f"FROM {DB_SCHEMA}.pagamento_efetivado pe "
            f"WHERE pe.data_referencia = tb_temp.data_referencia "
            f"AND pe.idparcela = tb_temp.idparcela "
        ") "
    f"RETURNING *;"
)

print(query)

with db.connect() as conn:
    result = conn.execute(query)
    conn.execute(f"DROP TABLE IF EXISTS {DB_SCHEMA}.tabela_temporaria;")

    # Recupera as informações da transação (linhas inseridas)
    inserted_rows = result.fetchall()

print('***Total_registros:', len(inserted_rows))

### Save output script file

In [None]:
# Salva o script SQL em um arquivo
with open(script_file, 'w') as arquivo:
    arquivo.write(query)