# Michel Souza Santana
## Projeto Desafio Aceleras
## Trilha 1
> Start: 15/05/2023

## Fase 1 - Transformação do ER proposto em um BI, realizando o ETL usando uma ferrmenta local (Talend, Apache Hop, Nifi, Airflow, SSIS, Pentaho,…)

* Entenda o modelo ER: Familiarize-se com o modelo ER existente, incluindo as tabelas, relacionamentos e atributos. Isso ajudará você a mapear corretamente os dados durante a transformação.

* Identifique os requisitos de BI: Compreenda as necessidades e requisitos do seu projeto de BI. Identifique as informações que você precisa extrair e apresentar no ambiente de BI.

* Escolha uma ferramenta ETL: Pesquise e selecione uma ferramenta ETL adequada para sua transformação de dados. Existem várias opções disponíveis, como Pentaho Data Integration, Talend, Microsoft SQL Server Integration Services (SSIS), entre outras.

* Instale a ferramenta ETL: Faça o download e instale a ferramenta ETL selecionada no seu ambiente local.

* Conecte-se ao banco de dados: Configure a conexão da ferramenta ETL com o banco de dados que contém os dados do modelo ER. Forneça as credenciais de acesso necessárias para estabelecer a conexão.

* Extração de dados: Utilizando a ferramenta ETL, extraia os dados do banco de dados conforme necessário para o seu projeto de BI. Isso pode envolver a seleção de tabelas específicas, filtragem de dados ou até mesmo a união de várias tabelas para obter as informações desejadas.

* Limpeza e transformação de dados: Aplique as transformações necessárias nos dados extraídos para adequá-los às necessidades do ambiente de BI. Isso pode incluir a remoção de dados duplicados, preenchimento de valores ausentes, conversão de formatos de data, entre outros processos de limpeza e transformação.

* Mapeamento para o modelo dimensional: Se você estiver construindo um data warehouse ou uma solução de BI baseada em modelo dimensional, mapeie os dados extraídos para as dimensões e fatos do seu modelo dimensional. Isso envolve a definição de hierarquias, chaves e relacionamentos.

* Desenvolva fluxos de trabalho ETL: Utilizando a ferramenta ETL, crie fluxos de trabalho que automatizem a transformação de dados. Isso pode envolver a criação de transformações, tarefas agendadas e outras operações para garantir a integridade e atualização dos dados.

* Carregamento dos dados: Carregue os dados transformados no ambiente de BI, que pode incluir um data warehouse, um banco de dados ou outra solução de armazenamento de dados.

* Desenvolva visualizações e relatórios: Com os dados carregados no ambiente de BI, desenvolva visualizações e relatórios interativos para fornecer insights acionáveis aos usuários finais. Isso pode ser feito usando ferramentas de visualização de dados como Tableau, Power BI, QlikView, entre outras.

* Teste e valide: Realize testes para garantir a precisão e a integridade dos dados transformados. Verifique se as visualiza

### Os processos de engenharia de dados (data engineering) são etapas sequenciais usadas para transformar e gerenciar dados em um pipeline de processamento. Esses processos são realizados em várias etapas, começando pelos dados brutos e terminando com os dados confiáveis e prontos para análise. Aqui está uma explicação de cada processo em ordem de execução:

* Processo de Raw Data (Dados Brutos):
Nesta fase inicial, os dados brutos são coletados de várias fontes, como bancos de dados, arquivos CSV, APIs, feeds de streaming, entre outros. Esses dados podem estar em formatos diversos e podem conter ruído, inconsistências e falta de estrutura. O objetivo do processo de Dados Brutos é extrair e armazenar esses dados em seu estado bruto original.

* Processo de Data Engineering (Engenharia de Dados):
Nesta etapa, os dados brutos são processados e transformados em um formato adequado para análise e uso posterior. Isso envolve atividades como limpeza de dados, padronização de formatos, filtragem de dados inválidos ou incompletos, remoção de duplicatas e criação de estruturas de dados otimizadas para consultas e processamento eficiente. O objetivo é obter dados estruturados e refinados que possam ser usados em análises e outros processos.

* Processo de Refined Data (Dados Refinados):
Nesta fase, os dados processados são refinados ainda mais para atender a requisitos específicos de negócios e análise. Isso pode incluir agregação de dados, cálculos adicionais, enriquecimento com informações adicionais, como dados geográficos ou dados de terceiros, e transformações personalizadas para atender às necessidades específicas dos usuários finais. O objetivo é fornecer dados refinados e mais valiosos para análise e tomada de decisão.

* Processo de Transient Data (Dados Transitórios):
Os dados transitórios são dados temporários ou de curta duração usados para processamento e transformação intermediários. Esses dados podem ser gerados durante o processamento em tempo real, pipelines de processamento de dados em lote ou durante a integração de diferentes fontes de dados. Os dados transitórios são normalmente armazenados em sistemas de processamento de fluxo de dados ou bancos de dados temporários, e seu objetivo é suportar operações intermediárias antes que os dados sejam refinados e confiáveis.

* Processo de Trusted Data (Dados Confiáveis):
Nesta fase final, os dados são considerados confiáveis, prontos para uso e adequados para análise e tomada de decisão. Isso envolve garantir a qualidade dos dados, validar a precisão, consistência e integridade dos dados, aplicar regras de negócios e conformidade, bem como implementar mecanismos de controle de qualidade. O objetivo é fornecer dados confiáveis que possam ser usados com confiança para análise, geração de relatórios e outras tarefas de negócios.

## Carregando as bibliotecas

In [1]:
import pandas as pd
import numpy as np
import pyarrow
import os
import csv

## Instalando o pyrrow para conversão dos arquivos csv em parquet

In [2]:
#!pip install pyarrow


# 1 - Data Raw

## Criando os diretórios estruturais do projeto

In [3]:
path_folder = !pwd
f = pd.read_csv(str(path_folder[0]) + '/folders.csv')
lista_folders = f['Folders'].to_list()

In [4]:
for i in lista_folders:
    diretorio = str(path_folder[0]) 

    if not os.path.exists(diretorio + '/' + i):
        os.makedirs(diretorio + '/' + i)
        print(f"Diretório {i} criado com sucesso!")
    else:
        print(f"O diretório {i} já existe.")
if not os.path.exists(diretorio + '/' + '/raw' + '/parquet'):
    os.makedirs(diretorio + '/' + '/raw' + '/parquet')
    print('Pasta parquet criada.')
else:
    print('Pasta parquet já existe;')

Diretório engineer criado com sucesso!
Diretório raw criado com sucesso!
Diretório refined criado com sucesso!
Diretório transient criado com sucesso!
Diretório trusted criado com sucesso!
Pasta parquet criada.


# Identificar as fontes de dados: 

https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce

A URL em questão se refere a um conjunto de dados disponibilizado no site Kaggle, que contém informações sobre o comércio eletrônico no Brasil. O conjunto de dados é intitulado "Brazilian E-Commerce Public Dataset by Olist" e foi criado pela empresa Olist, que é uma plataforma de vendas on-line que conecta pequenos e médios varejistas a marketplaces.

O conjunto de dados contém informações de mais de 100 mil pedidos de clientes, com dados que incluem informações do produto, preços, prazos de entrega, avaliações de clientes e informações sobre o vendedor. Além disso, o conjunto de dados contém informações sobre geolocalização dos clientes, categoria de produtos e informações sobre a própria loja virtual.

Este conjunto de dados pode ser extremamente útil para análises sobre comércio eletrônico no Brasil, permitindo a análise de tendências de consumo, comportamento dos clientes, performance de vendas e muito mais. A disponibilização de dados desse tipo é importante para o desenvolvimento de modelos de negócios mais eficientes e para a tomada de decisões mais informadas no setor de e-commerce brasileiro.

## Copia os dados do site do kaggle

### Instalando o Kaggle

In [5]:
#!pip install kaggle --upgrade

### Baixando os arquivos csv

Baixar & diszipar

In [6]:
if not os.path.exists(str(path_folder[0]) + '/' + 'raw/brazilian-ecommerce.zip'):
    
    r = !pwd
    PATH_FOLDER = r[0] + '/raw'
    os.environ['PATH_FOLDER'] = PATH_FOLDER

    !cd $PATH_FOLDER && kaggle datasets download -d olistbr/brazilian-ecommerce
    !cd $PATH_FOLDER && unzip brazilian-ecommerce.zip
    !cd $PATH_FOLDER && rm -r brazilian-ecommerce.zip
    print('Arquivo carregado e descompactado.')
else:
    print('Arquivo já existe.')

Downloading brazilian-ecommerce.zip to /home/michel/opt/kaggle_olist/raw
100%|███████████████████████████████████████| 42.6M/42.6M [00:13<00:00, 812kB/s]
100%|██████████████████████████████████████| 42.6M/42.6M [00:13<00:00, 3.23MB/s]
Archive:  brazilian-ecommerce.zip
  inflating: olist_customers_dataset.csv  
  inflating: olist_geolocation_dataset.csv  
  inflating: olist_order_items_dataset.csv  
  inflating: olist_order_payments_dataset.csv  
  inflating: olist_order_reviews_dataset.csv  
  inflating: olist_orders_dataset.csv  
  inflating: olist_products_dataset.csv  
  inflating: olist_sellers_dataset.csv  
  inflating: product_category_name_translation.csv  
Arquivo carregado e descompactado.


## Criando o arquivo 'controller.csv' na pasta enginner

In [13]:
pasta = str(path_folder[0]) + '/raw'  # Substitua pelo caminho da pasta desejada

# Capturar nomes dos arquivos e remover a extensão
arquivos = [os.path.splitext(arquivo)[0] for arquivo in os.listdir(pasta) if os.path.isfile(os.path.join(pasta, arquivo))]
# Eliminar as palavras indesejadas
palavras_indesejadas = ['_dataset.csv', 'olist_', 'order_', '_dataset', 'product_', '_name', '_translation']
nomes_limpos = [arquivo for arquivo in arquivos]
for palavra in palavras_indesejadas:
   nomes_limpos = [nome.replace(palavra, '') for nome in nomes_limpos]

# Caminho do arquivo CSV a ser salvo
caminho_arquivo = str(path_folder[0]) + '/' + 'controller.csv'

# Definir os dados a serem escritos no arquivo CSV
dados = [
    ["path_transient", "path_raw", "table_transient", "table_raw", "table_name"]
] + [
    [str(path_folder[0]) + '/' + "transient", str(path_folder[0]) + '/' + "raw", nome, nome, nome_limp]
    for nome, nome_limp in zip(arquivos, nomes_limpos)
]

# Salvar o arquivo CSV
pd.DataFrame(dados).to_csv(caminho_arquivo, index=False, header=False)



## Carregando tabela com os path necessários para manipulação dos arquivos

### Testando a formação dos path

In [11]:
print(path['path_transient'][0] + '/' + path['table_transient'][0] + '.csv')
print(path['path_raw'][0] + '/' + path['table_raw'][0] + '.parquet')

/home/michel/opt/kaggle_olist/transient/olist_order_payments_dataset.csv
/home/michel/opt/kaggle_olist/raw/olist_order_payments_dataset.parquet


In [14]:
path = pd.read_csv(str(path_folder[0]) + '/' + 'controller.csv')
path

Unnamed: 0,path_transient,path_raw,table_transient,table_raw,table_name
0,/home/michel/opt/kaggle_olist/transient,/home/michel/opt/kaggle_olist/raw,olist_order_payments_dataset,olist_order_payments_dataset,payments
1,/home/michel/opt/kaggle_olist/transient,/home/michel/opt/kaggle_olist/raw,olist_customers_dataset,olist_customers_dataset,customers
2,/home/michel/opt/kaggle_olist/transient,/home/michel/opt/kaggle_olist/raw,olist_products_dataset,olist_products_dataset,products
3,/home/michel/opt/kaggle_olist/transient,/home/michel/opt/kaggle_olist/raw,olist_order_reviews_dataset,olist_order_reviews_dataset,reviews
4,/home/michel/opt/kaggle_olist/transient,/home/michel/opt/kaggle_olist/raw,olist_sellers_dataset,olist_sellers_dataset,sellers
5,/home/michel/opt/kaggle_olist/transient,/home/michel/opt/kaggle_olist/raw,olist_geolocation_dataset,olist_geolocation_dataset,geolocation
6,/home/michel/opt/kaggle_olist/transient,/home/michel/opt/kaggle_olist/raw,product_category_name_translation,product_category_name_translation,category
7,/home/michel/opt/kaggle_olist/transient,/home/michel/opt/kaggle_olist/raw,olist_order_items_dataset,olist_order_items_dataset,items
8,/home/michel/opt/kaggle_olist/transient,/home/michel/opt/kaggle_olist/raw,olist_orders_dataset,olist_orders_dataset,orders


## Convertendo or arquivos CSV em .parquet

In [10]:
for i in range(len(path['path_raw'])):
    df = pd.read_csv(path['path_raw'][i] + '/' + path['table_raw'][i] + '.csv')
    df = df.astype('str')
    df.to_parquet(str(path['path_raw'][i]) + '/' + 'parquet' + '/' + str(path['table_raw'][i]) + '.parquet')


## Eliminando as duplicadas nos ID das tabelas

In [None]:
"""for i in range(len(path['path_raw'])):
    df_clear_duplicates = pd.read_parquet(str(path['path_raw'][i]) + '/' + 'parquet' + '/' + str(path['table_raw'][i]) + '.parquet')
    coluna = df_clear_duplicates.columns[0]
    df_clear_duplicates.drop_duplicates(subset=coluna, inplace=True)
    df_clear_duplicates.to_parquet(str(path['path_raw'][i]) + '/' + 'parquet' + '/' + str(path['table_raw'][i]) + '.parquet')"""

: 

In [None]:
"""MERGE INTO tabela_original t
USING (
    SELECT DISTINCT coluna1, coluna2, coluna3, ...
    FROM tabela_original
) s ON (t.coluna1 = s.coluna1)
WHEN MATCHED THEN
    DELETE;
"""

: 

## Aqui, estarei interando sobre um banco de dado MySql para criar tabelas correspondentes aos arquivos e carrega-las como forma de Buckap para os ddados da RAW

### Criando as tabelas no banco de dados Mysql

In [None]:
#!pip install mysql-connector-python
senha_db = pd.read_csv('/home/michel/senha.csv')
import mysql.connector

: 

### Conectando ao banco de dados MySQL e criando as tabelas

In [None]:
"""cnx = mysql.connector.connect(
    host='localhost',
    user='root',
    password=senha_db['senha'][0],
    database='staging_olist_db'
)

# Cursor para executar as consultas SQL
cursor = cnx.cursor()

# Lista de queries SQL para criação das tabelas
create_table_queries = ["""
"""
-- Criação da tabela CUSTOMER
CREATE TABLE IF NOT EXISTS stg_customers (
  customer_id VARCHAR(200) NOT NULL,
  customer_unique_id VARCHAR(255),
  customer_zip_code_prefix VARCHAR(200),
  customer_city VARCHAR(255),
  customer_state VARCHAR(255),
  PRIMARY KEY (customer_id)
)
""",
"""
-- Criação da tabela GEOLOCATION
CREATE TABLE IF NOT EXISTS stg_geolocation (
  geolocation_zip_code_prefix VARCHAR(200) NOT NULL,
  geolocation_lat VARCHAR(255),
  geolocation_lng VARCHAR(255),
  geolocation_city VARCHAR(255),
  geolocation_state VARCHAR(255),
  PRIMARY KEY (geolocation_zip_code_prefix, geolocation_lat, geolocation_lng)
)
""",
"""
-- Criação da tabela ITEMS
CREATE TABLE IF NOT EXISTS stg_items (
  order_id VARCHAR(255) NOT NULL,
  order_item_id VARCHAR(200) NOT NULL,
  product_id VARCHAR(255) NOT NULL,
  seller_id VARCHAR(255) NOT NULL,
  shipping_limit_date VARCHAR(255),
  price VARCHAR(255),
  freight_value VARCHAR(255),
  PRIMARY KEY (order_id, order_item_id) -- Primary Key Composta
)
""",
"""
-- Criação da tabela PAYMENT
CREATE TABLE IF NOT EXISTS stg_payments (
  order_id VARCHAR(255) NOT NULL,
  payment_sequential VARCHAR(200) NOT NULL,
  payment_type VARCHAR(255) NOT NULL,
  payment_installments VARCHAR(200),
  payment_value VARCHAR(255),
  PRIMARY KEY (order_id, payment_sequential) -- Primary Key Composta
)
""",
"""
-- Criação da tabela REVIEWS
CREATE TABLE IF NOT EXISTS stg_reviews (
  review_id VARCHAR(255) NOT NULL,
  order_id VARCHAR(255) NOT NULL,
  review_score VARCHAR(200),
  review_comment_title VARCHAR(255),
  review_comment_message TEXT,
  review_creation_date VARCHAR(255),
  review_answer_timestamp VARCHAR(255),
  PRIMARY KEY (review_id)
)
""",
"""
-- Criação da tabela ORDERS
CREATE TABLE IF NOT EXISTS stg_orders (
  order_id VARCHAR(255) NOT NULL,
  customer_id VARCHAR(255),
  order_status VARCHAR(255),
  order_purchase_timestamp VARCHAR(255),
  order_approved_at VARCHAR(255),
  order_delivered_carrier_date VARCHAR(255),
  order_delivered_customer_date VARCHAR(255),
  order_estimated_delivery_date VARCHAR(255),
  PRIMARY KEY (order_id)
)
""",
"""
-- Criação da tabela PRODUCTS
CREATE TABLE IF NOT EXISTS stg_products (
  product_id VARCHAR(255) NOT NULL,
  product_category_name VARCHAR(255),
  product_name_lenght VARCHAR(200),
  product_description_lenght VARCHAR(200),
  product_photos_qty VARCHAR(200),
  product_weight_g VARCHAR(200),
  product_length_cm VARCHAR(200),
  product_height_cm VARCHAR(200),
  product_width_cm VARCHAR(200),
  PRIMARY KEY (product_id)
)
""",
"""
-- Criação da tabela SELLERS
CREATE TABLE IF NOT EXISTS stg_sellers (
  seller_id VARCHAR(255) NOT NULL,
  seller_zip_code_prefix VARCHAR(255),
  seller_city VARCHAR(255),
  seller_state VARCHAR(255),
  PRIMARY KEY (seller_id)
)
"""
"""]

# Executar as consultas SQL para criar as tabelas
for query in create_table_queries:
    cursor.execute(query)

# Commit das alterações
cnx.commit()

# Fechando o cursor e a conexão
cursor.close()
cnx.close()"""

: 