## Desafio Indicium

### Proposta

Construir um pipeline de dados que extraia todos os dias de duas fontes de dados (csv e postgres), gravando os dados em um disco local e posteriormente submeter as informações para um banco de dados.

Dados: São fornecidos duas fontes de dados, um banco Postgres e um arquivo CSV.

As estapas devem ser isoladas umas das outras sendo capaz de executar cada uma sem executar a outra.

#### Primeira etapa:
* Gravar os dados do disco local sendo um arquivo csv para cada tabela do banco fornecido e um arquivo para o arquivo csv de entrada.
* Será executado todos os dias, portanto a extração deve ter caminhos diferentes apontando cada dia da extração.
 
#### Segunda etapa:
* Carregar os arquivos os dados do sistema de arquivos local para um banco de dados.
* Objetivo final é conseguir executar uma consulta mostrando os pedidos e seus detalhes. Os pedidos estão em uma tabela chamado orders no banco de dados postgres. Os detalhes são colocados no arquivo CSV fornecido no desafio, 
 
#### Requisitos
* Todas as tarefas devem ser idempotentes, você deve conseguir executar todo o pipeline por um dia e o resultado deve ser sempre o mesmo
* A etapa 2 depende de ambas as tarefas da etapa 1, portanto, você não poderá executar a etapa 2 por um dia se as tarefas da etapa 1 não forem bem-sucedidas
* Você deve extrair todas as tabelas do banco de dados de origem, não importa que você não vá usar a maioria delas para a etapa final.
* Você deve ser capaz de dizer claramente onde o pipeline falhou, para saber a partir de qual etapa deve executar novamente o pipeline
* Você deve fornecer instruções claras sobre como executar todo o pipeline. Quanto mais fácil, melhor.
* Você deve fornecer um arquivo csv ou json com o resultado da consulta final no banco de dados final.
* Você não precisa realmente agendar o pipeline, mas deve presumir que ele será executado em dias diferentes.
* Seu pipeline deve estar preparado para ser executado nos últimos dias, o que significa que você deve ser capaz de passar um argumento para o pipeline com um dia anterior e ele deve reprocessar os dados desse dia. Como os dados desse desafio são estáticos, a única diferença para cada dia de execução serão os caminhos de saída.

link com todos os detalhes: https://github.com/techindicium/code-challenge/tree/main

### Instalando o docker desktop

* Acesse o link: https://www.docker.com/ escolha sua plataforma e instale o docker desktop.
* Após concluir a instalação acesse a documentação para baixar a imagem postgres, link: https://hub.docker.com/_/postgres
* Acesse seu terminal e execute o comando abaixo para crianção do contanier postgres:

```bash
docker run --name indicium -p 5432:5432 -e POSTGRES_USER=northwind_user -e POSTGRES_PASSWORD=thewindisblowing -e POSTGRES_DB=northwind -d postgres
```

Acesse o arquivo [**northwind.sql**](https://github.com/will-rds/desafio_pipeline_indicium/blob/main/scripts/northwind.sql) e execute o script sql dentro do banco para criar a massa de dados e tabelas.


### Extraindo quais tabelas existem dentro do banco de dados

In [1]:
import psycopg2

# Configurações do banco de dados PostgreSQL
CONFIG_BD = {
    'host': 'localhost',
    'database': 'northwind',
    'user': 'northwind_user',
    'password': 'thewindisblowing'
}

# Puxando todas as tabelas existentes no banco de dados
def obter_nomes_tabelas(conexao):
    nomes_tabelas = []
    try:
        cursor = conexao.cursor()
        cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public';")
        nomes_tabelas = [row[0] for row in cursor.fetchall()]
    except Exception as e:
        print(f"Erro ao recuperar os nomes das tabelas: {e}")

    return nomes_tabelas

# Mostrando as tabelas encontradas no banco de dados
def main():
    try:
        # Conectando ao banco de dados
        conexao = psycopg2.connect(**CONFIG_BD)

        # Recuperar os nomes das tabelas
        nomes_tabelas = obter_nomes_tabelas(conexao)
        print("Tabelas encontradas no banco de dados:")
        print(nomes_tabelas)

    except Exception as e:
        print(f"Ocorreu um erro: {e}")

    finally:
        # Fechar a conexão com o banco de dados
        if conexao:
            conexao.close()

if __name__ == "__main__":
    main()


Tabelas encontradas no banco de dados:
['us_states', 'customers', 'orders', 'employees', 'shippers', 'categories', 'products', 'suppliers', 'region', 'territories', 'employee_territories', 'customer_demographics', 'customer_customer_demo']


### Extraindo as tabelas e salvando cada uma com a data do dia da extração em uma pasta.

* No diretório deste [projeto](https://github.com/will-rds/desafio_pipeline_indicium/tree/main/scripts), você irá encontrar todos arquivos com os mesmos códigos abaixo. 
* Ao executar o arquivo [ordem_execucao.py](https://github.com/will-rds/desafio_pipeline_indicium/blob/main/scripts/ordem_execucao.py) o mesmo irá chamar os demais arquivos automaticamente. 
* Lembrando que todos os arquivos devem estar no mesmo diretório.



In [2]:
import os
import warnings
import psycopg2
import pandas as pd
from datetime import datetime

# Ignorando mensagens do pandas
warnings.filterwarnings("ignore", message="pandas only supports SQLAlchemy connectable.*")
# Configurações do banco de dados PostgreSQL
DB_CONFIG = {
    'host': 'localhost',
    'database': 'northwind',
    'user': 'northwind_user',
    'password': 'thewindisblowing'
}

# Lista das tabelas a serem extraídas
tabelas_extraidas = ['us_states', 'customers', 'orders', 'employees', 'shippers', 'categories', 'products', 'suppliers', 'region', 'territories', 'employee_territories', 'customer_demographics', 'customer_customer_demo']  # Adicione os nomes das tabelas aqui

# Diretório base onde os arquivos CSV serão salvos (se não existir, será criado)
DIRETORIO_BASE = os.path.join('.', 'data')

def extraindo_tabelas(nome_tabela, conexao, output_dir):
    query = f"SELECT * FROM {nome_tabela};"
    df = pd.read_sql_query(query, conexao)
    now = datetime.now()
    date_str = now.strftime('%Y%m%d')
    os.makedirs(output_dir, exist_ok=True)
    csv_filename = f"{nome_tabela}_{date_str}.csv"
    csv_path = os.path.join(output_dir, csv_filename)
    df.to_csv(csv_path, index=False)

def main():
    try:
        # Conectando ao banco de dados
        conexao = psycopg2.connect(**DB_CONFIG)

        # Criar diretório com a data do dia da extração
        date_dir = datetime.now().strftime('%Y%m%d')
        diretorio_salvamento = os.path.join(DIRETORIO_BASE, date_dir)
        os.makedirs(diretorio_salvamento, exist_ok=True)

        # Iterar sobre as tabelas e extrair cada uma para um arquivo CSV dentro da pasta data
        for nome_tabela in tabelas_extraidas:
            extraindo_tabelas(nome_tabela, conexao, diretorio_salvamento)

        print("Extração concluída com sucesso!")

    except Exception as e:
        print(f"Ocorreu um erro: {e}")

    finally:
        # Fechar a conexão com o banco de dados
        if conexao:
            conexao.close()

if __name__ == "__main__":
    main()

    
 

Extração concluída com sucesso!


### Copiando o arquivo CSV para a mesma pasta onde as tabelas se encontram

In [3]:
import os
import shutil
from datetime import datetime

# Diretório base onde os arquivos CSV serão salvos (se não existir, será criado)
DIRETORIO_BASE = os.path.join('.', 'data')

#copiando o arquivo para o diretório das demais planilhas extraidas do banco
def main():
    try:
        # Criar diretório com a data do dia da extração
        data_diretorio = datetime.now().strftime('%Y%m%d')
        diretorio_salvamento = os.path.join(DIRETORIO_BASE, data_diretorio)
        os.makedirs(diretorio_salvamento, exist_ok=True)

        # Caminho onde o arquivo orginial csv se encontra
        caminho_base = "C:/indicium/order_details.csv"

        # Obter o nome do arquivo original
        arquivo_original = os.path.basename(caminho_base)
        
        # Adicionar a data da extração ao nome do arquivo
        now = datetime.now()
        data_hoje = now.strftime('%Y%m%d')
        novo_nome_csv = f"{arquivo_original.split('.')[0]}_{data_hoje}.csv"
        destino_novo_csv = os.path.join(diretorio_salvamento, novo_nome_csv)

        # Copiar o arquivo CSV para o mesmo diretório das tabelas
        shutil.copy(caminho_base, destino_novo_csv)

        print("Cópia do arquivo CSV concluída com sucesso!")

    except Exception as e:
        print(f"Ocorreu um erro: {e}")

if __name__ == "__main__":
    main()


Cópia do arquivo CSV concluída com sucesso!


### Carregando arquivos para banco de dados.
* Para fins didáticos foi criado um schema chamado "datawarehouse" no mesmo banco de dandos onde ocorreu a extração.
* Antes de executar este bloco, crie as tabelas dentro do schema "datawarehouse" com o código abaixo:

```sql
-- Table: datawarehouse.orders

-- DROP TABLE IF EXISTS datawarehouse.orders;

CREATE TABLE IF NOT EXISTS datawarehouse.orders
(
    order_id smallint NOT NULL,
    customer_id bpchar COLLATE pg_catalog."default",
    employee_id smallint,
    order_date date,
    required_date date,
    shipped_date date,
    ship_via smallint,
    freight real,
    ship_name character varying(40) COLLATE pg_catalog."default",
    ship_address character varying(60) COLLATE pg_catalog."default",
    ship_city character varying(15) COLLATE pg_catalog."default",
    ship_region character varying(15) COLLATE pg_catalog."default",
    ship_postal_code character varying(10) COLLATE pg_catalog."default",
    ship_country character varying(15) COLLATE pg_catalog."default",
    PRIMARY KEY (order_id)
)

TABLESPACE pg_default;

ALTER TABLE IF EXISTS datawarehouse.orders
    OWNER to northwind_user;

-- Table: datawarehouse.order_details

-- DROP TABLE IF EXISTS datawarehouse.order_details;

CREATE TABLE IF NOT EXISTS datawarehouse.order_details
(
    order_id smallint NOT NULL,
    product_id smallint,
    unit_price real,
    quantity smallint,
    discount real,
    CONSTRAINT order_details_order_id_fkey FOREIGN KEY (order_id)
        REFERENCES datawarehouse.orders (order_id) MATCH SIMPLE
        ON UPDATE NO ACTION
        ON DELETE NO ACTION
)

TABLESPACE pg_default;

ALTER TABLE IF EXISTS datawarehouse.order_details
    OWNER to northwind_user;
```


In [4]:
import os
import shutil
import psycopg2
import pandas as pd
from datetime import datetime

# Configurações do banco de dados PostgreSQL de destino
CONFIG_BD_DESTINO = {
    'host': 'localhost',
    'database': 'northwind',
    'user': 'northwind_user',
    'password': 'thewindisblowing'
}

# Diretório base onde os arquivos CSV foram salvos
DIRETORIO_BASE = os.path.join('.', 'data', datetime.now().strftime('%Y%m%d'))

# Obter o dia de hoje para puxar os arquivos com a data corrente.
data_corrente = datetime.now().strftime('%Y%m%d')

# Dicionário com o mapeamento entre nome do arquivo CSV e nome da tabela
MAPEAMENTO_ARQUIVO_TABELA = {
    f'orders_{data_corrente}.csv': 'orders',
    f'order_details_{data_corrente}.csv': 'order_details'
}
# Trucando a tabela antes de carregar novamenta para não ocorrer erro, isso pode mudar de acordo com as condições
# do projeto, sendo incremental ou full.
def truncar_tabelas(conexao):
    with conexao.cursor() as cursor:
        for nome_tabela in MAPEAMENTO_ARQUIVO_TABELA.values():
            consulta_truncate = f"TRUNCATE TABLE datawarehouse.{nome_tabela} CASCADE;"
            cursor.execute(consulta_truncate)
        conexao.commit()

# Carregando dados dos arquivos csv para o banco de dados 
def carregar_dados_no_banco():
    conexao_destino = None
    try:
        # Conectar ao banco de dados de destino
        conexao_destino = psycopg2.connect(**CONFIG_BD_DESTINO)

        # Criar o esquema (schema) caso não exista
        with conexao_destino.cursor() as cursor:
            cursor.execute(f"CREATE SCHEMA IF NOT EXISTS datawarehouse;")
        conexao_destino.commit()

        # Truncar as tabelas antes de carregar os dados
        truncar_tabelas(conexao_destino)

        for nome_arquivo, nome_tabela in MAPEAMENTO_ARQUIVO_TABELA.items():
            # Caminho completo do arquivo CSV a ser carregado
            caminho_arquivo_csv = os.path.join(DIRETORIO_BASE, nome_arquivo)

            # Carregar o arquivo CSV em um DataFrame
            df = pd.read_csv(caminho_arquivo_csv)

            # Substituir valores "NaN" por "None" no DataFrame
            df = df.where(pd.notna(df), None)

            # Converter o DataFrame em uma lista de tuplas
            dados = [tuple(row) for row in df.values]

            # Preparar o comando SQL para inserção dos dados na tabela especificada
            placeholders = ",".join(["%s"] * len(df.columns))
            consulta_insercao = f"INSERT INTO datawarehouse.{nome_tabela} VALUES ({placeholders});"

            # Executar a inserção dos dados
            with conexao_destino.cursor() as cursor:
                cursor.executemany(consulta_insercao, dados)

            conexao_destino.commit()

            print(f"Dados carregados na tabela {nome_tabela} com sucesso!")

    except Exception as e:
        print(f"Ocorreu um erro durante o carregamento dos dados: {e}")

    finally:
        # Fechar a conexão com o banco de dados de destino
        if conexao_destino:
            conexao_destino.close()

if __name__ == "__main__":
    carregar_dados_no_banco()



Dados carregados na tabela orders com sucesso!
Dados carregados na tabela order_details com sucesso!


### Orquestrando a ordem de execução
* Nesta etapa vamos organizar a ordem de execução de cada scrip.
* Caso seja encontrado algum problema no processo de extração, a carga no banco de dados será suspendida.

In [5]:
import carrega_banco_postgres
import extrai_csv
import extrai_banco_postgres

def main():
    try:
        # Extrair dados do banco de dados
        extrai_banco_postgres.main()

    except Exception as e:
        print(f"Ocorreu um erro na extração de dados do banco de dados: {e}")
        print("O pipeline foi interrompido devido a um erro.")
        return  # Interrompe o pipeline caso ocorra um erro

    try:
        # Extrair dados do arquivo CSV
        extrai_csv.main()

    except Exception as e:
        print(f"Ocorreu um erro na extração de dados do arquivo CSV: {e}")
        print("O pipeline foi interrompido devido a um erro.")
        return  # Interrompe o pipeline caso ocorra um erro

    try:
        # Carregar dados no banco de dados
        carrega_banco_postgres.carregar_dados_no_banco()

    except Exception as e:
        print(f"Ocorreu um erro no carregamento dos dados no banco de dados: {e}")
        print("O pipeline foi interrompido devido a um erro.")

if __name__ == "__main__":
    main()


Extração concluída com sucesso!
Cópia do arquivo CSV concluída com sucesso!
Dados carregados na tabela orders com sucesso!
Dados carregados na tabela order_details com sucesso!


## Consulta dos dados no banco
* Após todas as etapas serem concluídas com sucesso, vamos rodar uma consulta para validar as informações de conexão entre as tabelas com o comando abaixo:

```sql
SELECT
    o.order_id,
    o.customer_id,
    o.order_date,
    od.product_id,
    od.unit_price,
    od.quantity,
    od.discount
FROM
    datawarehouse.orders o
JOIN
    datawarehouse.order_details od ON o.order_id = od.order_id;
```

#### Autor
Willian Ribeiro dos Santos

Obrigado.