# CDC - Incremental Update with Python

## Importação das Libs

In [4]:
import pyodbc
import os
import psycopg2
from sqlalchemy import text, create_engine
import pandas as pd
from config import DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD, DB_SCHEMA

## Conect in Databases

In [7]:

# Obtenha a senha a partir de uma variável de ambiente, se necessário
# pwd = os.getenv('DB_PASSWORD')  # Exemplo de como obter a senha de uma variável de ambiente
pwd = DB_PASSWORD
postgres_user = DB_USER
postgres_host = DB_HOST
postgres_port = DB_PORT
postgres_database = DB_NAME
postgres_schema = DB_SCHEMA

# Criação da string de conexão
connection_string = f'postgresql://{postgres_user}:{pwd}@{postgres_host}:{postgres_port}/{postgres_database}'

# Conectar usando psycopg2 (opcional)
try:
    conn = psycopg2.connect(
        user=postgres_user,
        password=pwd,
        host=postgres_host,
        port=postgres_port,
        database=postgres_database
    )
    print("Conexão com o banco de dados PostgreSQL realizada com sucesso.")
except Exception as e:
    print(f"Erro ao conectar ao PostgreSQL: {e}")

# Conectar usando SQLAlchemy
engine = create_engine(connection_string)

Conexão com o banco de dados PostgreSQL realizada com sucesso.


## Full Load

In [13]:
# Ler os dados da tabela fonte, a que mandará os dados
query_1 = 'SELECT * FROM "DW_IM_AGB".tabela_fonte'
df_1 = pd.read_sql(query_1, conn)
print(df_1.head)

  df_1 = pd.read_sql(query_1, conn)


<bound method NDFrame.head of             id  coluna1  coluna2  coluna3  coluna4  coluna5  coluna6  coluna7  \
0       900001      777   123456    44444    88888    99999    11111    22222   
1       900002      777   123456    44444    88888    99999    11111    22222   
2       900003      777   123456    44444    88888    99999    11111    22222   
3       900004      777   123456    44444    88888    99999    11111    22222   
4       900005      777   123456    44444    88888    99999    11111    22222   
...        ...      ...      ...      ...      ...      ...      ...      ...   
99995   999996      777   123456    44444    88888    99999    11111    22222   
99996   999997      777   123456    44444    88888    99999    11111    22222   
99997   999998      777   123456    44444    88888    99999    11111    22222   
99998   999999      777   123456    44444    88888    99999    11111    22222   
99999  1000000      777   123456    44444    88888    99999    11111    22222  

Teste da tabela com mais dados

In [14]:
# Ler os dados da tabela alvo, a que receberá os dados
query_2 = 'SELECT * FROM "DW_IM_AGB".tabela_alvo'
df_2 = pd.read_sql(query_2, conn)
print(df_2.head)

  df_2 = pd.read_sql(query_2, conn)


<bound method NDFrame.head of             id  coluna1  coluna2  coluna3  coluna4  coluna5  coluna6  coluna7  \
0            1        1        2        3        4        5        6        7   
1            2        2        3        4        5        6        7        8   
2            3        3        4        5        6        7        8        9   
3            4        4        5        6        7        8        9       10   
4            5        5        6        7        8        9       10       11   
...        ...      ...      ...      ...      ...      ...      ...      ...   
949995  949996   949996   949997   949998   949999   950000   950001   950002   
949996  949997   949997   949998   949999   950000   950001   950002   950003   
949997  949998   949998   949999   950000   950001   950002   950003   950004   
949998  949999   949999   950000   950001   950002   950003   950004   950005   
949999  950000   950000   950001   950002   950003   950004   950005   950006  

## Start Incremental Update

In [15]:
# Cria a tupla da tabela que receberá os dados
df_2.apply(tuple,1)


0         (1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,...
1         (2, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...
2         (3, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 1...
3         (4, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, ...
4         (5, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,...
                                ...                        
949995    (949996, 949996, 949997, 949998, 949999, 95000...
949996    (949997, 949997, 949998, 949999, 950000, 95000...
949997    (949998, 949998, 949999, 950000, 950001, 95000...
949998    (949999, 949999, 950000, 950001, 950002, 95000...
949999    (950000, 950000, 950001, 950002, 950003, 95000...
Length: 950000, dtype: object

In [16]:
# Cria a tupla da tabela que enviará os dados
df_1.apply(tuple,1)


0        (900001, 777, 123456, 44444, 88888, 99999, 111...
1        (900002, 777, 123456, 44444, 88888, 99999, 111...
2        (900003, 777, 123456, 44444, 88888, 99999, 111...
3        (900004, 777, 123456, 44444, 88888, 99999, 111...
4        (900005, 777, 123456, 44444, 88888, 99999, 111...
                               ...                        
99995    (999996, 777, 123456, 44444, 88888, 99999, 111...
99996    (999997, 777, 123456, 44444, 88888, 99999, 111...
99997    (999998, 777, 123456, 44444, 88888, 99999, 111...
99998    (999999, 777, 123456, 44444, 88888, 99999, 111...
99999    (1000000, 777, 123456, 44444, 88888, 99999, 11...
Length: 100000, dtype: object

In [17]:
# Verifica o que há de diferença entre as duas tuplas
df_1.apply(tuple,1).isin(df_2.apply(tuple,1))

0        False
1        False
2        False
3        False
4        False
         ...  
99995    False
99996    False
99997    False
99998    False
99999    False
Length: 100000, dtype: bool

In [18]:
# Get changes
changes = df_1[~df_1.apply(tuple,1).isin(df_2.apply(tuple,1))]
changes

Unnamed: 0,id,coluna1,coluna2,coluna3,coluna4,coluna5,coluna6,coluna7,coluna8,coluna9,...,coluna11,coluna12,coluna13,coluna14,coluna15,coluna16,coluna17,coluna18,coluna19,coluna20
0,900001,777,123456,44444,88888,99999,11111,22222,33333,55555,...,900011,900012,900013,900014,900015,900016,900017,900018,900019,900020
1,900002,777,123456,44444,88888,99999,11111,22222,33333,55555,...,900012,900013,900014,900015,900016,900017,900018,900019,900020,900021
2,900003,777,123456,44444,88888,99999,11111,22222,33333,55555,...,900013,900014,900015,900016,900017,900018,900019,900020,900021,900022
3,900004,777,123456,44444,88888,99999,11111,22222,33333,55555,...,900014,900015,900016,900017,900018,900019,900020,900021,900022,900023
4,900005,777,123456,44444,88888,99999,11111,22222,33333,55555,...,900015,900016,900017,900018,900019,900020,900021,900022,900023,900024
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,999996,777,123456,44444,88888,99999,11111,22222,33333,55555,...,1000006,1000007,1000008,1000009,1000010,1000011,1000012,1000013,1000014,1000015
99996,999997,777,123456,44444,88888,99999,11111,22222,33333,55555,...,1000007,1000008,1000009,1000010,1000011,1000012,1000013,1000014,1000015,1000016
99997,999998,777,123456,44444,88888,99999,11111,22222,33333,55555,...,1000008,1000009,1000010,1000011,1000012,1000013,1000014,1000015,1000016,1000017
99998,999999,777,123456,44444,88888,99999,11111,22222,33333,55555,...,1000009,1000010,1000011,1000012,1000013,1000014,1000015,1000016,1000017,1000018


In [19]:
# Get modified rows
modified = changes[changes.id.isin(df_2.id)]
modified

Unnamed: 0,id,coluna1,coluna2,coluna3,coluna4,coluna5,coluna6,coluna7,coluna8,coluna9,...,coluna11,coluna12,coluna13,coluna14,coluna15,coluna16,coluna17,coluna18,coluna19,coluna20
0,900001,777,123456,44444,88888,99999,11111,22222,33333,55555,...,900011,900012,900013,900014,900015,900016,900017,900018,900019,900020
1,900002,777,123456,44444,88888,99999,11111,22222,33333,55555,...,900012,900013,900014,900015,900016,900017,900018,900019,900020,900021
2,900003,777,123456,44444,88888,99999,11111,22222,33333,55555,...,900013,900014,900015,900016,900017,900018,900019,900020,900021,900022
3,900004,777,123456,44444,88888,99999,11111,22222,33333,55555,...,900014,900015,900016,900017,900018,900019,900020,900021,900022,900023
4,900005,777,123456,44444,88888,99999,11111,22222,33333,55555,...,900015,900016,900017,900018,900019,900020,900021,900022,900023,900024
5,900006,777,123456,44444,88888,99999,11111,22222,33333,55555,...,900016,900017,900018,900019,900020,900021,900022,900023,900024,900025
6,900007,777,123456,44444,88888,99999,11111,22222,33333,55555,...,900017,900018,900019,900020,900021,900022,900023,900024,900025,900026
7,900008,777,123456,44444,88888,99999,11111,22222,33333,55555,...,900018,900019,900020,900021,900022,900023,900024,900025,900026,900027
8,900009,777,123456,44444,88888,99999,11111,22222,33333,55555,...,900019,900020,900021,900022,900023,900024,900025,900026,900027,900028
9,900010,777,123456,44444,88888,99999,11111,22222,33333,55555,...,900020,900021,900022,900023,900024,900025,900026,900027,900028,900029


In [20]:
# Get new records
inserts = changes[~changes.id.isin(df_2.id)]
inserts

Unnamed: 0,id,coluna1,coluna2,coluna3,coluna4,coluna5,coluna6,coluna7,coluna8,coluna9,...,coluna11,coluna12,coluna13,coluna14,coluna15,coluna16,coluna17,coluna18,coluna19,coluna20
50000,950001,950001,950002,950003,950004,950005,950006,950007,950008,950009,...,950011,950012,950013,950014,950015,950016,950017,950018,950019,950020
50001,950002,950002,950003,950004,950005,950006,950007,950008,950009,950010,...,950012,950013,950014,950015,950016,950017,950018,950019,950020,950021
50002,950003,950003,950004,950005,950006,950007,950008,950009,950010,950011,...,950013,950014,950015,950016,950017,950018,950019,950020,950021,950022
50003,950004,950004,950005,950006,950007,950008,950009,950010,950011,950012,...,950014,950015,950016,950017,950018,950019,950020,950021,950022,950023
50004,950005,950005,950006,950007,950008,950009,950010,950011,950012,950013,...,950015,950016,950017,950018,950019,950020,950021,950022,950023,950024
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,999996,777,123456,44444,88888,99999,11111,22222,33333,55555,...,1000006,1000007,1000008,1000009,1000010,1000011,1000012,1000013,1000014,1000015
99996,999997,777,123456,44444,88888,99999,11111,22222,33333,55555,...,1000007,1000008,1000009,1000010,1000011,1000012,1000013,1000014,1000015,1000016
99997,999998,777,123456,44444,88888,99999,11111,22222,33333,55555,...,1000008,1000009,1000010,1000011,1000012,1000013,1000014,1000015,1000016,1000017
99998,999999,777,123456,44444,88888,99999,11111,22222,33333,55555,...,1000009,1000010,1000011,1000012,1000013,1000014,1000015,1000016,1000017,1000018


In [30]:
# Parâmetros de conexão, foi necessário novamente para os trabalhos de upsert e insert
connection_params = {
    'dbname': postgres_database,
    'user': postgres_user,
    'password': pwd,
    'host': postgres_host,
    'port': postgres_port
}

Upsert

In [29]:
def update_to_sql(df, table_name, key_name, connection_params, schema=postgres_schema):
    # Lista de colunas para atualizar, ignorando a coluna de chave primária
    columns = [col for col in df.columns if col != key_name]
    
    # Conectar ao banco de dados usando psycopg2
    with psycopg2.connect(**connection_params) as conn:
        with conn.cursor() as cursor:
            # Definir o esquema de busca padrão em maiúsculas
            cursor.execute(f'SET search_path TO "{schema}";')
            
            # Verificar o esquema atual para confirmar que está correto
            cursor.execute('SHOW search_path;')
            print("Current search_path:", cursor.fetchone())
            
            for _, row in df.iterrows():
                # Converter tipos para tipos nativos do Python
                row = row.astype(object).where(pd.notnull(row), None)  # Substituir NaN por None
                values = tuple(row[col] for col in columns) + (row[key_name],)
                
                # Construir o conjunto de colunas e valores para o comando UPDATE
                set_clause = ", ".join([f'"{col}" = %s' for col in columns])
                
                # Construir o comando SQL, utilizando aspas duplas para o nome da tabela
                sql = f'''
                UPDATE "{table_name}"
                SET {set_clause}
                WHERE "{key_name}" = %s;
                '''
                
                # Executar o comando com os valores da linha
                cursor.execute(sql, values)

In [34]:
# Chame a função
update_to_sql(modified, "tabela_alvo", "id", connection_params, schema=postgres_schema)

Current search_path: ('"DW_IM_AGB"',)


Insert

In [35]:
import pandas as pd
import psycopg2
import numpy as np
from psycopg2.extras import execute_values

def insert_to_sql(df, table_name, key_name, connection_params, schema=postgres_schema):
    # Lista de colunas para inserir, ignorando a coluna de chave primária
    columns = [col for col in df.columns if col != key_name]

    # Conectar ao banco de dados usando psycopg2
    with psycopg2.connect(**connection_params) as conn:
        with conn.cursor() as cursor:
            # Definir o esquema de busca padrão em maiúsculas
            cursor.execute(f'SET search_path TO "{schema}";')
            
            # Verificar o esquema atual para confirmar que está correto
            cursor.execute('SHOW search_path;')
            print("Current search_path:", cursor.fetchone())
            
            # Criar a cláusula de inserção
            insert_query = f'''
            INSERT INTO "{table_name}" ({', '.join([f'"{col}"' for col in columns])})
            VALUES %s
            '''
            
            # Preparar os dados para inserção, convertendo tipos para nativos do Python
            values = [
                tuple(row[col].item() if isinstance(row[col], (np.integer, np.floating)) else (row[col] if pd.notnull(row[col]) else None) for col in columns)
                for _, row in df.iterrows()
            ]

            # Usar a função execute_values para inserção em massa
            execute_values(cursor, insert_query, values)




In [37]:
# Chame a função
insert_to_sql(inserts, "tabela_alvo", "id", connection_params, schema=postgres_schema)

Current search_path: ('"DW_IM_AGB"',)
