Importações

In [16]:
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.engine import URL
from sqlalchemy.types import String, Integer, Date, DECIMAL
import re

Conexões

In [17]:
class Conexao:
    def conn_tecpar():
        url = URL.create(
            "mysql",
            username="",
            password="",
            host="",
            port=3306,
            database="",
        )
        return create_engine(url)

    def conn_cyber():
        url = URL.create(
            "postgresql",
            username="",
            password="",
            host="",
            port=5432,
            database=""      
        )
        return create_engine(url)
    
    def conn_radius():
        url = URL.create(
            "postgresql",
            username="",
            password="",
            host="",
            port=5433,
            database=""
        )
        return create_engine(url)
    
conn_cyber = Conexao.conn_cyber()
conn_tecpar = Conexao.conn_tecpar()
conn_radius = Conexao.conn_radius()

Funções

In [18]:
def remove_caracteres(numero):
    # Remover todos os caracteres não numéricos
    numero = re.sub(r'[^0-9]', '', numero)
    return numero

def adicionar_9(numero):
    if len(numero) == 10 and re.match(r'^[1-9]{2}[6-9][0-9]{7}$', numero):
        return numero[:2] + '9' + numero[2:]
    else:
        return numero

def verificar_tipo_telefone(numero):
    if len(numero) < 10:
        return 'invalido'
    elif len(numero) == 10:
        if re.match(r'^[1-9]{2}[6-9][0-9]{7}$', numero):
            return 'celular'
        elif re.match(r'^[1-9]{2}[2-5][0-9]{7}$', numero):
            return 'fixo'
        else:
            return 'invalido'
    else:
        if re.match(r'^[1-9]{2}9[1-9][0-9]{7}$', numero):
            return 'celular'
        else:
            return 'invalido'

def validar_email(email):
    padrao = r'^[a-zA-Z0-9][a-zA-Z0-9._-]*@[a-zA-Z0-9][a-zA-Z0-9._-]*(\.[a-zA-Z]{2,4})+$'
    return re.match(padrao, email) is not None

def split_concat_street(df):
    if df['street'].str.contains(',').any():
        df[['street_2', 'comp_temp']] = df['street'].str.split(',', n=1, expand=True)
        df['street_2'] = df['street_2'].str.strip()
        df['comp_temp'] = df['comp_temp'].str.strip()
        df.loc[df['street'].str.contains(','), 'complement'] = df['complement'] + ', ' + df['comp_temp']
        df = df.drop(['street', 'comp_temp'], axis=1)
        df = df.rename(columns={'street_2': 'street'})
    return df

def mapeamento_street_type(df):
    mapeamento = {
    'R.': 'Rodovia',
    'RUA': 'Rua',
    'AV.': 'Avenida',
    'AVENIDA': 'Avenida',
    'AV': 'Avenida',
    'ESTRADA': 'Estrada',
    'EST': 'Estrada' ,
    'ESTR': 'Estrada',
    'ROD.': 'Rodovia',
    'RODOVIAL': 'Rodovia',
    'RODOVIA': 'Rodovia',
    'ROD': 'Rodovia',
    'TRAVESSA': 'Travessa',
    'VIELA': 'Viela',
    'ÁREA': 'Área',
    'JARDIM': 'Jardim',
    'PRAÇA': 'Praça',
    'ALAMEDA': 'Alameda',
    }
    condicao_nula = df['street'].isnull()
    df.loc[condicao_nula, 'street'] = ''
    df['street_type'] = 'Rua'
    for chave in mapeamento:
        condicao = df['street'].str.startswith(chave)
        df.loc[condicao, 'street_type'] = mapeamento[chave]
        df.loc[condicao, 'street'] = df.loc[condicao, 'street'].str.replace(chave, '', regex=False)
    df['street'] = df['street'].str.strip()
    return df

def transformacao_velocidade(valor):
    if isinstance(valor, str):
        if valor.endswith('K'):
            velocidade_sem_k = valor[:-1]
            if velocidade_sem_k:
                valor_transformado = int(velocidade_sem_k) / 1024
                return f"{int(round(valor_transformado))}m"
            else:
                return '0m'
        elif valor.endswith('m'):
            return valor
    return valor

Querys pessoas

In [19]:
query_phone = '''
SELECT DISTINCT
    CODPESSOA PERSON_ERP_OLD_ID,
    CASE
        WHEN TIPOPESSOA = 1 THEN REPLACE(REPLACE(REPLACE(CPF, '.', ''), '-', ''), '/', '')
        ELSE REPLACE(REPLACE(REPLACE(CNPJ, '.', ''), '-', ''),'/','')
    END DOCUMENT_NUMBER,
    1 ORDER_PHONE,
    FONE01 PHONE
FROM MK_PESSOAS P
    JOIN MK_CONTRATOS C ON C.CLIENTE = P.CODPESSOA
WHERE CANCELADO = 'N'
    AND (SUSPENSO = 'N' OR SUSPENSO IS NULL)
    AND FONE01 <> ''
    AND FONE01 IS NOT NULL
UNION ALL
SELECT DISTINCT
    CODPESSOA PERSON_ERP_OLD_ID,
    CASE
        WHEN TIPOPESSOA = 1 THEN REPLACE(REPLACE(REPLACE(CPF, '.', ''), '-', ''), '/', '')
        ELSE REPLACE(REPLACE(REPLACE(CNPJ, '.', ''), '-', ''),'/','')
    END DOCUMENT_NUMBER,
    2 ORDER_PHONE,
    FONE02 PHONE
FROM MK_PESSOAS P
    JOIN MK_CONTRATOS C ON C.CLIENTE = P.CODPESSOA
WHERE CANCELADO = 'N'
    AND (SUSPENSO = 'N' OR SUSPENSO IS NULL)
    AND FONE02 <> ''
    AND FONE02 IS NOT NULL
UNION ALL
SELECT DISTINCT
    CODPESSOA PERSON_ERP_OLD_ID,
    CASE
        WHEN TIPOPESSOA = 1 THEN REPLACE(REPLACE(REPLACE(CPF, '.', ''), '-', ''), '/', '')
        ELSE REPLACE(REPLACE(REPLACE(CNPJ, '.', ''), '-', ''),'/','')
    END DOCUMENT_NUMBER,
    3 ORDER_PHONE,
    FAX PHONE
FROM MK_PESSOAS P
    JOIN MK_CONTRATOS C ON C.CLIENTE = P.CODPESSOA
WHERE CANCELADO = 'N'
    AND (SUSPENSO = 'N' OR SUSPENSO IS NULL)
    AND FAX <> ''
    AND FAX IS NOT NULL
UNION ALL
SELECT DISTINCT
    CODPESSOA PERSON_ERP_OLD_ID,
    CASE
        WHEN TIPOPESSOA = 1 THEN REPLACE(REPLACE(REPLACE(CPF, '.', ''), '-', ''), '/', '')
        ELSE REPLACE(REPLACE(REPLACE(CNPJ, '.', ''), '-', ''),'/','')
    END DOCUMENT_NUMBER,
    4 ORDER_PHONE,
    CONTATO PHONE
FROM MK_PESSOAS P
    JOIN MK_CONTRATOS C ON C.CLIENTE = P.CODPESSOA
WHERE CANCELADO = 'N'
    AND (SUSPENSO = 'N' OR SUSPENSO IS NULL)
    AND CONTATO <> ''
    AND CONTATO IS NOT NULL
UNION ALL
SELECT DISTINCT
    CODPESSOA PERSON_ERP_OLD_ID,
    CASE
        WHEN TIPOPESSOA = 1 THEN REPLACE(REPLACE(REPLACE(CPF, '.', ''), '-', ''), '/', '')
        ELSE REPLACE(REPLACE(REPLACE(CNPJ, '.', ''), '-', ''),'/','')
    END DOCUMENT_NUMBER,
    5 ORDER_PHONE,
    ENDERECOWEB PHONE
FROM MK_PESSOAS P
    JOIN MK_CONTRATOS C ON C.CLIENTE = P.CODPESSOA
WHERE CANCELADO = 'N'
    AND (SUSPENSO = 'N' OR SUSPENSO IS NULL)
    AND ENDERECOWEB <> ''
    AND ENDERECOWEB IS NOT NULL
UNION ALL
SELECT DISTINCT
    CODPESSOA PERSON_ERP_OLD_ID,
    CASE
        WHEN TIPOPESSOA = 1 THEN REPLACE(REPLACE(REPLACE(CPF, '.', ''), '-', ''), '/', '')
        ELSE REPLACE(REPLACE(REPLACE(CNPJ, '.', ''), '-', ''),'/','')
    END DOCUMENT_NUMBER,
    6 ORDER_PHONE,
    EMAIL PHONE
FROM MK_PESSOAS P
    JOIN MK_CONTRATOS C ON C.CLIENTE = P.CODPESSOA
WHERE CANCELADO = 'N'
    AND (SUSPENSO = 'N' OR SUSPENSO IS NULL)
    AND EMAIL <> ''
    AND EMAIL IS NOT NULL;
'''

query_email = '''
SELECT DISTINCT
    CODPESSOA PERSON_ERP_OLD_ID,
    CASE
        WHEN TIPOPESSOA = 1 THEN REPLACE(REPLACE(REPLACE(CPF, '.', ''), '-', ''), '/', '')
        ELSE REPLACE(REPLACE(REPLACE(CNPJ, '.', ''), '-', ''),'/','')
    END DOCUMENT_NUMBER,
    TRIM(LOWER(REGEXP_SPLIT_TO_TABLE(EMAIL, '[,;/]'))) AS EMAIL
FROM MK_PESSOAS P
    JOIN MK_CONTRATOS C ON C.CLIENTE = P.CODPESSOA
WHERE CANCELADO = 'N'
    AND (SUSPENSO = 'N' OR SUSPENSO IS NULL)
    AND EMAIL <> '' AND EMAIL IS NOT NULL
UNION ALL
SELECT DISTINCT
    CODPESSOA PERSON_ERP_OLD_ID,
    CASE
        WHEN TIPOPESSOA = 1 THEN REPLACE(REPLACE(REPLACE(CPF, '.', ''), '-', ''), '/', '')
        ELSE REPLACE(REPLACE(REPLACE(CNPJ, '.', ''), '-', ''),'/','')
    END DOCUMENT_NUMBER,
    TRIM(LOWER(REGEXP_SPLIT_TO_TABLE(ENDERECOWEB, '[,;/]'))) AS EMAIL
FROM MK_PESSOAS P
    JOIN MK_CONTRATOS C ON C.CLIENTE = P.CODPESSOA
WHERE CANCELADO = 'N'
    AND (SUSPENSO = 'N' OR SUSPENSO IS NULL)
    and ENDERECOWEB <> '' AND ENDERECOWEB IS NOT NULL
'''

query_pessoa_temp = '''
SELECT DISTINCT
    CODPESSOA AS PERSON_ERP_OLD_ID,
    TIPOPESSOA AS PERSON_TYPE, 
    CASE
        WHEN TIPOPESSOA = 1 THEN TRIM(REPLACE(REPLACE(REPLACE(CPF, '.', ''), '-', ''), '/', ''))
        ELSE TRIM(REPLACE(REPLACE(REPLACE(CNPJ, '.', ''), '-', ''),'/',''))
    END AS DOCUMENT_NUMBER,
    TRIM(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(RG, '.', ''), '-', ''),'/',''),'_',''),' ','')) AS DOCUMENT_IDENTITY,
    TRIM(UPPER(NOME_RAZAOSOCIAL)) AS PERSON_NAME,
    TRIM(UPPER(NOME_FANTASIA)) AS TRADING_NAME,
    CAST(NASCIMENTO AS DATE) AS BIRTH_DATE,
    CODPESSOA AS PERSON_ERP_OLD_ID,
    NULL AS MUNICIPAL_REGISTRATION,
    IE AS STATE_REGISTRATION,
    CODPESSOA + 8000000 AS PERSON_LOCALE_FISCAL_COLLECTION_ERP_OLD_ID
FROM MK_PESSOAS P
    JOIN MK_CONTRATOS C ON C.CLIENTE = P.CODPESSOA
WHERE CANCELADO = 'N'
    AND (SUSPENSO = 'N' OR SUSPENSO IS NULL)
'''

Tratativas de Telefone

In [20]:
df_phone = pd.read_sql(query_phone, conn_cyber)

df_phone['phone'] = df_phone['phone'].apply(remove_caracteres)
df_phone['phone'] = df_phone['phone'].apply(adicionar_9)
df_phone['type_phone'] = df_phone['phone'].apply(verificar_tipo_telefone)
df_phone = df_phone.sort_values(by=['person_erp_old_id'], ascending=True)
df_phone = df_phone.sort_values(by=['order_phone'], ascending=True)
df_phone = df_phone.drop_duplicates(subset=['document_number', 'phone'], keep='first')
df_phone['ctrl_phone'] = df_phone.groupby(['document_number', 'type_phone']).cumcount() + 1
df_phone = df_phone.drop('order_phone', axis=1)

Carga tabela de telefones

In [21]:
df_phone.to_sql(
    name='f_phone_temp',con=conn_tecpar,
    if_exists='replace',
    index=False,
    chunksize=10000,
    dtype={
        'person_erp_old_id': Integer(),
        'document_number': String(14),
        'phone': String(255),
        'type_phone': String(255),
        'ctrl_phone': Integer()
    }
)
with conn_tecpar.connect() as con:
    con.execute(text('''
    CREATE INDEX F_PHONE_TEMP_PERSON_ERP_OLD_ID_IDX USING BTREE ON f_phone_temp (person_erp_old_id);
    CREATE INDEX F_PHONE_TEMP_DOCUMENT_NUMBER_IDX USING BTREE ON f_phone_temp (document_number);
    '''))
    con.close()

Tratativas de e-mail

In [22]:
df_email = pd.read_sql(query_email, conn_cyber)

df_email['email'] = df_email['email'].str.replace(" ", "").str.strip()
df_email['email_valido'] = df_email['email'].apply(validar_email)
filtro = df_email['email_valido'] == 1
df_email = df_email[filtro]
df_email = df_email.sort_values(by=['person_erp_old_id'], ascending=True)
df_email = df_email.drop_duplicates(subset=['document_number', 'email'], keep='first')
df_email['ctrl_email'] = df_email.groupby('document_number').cumcount() + 1

Carga tabela de e-mails

In [23]:
df_email.to_sql(
    name='f_email_temp',
    con=conn_tecpar,
    if_exists='replace',
    index=False,
    chunksize=10000,
    dtype={
        'person_erp_old_id': Integer(),
        'document_number': String(14),
        'email': String(255),
        'ctrl_email': Integer(),
        'email_valido': Integer()
    }
)

with conn_tecpar.connect() as con:
    con.execute(text('''
    CREATE INDEX F_EMAIL_TEMP_PERSON_ERP_OLD_ID_IDX USING BTREE ON f_email_temp (person_erp_old_id);
    CREATE INDEX F_EMAIL_TEMP_DOCUMENT_NUMBER_IDX USING BTREE ON f_email_temp (document_number);
    '''))
    con.close()

Carga tabela pessoas temporária

In [24]:
df_pessoa_temp = pd.read_sql(query_pessoa_temp, conn_cyber)

df_pessoa_temp.to_sql(
    name='f_person_temp',
    con=conn_tecpar,
    if_exists='replace',
    index=False,
    chunksize=10000,
    dtype={
        'person_erp_old_id': Integer(),
        'person_type': Integer(),
        'document_number': String(14),
        'document_identity': String(45),
        'person_name': String(255),
        'trading_name': String(255),
        'birth_date': Date(),
        'municipal_registration': String(128),
        'state_registration': String(128),
        'person_locale_fiscal_collection_erp_old_id': Integer()
    }
)

with conn_tecpar.connect() as con:
    con.execute(text('''
    CREATE INDEX F_PERSON_TEMP_PERSON_ERP_OLD_ID_IDX USING BTREE ON f_person_temp (person_erp_old_id);
    CREATE INDEX F_PERSON_TEMP_DOCUMENT_NUMBER_IDX USING BTREE ON f_person_temp (document_number);
    '''))
    con.close()

Carga tabela PERSON final!

In [25]:
query_person = '''
SELECT	P.PERSON_ERP_OLD_ID AS person_erp_old_id,
	PERSON_TYPE AS person_type,
	P.DOCUMENT_NUMBER AS document_number,
	DOCUMENT_IDENTITY AS document_identity,
	PERSON_NAME AS person_name,
	TRADING_NAME AS trading_name,
	BIRTH_DATE AS birth_date,
	MAIL_1.EMAIL AS email,
	(
	SELECT
		GROUP_CONCAT(MAIL_2.EMAIL SEPARATOR ',')
	FROM
		f_email_temp MAIL_2
	WHERE
		MAIL_2.DOCUMENT_NUMBER = P.DOCUMENT_NUMBER
		AND MAIL_2.CTRL_EMAIL > 1) AS email_nfe,
	NULL AS email_financial,
	NULL AS email_technical,
	PHONE_1.PHONE AS cel_phone_primary,
	PHONE_2.PHONE AS cel_phone_secondary,
	LAND_1.PHONE AS landline,
	MUNICIPAL_REGISTRATION AS municipal_registration,
	STATE_REGISTRATION AS state_registration,
    PERSON_LOCALE_FISCAL_COLLECTION_ERP_OLD_ID as person_locale_fiscal_collection_erp_old_id
FROM
	f_person_temp P
LEFT JOIN	
	f_email_temp MAIL_1
	ON
	MAIL_1.DOCUMENT_NUMBER = P.DOCUMENT_NUMBER
	AND CTRL_EMAIL = 1
LEFT JOIN	
	f_phone_temp PHONE_1
	ON
	PHONE_1.DOCUMENT_NUMBER = P.DOCUMENT_NUMBER
	AND PHONE_1.CTRL_PHONE = 1
	AND PHONE_1.TYPE_PHONE = 'CELULAR'
LEFT JOIN	
	f_phone_temp PHONE_2
	ON 
	PHONE_2.DOCUMENT_NUMBER = P.DOCUMENT_NUMBER
	AND PHONE_2.CTRL_PHONE = 2
	AND PHONE_2.TYPE_PHONE = 'CELULAR'
LEFT JOIN	
	f_phone_temp LAND_1
	ON
	LAND_1.DOCUMENT_NUMBER = P.DOCUMENT_NUMBER
	AND LAND_1.CTRL_PHONE = 1
	AND LAND_1.TYPE_PHONE = 'FIXO';
'''

df_person = pd.read_sql(query_person, conn_tecpar)

df_person.to_sql(
    name='f_person',
    con=conn_tecpar,
    if_exists='replace',
    index=False,
    chunksize=10000,
    dtype={
	'person_erp_old_id': Integer(),
	'person_type': Integer(),
	'document_number': String(14),
	'document_identity': String(45),
	'person_name': String(255),
	'trading_name': String(255),
	'birth_date': Date(),
    'email': String(255),
    'email_nfe': String(255),
	'email_financial': String(500),
	'email_technical': String(500),
	'cel_phone_primary': String(11),
	'cel_phone_secondary': String(11),
	'landline': String(11),
	'municipal_registration': String(128),
	'state_registration': String(128),
    'person_locale_fiscal_collection_erp_old_id': Integer()
    }
)

with conn_tecpar.connect() as con:
    con.execute(text('''
    CREATE INDEX F_PERSON_PERSON_ERP_OLD_ID_IDX USING BTREE ON f_person (person_erp_old_id);
    CREATE INDEX F_PERSON_DOCUMENT_NUMBER_IDX USING BTREE ON f_person (document_number);
    CREATE INDEX F_PERSON_PERSON_LOCALE_FISCAL_COLLECTION_ERP_OLD_ID_IDX USING BTREE ON f_person (person_locale_fiscal_collection_erp_old_id);
    '''))
    con.close()

Carga tabela PROPOSAL final!

In [26]:
query_proposal = '''
SELECT DISTINCT CODCONTRATO AS PROPOSAL_ERP_OLD_ID,
    NULL AS PROPOSAL_DESCRIPTION,
    coalesce (vlr00,vlr_mensalidade) AS PROPOSAL_SALE_VALUE_RELATORIO,
    NULL AS PERSON_ACCOUNT_MANAGER_DOCUMENT_NUMBER,
    CLIENTE AS PERSON_ERP_OLD_ID,
    CASE
        WHEN TIPOPESSOA = 1 THEN TRIM(REPLACE(REPLACE(REPLACE(CPF, '.', ''), '-', ''), '/', ''))
        ELSE TRIM(REPLACE(REPLACE(REPLACE(CNPJ, '.', ''), '-', ''), '/', ''))
    END AS PERSON_CUSTOMER_DOCUMENT_NUMBER,
    CASE
        WHEN METODO_FATURAMENTO = '1' THEN 1
        WHEN METODO_FATURAMENTO = '2' THEN 1
        WHEN METODO_FATURAMENTO = '3' THEN 4
    END AS BUSINESS_KIND_ID,
    NULL AS PERSON_CUSTOMER_CONTACT_DOCUMENT_NUMBER,
    COALESCE(CAST(DATA_HORA_ATIVACAO AS DATE), CAST(DT_ATIVACAO AS DATE)) AS CONTRACT_START_DATE,
    CAST(DT_CANCELAMENTO AS DATE) AS CONTRACT_END_DATE,
    CASE
        WHEN CONEXAO_BLOQUEADA = 'S' THEN 'BLOQUEADO'
        ELSE 'ATIVO'
    END AS CONTRACT_STATUS,
    CASE
        WHEN CONEXAO_BLOQUEADA = 'S' 
            THEN (SELECT DISTINCT ON (CD_CONEXAO) DATA 
		            FROM MK_CONEXOES_HISTORICO_EVENTOS H
		            WHERE H.CD_CONEXAO = CX.CODCONEXAO
                    AND TIPO_EVENTO = 1
		            ORDER BY CD_CONEXAO, CODCONHISTEVENTO DESC)
        ELSE NULL
    END AS CONTRACT_BLOCKED_DAY,
    CASE
        WHEN CONEXAO_BLOQUEADA = 'S' THEN TRIM(UPPER(B.DESCRICAO))
        ELSE NULL
    END AS CONTRACT_BLOCKED_REASON,
    1 AS RECURRENT_COLLECTION_INTERVAL,
    NULL AS PAYMENT_CONDITION_INVOICE_KIND,
    RF.DIA_VCTO AS RECURRENT_INVOICE_DAY,
    RF.DIA_VCTO AS RECURRENT_COLLECTION_EXPIRATION_DAY,
    CASE 
    	WHEN RF.TIPO_REGRA = 1 THEN 2
    	WHEN RF.TIPO_REGRA = 2 THEN 1
    	ELSE NULL
    END AS RECURRENT_COLLECTION_COMPETENCE_MONTH,
    C.PROFILE_PGTO AS PAYMENT_METHOD_ERP_OLD_ID,
    PFL.NOME_PROFILE AS PAYMENT_METHOD_DESCRIPTION,
    COALESCE(TRIM(UPPER(E.RAZAO_SOCIAL)), 'TODAS') AS COMPANY_PLACE_ERP_OLD_ID,
    NULL AS COMPANY_PLACE_ERP_OLD_NAME,
    NULL AS OTHER_INFO
FROM MK_CONTRATOS C
    JOIN MK_PESSOAS P ON P.CODPESSOA = C.CLIENTE
    JOIN MK_PLANOS_ACESSO PA ON C.PLANO_ACESSO = PA.CODPLANO
    LEFT JOIN MK_MOTIVO_SUSPENSAO S ON C.SUSPENSO_MOTIVO = S.CODMOTSUSP
    LEFT JOIN MK_CONEXOES CX ON CX.CONTRATO = C.CODCONTRATO
    LEFT JOIN MK_MOTIVOS_BLOQUEIO B ON B.CODMOTBLOQ = CX.MOTIVO_BLOQUEIO
    LEFT JOIN MK_FATURAMENTOS_REGRAS RF ON RF.CODFATURAMENTOREGRA = C.CD_REGRA_FATURAMENTO
    LEFT JOIN VI_MULTIEMPRESAS E ON P.CD_EMPRESA = E.CODPROVEDOR
    LEFT JOIN MK_PROFILE_PGTO PFL ON PFL.CODPROFILE = C.PROFILE_PGTO
    left join (select max(data_vencimento) as venc0, codvinculado as cod0 from mk_plano_contas where nomenclatura_integracao = 'CNT' group by 2) as tb0 on cod0 = codcontrato
    left join (select data_vencimento as venc00, codvinculado as cod00, valor_lancamento as vlr00 from mk_plano_contas where nomenclatura_integracao = 'CNT') as tb00 on cod00 = codcontrato and cod00 = cod0 and venc0 = venc00
WHERE CANCELADO = 'N'
    AND (SUSPENSO = 'N' OR SUSPENSO IS NULL)
'''

df_proposal = pd.read_sql(query_proposal, conn_cyber)

df_proposal.to_sql(
    name='f_proposal',
    con=conn_tecpar,
    if_exists='replace',
    index=False,
    chunksize=10000,
    dtype={
    'proposal_erp_old_id': Integer(),
    'proposal_description': String(255),
    'proposal_sale_value_relatorio': DECIMAL(10,2),
    'person_account_manager_document_number': String(11),
    'person_erp_old_id': Integer(),
    'person_customer_document_number': String(14),
    'business_kind_id': Integer(),
    'person_customer_contact_document_number': String(11),
    'contract_start_date': Date(),
    'contract_end_date': Date(),
    'contract_blocked_day': Date(),
    'contract_status': String(255),
    'contract_blocked_reason': String(255),
    'recurrent_collection_interval': Integer(),
    'payment_condition_invoice_kind': Integer(),
    'recurrent_invoice_day': Integer(),
    'recurrent_collection_expiration_day': Integer(),
    'recurrent_collection_competence_month': Integer(),
    'payment_method_erp_old_id': Integer(),
    'payment_method_description': String(255),
    'company_place_erp_old_id': String(255),
    'company_place_erp_old_name': String(255),
    'other_info': String(255)
    }
)

with conn_tecpar.connect() as con:
    con.execute(text('''
    CREATE INDEX F_PROPOSAL_PERSON_CUSTOMER_DOCUMENT_NUMBER_IDX USING BTREE ON f_proposal (person_customer_document_number);
    CREATE INDEX F_PROPOSAL_PROPOSAL_ERP_OLD_ID_IDX USING BTREE ON f_proposal (proposal_erp_old_id);
    CREATE INDEX F_PROPOSAL_PERSON_ERP_OLD_ID_IDX USING BTREE ON f_proposal (person_erp_old_id);
    CREATE INDEX F_PROPOSAL_PROPOSAL_SALE_VALUE_IDX USING BTREE ON f_proposal (proposal_sale_value_relatorio);
    '''))
    con.close()

Carga tabela OFFER final!

In [27]:
query_offer = '''
SELECT DISTINCT C.CODCONTRATO AS PROPOSAL_ERP_OLD_ID,
    C.CODCONTRATO + 9000000 AS PERSON_LOCALE_OFFER_ERP_OLD_ID,
    PA.CODPLANO AS OFFER_PLAN_OLD_ID,
    PA.DESCRICAO AS OFFER_PLAN_OLD_NAME,
    C.CODCONTRATO AS OFFER_ERP_OLD_ID,
    coalesce (vlr00,vlr_mensalidade) AS OFFER_SALE_VALUE_RELATORIO,
    VLR_RENOVACAO AS OFFER_SALE_VALUE_SUGERIDO,
    COALESCE((SELECT SPLIT_PART(SPLIT_PART(PLANO_CRM.velocidades_formatadas,' ',1),'/',1)
            FROM MK_CRM_PRODUTOS,
                MK_CRM_PRODUTOS_COMPOSICAO,
                MK_PLANOS_ACESSO AS PLANO_CRM
            WHERE CD_PLANO_PRINCIPAL = C.PLANO_ACESSO
                AND CODCRMPRODUTO = CD_PRODUTO
                AND PLANO_CRM.CODPLANO = CD_PLANO
                AND PLANO_CRM.TIPO = '1'
            LIMIT 1), PA.VEL_UP) AS NET_UPLOAD_PLANO,
    COALESCE((SELECT SPLIT_PART(SPLIT_PART(PLANO_CRM.velocidades_formatadas, ' ', 1), '/', 2)
            FROM MK_CRM_PRODUTOS,
                MK_CRM_PRODUTOS_COMPOSICAO,
                MK_PLANOS_ACESSO AS PLANO_CRM
            WHERE CD_PLANO_PRINCIPAL = C.PLANO_ACESSO
                AND CODCRMPRODUTO = CD_PRODUTO
                AND PLANO_CRM.CODPLANO = CD_PLANO
                AND PLANO_CRM.TIPO = '1'
            LIMIT 1), PA.VEL_DOWN) AS NET_DOWNLOAD_PLANO,
    C.CODCONTRATO SERVICE_TAG_OLD
FROM MK_CONTRATOS C
    INNER JOIN MK_PLANOS_ACESSO PA ON C.PLANO_ACESSO = PA.CODPLANO
    left join (select max(data_vencimento) as venc0, codvinculado as cod0 from mk_plano_contas where nomenclatura_integracao = 'CNT' group by 2) as tb0 on cod0 = codcontrato
    left join (select data_vencimento as venc00, codvinculado as cod00, valor_lancamento as vlr00 from mk_plano_contas where nomenclatura_integracao = 'CNT') as tb00 on cod00 = codcontrato and cod00 = cod0 and venc0 = venc00
WHERE CANCELADO = 'N'
    AND (SUSPENSO = 'N' OR SUSPENSO IS NULL);
'''

query_conexao = '''
SELECT DISTINCT CX.CONTRATO AS PROPOSAL_ERP_OLD_ID,
    STRING_AGG(CAST(CX.CODCONEXAO AS VARCHAR), ',') AS CONNECTION_ERP_OLD_ID,
    STRING_AGG(CX.USERNAME,',') AS NET_USER_PPPOE,
    CASE
        WHEN STRING_AGG(CX.TECNOLOGIA, ',') = 'F' THEN 'FTTH'
        WHEN STRING_AGG(CX.TECNOLOGIA, ',') = 'W' THEN 'RADIO'
        WHEN STRING_AGG(CX.TECNOLOGIA, ',') = 'U' THEN 'UTP'
        ELSE 'AVALIAR'
    END AS NET_TECHNOLOGY,
    NULL OTHER_SERVICE_IDENTIFIER,
    NULL NET_EQUIPMENT_CUSTOMER_CLIENT,
    CASE
        WHEN MAX(CX.TIPO_CONEXAO) = 1 THEN 1
        WHEN MAX(CX.TIPO_CONEXAO) = 2 THEN 4
        ELSE NULL
    END AS BUSINESS_KIND_ID,
    MD5(STRING_AGG(CX.USERNAME,',')) AS NET_USER_PPPOE_MD5
FROM MK_CONTRATOS C
    INNER JOIN MK_CONEXOES CX ON CX.CONTRATO = C.CODCONTRATO
    LEFT JOIN MK_PLANOS_ACESSO PAC ON CX.CODPLANO_ACESSO = PAC.CODPLANO
WHERE CANCELADO = 'N'
    AND (SUSPENSO = 'N' OR SUSPENSO IS NULL)
GROUP BY PROPOSAL_ERP_OLD_ID;
'''
query_radius = '''
SELECT
    MD5(usg.USERNAME) NET_USER_PPPOE_MD5,
    SPLIT_PART(SPLIT_PART(grp.VALUE, ' ', 1), '/', 1) AS NET_UPLOAD,
    SPLIT_PART(SPLIT_PART(grp.VALUE, ' ', 1), '/', 2) AS NET_DOWNLOAD
FROM
    radius.usergroup usg
JOIN
    radius.radgroupreply grp ON usg.GROUPNAME = grp.GROUPNAME AND ATTRIBUTE = 'Mikrotik-Rate-Limit';
'''

query_radius_ip = '''
SELECT
	MD5(USERNAME) NET_USER_PPPOE_MD5,
	VALUE ADDRESS_IPV4
FROM
	radius.radreply
WHERE ATTRIBUTE = 'Framed-IP-Address'
'''

df_offer = pd.read_sql(query_offer, conn_cyber)
df_conexao = pd.read_sql(query_conexao, conn_cyber)
df_radius = pd.read_sql(query_radius, conn_radius)
df_radius_ip = pd.read_sql(query_radius_ip, conn_radius)

df_offer_final = df_offer.merge(df_conexao, how='left', on='proposal_erp_old_id')
df_offer_final = df_offer_final.merge(df_radius, how='left', on='net_user_pppoe_md5')
df_offer_final = df_offer_final.merge(df_radius_ip, how='left', on='net_user_pppoe_md5')


df_offer_final['net_upload'] = df_offer_final['net_upload'].apply(transformacao_velocidade)
df_offer_final['net_download'] = df_offer_final['net_download'].apply(transformacao_velocidade)
df_offer_final['net_upload_plano'] = df_offer_final['net_upload_plano'].apply(transformacao_velocidade)
df_offer_final['net_download_plano'] = df_offer_final['net_download_plano'].apply(transformacao_velocidade)
df_offer_final['net_has_address_ipv4_fixed'] = df_offer_final['address_ipv4'].notnull().astype(int)


df_offer_final.to_sql(
    name='f_offer',
    con=conn_tecpar,
    if_exists='replace',
    index=False,
    chunksize=10000,
    dtype={
        'proposal_erp_old_id': Integer(),
        'person_locale_offer_erp_old_id': Integer(),
        'offer_plan_old_id': Integer(),
        'offer_plan_old_name': String(255),
        'offer_sale_value_relatorio': DECIMAL(10,2),
        'offer_sale_value_sugerido': DECIMAL(10,2),
        'offer_erp_old_id': Integer(),
        'net_upload': String(45),
        'net_download': String(45),
        'net_upload_plano': String(45),
        'net_download_plano': String(45),
        'service_tag_old': String(255),
        'connection_erp_old_id': String(255),
        'net_user_pppoe': String(255),
        'net_technology': String(255),
        'other_service_identifier': String(255),
        'net_equipment_customer_client': String(255),
        'net_has_address_ipv4_fixed': Integer(),
        'address_ipv4': String(255),
        'business_kind_id': Integer(),
        'net_user_pppoe_md5': String(255)
    }
)

with conn_tecpar.connect() as con:
    con.execute(text('''
    CREATE INDEX F_OFFER_OFFER_ERP_OLD_ID_IDX USING BTREE ON f_offer (offer_erp_old_id);
    CREATE INDEX F_OFFER_OFFER_PLAN_OLD_ID_IDX USING BTREE ON f_offer (offer_plan_old_id);
    CREATE INDEX F_OFFER_PERSON_LOCALE_OFFER_ERP_OLD_ID_IDX USING BTREE ON f_offer (person_locale_offer_erp_old_id);
    CREATE INDEX F_OFFER_PROPOSAL_ERP_OLD_ID_IDX USING BTREE ON f_offer (proposal_erp_old_id);
    CREATE INDEX F_OFFER_SERVICE_TAG_OLD_IDX USING BTREE ON f_offer (service_tag_old);
    CREATE INDEX F_OFFER_OFFER_SALE_VALUE_IDX USING BTREE ON f_offer (offer_sale_value_relatorio);
    '''))
    con.close()

Carga tabela LOCALES final!

In [28]:
query_locale_offer = '''
SELECT DISTINCT C.CLIENTE AS PERSON_ERP_OLD_ID,
    CASE
        WHEN TIPOPESSOA = 1 THEN TRIM(REPLACE(REPLACE(REPLACE(P.CPF, '.', ''), '-', ''), '/', ''))
        ELSE TRIM(REPLACE(REPLACE(REPLACE(P.CNPJ, '.', ''), '-', ''), '/', ''))
    END AS PERSON_DOCUMENT_NUMBER,
    C.CODCONTRATO + 9000000 AS PERSON_LOCALE_OFFER_ERP_OLD_ID,
    NULL STREET_TYPE,
    TRIM(UPPER(L.LOGRADOURO)) AS STREET,
    CX.NUMERO AS NUM,
    TRIM(UPPER(CAST(CX.COMPLEMENTO AS VARCHAR))) AS COMPLEMENT,
    TRIM(UPPER(B.BAIRRO)) AS NEIGHBORHOOD,
    TRIM(UPPER(CONCAT(SUBSTRING(CX.CEP, 1, 5), '-', SUBSTRING(CX.CEP, 6, 3)))) AS POSTAL_CODE,
    TRIM(UPPER(E.SIGLAESTADO)) AS STATE,
    TRIM(UPPER(CI.CIDADE)) AS CITY,
    TRIM(UPPER(CI.IBGE)) AS IBGE_CODE,
    NULL AS ADDRESS_HASH,
    CX.CONEXAO_BLOQUEADA AS CONEXAO_BLOQUEADA
FROM MK_PESSOAS P
    INNER JOIN MK_CONTRATOS C ON P.CODPESSOA = C.CLIENTE
    LEFT JOIN MK_CONEXOES CX ON CX.CONTRATO = C.CODCONTRATO
    LEFT JOIN MK_LOGRADOUROS L ON L.CODLOGRADOURO = CX.LOGRADOURO
    LEFT JOIN MK_BAIRROS B ON B.CODBAIRRO = CX.BAIRRO
    LEFT JOIN MK_ESTADOS E ON E.CODESTADO = CX.UF
    LEFT JOIN MK_CIDADES CI ON CI.CODCIDADE = CX.CIDADE
WHERE CANCELADO = 'N'
    AND (SUSPENSO = 'N' OR SUSPENSO IS NULL)
    AND CX.CODCONEXAO IS NOT NULL
ORDER BY CX.CONEXAO_BLOQUEADA ASC
'''
query_locale_offer_sem_conexao = '''
SELECT DISTINCT C.CLIENTE AS PERSON_ERP_OLD_ID,
    CASE
        WHEN P.TIPOPESSOA = 1 THEN TRIM(REPLACE(REPLACE(REPLACE(P.CPF, '.', ''), '-', ''), '/', ''))
        ELSE TRIM(REPLACE(REPLACE(REPLACE(P.CNPJ, '.', ''), '-', ''), '/', ''))
    END AS PERSON_DOCUMENT_NUMBER,
    C.CODCONTRATO + 9000000 AS PERSON_LOCALE_OFFER_ERP_OLD_ID,
    NULL STREET_TYPE,
    TRIM(UPPER(L.LOGRADOURO)) AS STREET,
    CASE WHEN IGUALRESIDENCIA = 'S' THEN P.NUMERO ELSE P.NUMEROCOBRANCA END AS NUM,
    TRIM(UPPER(CAST(CASE WHEN IGUALRESIDENCIA = 'S' THEN P.COMPLEMENTOENDERECO ELSE P.COMPLEMENTOENDERECOCOBR END AS VARCHAR))) AS COMPLEMENT,
    TRIM(UPPER(B.BAIRRO)) AS NEIGHBORHOOD,
    TRIM(UPPER(CONCAT(SUBSTRING(CASE WHEN IGUALRESIDENCIA = 'S' THEN P.CEP ELSE P.CEPCOBRANCA END, 1, 5), '-', SUBSTRING(CASE WHEN IGUALRESIDENCIA = 'S' THEN P.CEP ELSE P.CEPCOBRANCA END, 6, 3)))) AS POSTAL_CODE,
    TRIM(UPPER(E.SIGLAESTADO)) AS STATE,
    TRIM(UPPER(CI.CIDADE)) AS CITY,
    TRIM(UPPER(CI.IBGE)) AS IBGE_CODE,
    NULL AS ADDRESS_HASH
FROM MK_PESSOAS P
    INNER JOIN MK_CONTRATOS C ON P.CODPESSOA = C.CLIENTE
    LEFT JOIN MK_LOGRADOUROS L ON L.CODLOGRADOURO = CASE WHEN IGUALRESIDENCIA = 'S' THEN P.CODLOGRADOURO ELSE P.CODLOGRADOUROCOBRANCA END
    LEFT JOIN MK_BAIRROS B ON B.CODBAIRRO = CASE WHEN IGUALRESIDENCIA = 'S' THEN P.CODBAIRRO ELSE P.CODBAIRROCOBRANCA END
    LEFT JOIN MK_ESTADOS E ON E.CODESTADO = CASE WHEN IGUALRESIDENCIA = 'S' THEN P.CODESTADO ELSE P.CODESTADOCOBRANCA END
    LEFT JOIN MK_CIDADES CI ON CI.CODCIDADE = CASE WHEN IGUALRESIDENCIA = 'S' THEN P.CODCIDADE ELSE P.CODCIDADECOBRANCA END
    LEFT JOIN MK_CONEXOES CX ON CX.CONTRATO = C.CODCONTRATO
WHERE CANCELADO = 'N'
    AND (SUSPENSO = 'N' OR SUSPENSO IS NULL)
    AND CX.CODCONEXAO IS NULL
'''

df_locale_offer = pd.read_sql(query_locale_offer, conn_cyber)
df_locale_offer_sem_conexao = pd.read_sql(query_locale_offer_sem_conexao, conn_cyber)
df_locale_offer = pd.concat([df_locale_offer, df_locale_offer_sem_conexao], ignore_index=True)
df_locale_offer = mapeamento_street_type(df_locale_offer)
df_locale_offer['num'] = df_locale_offer['num'].fillna(0)
df_locale_offer = split_concat_street(df_locale_offer)
df_locale_offer['complement'] = df_locale_offer['complement'].replace('[^\w\s:,.()/\@]+', '', regex=True)
df_locale_offer['street'] = df_locale_offer['street'].replace('[^\w\s:,.()/\@]+', '', regex=True)
df_locale_offer = df_locale_offer.drop_duplicates('person_locale_offer_erp_old_id', keep='first')
df_locale_offer = df_locale_offer.drop('conexao_bloqueada', axis=1)

df_locale_offer.to_sql(
    name='f_person_locale_offer',
    con=conn_tecpar,
    if_exists='replace',
    index=False,
    chunksize=10000,
    dtype={
    'person_erp_old_id': Integer(),
    'person_document_number': String(14),
    'person_locale_offer_erp_old_id': Integer(),
    'street_type': String(45),
    'street': String(255),
    'new_street': String(255),
    'num': String(255),
    'complement': String(255),
    'neighborhood': String(255),
    'postal_code': String(45),
    'state': String(2),
    'city': String(255),
    'ibge_code': String(255),
    'address_hash': String(128)
    }
)

with conn_tecpar.connect() as con:
    con.execute(text('''
    CREATE INDEX F_PERSON_LOCALE_OFFER_PERSON_DOCUMENT_NUMBER_IDX USING BTREE ON f_person_locale_offer (person_document_number);
    CREATE INDEX F_PERSON_LOCALE_OFFER_ADDRESS_HASH_IDX USING BTREE ON f_person_locale_offer (address_hash);
    CREATE INDEX F_PERSON_LOCALE_OFFER_PERSON_ERP_OLD_ID_IDX USING BTREE ON f_person_locale_offer (person_erp_old_id);
    CREATE INDEX F_PERSON_LOCALE_OFFER_PERSON_LOCALE_OFFER_ERP_OLD_ID_IDX USING BTREE ON f_person_locale_offer (person_locale_offer_erp_old_id);
    '''))
    con.close()

In [29]:
query_locale_fiscal = '''
SELECT DISTINCT C.CLIENTE AS PERSON_ERP_OLD_ID,
    CASE
        WHEN TIPOPESSOA = 1 THEN TRIM(REPLACE(REPLACE(REPLACE(P.CPF, '.', ''), '-', ''), '/', ''))
        ELSE TRIM(REPLACE(REPLACE(REPLACE(P.CNPJ, '.', ''), '-', ''), '/', ''))
    END AS PERSON_DOCUMENT_NUMBER,
    C.CLIENTE + 8000000 AS PERSON_LOCALE_FISCAL_COLLECTION_ERP_OLD_ID,
    NULL STREET_TYPE,
    TRIM(UPPER(L.LOGRADOURO)) AS STREET,
    CASE WHEN IGUALRESIDENCIA = 'S' THEN P.NUMERO ELSE P.NUMEROCOBRANCA END AS NUM,
    TRIM(UPPER(CAST(CASE WHEN IGUALRESIDENCIA = 'S' THEN P.COMPLEMENTOENDERECO ELSE P.COMPLEMENTOENDERECOCOBR END AS VARCHAR))) AS COMPLEMENT,
    TRIM(UPPER(B.BAIRRO)) AS NEIGHBORHOOD,
    TRIM(UPPER(CONCAT(SUBSTRING(CASE WHEN IGUALRESIDENCIA = 'S' THEN P.CEP ELSE P.CEPCOBRANCA END, 1, 5), '-', SUBSTRING(CASE WHEN IGUALRESIDENCIA = 'S' THEN P.CEP ELSE P.CEPCOBRANCA END, 6, 3)))) AS POSTAL_CODE,
    TRIM(UPPER(E.SIGLAESTADO)) AS STATE,
    TRIM(UPPER(CI.CIDADE)) AS CITY,
    TRIM(UPPER(CI.IBGE)) AS IBGE_CODE,
    NULL AS ADDRESS_HASH
FROM MK_PESSOAS P
    INNER JOIN MK_CONTRATOS C ON P.CODPESSOA = C.CLIENTE
    LEFT JOIN MK_LOGRADOUROS L ON L.CODLOGRADOURO = CASE WHEN IGUALRESIDENCIA = 'S' THEN P.CODLOGRADOURO ELSE P.CODLOGRADOUROCOBRANCA END
    LEFT JOIN MK_BAIRROS B ON B.CODBAIRRO = CASE WHEN IGUALRESIDENCIA = 'S' THEN P.CODBAIRRO ELSE P.CODBAIRROCOBRANCA END
    LEFT JOIN MK_ESTADOS E ON E.CODESTADO = CASE WHEN IGUALRESIDENCIA = 'S' THEN P.CODESTADO ELSE P.CODESTADOCOBRANCA END
    LEFT JOIN MK_CIDADES CI ON CI.CODCIDADE = CASE WHEN IGUALRESIDENCIA = 'S' THEN P.CODCIDADE ELSE P.CODCIDADECOBRANCA END
WHERE CANCELADO = 'N'
    AND (SUSPENSO = 'N' OR SUSPENSO IS NULL)
'''

df_locale_fiscal = pd.read_sql(query_locale_fiscal, conn_cyber)
df_locale_fiscal = mapeamento_street_type(df_locale_fiscal)
df_locale_fiscal['num'] = df_locale_fiscal['num'].fillna(0)
df_locale_fiscal = split_concat_street(df_locale_fiscal)
df_locale_fiscal['complement'] = df_locale_fiscal['complement'].replace('[^\w\s:,.()/\@]+', '', regex=True)
df_locale_fiscal['street'] = df_locale_fiscal['street'].replace('[^\w\s:,.()/\@]+', '', regex=True)

df_locale_fiscal.to_sql(
    name='f_person_locale_fiscal_collection',
    con=conn_tecpar,
    if_exists='replace',
    index=False,
    chunksize=10000,
    dtype={
    'person_erp_old_id': Integer(),
    'person_document_number': String(14),
    'person_locale_fiscal_collection_erp_old_id': Integer(),
    'street_type': String(45),
    'street': String(255),
    'new_street': String(255),
    'num': String(255),
    'complement': String(255),
    'neighborhood': String(255),
    'postal_code': String(45),
    'state': String(2),
    'city': String(255),
    'ibge_code': String(255),
    'address_hash': String(128)
    }
)
with conn_tecpar.connect() as con:
    con.execute(text('''
    CREATE INDEX F_PERSON_LOCALE_FISCAL_COLLECTION_PERSON_DOCUMENT_NUMBER_IDX USING BTREE ON f_person_locale_fiscal_collection (person_document_number);
    CREATE INDEX F_PERSON_LOCALE_FISCAL_COLLECTION_ADDRESS_HASH_IDX USING BTREE ON f_person_locale_fiscal_collection (address_hash);
    CREATE INDEX F_PERSON_LOCALE_FISCAL_COLLECTION_PERSON_ERP_OLD_ID_IDX USING BTREE ON f_person_locale_fiscal_collection (person_erp_old_id);
    CREATE INDEX F_PERSON_LOCALE_FISCAL_COLLECTION_ERP_OLD_ID_IDX USING BTREE ON f_person_locale_fiscal_collection (person_locale_fiscal_collection_erp_old_id);
    '''))
    con.close()

In [30]:
with conn_tecpar.connect() as con:
    con.execute(text('''
    DROP TABLE f_email_temp;
    DROP TABLE f_person_temp;
    DROP TABLE f_phone_temp;
    '''))
    con.close()

conn_cyber.dispose()
conn_tecpar.dispose()
conn_radius.dispose()