# Tutorial - View Streaming

Este material auxilia o passo-a-passo na criação do ambiente necessário para simular a auto-atualização das Views Materializadas. Siga os passos a seguir para simular o exemplo da Video-Aula de Materialized Views.

1. **Crie um projeto** em seu GCP;
2. **Cria uma conta de serviço** para acesso ao BigQuery [https://console.cloud.google.com/iam-admin/serviceaccounts];
3. **Exporte o arquivo Json da chave** para o mesmo diretório que se encontra este arquivo;
4. Execute os **passo de 1 a 6** deste tutorial;
5. Abra o arquivo `scripts-carga-bq.sql`
6. Substistua no arquivo `scripts-carga-bq.sql` o texto `nome-projeto` pelo nome único do seu projeto criado no passo 1;
7. Execute o SQL do arquivo `scripts-carga-bq.sql` para gerar dados aleatórios nas tabelas `AIRLINE_FLIGHTS` e `COMPANY_SALES`


   
_Rubens Mussi Cury_     
🥇Google Cloud Certified   
rubensmussicury@gmail.com   

## 1. Bibliotecas
* Conta de Serviço - https://cloud.google.com/docs/authentication/production
* BigQuery - https://cloud.google.com/bigquery/docs/how-to e https://google-cloud-python.readthedocs.io/en/0.32.0/bigquery/usage.html
* Exceções - https://google-cloud-python.readthedocs.io/en/0.32.0/core/exceptions.html

In [None]:
import os
import json
import random
import datetime
from google.oauth2 import service_account
from google.cloud import bigquery
from google.api_core.exceptions import BadRequest, Conflict, AlreadyExists

## 2. Funções
- Gera data aleatória para auxiliar na ingestão de dados
- Cria conjunto de dados do tamanho e tipo solicitado 

In [None]:
"""
Cria tabelas de acordo com a relação.
tables       Array no formato [{"table_id": "", "schema": []}]
  
return       Mensagem informando o resultado da criação da tabela.
"""
def create_tables(tables):    
    
    # Percorre todas as tabelas definidas para criação.
    for table in tables:        
        
        # Captura o a identificação da tabela e o schema.
        table_id = table["table_id"]
        schema = table["schema"]
        
        # Instancia o objeto da tabela.
        table = bigquery.Table(table_id, schema=schema)

        try:
            # Executa o job de criação do Dataset.
            executed_job = bq_client.create_table(table)
            print("Tabela criada - {0}".format(table_id))

        # Conflito na criação do Dataset.
        except Conflict as e:
            print("Erro ao criar tabela - {0}".format(table_id))
            print("/n".join(e.args))

        # Exceção genérica.
        except Exception as e:
            print("Erro ao criar tabela - {0}".format(table_id))
            print("/n".join(e.args))


"""
Gera uma data aleatória dentro de um intervalo de datas.
start_date   Data mínima do intervalo.
end_date     Data máxima do intervalo.

return       Uma data aleatória entre [start_date] e [end_date]
"""
def random_date(start_date, end_date):
    time_between_dates = end_date - start_date
    days_between_dates = time_between_dates.days
    random_number_of_days = random.randrange(days_between_dates)
    return start_date + datetime.timedelta(days=random_number_of_days)

"""
Cria conjunto de dados do tamanho e tipo solicitado.
data_type    [AIRLINE_FLIGHTS] ou [COMPANY_SALES]
batch_size   Tamanho do conjunto de dados

return       Dicionário com os dados correspondentes ao data_type.
"""
def generate_random_data(data_type, batch_size):
    batch_data = []
    
    if "AIRLINE_FLIGHTS" in data_type:
        
        for row in range(batch_size):

            year = random.randint(2010, 2019)
            month = random.randint(1, 12)
            hour = random.randint(1, 24)
            airline = ["Southwest Airlines Co.: WN", "American Airlines Inc.: AA", "Delta Air Lines Inc.: DL", "SkyWest Airlines Inc.: OO", "United Air Lines Inc.: UA", "American Eagle Airlines Inc.: MQ", "US Airways Inc.: US", "Northwest Airlines Inc.: NW", "ExpressJet Airlines Inc.: XE", "ExpressJet Airlines Inc.: EV"]
            flights = int(random.random() * 100)

            batch_data.append({"YEAR": year,
                               "MONTH": month,
                               "HOUR": hour,
                               "AIRLINE": airline[random.randint(0, len(airline) - 1)],
                               "FLIGHTS": flights})
    if "COMPANY_SALES" in data_type:
        
        for row in range(batch_size):

            date = random_date(datetime.date(2010, 1, 1), datetime.date.today()).isoformat()
            company = ["Progressive Sports", "Country Parts Shop", "Bikes and Motorbikes", "Budget Toy Store", "Advanced Bike Components", "Bicycle Warehouse Inc."]
            sales = round(random.random() * 1000, 2)

            batch_data.append({"DATE": date,
                               "COMPANY": company[random.randint(0, len(company) - 1)],
                               "SALES": sales})            

    return batch_data

## 3. Autenticação
* https://cloud.google.com/iam/docs/creating-managing-service-account-keys?hl=pt-br
* Via `os.environ["GOOGLE_APPLICATION_CREDENTIALS"]`
* Via service_account.Credentials()

In [None]:
# Caminho completo da chave de serviço
gcp_credential_file = "sua-chave-aqui.json"

# Autenticação na conta de serviço.
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = gcp_credential_file

# Utilize esta forma de credencial para autenticar diretamente pelo SDK.
# credentials = service_account.Credentials.from_service_account_file(gcp_credential_file)
# client = bigquery.Client(credentials=credentials)

## 4. Ambiente
- Identificação do Projeto

In [None]:
# Instancia o Client do BQ.
bq_client = bigquery.Client()

# Obtém nome único global do projeto vinculado a conta de serviço.
project_id = bq_client.project

## 5. Datasets
- Crição do Datasets - https://cloud.google.com/bigquery/docs/datasets#python

Criaremos Datasets `primary_dataset` e `secondary_dataset`

In [None]:
# Define os nomes dos datasets a serem criados no projeto.
dataset_ids = ["{0}.{1}".format(project_id, "primary_dataset"), "{0}.{1}".format(project_id, "secondary_dataset")]

# Cria cada um dos Datasets definidos.
for dataset_id in dataset_ids:
    
    # Instancia o objeto do Dataset.
    dataset = bigquery.Dataset(dataset_id)

    # Define a localização do Dataset.
    dataset.location = "US"

    try:
        # Executa o job de criação do Dataset.
        executed_job = bq_client.create_dataset(dataset)
        print("Dataset criado - {0}.{1}".format(project_id, dataset_id))
    
    # Conflito na criação do Dataset.
    except Conflict as e:
        print("Erro ao criar dataset - {0}.{1}".format(project_id, dataset_id))
        print("/n".join(e.args))
    
    # Exceção genérica.
    except Exception as e:
        print("Erro ao criar dataset - {0}.{1}".format(project_id, dataset_id))
        print("/n".join(e.args))

## 6. Tabelas
- Criação das Tabelas - https://cloud.google.com/bigquery/docs/tables#python

Criaremos as tabelas `primary_dataset.COMPANY_SALES` e `secondary_dataset.AIRLINE_FLIGHTS`

In [None]:
# Define as especificações das tabelas [AIRLINE_FLIGHTS] e [COMPANY_SALES]
tables = [{"table_id": "{0}.{1}".format(dataset_ids[0], "AIRLINE_FLIGHTS"),
           "schema": [
               bigquery.SchemaField("YEAR", "INTEGER", mode="REQUIRED"),
               bigquery.SchemaField("MONTH", "INTEGER", mode="REQUIRED"),
               bigquery.SchemaField("HOUR", "INTEGER", mode="REQUIRED"),
               bigquery.SchemaField("AIRLINE", "STRING", mode="REQUIRED"),
               bigquery.SchemaField("FLIGHTS", "INTEGER", mode="REQUIRED")
           ]},
          {"table_id": "{0}.{1}".format(dataset_ids[1], "COMPANY_SALES"),
           "schema": [
               bigquery.SchemaField("DATE", "DATE", mode="REQUIRED"),
               bigquery.SchemaField("COMPANY", "STRING", mode="REQUIRED"),
               bigquery.SchemaField("SALES", "FLOAT", mode="REQUIRED")
           ]},          
         ]

# Cria as tabelas de acordo com as especificações.
create_tables(tables)

## 7. Streaming
- Incrementa batchs de 800 registros nas tabelas `AIRLINE_FLIGHTS` e `COMPANY_SALES`
- Utilize este passo para simular a atualização das Views Materializadas

In [None]:
# Total de batchs a ser carregados nas tabelas.
total_batchs = 50000
batch_size = 800

# Processo de ingestão.
print("Iniciado processo de streaming...")
print("Tam. da Carga   - {}".format(batch_size))
print("Total de Linhas - {}".format(total_batchs * batch_size))

for batch in range(total_batchs):
    
    for table in tables:
        
        # Caputura o Id da tabela.
        table_id = table["table_id"] 
        
        # Gera 800 registros aleatórios.
        batch_data = generate_random_data(table_id, batch_size)
        
        # Obtém o objeto da tabela que receberá os dados.
        table_object = bq_client.get_table(table_id)
        
        # Insere os registros no formato JSON.
        executed_job = bq_client.insert_rows_json(table_object, batch_data)
        
        if executed_job == []:
            # Atualiza percentual de ingestão.
            percentage_loaded = int((batch * batch_size) / (total_batchs * batch_size) * 100)
            print("{0} % concluído - {1} já linhas inseridas na tabela {2}".format(percentage_loaded, 
                                                                                  batch * batch_size, 
                                                                                  table_id), end="\r")