## Objetivo do notebook
#### O objetivo deste notebook é realizar a leitura de dados das origens do dataset alunos na camada silver e gravar de forma versionada na camada gold


### Import Libs

In [1]:
!pip install minio





In [2]:
from pyspark.sql import SparkSession
from io import BytesIO
from pyspark.sql.functions import col, max
from pyspark.sql.window import Window
from minio.error import S3Error
from minio import Minio

import glob
import pyarrow.parquet as pq
import os
import pandas as pd

### Definição de variáveis

In [3]:
# Sessão Spark
spark = SparkSession.builder.getOrCreate()

# Parametros de input e output das origens
camadaLeitura = 'silver'
camadaEscrita = 'gold'
pasta = 'notas'
temp_blobs = '/home/jovyan/notebooks/temporary_blobs/' # pasta temporária para armazenamento de objetos

# Conexão ao miniIO
minio_endpoint = 'minio:9000'
minio_access_key = 'minioaccesskey'
minio_secret_key = 'miniosecretkey'
minio_object_name = pasta
minio_client = Minio(minio_endpoint, access_key=minio_access_key, secret_key=minio_secret_key, secure=False)

# Definição de variáveis para versionamento dos dados na camada bronze
minio_path = f'{pasta}'

### Leitura da camada Silver

In [4]:
dataframes = []

try:
    # Lista objetos no bucket
    objects = minio_client.list_objects(camadaLeitura, prefix=pasta, recursive=True)

    # Itera sobre os objetos na pasta
    for obj in objects:
        # Lê o conteúdo do objeto
        content = minio_client.get_object(camadaLeitura, obj.object_name).read()
        
        # Lê o conteúdo do objeto Parquet usando pyarrow e converte para DataFrame Pandas
        df_pandas = pq.read_table(BytesIO(content)).to_pandas()
        
        # Adiciona o DataFrame Pandas à lista de DataFrames
        dataframes.append(df_pandas)
        
        print(f"Objeto encontrado: {obj.object_name}")
except error.MinioException as e:
    print(f"Erro ao listar objetos: {e}")

# Concatena todos os DataFrames Pandas em um único DataFrame Pandas
consolidated_df_pandas = pd.concat(dataframes, ignore_index=True)

# Converte para DataFrame PySpark
df_matri_notas = spark.createDataFrame(consolidated_df_pandas)

Objeto encontrado: notas/notas20240209/part-00000-6cc2ff32-2dc7-48a3-9604-756f4ff38edb-c000.snappy.parquet
Objeto encontrado: notas/notas20240209/part-00001-6cc2ff32-2dc7-48a3-9604-756f4ff38edb-c000.snappy.parquet
Objeto encontrado: notas/notas20240209/part-00002-6cc2ff32-2dc7-48a3-9604-756f4ff38edb-c000.snappy.parquet
Objeto encontrado: notas/notas20240209/part-00003-6cc2ff32-2dc7-48a3-9604-756f4ff38edb-c000.snappy.parquet
Objeto encontrado: notas/notas20240209/part-00004-6cc2ff32-2dc7-48a3-9604-756f4ff38edb-c000.snappy.parquet
Objeto encontrado: notas/notas20240209/part-00005-6cc2ff32-2dc7-48a3-9604-756f4ff38edb-c000.snappy.parquet
Objeto encontrado: notas/notas20240209/part-00006-6cc2ff32-2dc7-48a3-9604-756f4ff38edb-c000.snappy.parquet
Objeto encontrado: notas/notas20240209/part-00007-6cc2ff32-2dc7-48a3-9604-756f4ff38edb-c000.snappy.parquet
Objeto encontrado: notas/notas20240209/part-00008-6cc2ff32-2dc7-48a3-9604-756f4ff38edb-c000.snappy.parquet
Objeto encontrado: notas/notas2024020

### Tratamento dos dados

In [5]:
# Busca etl_date mais recente
max_etl_date = df_matri_notas.agg(max("etl_date")).collect()[0][0]

# Filtra o Dataframe pelo etl_date mais recente
df_matri_notas = df_matri_notas.select('id_matricula', 'id_disciplina', 'data', 'nota' ).where(col("etl_date") == max_etl_date)

### Gravação do dataframe em um diretório temporário

In [6]:
df_matri_notas.write.parquet(temp_blobs, mode="overwrite")

### Gravação na camada Gold

In [7]:
# Recria bucket caso não exista
if not minio_client.bucket_exists(camadaEscrita):
    minio_client.make_bucket(camadaEscrita)

# Lista todos os arquivos Parquet no diretório temporário de blobs
arquivos_parquet = glob.glob(os.path.join(temp_blobs, '*.parquet'))

# Exclui todos os arquivos existentes na pasta no MinIO
objects = minio_client.list_objects(camadaEscrita, minio_path, recursive=True)
for obj in objects:
    try:
        minio_client.remove_object(camadaEscrita, obj.object_name)
        print(f"Arquivo existente '{obj.object_name}' removido com sucesso do MinIO.")
    except S3Error as e:
        print(f"Erro ao excluir o arquivo existente do MinIO: {e}")

# Itera sobre a lista de arquivos Parquet e envia cada um para o MinIO
for arquivo_parquet in arquivos_parquet:
    try:
        # Obtém o nome do arquivo
        nome_arquivo = os.path.basename(arquivo_parquet)

        # Envia o novo arquivo para o MinIO
        minio_client.fput_object(camadaEscrita, os.path.join(minio_path, nome_arquivo), arquivo_parquet)
        print(f"Arquivo '{nome_arquivo}' enviado com sucesso para o MinIO em '{os.path.join(minio_path, nome_arquivo)}'.")
    except S3Error as e:
        print(f"Erro ao interagir com o MinIO: {e}")

Arquivo 'part-00000-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet' enviado com sucesso para o MinIO em 'notas/part-00000-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet'.
Arquivo 'part-00001-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet' enviado com sucesso para o MinIO em 'notas/part-00001-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet'.
Arquivo 'part-00002-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet' enviado com sucesso para o MinIO em 'notas/part-00002-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet'.
Arquivo 'part-00003-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet' enviado com sucesso para o MinIO em 'notas/part-00003-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet'.
Arquivo 'part-00004-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet' enviado com sucesso para o MinIO em 'notas/part-00004-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet'.
Arquivo 'part-00005-10b75333-87fd-4754-9a75-0

### Remove dados do diretório temporário de blobs

In [8]:
# Liste todos os arquivos na pasta
arquivos_na_pasta = os.listdir(temp_blobs)

# Itere sobre os arquivos e os delete
for arquivo in arquivos_na_pasta:
    caminho_completo = os.path.join(temp_blobs, arquivo)
    try:
        if os.path.isfile(caminho_completo):
            os.remove(caminho_completo)
            print(f'{caminho_completo} deletado com sucesso.')
    except Exception as e:
        print(f'Erro ao deletar {caminho_completo}: {e}')

spark.stop()

/home/jovyan/notebooks/temporary_blobs/.part-00000-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet.crc deletado com sucesso.
/home/jovyan/notebooks/temporary_blobs/.part-00001-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet.crc deletado com sucesso.
/home/jovyan/notebooks/temporary_blobs/.part-00002-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet.crc deletado com sucesso.
/home/jovyan/notebooks/temporary_blobs/.part-00003-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet.crc deletado com sucesso.
/home/jovyan/notebooks/temporary_blobs/.part-00004-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet.crc deletado com sucesso.


/home/jovyan/notebooks/temporary_blobs/.part-00005-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet.crc deletado com sucesso.
/home/jovyan/notebooks/temporary_blobs/.part-00006-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet.crc deletado com sucesso.
/home/jovyan/notebooks/temporary_blobs/.part-00007-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet.crc deletado com sucesso.
/home/jovyan/notebooks/temporary_blobs/.part-00008-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet.crc deletado com sucesso.
/home/jovyan/notebooks/temporary_blobs/.part-00009-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet.crc deletado com sucesso.
/home/jovyan/notebooks/temporary_blobs/.part-00010-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet.crc deletado com sucesso.
/home/jovyan/notebooks/temporary_blobs/.part-00011-10b75333-87fd-4754-9a75-0b265bebbc2b-c000.snappy.parquet.crc deletado com sucesso.
/home/jovyan/notebooks/temporary_blobs/._SUCCESS.crc deletado 