# Pipeline de Processamento e Análise de Dados de Vendas 
## Postos de Gasolina da Alemanha
## Michel Souza Santana
> 09/07/2023

## Ingestão dos dados

Fonte dos dados: https://dev.azure.com/tankerkoenig/_git/tankerkoenig-data?path=/prices

In [None]:
import os
import pandas as pd
from google.cloud import storage

In [None]:
def contar_pastas(pasta):
    if not os.path.exists(pasta) or not os.path.isdir(pasta):
        print(f"A pasta '{pasta}' não existe ou não é uma pasta válida.")
        return -1

    lista_itens = os.listdir(pasta)
    pastas = [item for item in lista_itens if os.path.isdir(os.path.join(pasta, item))]
    
    return len(pastas)

    print(pastas)

In [None]:
def capturar_caminhos_arquivos(pasta):
    caminhos_arquivos = []
    for pasta_atual, _, arquivos in os.walk(pasta):
        for arquivo in arquivos:
            caminhos_arquivos.append(os.path.join(pasta_atual, arquivo))
    return caminhos_arquivos

caminho_da_pasta = '/home/michel/Documentos/Analise_de_Dados_de_Vendas/data/german_petrol/price/2022'
numero_de_pastas = contar_pastas(caminho_da_pasta)

if numero_de_pastas != -1:
    print(f"O número de pastas em '{caminho_da_pasta}' é: {numero_de_pastas}")
    caminhos_arquivos = capturar_caminhos_arquivos(caminho_da_pasta)
    print(f"Caminhos dos arquivos dentro das pastas:")

In [6]:
import os
import pandas as pd

def contar_pastas_e_capturar_arquivos(pasta):
    if not os.path.exists(pasta) or not os.path.isdir(pasta):
        print(f"A pasta '{pasta}' não existe ou não é uma pasta válida.")
        return -1

    pastas = []
    caminhos_arquivos = []

    for pasta_atual, _, arquivos in os.walk(pasta):
        for item in os.listdir(pasta_atual):
            caminho_item = os.path.join(pasta_atual, item)
            if os.path.isdir(caminho_item):
                pastas.append(item)
            else:
                caminhos_arquivos.append(caminho_item)

    print(f"O número de pastas em '{pasta}' é: {len(pastas)}")
    print("Caminhos dos arquivos dentro das pastas:")
    for caminho_arquivo in caminhos_arquivos:
        print(caminho_arquivo)

    return pastas, caminhos_arquivos

caminho_da_pasta = '/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022'
pastas, caminhos_arquivos = contar_pastas_e_capturar_arquivos(caminho_da_pasta)


O número de pastas em '/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022' é: 12
Caminhos dos arquivos dentro das pastas:
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-22-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-09-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-06-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-20-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-16-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-07-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-03-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-05-pr

In [None]:
def carrega_dados():
    diretorio_destino = "/home/michel/Documentos/Analise_de_Dados_de_Vendas/data/german_petrol/price_parquet"

    if not os.path.exists(diretorio_destino):
        os.makedirs(diretorio_destino)
    
    for caminho_arquivo in caminhos_arquivos:
        df_aux = pd.read_csv(caminho_arquivo)
        nome_arquivo = os.path.basename(caminho_arquivo)
    
        nome_arquivo_parquet = os.path.splitext(nome_arquivo)[0] + ".parquet"

        caminho_arquivo_parquet = os.path.join(diretorio_destino, nome_arquivo_parquet)

        df_aux.to_parquet(caminho_arquivo_parquet)
    
        print(f"Arquivo {nome_arquivo_parquet} convertido e salvo em {caminho_arquivo_parquet}")
    

In [None]:
def converter_e_salvar_parquet():
    caminhos_arquivos = capturar_caminhos_arquivos(caminho_da_pasta)
    bucket_name = "NOME_DO_SEU_BUCKET"  # Substitua pelo nome do seu bucket no GCS
    gcs_client = storage.Client()

    for caminho_arquivo in caminhos_arquivos:
        df_aux = pd.read_csv(caminho_arquivo)
        nome_arquivo = os.path.basename(caminho_arquivo)
        nome_arquivo_parquet = os.path.splitext(nome_arquivo)[0] + ".parquet"
        blob = gcs_client.bucket(bucket_name).blob(nome_arquivo_parquet)
        df_aux.to_parquet(f"gs://{bucket_name}/{nome_arquivo_parquet}", storage_options={"storage_options": "mode=wb"})
        print(f"Arquivo {nome_arquivo_parquet} convertido e salvo em gs://{bucket_name}/{nome_arquivo_parquet}")

In [None]:
"""import os
import pandas as pd
import pyarrow.parquet as pq

def capturar_caminhos_parquet(pasta):
    caminhos_parquet = []
    for pasta_atual, _, arquivos in os.walk(pasta):
        for arquivo in arquivos:
            if arquivo.endswith('.parquet'):  # Garantir que só arquivos Parquet sejam adicionados
                caminhos_parquet.append(os.path.join(pasta_atual, arquivo))
    return caminhos_parquet

pasta = '/home/michel/Documentos/Analise_de_Dados_de_Vendas/data/german_petrol/price_parquet'
caminhos_parquet = capturar_caminhos_parquet(pasta)

dataframes = []
for caminho_arquivo in caminhos_parquet:
    df_aux = pd.read_parquet(caminho_arquivo)
    dataframes.append(df_aux)

# Concatenar todos os DataFrames da lista em um único DataFrame final
df = pd.concat(dataframes, axis=1, ignore_index=True)"""


In [8]:
import os
import pandas as pd
from google.cloud import storage

def contar_pastas_capturar_e_salvar_parquet(pasta, bucket_name, chave_json):
    if not os.path.exists(pasta) or not os.path.isdir(pasta):
        print(f"A pasta '{pasta}' não existe ou não é uma pasta válida.")
        return -1

    pastas = []
    caminhos_arquivos = []

    for pasta_atual, _, arquivos in os.walk(pasta):
        for item in os.listdir(pasta_atual):
            caminho_item = os.path.join(pasta_atual, item)
            if os.path.isdir(caminho_item):
                pastas.append(item)
            else:
                caminhos_arquivos.append(caminho_item)

    print(f"O número de pastas em '{pasta}' é: {len(pastas)}")
    print("Caminhos dos arquivos dentro das pastas:")
    for caminho_arquivo in caminhos_arquivos:
        print(caminho_arquivo)

    gcs_client = storage.Client.from_service_account_json(chave_json)

    for caminho_arquivo in caminhos_arquivos:
        df_aux = pd.read_csv(caminho_arquivo)
        nome_arquivo = os.path.basename(caminho_arquivo)
        nome_arquivo_parquet = os.path.splitext(nome_arquivo)[0] + ".parquet"

        bucket = gcs_client.get_bucket(bucket_name)
        blob = bucket.blob(nome_arquivo_parquet)

        # Salvar o arquivo diretamente no bucket do GCS
        with blob.open('wb') as file:
            df_aux.to_parquet(file)
        
        print(f"Arquivo {nome_arquivo_parquet} convertido e salvo em gs://{bucket_name}/{nome_arquivo_parquet}")

    return pastas, caminhos_arquivos

caminho_da_pasta = '/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022'
bucket_name = "germany-petrol-bk"  # Substitua pelo nome do seu bucket no GCS
chave_json = '/home/michel/Documentos/Chave/germany-petrol-37c6d617bd06.json'  # Substitua pelo caminho real da sua chave de serviço JSON

pastas, caminhos_arquivos = contar_pastas_capturar_e_salvar_parquet(caminho_da_pasta, bucket_name, chave_json)


O número de pastas em '/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022' é: 12
Caminhos dos arquivos dentro das pastas:
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-22-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-09-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-06-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-20-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-16-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-07-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-03-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-05-pr

In [9]:
import os
import pandas as pd
from google.cloud import storage

def contar_pastas_capturar_e_salvar_parquet(pasta, bucket_name, chave_json):
    if not os.path.exists(pasta) or not os.path.isdir(pasta):
        print(f"A pasta '{pasta}' não existe ou não é uma pasta válida.")
        return -1

    pastas = []
    caminhos_arquivos = []

    for pasta_atual, _, arquivos in os.walk(pasta):
        for item in os.listdir(pasta_atual):
            caminho_item = os.path.join(pasta_atual, item)
            if os.path.isdir(caminho_item):
                pastas.append(item)
            else:
                caminhos_arquivos.append(caminho_item)

    print(f"O número de pastas em '{pasta}' é: {len(pastas)}")
    print("Caminhos dos arquivos dentro das pastas:")
    for caminho_arquivo in caminhos_arquivos:
        print(caminho_arquivo)

    gcs_client = storage.Client.from_service_account_json(chave_json)
    bucket = gcs_client.get_bucket(bucket_name)
    pasta_no_bucket = "data_raw/"  # Pasta no bucket onde deseja salvar os arquivos

    for caminho_arquivo in caminhos_arquivos:
        df_aux = pd.read_csv(caminho_arquivo)
        nome_arquivo = os.path.basename(caminho_arquivo)
        nome_arquivo_parquet = os.path.splitext(nome_arquivo)[0] + ".parquet"

        caminho_arquivo_no_bucket = os.path.join(pasta_no_bucket, nome_arquivo_parquet)
        blob = bucket.blob(caminho_arquivo_no_bucket)

        # Salvar o arquivo diretamente no bucket do GCS
        with blob.open('wb') as file:
            df_aux.to_parquet(file)
        
        print(f"Arquivo {nome_arquivo_parquet} convertido e salvo em gs://{bucket_name}/{caminho_arquivo_no_bucket}")

    return pastas, caminhos_arquivos

caminho_da_pasta = '/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022'
bucket_name = "germany-petrol-bk"  # Substitua pelo nome do seu bucket no GCS
chave_json = '/home/michel/Documentos/Chave/germany-petrol-37c6d617bd06.json'  # Substitua pelo caminho real da sua chave de serviço JSON

pastas, caminhos_arquivos = contar_pastas_capturar_e_salvar_parquet(caminho_da_pasta, bucket_name, chave_json)


O número de pastas em '/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022' é: 12
Caminhos dos arquivos dentro das pastas:
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-22-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-09-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-06-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-20-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-16-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-07-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-03-prices.csv
/home/michel/Documentos/Analise_de_Dados_de_Vendas/airflow-docker/data/price/2022/07/2022-07-05-pr