<a href="https://colab.research.google.com/github/louisecastrof/Projects_and_Studies/blob/main/Apache_Beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Definições Apache Beam


## Apache Beam
É um modelo unificado e portátil (exemplo: construir pipelines para batch e streaming com o mesmo código basicamente) </br>
Utiliza SDK para se comunicar com as linguagens (Python, Java, Go, etc) </br>
Conversa com function API se comunica entre o runner e a engine e transcreve os componentes de execução necessários (PubSub, BigQuery, Kafka, GCS) </br>


## Pipeline
Pcollection - (similar ao RDD do spark) Coleção de dados ingeridos ou criados após uma transformação (leu ou escreveu na fonte: cria PCollection). São imutáveis. </br>
Para gerá-las usamos o PTransform, libs específicas de uma readtransform. São comandos que ajudam a transformar, deduplicar, contar dentro da pipeline. </br>
ParDo = "Parallel do": dá pra criar funções independentes. Você cria variáveis ou funções personalizadas que podem ser reutilizadas posteriormente </br>

## Data Fusion?
Data fusion é baseado no Dataflow, então as funcionalidades se preservam por um custo menor no Dataflow e maior no Datafusion devido à sua interface. </br>
O Dataflow criaa tabela se não existir, appenda dados, tem max file size, parâmetros adicionais do BQ, frequência para trigger, Schema específico, etc. Tudo que o Data Fusion tiver, só que com menor custo.</br>

# Instalação Apache Beam

In [None]:
# Instalando as libs - [interactive] para notebooks como o colab, se for para local apenas apache-beam
!pip install apache-beam[interactive]
import apache_beam as beam

# Pipeline

## ReadFromText </br>

O método ReadFromText do Apache Beam é uma função que permite ler dados de arquivos de texto e gerar um PCollection (coleção de dados) no pipeline do Apache Beam. Esse método é comumente usado na fase de entrada de dados do pipeline, em que dados são lidos a partir de uma fonte externa e transformados em um formato que possa ser processado pelo pipeline. </br>

O ReadFromText suporta diferentes tipos de arquivos de texto, como CSV, JSON, arquivos de texto simples e muitos outros. Ele pode ser configurado para lidar com diferentes tipos de codificações de texto e delimitadores de campo, permitindo a leitura de diferentes formatos de arquivo de texto.</br>

In [4]:
# Definir pipeline
p1 = beam.Pipeline()

# Atribuindo à pipeline voos pois poderá ser reutilizada
voos = (
p1
  # ler arquivo, excluir o header
  # pipes = um comando é usado com input do outro
  | "Importar Dados" >> beam.io.ReadFromText("voos_sample.csv", skip_header_lines = 1)
  | "Separar por Vírgulas" >> beam.Map(lambda record: record.split(','))
  | "Mostrar Resultados" >> beam.Map(print)
    
)

# Executar
p1.run()

['2019-04-27', '19805', '2', 'LAX', 'JFK', '944', '14', '1736', '-29', '269', '2475', '2']
['2019-04-27', '19805', '3', 'JFK', 'LAX', '1224', '-6', '1614', '39', '371', '2475', '3']
['2019-04-27', '19805', '4', 'LAX', 'JFK', '1240', '25', '2028', '-27', '264', '2475', '4']
['2019-04-27', '19805', '5', 'DFW', 'HNL', '1300', '-5', '1650', '15', '510', '3784', '5']
['2019-04-27', '19805', '6', 'OGG', 'DFW', '1901', '126', '640', '95', '385', '3711', '6']
['2019-04-27', '19805', '7', 'DFW', 'OGG', '1410', '125', '1743', '138', '497', '3711', '7']
['2019-04-27', '19805', '8', 'HNL', 'DFW', '1659', '4', '458', '-22', '398', '3784', '8']
['2019-04-27', '19805', '9', 'JFK', 'LAX', '648', '-7', '1029', '19', '365', '2475', '9']
['2019-04-27', '19805', '10', 'LAX', 'JFK', '2156', '21', '556', '1', '265', '2475', '10']
['2019-04-27', '19805', '12', 'LAX', 'JFK', '1113', '-2', '1910', '-40', '267', '2475', '11']
['2019-04-27', '19805', '14', 'OGG', 'LAX', '2235', '5', '618', '-17', '270', '2486', 

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f8b85219af0>

## Create
O método Create do Apache Beam é uma função que permite criar uma coleção de dados diretamente dentro do pipeline. Esse método é comumente usado na fase de entrada de dados do pipeline, em que dados são gerados internamente e transformados em um formato que possa ser processado pelo pipeline. </br>

O Create permite criar uma PCollection (coleção de dados) a partir de uma lista de elementos em Python. Esses elementos podem ser de qualquer tipo, como inteiros, strings, tuplas, dicionários e objetos personalizados. O Create é útil quando se deseja criar uma coleção de dados estática que possa ser usada como entrada para as transformações do pipeline. </br>

In [10]:
import apache_beam as beam

p1 = beam.Pipeline()

#Criando tuplas
p1 | "Tupla" >> beam.Create( [ ("Louise", 28),("Taiga", 8),("Luke", 8) ] ) | "print Tupla" >> beam.Map(print)

p1.run()

('Louise', 28)
('Taiga', 8)
('Luke', 8)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f8b84608f40>

In [13]:
# Redefinindo o pipeline para que os resultados não se misturem
p1 = beam.Pipeline()

# Criando listas
p1 | "Lista" >> beam.Create(["Green Day","Radiohead","Queens of the Stone Age","Rammstein","Carne Doce"]) | "print Lista" >> beam.Map(print)

p1.run()

Green Day
Radiohead
Queens of the Stone Age
Rammstein
Carne Doce


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f8b845ce760>

## ReadFromText e WriteToText
O ReadFromText e o WriteToText são dois métodos do Apache Beam que permitem ler e escrever dados em arquivos de texto, respectivamente. </br>

O método ReadFromText é usado para ler dados de arquivos de texto, como CSV, JSON, arquivos de texto simples e muitos outros. Ele pode ser configurado para lidar com diferentes tipos de codificações de texto e delimitadores de campo, permitindo a leitura de diferentes formatos de arquivo de texto. O ReadFromText retorna uma coleção de dados (PCollection) com as linhas do arquivo lido, que pode ser usado como entrada para as transformações do pipeline. </br>

Já o método WriteToText é usado para escrever uma PCollection em um arquivo de texto. Ele permite que você defina a localização do arquivo e outras opções, como a codificação de caracteres e o caractere de separação entre as linhas. Cada elemento da PCollection é escrito como uma linha separada no arquivo de texto. O WriteToText retorna um objeto vazio (PDone) que pode ser usado para controlar o fluxo do pipeline. </br>

In [14]:
p1 = beam.Pipeline()

voos = (
p1
#importando dados do CSV
  | "Importar Dados" >> beam.io.ReadFromText("voos_sample.csv", skip_header_lines = 1)
  | "Separar por Vírgulas" >> beam.Map(lambda record: record.split(','))
  | "Mostrar Resultados" >> beam.Map(print)
  # adicionar linha de gravar dados - exportando para arquivo txt
  | "Gravar Resultado" >> beam.io.WriteToText("voos.txt")
)

p1.run()

['2019-04-27', '19805', '2', 'LAX', 'JFK', '944', '14', '1736', '-29', '269', '2475', '2']
['2019-04-27', '19805', '3', 'JFK', 'LAX', '1224', '-6', '1614', '39', '371', '2475', '3']
['2019-04-27', '19805', '4', 'LAX', 'JFK', '1240', '25', '2028', '-27', '264', '2475', '4']
['2019-04-27', '19805', '5', 'DFW', 'HNL', '1300', '-5', '1650', '15', '510', '3784', '5']
['2019-04-27', '19805', '6', 'OGG', 'DFW', '1901', '126', '640', '95', '385', '3711', '6']
['2019-04-27', '19805', '7', 'DFW', 'OGG', '1410', '125', '1743', '138', '497', '3711', '7']
['2019-04-27', '19805', '8', 'HNL', 'DFW', '1659', '4', '458', '-22', '398', '3784', '8']
['2019-04-27', '19805', '9', 'JFK', 'LAX', '648', '-7', '1029', '19', '365', '2475', '9']
['2019-04-27', '19805', '10', 'LAX', 'JFK', '2156', '21', '556', '1', '265', '2475', '10']
['2019-04-27', '19805', '12', 'LAX', 'JFK', '1113', '-2', '1910', '-40', '267', '2475', '11']
['2019-04-27', '19805', '14', 'OGG', 'LAX', '2235', '5', '618', '-17', '270', '2486', 

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f8b7f8a98e0>

## Map
O método Map do Apache Beam é uma função que permite aplicar uma transformação em cada elemento de uma PCollection (coleção de dados), gerando exatamente um elemento de saída para cada elemento de entrada. </br>

O Map é útil quando se deseja transformar cada elemento de uma PCollection, como modificar ou extrair informações específicas de um elemento. Alguns exemplos de aplicação do Map são: extrair um campo de um registro, aplicar uma função matemática a cada elemento ou converter o tipo de dados de uma PCollection. </br>

In [15]:
import apache_beam as beam

p1 = beam.Pipeline()

voos = (
p1
  | "Importar Dados" >> beam.io.ReadFromText("voos_sample.csv", skip_header_lines = 1)
  | "Separar por Vírgulas" >> beam.Map(lambda record: record.split(','))
  | "Mostrar Resultados" >> beam.Map(print)
)

p1.run()


['2019-04-27', '19805', '2', 'LAX', 'JFK', '944', '14', '1736', '-29', '269', '2475', '2']
['2019-04-27', '19805', '3', 'JFK', 'LAX', '1224', '-6', '1614', '39', '371', '2475', '3']
['2019-04-27', '19805', '4', 'LAX', 'JFK', '1240', '25', '2028', '-27', '264', '2475', '4']
['2019-04-27', '19805', '5', 'DFW', 'HNL', '1300', '-5', '1650', '15', '510', '3784', '5']
['2019-04-27', '19805', '6', 'OGG', 'DFW', '1901', '126', '640', '95', '385', '3711', '6']
['2019-04-27', '19805', '7', 'DFW', 'OGG', '1410', '125', '1743', '138', '497', '3711', '7']
['2019-04-27', '19805', '8', 'HNL', 'DFW', '1659', '4', '458', '-22', '398', '3784', '8']
['2019-04-27', '19805', '9', 'JFK', 'LAX', '648', '-7', '1029', '19', '365', '2475', '9']
['2019-04-27', '19805', '10', 'LAX', 'JFK', '2156', '21', '556', '1', '265', '2475', '10']
['2019-04-27', '19805', '12', 'LAX', 'JFK', '1113', '-2', '1910', '-40', '267', '2475', '11']
['2019-04-27', '19805', '14', 'OGG', 'LAX', '2235', '5', '618', '-17', '270', '2486', 

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f8bb4e6f9d0>


## Flat Map
O método FlatMap do Apache Beam é uma função que permite aplicar uma transformação em cada elemento de uma PCollection (coleção de dados), gerando zero ou mais elementos de saída para cada elemento de entrada. A principal diferença entre o Map e o FlatMap é que o FlatMap permite gerar uma saída de zero a muitos elementos, enquanto o Map gera uma saída de exatamente um elemento para cada entrada. </br>

O FlatMap é útil quando se deseja que uma única entrada gere várias saídas ou quando se deseja filtrar algumas entradas e gerar saídas a partir de outras. Alguns exemplos de aplicação do FlatMap são: quebrar uma string em palavras, filtrar elementos de uma lista ou gerar várias saídas a partir de uma entrada, como duplicar uma entrada. </br>

In [17]:
import apache_beam as beam

p1 = beam.Pipeline()

Collection = (
    p1
    |beam.io.ReadFromText('poema.txt')
    |beam.FlatMap(lambda record: record.split(' '))
    # exportando resultado para txt
    |beam.io.WriteToText('resultado.txt')
)
p1.run()

# Resultado - em todos os espaços há uma quebra de linha

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f8b845aa970>

## Filter lamba

A transformação Filter do Apache Beam permite filtrar elementos de uma PCollection (coleção de dados), gerando uma nova PCollection contendo apenas os elementos que satisfazem uma determinada condição. Quando combinada com uma função lambda, a transformação Filter pode ser usada para filtrar elementos com base em uma expressão booleana especificada pela função lambda.

A função lambda é uma função anônima que pode ser definida em linha e passada como argumento para a transformação Filter. A função lambda é executada em cada elemento da PCollection, e deve retornar um valor booleano True ou False que indica se o elemento deve ser mantido ou descartado, respectivamente.


In [None]:
import apache_beam as beam

p1 = beam.Pipeline()

voos = (
p1
  | "Importar Dados" >> beam.io.ReadFromText("voos_sample.csv", skip_header_lines = 1)
  | "Separar por Vírgulas" >> beam.Map(lambda record: record.split(','))
  | "Pegar voos de Los Angeles" >> beam.Filter(lambda record: record[3] == "LAX")
  | "Mostrar Resultados" >> beam.Map(print)
)

p1.run()

['2019-04-27', '19805', '2', 'LAX', 'JFK', '944', '14', '1736', '-29', '269', '2475', '2']
['2019-04-27', '19805', '4', 'LAX', 'JFK', '1240', '25', '2028', '-27', '264', '2475', '4']
['2019-04-27', '19805', '10', 'LAX', 'JFK', '2156', '21', '556', '1', '265', '2475', '10']
['2019-04-27', '19805', '12', 'LAX', 'JFK', '1113', '-2', '1910', '-40', '267', '2475', '11']
['2019-04-27', '19805', '22', 'LAX', 'JFK', '1458', '-2', '2336', '11', '272', '2475', '20']


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fd0da09d970>

## Filter lista

Quando combinado com uma lista, a transformação Filter pode ser usada para filtrar elementos que não pertencem à lista.

Para usar a transformação Filter com uma lista, você pode definir uma função que verifica se o elemento está contido na lista ou não, e passar essa função como argumento para a transformação Filter. Essa função pode ser definida de várias maneiras, incluindo uma função definida pelo usuário ou usando uma função built-in do Python, como lambda.

In [22]:
import apache_beam as beam

palavras=['quatro','um']

def encontrarPalavras( i ):
 if i in palavras:
    return True

p1 = beam.Pipeline()

Collection = (
    p1
    |beam.io.ReadFromText('poema.txt')
    |beam.FlatMap(lambda record: record.split(' '))
    |beam.Filter(encontrarPalavras)
    |beam.Map(print)
)
p1.run()

quatro
quatro
um
quatro
quatro
um


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f8bade3ffa0>

##Flatten

O Flatten é uma transformação do Apache Beam que combina várias PCollections (coleções de dados) do mesmo tipo em uma única PCollection. Ele recebe como entrada uma lista de PCollections e produz uma única PCollection que contém todos os elementos de todas as PCollections de entrada.

In [5]:
import apache_beam as beam

p = beam.Pipeline()

negros = ('Adão','Jesus','Mike')
brancos = ('Tulio','Mary','Joca')
indios = ('Vic','Marta','Tom')

negros_pc = p | "Criando Pcollection negros" >> beam.Create(negros)
brancos_pc = p | "Criando Pcollection brancos" >> beam.Create(brancos)
indios_pc = p | "Criando Pcollection indios" >> beam.Create(indios)

pessoas = ((negros_pc,brancos_pc,indios_pc) | beam.Flatten()) | beam.Map(print)
p.run()

Adão
Jesus
Mike
Tulio
Mary
Joca
Vic
Marta
Tom


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f8b848fe7c0>

## Combine Per Key

A transformação CombinePerKey do Apache Beam combina os valores de uma PCollection que compartilham a mesma chave, usando uma função de agregação fornecida pelo usuário. Essa transformação é comumente usada para agregar dados em pipelines de processamento de dados distribuídos.

Para usar a transformação CombinePerKey, é necessário que a PCollection de entrada seja do tipo KV (key-value), onde cada elemento é uma tupla (chave, valor).

In [6]:
import apache_beam as beam

p1 = beam.Pipeline()

Tempo_Atrasos = (
p1
  | "Importar Dados" >> beam.io.ReadFromText("voos_sample.csv")
  | "Separar por Vírgulas" >> beam.Map(lambda record: record.split(','))
  | "Pegar voos de Los Angeles" >> beam.Filter(lambda record: int(record[8]) > 0 )
  | "Criar par" >> beam.Map(lambda record: (record[4],int(record[8])))
  | "Somar por key" >> beam.CombinePerKey(sum)
  | "Mostrar Resultados" >> beam.Map(print)
)

p1.run()

('LAX', 94)
('HNL', 15)
('DFW', 95)
('OGG', 138)
('JFK', 220)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f8b84986490>

## Combiners.Count.PerKey()

A transformação combiners.Count.PerKey() do Apache Beam conta o número de valores associados a cada chave em uma PCollection do tipo KV (key-value). Essa transformação é um exemplo de um transformador de combinação que combina os valores de entrada para cada chave usando a função de agregação Count.

Por exemplo, suponha que você tenha uma PCollection com dados de vendas de produtos, onde cada venda é identificada por um código de produto, e você deseja contar o número de vendas para cada produto. Para isso, você pode usar a transformação combiners.Count.PerKey()

In [7]:
import apache_beam as beam

p1 = beam.Pipeline()

Qtd_Atrasos = (
    p1
    | "Importar Dados" >> beam.io.ReadFromText("voos_sample.csv", skip_header_lines = 1)
    | "Separar por Vírgulas" >> beam.Map(lambda record: record.split(','))
    | "Pegar voos de Los Angeles" >> beam.Filter(lambda record: int(record[8]) > 0 )
    | "Criar par" >> beam.Map(lambda record: (record[4],int(record[8])))
    | "Contar por key" >> beam.combiners.Count.PerKey()
    | "Mostrar Resultados" >> beam.Map(print)
)

p1.run()

('LAX', 4)
('HNL', 1)
('DFW', 1)
('OGG', 1)
('JFK', 4)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f8b84951a60>

## CoGroupByKey

A transformação CoGroupByKey do Apache Beam é usada para combinar múltiplas PCollections do tipo KV (key-value) com as mesmas chaves, agrupando todos os valores associados a cada chave em uma tupla.

Por exemplo, suponha que você tenha duas PCollections com dados de vendas de produtos, uma contendo as quantidades vendidas de cada produto e outra contendo os preços unitários de cada produto. Você pode usar a transformação CoGroupByKey para combinar essas duas coleções com base no código do produto, criando uma tupla para cada código de produto que contém as quantidades vendidas e os preços unitários correspondentes.


In [8]:
import apache_beam as beam

p1 = beam.Pipeline()

Tempo_Atrasos = (
  p1
  | "Importar Dados Atraso" >> beam.io.ReadFromText("voos_sample.csv", skip_header_lines = 1)
  | "Separar por Vírgulas Atraso" >> beam.Map(lambda record: record.split(','))
  | "Pegar voos com atraso" >> beam.Filter(lambda record: int(record[8]) > 0 )
  | "Criar par atraso" >> beam.Map(lambda record: (record[4],int(record[8])))
  | "Somar por key" >> beam.CombinePerKey(sum)
#  | "Mostrar Resultados" >> beam.Map(print)
)

Qtd_Atrasos = (
  p1
  | "Importar Dados" >> beam.io.ReadFromText("voos_sample.csv", skip_header_lines = 1)
  | "Separar por Vírgulas" >> beam.Map(lambda record: record.split(','))
  | "Pegar voos com atraso qtd" >> beam.Filter(lambda record: int(record[8]) > 0 )
  | "Criar par qtd" >> beam.Map(lambda record: (record[4],int(record[8])))
  | "Contar por key" >> beam.combiners.Count.PerKey()
#  | "Mostrar Resultados QTD" >> beam.Map(print)
)

tabela_atrasos = (
    {'Qtd_Atrasos':Qtd_Atrasos,'Tempo_Atrasos':Tempo_Atrasos} 
    | "Group By" >> beam.CoGroupByKey()
    | beam.Map(print)
)

p1.run()

('LAX', {'Qtd_Atrasos': [4], 'Tempo_Atrasos': [92]})
('HNL', {'Qtd_Atrasos': [1], 'Tempo_Atrasos': [15]})
('DFW', {'Qtd_Atrasos': [1], 'Tempo_Atrasos': [95]})
('OGG', {'Qtd_Atrasos': [1], 'Tempo_Atrasos': [138]})
('JFK', {'Qtd_Atrasos': [4], 'Tempo_Atrasos': [220]})


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f8b845b6760>

## ParDo – Funções Customizadas

A transformação ParDo do Apache Beam é uma das transformações mais versáteis e amplamente utilizadas na plataforma. Ela permite a execução de funções customizadas em elementos de uma PCollection e a geração de novos elementos como saída.

O nome "ParDo" é uma abreviação de "parallel do", o que indica que a transformação pode executar a função de forma paralela em várias instâncias do worker em um ambiente distribuído.

Em um pipeline Beam típico, a transformação ParDo é frequentemente usada para transformar dados de entrada em um formato diferente ou para aplicar lógica de negócios personalizada.

In [24]:
import apache_beam as beam

p1 = beam.Pipeline()

class filtro(beam.DoFn):
  def process(self,record):
    if int(record[8]) > 0:
      return [record]

Tempo_Atrasos = (
  p1
  | "Importar Dados Atraso" >> beam.io.ReadFromText(r"voos_sample.csv", skip_header_lines = 1)
  | "Separar por Vírgulas Atraso" >> beam.Map(lambda record: record.split(','))
  | "Pegar voos com atraso" >> beam.ParDo(filtro())
  | "Criar par atraso" >> beam.Map(lambda record: (record[4],int(record[8])))
  | "Somar por key" >> beam.CombinePerKey(sum)
#  | "Mostrar Resultados" >> beam.Map(print)
)

Qtd_Atrasos = (
  p1
  | "Importar Dados" >> beam.io.ReadFromText(r"voos_sample.csv", skip_header_lines = 1)
  | "Separar por Vírgulas Qtd" >> beam.Map(lambda record: record.split(','))
  | "Pegar voos com Qtd" >> beam.ParDo(filtro())
  | "Criar par Qtd" >> beam.Map(lambda record: (record[4],int(record[8])))
  | "Contar por key" >> beam.combiners.Count.PerKey()
#  | "Mostrar Resultados QTD" >> beam.Map(print)
)

tabela_atrasos = (
    {'Qtd_Atrasos':Qtd_Atrasos,'Tempo_Atrasos':Tempo_Atrasos} 
    | "Group By" >> beam.CoGroupByKey()
    | beam.Map(print)
)

p1.run()

('LAX', {'Qtd_Atrasos': [4], 'Tempo_Atrasos': [92]})
('HNL', {'Qtd_Atrasos': [1], 'Tempo_Atrasos': [15]})
('DFW', {'Qtd_Atrasos': [1], 'Tempo_Atrasos': [95]})
('OGG', {'Qtd_Atrasos': [1], 'Tempo_Atrasos': [138]})
('JFK', {'Qtd_Atrasos': [4], 'Tempo_Atrasos': [220]})


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f8b7f5fb760>