In [1]:
import pandas as pd
import os
# importando bibliotecas conexão PDGT
import json
import argparse
import subprocess
import boto3
import time
from pyathena import connect
import pandas.io.sql as sqlio
import sys
from ydata_profiling import ProfileReport
from botocore import UNSIGNED
from botocore.config import Config
import boto3.session
from botocore import exceptions
from scipy.stats import chi2_contingency
#importando bibliotecas padrão
import pandas as pd
from datetime import datetime
from validate_docbr import CPF
import matplotlib.pyplot as plt
import numpy as np

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class CustomException(Exception):
    pass

json_manifest_dbt = "target/manifest.json"
athena_bucket = "todos-athena-us-east-1"
athena_tmp_folder = f"s3://{athena_bucket}/"

def execute_athena_sql(query):
    client = boto3.client('athena', region_name='us-east-1')
    queryStart = client.start_query_execution(
    QueryString = query,
    ResultConfiguration = { 'OutputLocation': athena_tmp_folder})
    queryExecution = client.get_query_execution(QueryExecutionId=queryStart['QueryExecutionId'])
    while queryExecution['QueryExecution']['Status']['State'] in ('RUNNING', 'QUEUED'):
        time.sleep(5)
        queryExecution = client.get_query_execution(QueryExecutionId=queryStart['QueryExecutionId'])

def execute_athena_query(query):
    cursor = connect(s3_staging_dir=athena_tmp_folder,
                    region_name="us-east-1").cursor()
    cursor.execute(query)
    colls=','.join(str(f"{e[0]}") for e in cursor.description)
    results=pd.DataFrame(list(cursor), columns=colls.split(","))
    return results

In [3]:
query = """
--SCRIPT LIMPEZA DADOS PACIENTE
--SELECT count(*) AS qtd_pacientes FROM (
WITH pacientes AS (
	SELECT
		CASE WHEN trim(pa.cpf) IS NOT NULL 
			 AND LENGTH(trim(pa.cpf)) = 11
			 AND trim(pa.cpf) NOT IN ('00000000000')
			 THEN trim(pa.cpf)
			 ELSE NULL END AS cpf,
			count(1) AS count
	FROM todos_data_lake_trusted_feegow.pacientes pa
--	JOIN pira ON pira.id_paciente = pa.id
	WHERE 1=1
		AND trim(pa.cpf) IS NOT NULL
		AND LENGTH(trim(pa.cpf)) = 11
		AND trim(pa.cpf) NOT IN ('00000000000')
--		AND pa.email IN (LIKE '%@%')
--		AND pa.sys_active = 1
	GROUP BY 
		trim(pa.cpf)
	HAVING 
		count(1) = 1
),
cdt AS (
	WITH clean AS (
	SELECT 
		trim(fl.cpf) AS cpf, 
		count(1) 
	FROM pdgt_cartaodetodos_filiado.fl_filiado fl 
	WHERE fl.status_atual = 1  
	GROUP BY 
		fl.cpf
	HAVING 
		count(1) = 1
		)
	SELECT DISTINCT
		trim(fl.cpf) AS cpf,
		CASE WHEN fl.flag_titular = 1 THEN 'Titular'
			 ELSE 'Dependente' END titular
	FROM pdgt_cartaodetodos_filiado.fl_filiado fl
	JOIN clean ON clean.cpf = trim(fl.cpf)
	WHERE 1=1
--		AND fl.flag_titular = 1
		AND fl.status_atual = 1  
),
mail AS (
	SELECT
		trim(pa.cpf) AS cpf,
		pa.email,
		count(1) AS agregador
	FROM todos_data_lake_trusted_feegow.pacientes pa
	JOIN pacientes ON 
		trim(pa.cpf) = pacientes.cpf
	WHERE 1=1
		AND pa.email LIKE '%@%'
	GROUP BY 
		trim(pa.cpf),
		pa.email
	HAVING 
		count(1) = 1
),
cdtml AS (
	WITH clean AS (
	SELECT 
		trim(fl.cpf) AS cpf, 
		count(1) 
	FROM pdgt_cartaodetodos_filiado.fl_filiado fl 
	WHERE fl.status_atual = 1  
	GROUP BY 
		fl.cpf
	HAVING 
		count(1) = 1
		)
	SELECT DISTINCT
		trim(fl.cpf) AS cpf,
		CASE WHEN fl.flag_titular = 1 THEN 'Titular'
			 ELSE 'Dependente' END titular,
		fl.email
	FROM pdgt_cartaodetodos_filiado.fl_filiado fl
	JOIN clean ON clean.cpf = trim(fl.cpf)
	WHERE 1=1
		AND fl.status_atual = 1  
--		AND fl.flag_titular = 1
		AND fl.email LIKE '%@%'
		AND fl.flag_email_valido = 1
),
query AS (
	SELECT DISTINCT
		--pa.id,
		--id do paciente na feegow é o mesmo id do prontuário
		pa.sys_active AS ativo,
		NULL AS obito,
		pa.id AS prontuario,
		--id do paciente na feegow é o mesmo id do prontuário
		pacientes.cpf AS cpf,
		SPLIT_PART(pa.nome_paciente , ' ', 1) AS primeiro_nome,
		TRIM(SUBSTRING(nome_paciente FROM POSITION(' ' IN pa.nome_paciente) + 1)) AS sobrenome,
		pa.nome_paciente AS nome_social,
		NULL AS rg,
		DATE_FORMAT(pa.nascimento, '%d/%m/%Y') AS data_nascimento,
		NULL AS nome_mae,
		NULL AS naturalidade,
		pa.profissao,
		NULL AS restricoes_trat_medico,
		NULL AS telefone,
		REGEXP_REPLACE(pa.celular, '[ "()-]', '') AS celular,
		'' AS celular_alternativo,
		CASE WHEN mail.email IS NULL THEN lower(cdtml.email)
			 ELSE lower(mail.email) END email,
		spe.cep,
		spe.logradouro AS endereco,
		spe.numero,
		spe.complemento,
		spe.bairro,
		spe.cidade,
		spe.estado,
		NULL AS fotografia,
		NULL AS observacoes,
		NULL AS cns,
		pa.sysdate AS created_at,
		pa.dhup,
		pa.sexo AS sexo_id,
		sx.nomesexo AS sexo,
		CASE
			--condição criada para verificar se há matricula e trazer Cartão de TODOS
	    WHEN stp.nome_tabela_particular = 'Cartão de TODOS*' THEN 'Cartão de TODOS'
			WHEN spc.matricula IS NOT NULL
			AND stp.nome_tabela_particular IS NULL THEN 'Cartão de TODOS'
			WHEN spc.matricula IS NOT NULL
			AND stp.nome_tabela_particular = 'PARTICULAR*' THEN 'Cartão de TODOS'
			ELSE stp.nome_tabela_particular
		END AS parceria,
		NULL AS etnia_id,
		NULL AS genero_id,
		pa.origem_id,
		NULL AS prioridades_id,
		NULL AS est_civil_id,
		NULL AS mig_prontuario_id,
		spc.matricula AS CDT_MATRICULA,
		NULL AS last_attendance_data,
		cdt.titular,
		'31/12/2024' AS validade--,
--		mail.email AS email_feegow, --REMOVER ESSA LINHA
--		lower(cdtml.email) AS email_cdt,
--		pacientes.cpf AS cpf_feegow
	FROM todos_data_lake_trusted_feegow.pacientes pa
	JOIN pacientes ON
		pacientes.cpf = trim(pa.cpf)
	LEFT JOIN cdt ON 
		cdt.cpf = trim(pa.cpf)
	LEFT JOIN mail ON 
		mail.cpf = trim(pa.cpf)
	LEFT JOIN cdtml ON 
		cdtml.cpf = trim(pa.cpf)
	LEFT JOIN todos_data_lake_trusted_feegow.paciente_endereco spe ON
		spe.paciente_id = pa.id
	LEFT JOIN todos_data_lake_trusted_feegow.paciente_convenio spc ON
		spc.paciente_id = pa.id
	LEFT JOIN todos_data_lake_trusted_feegow.tabelas_particulares stp ON
		stp.id = pa.tabela_id
	LEFT JOIN todos_data_lake_trusted_feegow.sexo sx ON 
		sx.id = pa.sexo
	WHERE 1=1
		AND pa.sys_active = 1
),
tratado AS (
	SELECT
	 ROW_NUMBER() OVER (ORDER BY prontuario) AS rn,
    ativo,
    obito,
    prontuario,
    cpf,
    primeiro_nome,
    sobrenome,
    nome_social,
    rg,
    data_nascimento,
    nome_mae,
    naturalidade,
    profissao,
    restricoes_trat_medico,
    telefone,
    celular,
    celular_alternativo,
    email,
    cep,
    endereco,
    numero,
    complemento,
    bairro,
    cidade,
    estado,
    fotografia,
    observacoes,
    cns,
    created_at,
    dhup,
    sexo_id,
    sexo,
    parceria,
    etnia_id,
    genero_id,
    origem_id,
    prioridades_id,
    est_civil_id,
    mig_prontuario_id,
    CDT_MATRICULA,
    last_attendance_data,
    titular,
    validade
FROM query
WHERE 1=1 
--	AND cpf IS NOT NULL 
	AND email IS NOT NULL
--	AND email_feegow IS NOT NULL	
--	AND cpf_titular IS NOT NULL
--	AND cpf_feegow IS NULL 
--	AND email_feegow IS NULL 
--	AND email_cdt IS NOT NULL
--	AND cpf = '62852450372'
--GROUP BY cpf
ORDER BY 
	prontuario
),
sujeira AS (
	SELECT
	 ROW_NUMBER() OVER (ORDER BY prontuario) AS rn,
    ativo,
    obito,
    prontuario,
    cpf,
    primeiro_nome,
    sobrenome,
    nome_social,
    rg,
    data_nascimento,
    nome_mae,
    naturalidade,
    profissao,
    restricoes_trat_medico,
    telefone,
    celular,
    celular_alternativo,
    email,
    cep,
    endereco,
    numero,
    complemento,
    bairro,
    cidade,
    estado,
    fotografia,
    observacoes,
    cns,
    created_at,
    dhup,
    sexo_id,
    sexo,
    parceria,
    etnia_id,
    genero_id,
    origem_id,
    prioridades_id,
    est_civil_id,
    mig_prontuario_id,
    CDT_MATRICULA,
    last_attendance_data,
    titular,
    validade
FROM query
WHERE 1=1 
--	AND cpf IS NOT NULL
	AND email IS NULL
--	AND email_feegow IS NOT NULL	
--	AND cpf_titular IS NOT NULL
--	AND cpf_feegow IS NULL 
--	AND email_feegow IS NULL 
--	AND email_cdt IS NOT NULL
--	AND cpf = '62852450372'
--GROUP BY cpf
ORDER BY 
	prontuario
)
SELECT 
	ativo,
    obito,
    prontuario,
    cpf,
    primeiro_nome,
    sobrenome,
    nome_social,
    rg,
    data_nascimento,
    nome_mae,
    naturalidade,
    profissao,
    restricoes_trat_medico,
    telefone,
    celular,
    celular_alternativo,
    email,
    cep,
    endereco,
    numero,
    complemento,
    bairro,
    cidade,
    estado,
    fotografia,
    observacoes,
    cns,
    created_at,
    dhup,
    sexo_id,
    sexo,
    parceria,
    etnia_id,
    genero_id,
    origem_id,
    prioridades_id,
    est_civil_id,
    mig_prontuario_id,
    CDT_MATRICULA,
    last_attendance_data,
    titular,
    validade
FROM tratado WHERE rn BETWEEN 0 AND 1000000 
"""

In [4]:
df = execute_athena_query(query)

In [6]:
df

KeyboardInterrupt: 

In [5]:
df.to_csv('csv_teste', index=False)

Error: need to escape, but no escapechar set

In [2]:
df_inv = pd.read_csv("/home/lucasmateus/dbt-projects/projetos_lucas/tech/data_quality_amei/data_quality_pacientes_feegow_email_invalido.csv")

: 

In [None]:

# Defina o diretório onde deseja salvar os arquivos CSV segmentados
output_dir = 'arquivos_csv_segmentados'
os.makedirs(output_dir, exist_ok=True)

# Calcule o número de segmentos necessários
num_segments = len(df_inv) // 1000000 + 1

# Segmente o DataFrame e salve cada segmento em um arquivo CSV
for i in range(num_segments):
    start_idx = i * 1000000
    end_idx = min((i + 1) * 1000000, len(df_inv))
    segment_df = df_inv.iloc[start_idx:end_idx]
    segment_filename = os.path.join(output_dir, f'segmento_{i + 1}.csv')
    segment_df.to_csv(segment_filename, index=False)

    print(f'Segmento {i + 1}/{num_segments} salvo em {segment_filename}')