## GCP Pub/Sub

O Pub/Sub é um serviço de mensagens assíncrono e escalável.
Nesse notebook iremos explorar os conceitos básicos de tópico, assinaturas, esquemas e retenção.

Os dados utilizados estão disponíveis no arquivo `aula-pdm-dados.zip`.

Referências: [documentação oficial](https://cloud.google.com/pubsub/docs/overview)

### Configurações

#### git clone

Caso esteja utilizando o dataproc, clone o repositório com os dados:

```bash
cd /home/dataproc
git clone https://github.com/robertogyn19/aula-pdm-pubsub.git
cd aula-pdm-pubsub
```

#### Descompactação dos dados

```bash
unzip aula-pdm-dados.zip
```

#### Versão do pubsub
Antes de iniciar, caso esteja utilizando o dataproc, verifique qual versão da lib python.
Isso pode ser feito através do terminal rodando o comando abaixo:

```bash
pip list | grep google-cloud-pubsub
google-cloud-pubsub               2.18.4
```

Caso a versão seja anterior a `2.27.0`, precisamos atualizá-la, faça isso rodando o comando abaixo:

```bash
pip install google-cloud-pubsub==2.27.1
```

### 1) Tópicos

O primeiro exemplo que vamos ver é o mesmo do [tutorial inicial](https://cloud.google.com/python/docs/reference/pubsub/2.27.1) do cliente Python do Pub/Sub, apenas com alguns comentários para facilitar a compreensão.

In [None]:
import os
from google.cloud import pubsub_v1
from google.api_core.exceptions import AlreadyExists
from google.pubsub_v1 import BigQueryConfig, DeadLetterPolicy, Topic, Subscription

Antes de iniciar, vamos configurar o ID e o número do projeto da GCP, essas informações serão utilizadas nos exemplos abaixo. Para obter essa informação, acesse o console da GCP e copie os valores como mostrado na imagem:

![project-id](imagens/img1-project-info.png)

In [None]:
project_id="ferrous-griffin-442710-j9"
project_number="625529740055"

In [None]:
# Criação do cliente de publicação, com ele é possível criar tópicos e publicar mensagens
publisher = pubsub_v1.PublisherClient()

# Os nomes dos tópicos seguem o formato projects/<nome-do-projeto>/topics/<nome-do-tópico>
# o código abaixo configura esse nome
topic_name = "projects/{project_id}/topics/{topic}".format(
    project_id=project_id,
    topic='aula-pdm-primeiro-topico',
)
topic_name

In [None]:
# Criação do tópico. Lembre-se, caso execute esse código duas vezes,
# a segunda vez lançará uma exceção dizendo que o tópico já existe
publisher.create_topic(name=topic_name)

In [None]:
# Publicação da nossa primeira mensagem
future = publisher.publish(topic_name, b"My first message!", versao="python3.11")

# A função que publica mensagens retorna um Future, pois é uma operação assíncrona.
# Usamos a função result() para aguardar a resposta desse future
future.result()

### 2) Assinaturas ou subscriptions

Agora que publicamos nossa primeira mensagem, precisamos criar uma assinatura no tópico para consumir a mensagem.

In [None]:
# Configuramos o nome da assinatura para o formato esperado assim como fizemos com o tópico
subscription_name = "projects/{project_id}/subscriptions/{sub}".format(
    project_id=project_id,
    sub='aula-pdm-minha-primeira-assinatura',
)
subscription_name

In [None]:
# A função abaixo será responsável por receber a mensagem 
def callback(message):
    print(f"data: {message.data.decode('utf-8')} | attributes: {message.attributes}")
    # Essa chamada do ack é como o Pub/Sub controla quais mensagens foram processadas
    message.ack()

In [None]:
# Criação do subscriber, como ele é possível criar assinaturas e "se inscrever" em tópicos
subscriber = pubsub_v1.SubscriberClient()

In [None]:
# Criação da assinatura. Lembre-se, caso execute esse código duas vezes,
# a segunda vez lançará uma exceção dizendo que a assinatura já existe
subscriber.create_subscription(
    name=subscription_name, topic=topic_name
)

In [None]:
# A chamada abaixo registra a assinatura com a função de callback
# Dessa forma, toda mensagem que chegar no tópico da assinatura, executará o código cadastrado
future = subscriber.subscribe(subscription_name, callback)

In [None]:
# Assim como na publicação, a função result é para aguardar algo
# Por quanto tempo devemos esperar por novas mensagens?
try:
    future.result()
except KeyboardInterrupt:
    future.cancel()

### 3) Assinatura do BigQuery

Agora que já rodamos alguns códigos simples, vamos ver como podemos criar uma assinatura para ler os dados de um tópico de forma automática e inserir em uma tabela do BigQuery.

Essa seção foi baseada na [documentação oficial](https://cloud.google.com/pubsub/docs/create-bigquery-subscription?hl=pt-br).

Antes de criar a assinatura e inserir os dados, precisamos preparar o BigQuery.
Vamos realizar três procedimentos:
1. Criar um dataset no BigQuery, o dataset é onde as tabelas ficam agrupadas, como o esquema em bancos relacionais.
2. Criar a tabela no dataset do BigQuery.
3. Ajustar a permissão do Pub/Sub para conseguir enviar os dados.

Para criação do dataset e da tabela, vamos executar o comando a seguir no [BigQuery Studio](https://console.cloud.google.com/bigquery).

Clique no link `Consulta SQL` e cole o código abaixo. Veja na imagem onde está localizado o link.

![bigquery-studio](imagens/img1-bq-studio.png)

```sql
-- Criação do dataset
CREATE SCHEMA aula_pdm;

-- Criação da tabela
CREATE TABLE aula_pdm.clientes
(
    city       STRING,
    client_id  STRING,
    cnae_id    STRING,
    cod_city   INTEGER,
    cod_tract  STRING,
    cod_uf     INTEGER,
    state      STRING,
    client     STRING,
    company_id STRING
);
```

Ao inserir o código acima, clique no botão `Executar`, ao finalizar, irá aparecer o status de cada comando executado. Abaixo tem um exemplo de saída.

![bigquery-output](imagens/img1-bq-execute-query.png)

O último procedimento para habilitar que o Pub/Sub acesse o BigQuery é a gestão de acesso.
Vamos precisar acessar a página de [IAM](https://console.cloud.google.com/iam-admin/iam) e depois clicar em `Conceder acesso` ou `Grant Access` (caso esteja em inglês).

![iam](imagens/img2-iam.png)

No campo `Novos principais / New principals` devemos incluir o nome da Service Account, ela segue esse padrão `service-<project-number>@gcp-sa-pubsub.iam.gserviceaccount.com` onde project-number é o número do projeto da GCP que obtivemos no começo do notebook.



Portanto o valor que devo inserir no campo é `service-625529740055@gcp-sa-pubsub.iam.gserviceaccount.com` e no campo Assign roles, temos que preencher com `Editor de dados BigQuery / BigQuery Data Editor`, adicionar outra role clicando no botão `Adicionar outro papel / Add Another Role` e depois selecionando `Assinante do Pub/Sub / Pub/Sub Subscriber`, após preencher esses valores, clique em save.

![grant-access](imagens/img2-grant-access-pt.png)

Agora já temos tudo pronto para a criação do tópico e da assinatura do BigQuery.

In [None]:
# Primeiro vamos criar o código que irá receber os dados de clientes.
# O código é praticamente igual ao anterior, exceto que agora estamos configurando um período de retenção para as mensagens.

topico_clientes = "projects/{project_id}/topics/{topic}".format(
    project_id=project_id,
    topic='aula-pdm-clientes',
)
topico_clientes

In [None]:
publisher = pubsub_v1.PublisherClient()

In [None]:
topico_obj = Topic({
    "name": topico_clientes,
    "message_retention_duration": "259200s" # 3 dias
})
try:
    publisher.create_topic(request=topico_obj)
except AlreadyExists:
    print(f"O tópico '{topico_clientes}' já existe")

#### 3.1) Criação de assinaturas pela interface gráfica

Antes de criar a assinatura através da interface gráfica, vamos criar um tópico para DLQ para ser utilizado na assinatura.

In [None]:
# Configuração do nome do tópico para DLQ
topico_dlq = "projects/{project_id}/topics/{topic}".format(
    project_id=project_id,
    topic='aula-pdm-clientes-bq-dlq',
)
topico_dlq
try:
    # Criação do tópico de DLQ
    publisher.create_topic(name=topico_dlq)
except AlreadyExists:
    print(f"O tópico '{topico_dlq}' já existe")

![new-subscription](imagens/img3-new-sub.png)

![create-subscription](imagens/img3-sub-bq-pt.png)

![create-subscription-2](imagens/img4-sub-bq-schema-pt.png)

![create-subscription-3](imagens/img5-sub-bq-dlq-pt.png)

Após a criação, deverá aparecer a tela da assinatura como essa imagem a seguir.
Clique no botão `Conceder o papel de editor` ou `Grant Editor Role` para dar permissão para a conta de serviço do Pub/Sub.

![subscription-created](imagens/img5-sub-dlq-access-pt.png)

#### 3.2) Criação de assinaturas através de código python

A seguir temos o código para criar uma assinatura do BigQuery assim como fizemos pela interface gráfica.

In [None]:
# Configuração do nome da assinatura
assinatura_clientes_bq = "projects/{project_id}/subscriptions/{sub}".format(
    project_id=project_id,
    sub="aula-pdm-clientes-bq-python",
)
assinatura_clientes_bq

In [None]:
# Configuração do nome do tópico para DLQ
topico_dlq = "projects/{project_id}/topics/{topic}".format(
    project_id=project_id,
    topic='aula-pdm-clientes-bq-dlq',
)
topico_dlq

In [None]:
try:
    # Criação do tópico de DLQ
    publisher.create_topic(name=topico_dlq)
except AlreadyExists:
    print(f"O tópico '{topico_dlq}' já existe")

In [None]:
# Configuração da assinatura
clientes_assinatura_bq = Subscription({
    "name": assinatura_clientes_bq,
    "topic": topico_clientes,
    "bigquery_config": BigQueryConfig({
        "table": f"{project_id}.aula_pdm.clientes",   # nome da tabela no BigQuery
        "use_table_schema": True,                     # flag indicando para utilizar o esquema da tabela
        "drop_unknown_fields": True                   # flag indicando para descartar os campos desconhecidos
    }),
    "dead_letter_policy": DeadLetterPolicy({
        "dead_letter_topic": topico_dlq
    })
})

In [None]:
try:
    subscriber.create_subscription(
        request=clientes_assinatura_bq
    )
except AlreadyExists:
    printf(f"A assinatura {assinatura_clientes_bq} já existe")

### Exercício prático: Como seria uma assinatura para utilizar o esquema do tópico?

O Pub/Sub tem uma funcionalidade para criar esquemas e depois os atribuir a um ou mais tópicos.
Podemos definir os esquemas em dois formatos, [Avro Schema](https://avro.apache.org/docs/1.11.1/specification/) ou Protobuf.
Abaixo temos o esquema referente aos dados de clientes:

```json
{
 "type" : "record",
 "name" : "Avro",
 "fields" : [
    {
      "name": "city",
      "type": "string"
    },
    {
      "name": "client_id",
      "type": "string"
    },
    {
      "name": "cnae_id",
      "type": "string"
    },
    {
      "name": "cod_city",
      "type": "int"
    },
    {
      "name": "cod_tract",
      "type": "string"
    },
    {
      "name": "cod_uf",
      "type": "int"
    },
    {
      "name": "state",
      "type": "string"
    },
    {
      "name": "client",
      "type": "string"
    },
    {
      "name": "company_id",
      "type": "string"
    }
  ]
}
```

Tente criar o esquema acima e depois crie uma assinatura que envia os dados para o BigQuery utilizando o esquema do tópico.

Execute a consulta abaixo para criar uma tabela no BigQuery.
```sql
CREATE TABLE aula_pdm.clientes_raw(
    data JSON
);
```

### 4) Envio de mensagens para o tópico

In [None]:
import csv
import json
from concurrent import futures

from google.cloud.pubsub_v1.futures import Future

In [None]:
def csv_to_list_of_dict(csv_path: str) -> list[dict]:
    csvfile = open(csv_path, "r")
    reader = csv.DictReader(csvfile)
    data = [row for row in reader]
    csvfile.close()
    return data

In [None]:
dados_dir = "/home/dataproc/aula-pdm-pubsub"
clientes_csv = f"{dados_dir}/clients.csv"
vendas_csv = f"{dados_dir}/vendas.csv"

In [None]:
clientes_dados = csv_to_list_of_dict(clientes_csv)

In [None]:
clientes_dados[0:3]

In [None]:
vendas_dados = csv_to_list_of_dict(vendas_csv)

In [None]:
vendas_dados[0]

In [None]:
len(vendas_dados)

In [None]:
def publica_mensagens(data: list[dict], topico: str):
    total = len(data)
    finished = []

    def done_callback(future: Future):
        f = len(finished) + 1
        perc = f / total * 100
        if f % 100 == 0:
            print(f"Publicação finalizada [{f} / {total} ({perc:.2f}%)]")
        finished.append(True)
    
    publish_futures = []
    for idx, row in enumerate(data):
        idx = idx + 1
        row_as_json = json.dumps(row).encode("utf-8")
        future = publisher.publish(topico, row_as_json)
        future.add_done_callback(done_callback)
        publish_futures.append(future)
    
        perc = idx / total * 100
        if idx % 100 == 0:
            print(f"Publicando {idx}/{total} ({perc:.2f}%)")

    futures.wait(publish_futures, timeout=60, return_when=futures.ALL_COMPLETED)

In [None]:
publica_mensagens(clientes_dados[0:1000], topico_clientes)

In [None]:
clientes_dados[0]

### Exercício prático: Inserção dos dados de vendas

Como seria feita a inserção dos dados de vendas? Lembre-se de que é necessário criar a tabela no BigQuery.
Você pode explorar os dados como fizemos logo acima para criar o esquema da tabela.

### 5) Assinatura do Google Cloud Storage (GCS)

Existe um tipo de assinatura que envia os dados para o GCS em formato Avro. Essa funcionalidade pode ser interessante quando você tem um grande volume de dados ou quer economizar ou não precisa acessar os dados pelo BigQuery.

Essa seção foi baseada na [documentação oficial](https://cloud.google.com/pubsub/docs/create-cloudstorage-subscription?hl=pt-br).

#### 5.1) Criação do bucket no GCS

Para criar uma assinatura do GCS é necessário criar um bucket e ajustar as permissões do mesmo.
Vamos criar um bucket pela [interface da GCP](https://console.cloud.google.com/storage/browser).

![create-bucket](imagens/img6-create-bucket.png)

#### 5.2) Permissões do Pub/Sub para o GCS

Para alterar a permissão, vamos conceder o papel `Storage Admin` a Service Account do Pub/Sub assim como fizemos na seção anterior do BigQuery.
Acesse a página de [IAM](https://console.cloud.google.com/iam-admin/iam) e clique no botão `Conceder acesso / Grant Access`.

![grant-access-storage](imagens/img7-grant-access.png)

In [None]:
topico_vendas = "projects/{project_id}/topics/{topic}".format(
    project_id=project_id,
    topic='aula-pdm-vendas',
)
topico_vendas

In [None]:
publisher = pubsub_v1.PublisherClient()
try:
    publisher.create_topic(name=topico_vendas)
except AlreadyExists:
    print(f"O tópico '{topico_vendas}' já existe")

#### 5.3) Criação da assinatura pela interface da GCP

Agora vamos criar a assinatura no Pub/Sub através da interface gráfica.

![create-subscription-gcs](imagens/img8-sub-gcs.png)

![create-subscription-gcs-2](imagens/img9-sub-gcs-file.png)

### 6) Envio dos dados de vendas

In [None]:
publica_mensagens(vendas_dados[0:10000], topico_vendas)

### 7) Dados de vendas no BigQuery

Nós podemos ler os dados de vendas no BigQuery de duas formas, com uma tabela nativa ou tabela externa.
Como o formato Avro já tem o esquema dos dados, a criação da tabela é bem simples.

Você pode abrir a tela de criação de tabela a partir das opções do dataset, selecione os 3 pontos e depois `Criar tabela / Create Table`.

![create-table](imagens/img10-bq-create-table.png)

![create-table2](imagens/img11-bq-create-table-info.png)

### 8) Leitura de dados no BigQuery

Os dados do Pub/Sub para o GCS não levam um esquema de dados, portanto os dados de vendas ficam em uma coluna do tipo `bytes`.
O conteúdo dessa coluna é na verdade uma string com encode base64, isso nos ajuda muito durante a conversão.

```sql
SELECT * FROM `aula_pdm.vendas` LIMIT 10;
-- A consulta acima vai retornar os dados mais ou menos assim
```

![query-result](imagens/img12-bq-query-result.png)

Você pode copiar o conteúdo da coluna data e usar algum site de decode de dados em base64 para confirmar de que o conteúdo é o json dos dados de vendas.
Abaixo tem um exemplo de conversão usando o terminal.
```bash
echo -n "eyJjbGllbnRfaWQiOiAiYzMwNzUiLCAiaXRlbXNfY291bnQiOiAiMSIsICJsaXN0X3ByaWNlIjogIjAuMCIsICJvcmRlcl9kYXRlIjogIjIwMjAtMTEtMjUiLCAib3JkZXJfaWQiOiAiIiwgInByb2R1Y3RfaWQiOiAicDIwNTQiLCAic2FsZV9wcmljZSI6ICI0OC4yNyIsICJzYWxlc21hbl9pZCI6ICJzODEiLCAic3VwcGxpZXJfaWQiOiAic3U3MCIsICJjb21wYW55X2lkIjogIjA2MDMiLCAicHJvZHVjdCI6ICJQcm9kdWN0IHAyMDU0IiwgInNhbGVzbWFuIjogIlNhbGVzbWFuIHM4MSIsICJzdXBwbGllciI6ICJTdXBwbGllciBzdTcwIiwgImNsaWVudCI6ICJDbGllbnQgYzMwNzUifQ==" | base64 --decode -
{"client_id": "c3075", "items_count": "1", "list_price": "0.0", "order_date": "2020-11-25", "order_id": "", "product_id": "p2054", "sale_price": "48.27", "salesman_id": "s81", "supplier_id": "su70", "company_id": "0603", "product": "Product p2054", "salesman": "Salesman s81", "supplier": "Supplier su70", "client": "Client c3075"}
```

O BigQuery consegue fazer a conversão desse dado em praticamente toda função de manipulação de string/bytes.
No exemplo abaixo, vamos fazer o parser da coluna para o formato JSON e vamos extrair algumas outras colunas.

```sql
WITH data_with_json AS (
  SELECT PARSE_JSON(CAST(data AS STRING)) data_json
  FROM `aula_pdm.vendas`
),
data_fmt AS (
  SELECT
    JSON_EXTRACT_SCALAR(data_json, '$.client')                       cliente
  , CAST(JSON_EXTRACT_SCALAR(data_json, '$.items_count') AS NUMERIC) qtd_itens
  , CAST(JSON_EXTRACT_SCALAR(data_json, '$.order_date') AS DATE)     data_pedido
  , CAST(JSON_EXTRACT_SCALAR(data_json, '$.list_price') AS FLOAT64)  preco_tabela
  , CAST(JSON_EXTRACT_SCALAR(data_json, '$.sale_price') AS FLOAT64)  preco_venda
  FROM data_with_json
)
SELECT *
FROM data_fmt
WHERE preco_venda > 300
```

O resultado da consulta acima vai ser parecido com esse abaixo.

![bq-query-result](imagens/img13-bq-query-result2.png)