# PyArrow: Uma Solução Completa de Parquet

In [None]:
# instalar Parquet, Pandas e Pyarrow
%!pip install parquet
%!pip install pandas
%!pip install pyarrow

In [106]:
# importar os pacotes necessários
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import os

In [107]:
# Criando um DataFrame de exemplo com diversos tipos de dados
df = pd.DataFrame({
    'Nome': ['Maria', 'John', 'Tonico', 'Mariane'],
    'Idade': [25, 30, 35, np.nan],  # Incluindo um valor nulo
    'Salario': [50000.50, 60000.75, 70000.00, 80000.25],
    'Data_Admissao': pd.to_datetime(['2020-01-15', '2019-05-20', '2018-11-01', '2021-03-10']),
    'Descrição': ['Desenvolvedora Python', 'Analista de Dados', 'Cientista de Dados', 'Gerente de Projetos com acentuação çãõ']
})

In [108]:
# Converta o DataFrame em uma Arrow Table
df = pa.Table.from_pandas(df)

# Escrevendo o DataFrame para um arquivo Parquet usando pyarrow
try:
    pq.write_table(df, 'dataset/df_pyarrow.parquet')
    print("\nArquivo Parquet 'dataset/df_pyarrow.parquet' escrito com sucesso usando pyarrow!")
except Exception as e:
    print(f"Erro ao escrever o arquivo Parquet: {e}")


Arquivo Parquet 'dataset/df_pyarrow.parquet' escrito com sucesso usando pyarrow!


## Ler arquivo `Parquet` utilizando pyarrow
Pode utilizar o método `read_table`.

In [109]:
# Lendo o arquivo completo com pyarrow
try:
    # Leia o arquivo Parquet em uma Arrow Table
    df = pq.read_table('dataset/df_pyarrow.parquet')
    print("\nDataFrame lido completo com pyarrow:")
    print(df)

except FileNotFoundError:
    print("\nErro: Arquivo Parquet não encontrado. Certifique-se de que o arquivo 'dataset/df_pyarrow.parquet' ou 'dataset/sample_fastparquet.parquet' existe no diretório atual.")
except pd.errors.ParserError as pe:
    print(f"\nErro de parsing do Parquet: {pe}")
except Exception as e:
    print(f"\nOutro erro ao ler o arquivo Parquet: {e}")


DataFrame lido completo com pyarrow:
pyarrow.Table
Nome: string
Idade: double
Salario: double
Data_Admissao: timestamp[ns]
Descrição: string
----
Nome: [["Maria","John","Tonico","Mariane"]]
Idade: [[25,30,35,null]]
Salario: [[50000.5,60000.75,70000,80000.25]]
Data_Admissao: [[2020-01-15 00:00:00.000000000,2019-05-20 00:00:00.000000000,2018-11-01 00:00:00.000000000,2021-03-10 00:00:00.000000000]]
Descrição: [["Desenvolvedora Python","Analista de Dados","Cientista de Dados","Gerente de Projetos com acentuação çãõ"]]


## Ler apenas colunas especificas em uma tabela utilizando pyarrow
Pode utilizar o método `read_table`.

In [110]:
# Lendo apenas as colunas 'Nome' e 'Salário' com pyarrow
try:
    # Abre o arquivo Parquet
    df_colunas_selecionadas = pq.read_table('dataset/df_pyarrow.parquet', columns=['Nome', 'Salario'])
    print("\nTabela com colunas selecionadas (Nome e Salario) com pyarrow:")
    print(df_colunas_selecionadas)

except FileNotFoundError:
    print("\nErro: Arquivo Parquet não encontrado. Certifique-se de que o arquivo 'dataset/df_pyarrow.parquet' existe no diretório atual.")
except Exception as e:
    print(f"\nOutro erro ao ler o arquivo Parquet: {e}")


Tabela com colunas selecionadas (Nome e Salario) com pyarrow:
pyarrow.Table
Nome: string
Salario: double
----
Nome: [["Maria","John","Tonico","Mariane"]]
Salario: [[50000.5,60000.75,70000,80000.25]]


## Adicionar uma nova coluna e seus valores a uma tabela pyarrow

Pode utilizar o método `add_column`.

In [111]:
# Criar uma nova coluna com pyarrow
try:
    nova_coluna_nome = 'Status'
    nova_coluna = pa.array(['Ativo', 'Inativo', 'Ativo', 'Inativo'])

    # Verifica se a coluna já existe
    if nova_coluna_nome in df.column_names:
         print(f"\nA coluna '{nova_coluna_nome}' já existe na tabela. Não será adicionada.")
    else:
        # Adiciona a nova coluna à tabela
        df = df.add_column(len(df.column_names), pa.field(nova_coluna_nome, nova_coluna.type), nova_coluna)

        # Converte a tabela pyarrow para um DataFrame pandas para visualização
        df_final = df.to_pandas()

        print("\nTabela com a nova coluna:")
        print(df_final)

        # Escreve a tabela em um arquivo Parquet
        pq.write_table(df, 'dataset/output_com_nova_coluna.parquet')

        print("\nArquivo Parquet 'dataset/output_com_nova_coluna.parquet' escrito com sucesso!")

except Exception as e:
    print(f"\nErro ao adicionar a coluna e escrever o arquivo Parquet: {e}")


Tabela com a nova coluna:
      Nome  Idade   Salario Data_Admissao  \
0    Maria   25.0  50000.50    2020-01-15   
1     John   30.0  60000.75    2019-05-20   
2   Tonico   35.0  70000.00    2018-11-01   
3  Mariane    NaN  80000.25    2021-03-10   

                                Descrição   Status  
0                   Desenvolvedora Python    Ativo  
1                       Analista de Dados  Inativo  
2                      Cientista de Dados    Ativo  
3  Gerente de Projetos com acentuação çãõ  Inativo  

Arquivo Parquet 'dataset/output_com_nova_coluna.parquet' escrito com sucesso!


### Alterar um valor específico em uma coluna de uma tabela com pyarrow.

Embora o processo seja um pouco diferente de como faríamos em um pandas.DataFrame diretamente. A tabela pyarrow é imutável, então você precisará criar um novo array com a modificação e substituir a coluna inteira.

In [112]:
# Alterar valor de um indice da coluna com pyarrow
try:

    # Especificações da alteração
    coluna_para_alterar = 'Salario'
    indice_do_valor = 1  # Índice do valor a ser alterado (Bob)
    novo_valor = 65000

    # Verifica se a coluna existe
    if coluna_para_alterar not in df.column_names:
        print(f"\nErro: A coluna '{coluna_para_alterar}' não existe.")
    else:
        # Verifica se o índice está dentro do alcance
        coluna = df.column(coluna_para_alterar)
        if indice_do_valor < 0 or indice_do_valor >= len(coluna):
             print(f"\nErro: Índice '{indice_do_valor}' fora do alcance da coluna '{coluna_para_alterar}'.")
        else:
            # Cria um novo array (cópia) com o valor alterado
            novo_array = coluna.to_numpy().copy() # Usamos copy() aqui
            novo_array[indice_do_valor] = novo_valor
            novo_array = pa.array(novo_array)

            # Substitui a coluna original pela nova coluna
            df = df.set_column(df.column_names.index(coluna_para_alterar), pa.field(coluna_para_alterar, novo_array.type), novo_array)

            print(f"\nValor na coluna '{coluna_para_alterar}' no índice '{indice_do_valor}' alterado para '{novo_valor}' com sucesso.")

            # Converte a tabela para um DataFrame pandas para visualização
            df_final = df.to_pandas()
            print("\nTabela com o valor alterado:")
            print(df_final)


            # Escreve a tabela em um novo arquivo Parquet
            pq.write_table(df, 'dataset/output_valor_alterado.parquet')
            print("\nArquivo Parquet 'dataset/output_valor_alterado.parquet' escrito com sucesso!")


except Exception as e:
    print(f"\nErro ao alterar o valor e escrever o arquivo Parquet: {e}")


Valor na coluna 'Salario' no índice '1' alterado para '65000' com sucesso.

Tabela com o valor alterado:
      Nome  Idade   Salario Data_Admissao  \
0    Maria   25.0  50000.50    2020-01-15   
1     John   30.0  65000.00    2019-05-20   
2   Tonico   35.0  70000.00    2018-11-01   
3  Mariane    NaN  80000.25    2021-03-10   

                                Descrição   Status  
0                   Desenvolvedora Python    Ativo  
1                       Analista de Dados  Inativo  
2                      Cientista de Dados    Ativo  
3  Gerente de Projetos com acentuação çãõ  Inativo  

Arquivo Parquet 'dataset/output_valor_alterado.parquet' escrito com sucesso!


## Excluir uma coluna existente de uma tabela com pyarrow.

Pode usar o método` remove_column`

In [113]:
# Excluir uma coluna com pyarrow

try:
    # Nome da coluna a ser removida
    coluna_a_remover = 'Salario'

    # Verifica se a coluna existe antes de tentar remover
    if coluna_a_remover in df.column_names:
      # Remove a coluna da tabela
        df = df.remove_column(df.column_names.index(coluna_a_remover))
        print(f"\nA coluna '{coluna_a_remover}' foi removida com sucesso.")

      # Converte a tabela pyarrow para um DataFrame pandas para visualização
        df_final = df.to_pandas()

        print("\nTabela sem a coluna:")
        print(df_final)

        # Escreve a tabela em um novo arquivo Parquet
        pq.df(df, 'dataset/output_sem_coluna.parquet')

        print("\nArquivo Parquet 'dataset/output_sem_coluna.parquet' escrito com sucesso!")
    else:
        print(f"\nA coluna '{coluna_a_remover}' não existe na tabela. A remoção não será realizada.")
except Exception as e:
    print(f"\nErro ao remover a coluna e escrever o arquivo Parquet: {e}")


A coluna 'Salario' foi removida com sucesso.

Tabela sem a coluna:
      Nome  Idade Data_Admissao                               Descrição  \
0    Maria   25.0    2020-01-15                   Desenvolvedora Python   
1     John   30.0    2019-05-20                       Analista de Dados   
2   Tonico   35.0    2018-11-01                      Cientista de Dados   
3  Mariane    NaN    2021-03-10  Gerente de Projetos com acentuação çãõ   

    Status  
0    Ativo  
1  Inativo  
2    Ativo  
3  Inativo  

Erro ao remover a coluna e escrever o arquivo Parquet: module 'pyarrow.parquet' has no attribute 'df'


# Particionar arquivos Parquet

É uma prática comum em data lakes para otimizar consultas, permitindo que os mecanismos de processamento leiam apenas os arquivos relevantes para uma consulta específica.

In [120]:
# 1. Criando um DataFrame de exemplo com diversos tipos de dados
df = pd.DataFrame({
    'Nome': ['Maria', 'John', 'Tonico', 'Mariane'],
    'Idade': [25, 30, 35, np.nan],  # Incluindo um valor nulo
    'Salario': [50000.50, 60000.75, 70000.00, 80000.25],
    'Data_Admissao': pd.to_datetime(['2020-01-15', '2019-05-20', '2018-11-01', '2021-03-10']),
    'Descrição': ['Desenvolvedora Python', 'Analista de Dados', 'Cientista de Dados', 'Gerente de Projetos com acentuação çãõ'],
    'Status': ['Ativo', 'Inativo', 'Ativo', 'Ativo']
})

# 2. Escrevendo o DataFrame para um arquivo Parquet usando pyarrow
try:
    table_inicial = pa.Table.from_pandas(df)
    pq.write_table(table_inicial, 'dataset/df_pyarrow.parquet')
    print("\nArquivo Parquet 'dataset/df_pyarrow.parquet' escrito com sucesso usando pyarrow!")
except Exception as e:
    print(f"\nErro ao escrever o arquivo Parquet: {e}")



Arquivo Parquet 'dataset/df_pyarrow.parquet' escrito com sucesso usando pyarrow!


In [121]:
# 3. Lendo o arquivo Parquet para uma Arrow Table
try:
    table_lido = pq.read_table('dataset/df_pyarrow.parquet')
    df_lido = table_lido.to_pandas()
    print(f"\nArquivo Parquet 'dataset/df_pyarrow.parquet' lido com sucesso.")
except Exception as e:
    print(f"\nErro ao ler o arquivo Parquet: {e}")

# 4. Definindo o caminho base e coluna de particionamento
base_path = 'dataset/partitioned_data'
partition_cols = ['Status']

# 4.1. Cria o diretório de particionamento, se não existir
if not os.path.exists(base_path):
    os.makedirs(base_path)

# 5. Particionamento e escrita
try:
    # Convertendo o DataFrame para Arrow Table
    table_para_particionar = pa.Table.from_pandas(df_lido)

    pq.write_to_dataset(
        table_para_particionar,
        root_path=base_path,
        partitioning=pa.dataset.partitioning(pa.schema([("Status", pa.string())]), flavor="hive"),
        existing_data_behavior='overwrite_or_ignore'
    )
    print(f"\nDados particionados por {partition_cols} e salvos em '{base_path}' com sucesso.")
except Exception as e:
    print(f"\nErro ao particionar e escrever os arquivos Parquet: {e}")


Arquivo Parquet 'dataset/df_pyarrow.parquet' lido com sucesso.

Dados particionados por ['Status'] e salvos em 'dataset/partitioned_data' com sucesso.


In [122]:
# 6. Leitura com filtro utilizando pyarrow.dataset
try:
    # Lendo dados particionados diretamente como um dataset
    dataset = ds.dataset(base_path, format="parquet", partitioning="hive")

    # Aplicando filtro para 'Status = Ativo'
    filtered_table = dataset.to_table(filter=ds.field('Status') == 'Ativo')
    filtered_df = filtered_table.to_pandas()

    print("\nDados filtrados (Status = Ativo):")
    print(filtered_df)
except Exception as e:
    print(f"\nErro ao ler arquivos Parquet particionados: {e}")


Dados filtrados (Status = Ativo):
      Nome  Idade   Salario Data_Admissao  \
0    Maria   25.0  50000.50    2020-01-15   
1   Tonico   35.0  70000.00    2018-11-01   
2  Mariane    NaN  80000.25    2021-03-10   

                                Descrição Status  
0                   Desenvolvedora Python  Ativo  
1                      Cientista de Dados  Ativo  
2  Gerente de Projetos com acentuação çãõ  Ativo  


In [123]:
# 7. Limpeza (opcional)
if os.path.exists(base_path):
    for root, dirs, files in os.walk(base_path, topdown=False):
        for name in files:
            os.remove(os.path.join(root, name))
        for name in dirs:
            os.rmdir(os.path.join(root, name))
    os.rmdir(base_path)
    print(f"\nDiretório {base_path} removido")


Diretório dataset/partitioned_data removido


Particionar um arquivo Parquet é útil para lidar com grandes volumes de dados, melhorando a eficiência de consultas e a organização. O particionamento divide os dados em subdiretórios baseados nos valores de uma ou mais colunas (e.g., Status), permitindo:

- Consultas mais rápidas: Apenas as partições relevantes são lidas, reduzindo o tempo de leitura e uso de memória.
- Escalabilidade: Permite o processamento paralelo, especialmente em sistemas distribuídos como Spark.
- Organização: Facilita a navegação e o gerenciamento de dados, agrupando informações logicamente.

Esses benefícios tornam o particionamento ideal para análises em larga escala e sistemas que processam dados frequentemente.