In [None]:
import psycopg2
from psycopg2 import sql
import pandas as pd
from sqlalchemy import create_engine
import requests
import json
from dotenv import load_dotenv
import os

# Carregar as variáveis de ambiente do arquivo .env
load_dotenv()
url_api_funcionarios = os.getenv('url_api_funcionarios')

bix_user = os.getenv('bix_user')
bix_dbname = os.getenv('bix_dbname')
bix_password = os.getenv('bix_password')
bix_host = os.getenv('bix_host')
bix_port = os.getenv('bix_port')

gc_storage = os.getenv('gc_storage')

user = os.getenv('user')
dbname = os.getenv('dbname')
password = os.getenv('password')
host = os.getenv('host')
port = os.getenv('port')

def lambda_handler(event, context):
    #sources
    funcionarios = consolidar_nomes()
    vendas = extract_postgres_bix()
    categorias = extract_cloud()
    #create tables
    create_funcionarios = create_table(funcionarios)
    create_vendas = create_table(vendas)
    create_categorias=create_table(categorias)
    #upsert postgres
    upsert_funcionarios = insert_or_update_postgres(funcionarios, 'funcionarios')
    upsert_vendas = insert_or_update_postgres(vendas, 'vendas')
    upsert_categorias = insert_or_update_postgres(categorias, 'categorias')
    # TODO implement
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

################### EXTRACT ####################################

#API - funcionarios
def consultar_api(id):
    url = url_api_funcionarios+str(id)
    response = requests.get(url)
    nome = response.text.strip()
    return nome

def consolidar_nomes(**kwargs):
    ids = list(range(1, 10))
    funcionarios = [{'id': id, 'nome': consultar_api(id)} for id in ids]
    df_funcionarios = pd.DataFrame(funcionarios)
    print(df_funcionarios)
    return df_funcionarios

#Postgres - vendas
def extract_postgres_bix():
    # Conectar ao PostgreSQL
    conn = psycopg2.connect(
        dbname=bix_dbname,
        user=bix_user,
        password=bix_password,
        host=bix_host,
        port=bix_port)
    # Consulta SQL
    sql_query = "SELECT * FROM public.venda"
    # Carrega os resultados da consulta em um dataframe
    vendas = pd.read_sql_query(sql_query, conn)
    # Fecha a conexão com o banco de dados
    conn.close()
    print(vendas)
    return vendas

#Cloud - categorias
def extract_cloud():
    categorias = pd.read_parquet(gc_storage)
    print(categorias)
    return categorias
    

################### CRIAÇÃO TABELAS ########################
create_table_categorias = """
    CREATE TABLE IF NOT EXISTS categorias (
        id int,
        nome_categoria TEXT,
        created_at TIMESTAMP DEFAULT NOW(),
        updated_at TIMESTAMP DEFAULT NOW(),
        CONSTRAINT table_pk PRIMARY KEY (id)
    )
"""

create_table_funcionarios = """
    CREATE TABLE IF NOT EXISTS funcionarios (
        id int,
        nome TEXT,
        created_at TIMESTAMP DEFAULT NOW(),
        updated_at TIMESTAMP DEFAULT NOW(),
        CONSTRAINT table_pk_2 PRIMARY KEY (id)
    )
"""

create_table_vendas = """
    CREATE TABLE IF NOT EXISTS vendas (
        id_venda int,
        id_funcionario int,
        id_categoria int,
        data_venda date,
        venda int,
        created_at TIMESTAMP DEFAULT NOW(),
        updated_at TIMESTAMP DEFAULT NOW(),
        CONSTRAINT table_pk_3 PRIMARY KEY (id_venda)
    )
"""
def create_table(create_table):
    conn = psycopg2.connect(
    dbname=dbname,
    user=user,
    password=password,
    host=host,
    port=port)
    cur = conn.cursor()
    cur.execute(create_table)
    conn.commit()
    # Fecha o cursor e a conexão
    cur.close()
    conn.close()

############# LOAD ##########################

def insert_or_update_postgres(df, tabela):
    conn = psycopg2.connect(
        dbname=dbname,
        user=user,
        password=password,
        host=host,
        port=port)
    cur = conn.cursor()
    # Capturar o nome da primeira coluna do dataframe
    id_column = df.columns[0]
    for index, row in df.iterrows():
        # Montar a string de colunas dinamicamente
        col_names = ', '.join(df.columns)
        # Montar a string de valores dinamicamente
        values = ', '.join(['%s'] * len(df.columns))
        query = sql.SQL("""
        INSERT INTO public.{tabela} ({col_names})
        VALUES ({values})
        ON CONFLICT ({id_column}) DO UPDATE
        SET
        """.format(tabela=tabela, col_names=col_names, values=values, id_column=id_column))
        # Montar a string do comando SET dinamicamente
        update_command = ', '.join([f"{col} = EXCLUDED.{col}" for col in df.columns])
        query += sql.SQL(update_command)
        # Executar a query com os valores correspondentes
        cur.execute(query, tuple(row[col] for col in df.columns))
    # Commit (salvar) as mudanças
    conn.commit()
    # Fechar a conexão
    cur.close()
    conn.close()
    return print('Inserção ou Atualização concluídas')



In [None]:
funcionarios = consolidar_nomes()
vendas = extract_postgres_bix()
categorias = extract_cloud()