### **Objetivo:**

Scritp python que transforma um arquivo CSV (preferencialmente, grande) para um arquivo do tipo PARQUET

### **Ganhos:**

O arquivo parquet é mais leve e rápido, tanto leitura, operações nos dados e escrita.

### Versão - Programação Procedural

**01)** Importa bibliotecas necessárias:

In [None]:
# Import de bibliotecas
import pandas as pd
import os
import pyarrow.parquet as pq
import pyarrow as pa
import os
import shutil

**02)** Define nome do arquivo CSV a ser transformado em Parquet

In [None]:
# Defina o caminho do arquivo CSV
csv_file = 'Rate\Rate.csv'

**03)** Carrega o arquivo CSV e salva chunks intermediário em parquet em um diretório temporário:

In [1]:
# Defina o diretório onde os arquivos Parquet serão salvos
output_dir = 'arquivos_parquet_temp'

# Crie o diretório se ele não existir
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Defina o tamanho do chunk (número de linhas por chunk)
chunk_size = 100000  # Ajuste conforme necessário

# Leitura e processamento por chunks
for i, chunk in enumerate(pd.read_csv(csv_file, chunksize=chunk_size)):
    # Define o nome do arquivo Parquet para cada chunk
    chunk_parquet_file = os.path.join(output_dir, f'arquivo_parquet_parte_{i}.parquet')
    
    # Salva cada chunk como um arquivo Parquet separado no diretório especificado
    chunk.to_parquet(chunk_parquet_file, engine='pyarrow', index=False)
    
    print(f"Chunk {i} salvo como {chunk_parquet_file}")

print("Arquivo CSV foi dividido e convertido para Parquet com sucesso!")


Chunk 0 salvo como arquivos_parquet_temp\arquivo_parquet_parte_0.parquet
Chunk 1 salvo como arquivos_parquet_temp\arquivo_parquet_parte_1.parquet
Chunk 2 salvo como arquivos_parquet_temp\arquivo_parquet_parte_2.parquet
Chunk 3 salvo como arquivos_parquet_temp\arquivo_parquet_parte_3.parquet
Chunk 4 salvo como arquivos_parquet_temp\arquivo_parquet_parte_4.parquet
Chunk 5 salvo como arquivos_parquet_temp\arquivo_parquet_parte_5.parquet
Chunk 6 salvo como arquivos_parquet_temp\arquivo_parquet_parte_6.parquet
Chunk 7 salvo como arquivos_parquet_temp\arquivo_parquet_parte_7.parquet
Chunk 8 salvo como arquivos_parquet_temp\arquivo_parquet_parte_8.parquet
Chunk 9 salvo como arquivos_parquet_temp\arquivo_parquet_parte_9.parquet
Chunk 10 salvo como arquivos_parquet_temp\arquivo_parquet_parte_10.parquet
Chunk 11 salvo como arquivos_parquet_temp\arquivo_parquet_parte_11.parquet
Chunk 12 salvo como arquivos_parquet_temp\arquivo_parquet_parte_12.parquet
Chunk 13 salvo como arquivos_parquet_temp\arq

**04)** consolida arquivos intermediários parquet em um arquivo final:

In [2]:
# Diretório onde os arquivos Parquet temporários estão localizados
parquet_folder = output_dir  # O diretório onde os arquivos Parquet foram salvos

# Verifique se o diretório existe
if not os.path.exists(parquet_folder):
    raise Exception(f"O diretório {parquet_folder} não existe.")

# Lista de arquivos Parquet que você deseja combinar
parquet_files = [f for f in os.listdir(parquet_folder) if f.endswith('.parquet')]

# Verifique se há arquivos Parquet no diretório
if not parquet_files:
    raise Exception(f"Nenhum arquivo Parquet encontrado no diretório {parquet_folder}.")

# Lista para armazenar as tabelas PyArrow de cada arquivo Parquet
parquet_tables = []

# Ler cada arquivo Parquet e armazenar a tabela na lista
for file in parquet_files:
    file_path = os.path.join(parquet_folder, file)
    table = pq.read_table(file_path)
    parquet_tables.append(table)

# Combinar todas as tabelas em uma única tabela
combined_table = pa.concat_tables(parquet_tables)

# Definir o caminho do arquivo Parquet combinado
output_file = 'dataset.parquet'

# Salvar a tabela combinada em um único arquivo Parquet
pq.write_table(combined_table, output_file)

print(f"Todos os arquivos Parquet foram combinados e salvos como {output_file}")

# Apagar o diretório intermediário e seus arquivos
try:
    shutil.rmtree(parquet_folder)
    print(f"O diretório {parquet_folder} foi removido com sucesso.")
except Exception as e:
    print(f"Erro ao remover o diretório {parquet_folder}: {e}")

Todos os arquivos Parquet foram combinados e salvos como arquivo_combinado.parquet
O diretório arquivos_parquet_temp foi removido com sucesso.


### Versão - Programação Orientada a Objeto

In [6]:
# Instalação do tqdm para geração da barra de progresso simples (verbosidade de saída comprimida)
!pip install tqdm -q

In [2]:
# Instabal biblioteca necessárias
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
import os
import shutil
from tqdm import tqdm  # Biblioteca para a barra de progresso

In [3]:
# Define classe e atributos
class ParquetProcessor:
    def __init__(self, csv_file, output_parquet, temp_dir='arquivos_parquet_temp', chunk_size=100000):
        """
        Inicializa o processador Parquet.
        
        :param csv_file: Caminho do arquivo CSV de entrada.
        :param output_parquet: Caminho do arquivo Parquet final.
        :param temp_dir: Diretório para armazenar os arquivos Parquet temporários.
        :param chunk_size: Número de linhas por chunk ao ler o CSV.
        """
        self.csv_file = csv_file
        self.output_parquet = output_parquet
        self.temp_dir = temp_dir
        self.chunk_size = chunk_size

    def create_parquet_chunks(self):
        """Lê o arquivo CSV em chunks e salva os chunks como arquivos Parquet temporários."""
        # Criar o diretório temporário, se não existir
        if not os.path.exists(self.temp_dir):
            os.makedirs(self.temp_dir)

        # Verificar a quantidade total de linhas no arquivo CSV para mostrar o progresso
        total_rows = sum(1 for _ in open(self.csv_file)) - 1  # Menos o cabeçalho

        # Processar o CSV em chunks e salvar como arquivos Parquet
        with tqdm(total=total_rows, desc="Criando arquivos Parquet", unit=" linhas") as pbar:
            for i, chunk in enumerate(pd.read_csv(self.csv_file, chunksize=self.chunk_size)):
                chunk_parquet_file = os.path.join(self.temp_dir, f'arquivo_parquet_parte_{i}.parquet')
                
                # Salvar chunk como arquivo Parquet
                chunk.to_parquet(chunk_parquet_file, engine='pyarrow', index=False)

                # Atualizar a barra de progresso com o número de linhas processadas
                pbar.update(len(chunk))  # Atualiza a barra com o tamanho do chunk

    def combine_parquet_files(self):
        """Combina os arquivos Parquet temporários em um único arquivo Parquet final."""
        # Verificar se o diretório de arquivos temporários existe
        if not os.path.exists(self.temp_dir):
            raise Exception(f"O diretório {self.temp_dir} não existe.")

        # Listar os arquivos Parquet no diretório temporário
        parquet_files = [f for f in os.listdir(self.temp_dir) if f.endswith('.parquet')]

        # Verificar se há arquivos Parquet no diretório
        if not parquet_files:
            raise Exception(f"Nenhum arquivo Parquet encontrado no diretório {self.temp_dir}.")

        # Combinar todos os arquivos Parquet em uma única tabela
        parquet_tables = []
        for file in parquet_files:
            file_path = os.path.join(self.temp_dir, file)
            table = pq.read_table(file_path)
            parquet_tables.append(table)

        combined_table = pa.concat_tables(parquet_tables)

        # Salvar a tabela combinada em um único arquivo Parquet
        pq.write_table(combined_table, self.output_parquet)
        print(f"Arquivo Parquet combinado salvo como {self.output_parquet}")

    def clean_up(self):
        """Remove o diretório temporário após a consolidação dos arquivos."""
        try:
            shutil.rmtree(self.temp_dir)
            print(f"O diretório {self.temp_dir} foi removido com sucesso.")
        except Exception as e:
            print(f"Erro ao remover o diretório {self.temp_dir}: {e}")

    def process(self, remove_temp=True):
        """
        Executa o processamento completo: leitura do CSV, criação dos chunks Parquet e consolidação final.
        
        :param remove_temp: Define se o diretório temporário deve ser removido após a consolidação.
        """
        print("Iniciando a criação dos chunks Parquet...")
        self.create_parquet_chunks()

        print("Iniciando a consolidação dos arquivos Parquet...")
        self.combine_parquet_files()

        if remove_temp:
            print("Removendo o diretório temporário...")
            self.clean_up()

        print("Processamento completo!")

In [4]:
# Exemplo de uso:
if __name__ == '__main__':
    # Caminho do arquivo CSV de entrada
    csv_file = 'Rate\Rate.csv'

    # Caminho do arquivo Parquet final
    output_parquet = 'dataset.parquet'

    # Instanciar o processador Parquet
    processor = ParquetProcessor(csv_file, output_parquet, chunk_size=1000000)

    # Executar o processamento
    processor.process(remove_temp=True)  # Define se o diretório intermediário deve ser removido ou não

Iniciando a criação dos chunks Parquet...


Criando arquivos Parquet: 100%|██████████| 12694445/12694445 [00:45<00:00, 278449.78 linhas/s]


Iniciando a consolidação dos arquivos Parquet...
Arquivo Parquet combinado salvo como dataset.parquet
Removendo o diretório temporário...
O diretório arquivos_parquet_temp foi removido com sucesso.
Processamento completo!
