<a href="https://colab.research.google.com/github/MarianaDuartee/ProjetoFinal/blob/main/4_pubSub_final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### INSTALANDO DEPENDENCIAS

In [9]:
!pip install google-cloud-pubsub
!pip install fsspec
!pip install gcsfs
!pip install apache-beam[interactive]
!pip install apache_beam[gcp]
!pip install google-cloud-bigquery

### IMPORTANDO BIBLIOTECAS

In [1]:
import csv
import time
import os
import json

import fsspec
import gcsfs
import pandas as pd

import apache_beam as beam
from apache_beam import window
from apache_beam import coders
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions

from google.cloud import pubsub_v1
from google.cloud import storage
from google.cloud import bigquery
from google.colab import drive

drive.mount('/content/drive')

# Configurando conta de serviço
service_account_key = r"/content/soulcode-projeto-final-4b88bea6e07a.json"
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_key

Mounted at /content/drive


In [None]:
from google.colab import drive
drive.mount('/content/drive')

### PUB AND SUB (PUBLICANDO E CONSUMINDO DADOS)

Produtor

In [None]:
# Setando o tópico de entrada (ingestão)
topico = 'projects/soulcode-projeto-final/topics/ingestor_dados'
publisher = pubsub_v1.PublisherClient()

entrada = r"/content/2_temp_temp_pandas_total_pop_ano_uf.csv"

# Visualizando entrada dos dados
with open(entrada, 'rb') as file:
    for row in file:
        print('Publicando no topico: ', topico)
        publisher.publish(topico,row)
        time.sleep(2)

Consumidor

In [None]:
# Função de ACK (Aceite)
def monstrar_msg(mensagem):
  print(('Mensagem: {}'.format(mensagem)))
  mensagem.ack()

# Setando a subscrição de saída
subscription = 'projects/soulcode-projeto-final/subscriptions/consumidor_dados_violencia'
subscriber = pubsub_v1.SubscriberClient()

subscriber.subscribe(subscription,callback=monstrar_msg)

while True:
  time.sleep(2)

# PIPELINE BATCH

### PIPELINE LOCAL PARA GCS

In [14]:
# Criando pipeline
p1 = beam.Pipeline()

# Modelo de Pipeline. Ingerindo arquivos locais

path_file = r'/content/drive/MyDrive/Projeto Final/OcorrenciasUF.json'

rows = (
    p1
    |'Extrair_Dados' >> beam.io.ReadFromText(path_file, skip_header_lines=0, coder=coders.StrUtf8Coder())
    |'Ler_Elementos' >> beam.Map(lambda element: element)
    # |'Separar_Elementos' >> beam.Map(lambda element: element.split(','))
    |'Gravar_resultado' >> beam.io.WriteToText('gs://data_lake_ingest_data/1_input/converter/OcorrenciasUF', file_name_suffix='.json')
   )

p1.run()

df_test = pd.read_json('gs://data_lake_ingest_data/1_input/converter/OcorrenciasUF-00000-of-00001.json')
print(df_test)
print(df_test.dtypes)



              UF                           Tipo Crime  ...       Mês Ocorrências
0           Acre                              Estupro  ...   janeiro          39
1           Acre                     Furto de veículo  ...   janeiro          55
2           Acre                     Homicídio doloso  ...   janeiro          14
3           Acre      Lesão corporal seguida de morte  ...   janeiro           0
4           Acre       Roubo a instituição financeira  ...   janeiro           0
...          ...                                  ...  ...       ...         ...
18769  Tocantins       Roubo a instituição financeira  ...  dezembro           6
18770  Tocantins                       Roubo de carga  ...  dezembro           1
18771  Tocantins                     Roubo de veículo  ...  dezembro          55
18772  Tocantins  Roubo seguido de morte (latrocínio)  ...  dezembro           2
18773  Tocantins               Tentativa de homicídio  ...  dezembro          42

[18774 rows x 5 columns]
UF

### GCS PARA BIGQUERY (BATCH)

In [None]:
# CONTINUANDO PIPELINE 
def print_row(element):
    return print(element)
    
pipeline_args=['--runner=DataflowRunner',
               '--job_name=bq-load',
               '--project=soulcode-projeto-final',
               '--region=southamerica-east1',
               '--temp_location=gs://data_lake_ingest_data/temp_process',
               '--staging_location=gs://data_lake_ingest_data/temp_process',
               '--template_location=gs://data_lake_ingest_data/4_templates/template_model_batch',
               '-–save_main_session'
               ]

options = PipelineOptions(pipeline_args)
p1 = beam.Pipeline(options=options)

path_file = 'gs://data_lake_ingest_data/2_temp/temp_pandas_total_pop_ano_uf.csv'

table_schema = {
    "fields": [
                {"name":"UF", "type":"STRING", "mode":"NULLABLE"},
                {"name":"populacao_estimada", "type":"FLOAT", "mode":"NULLABLE"}, 
                {"name":"Ano", "type":"INTEGER", "mode":"NULLABLE"},
            ]
        }

# https://medium.com/datamindedbe/how-to-build-a-cleaning-pipeline-with-bigquery-and-dataflow-on-gcp-3d2f288d4e1b
# https://stackoverflow.com/questions/48741327/writing-nested-schema-to-bigquery-from-dataflow-python
# https://stackoverflow.com/questions/53784829/how-to-get-table-schema-from-json-file-parse-table-schema-from-json
# https://stackoverflow.com/questions/59217700/dataflow-apache-beam-cant-write-on-bigquery

rows = (
        p1 
        
        |'Extraindo_Dados' >> beam.io.ReadFromText(path_file, skip_header_lines=0)
        # |'Separar_Elementos' >> beam.Map(lambda element: element.split(','))
        |'Print' >> beam.Map(lambda element: print_row(element)).with_output_types(Transaction)
        |'Gravar_Resultado' >> beam.io.gcp.bigquery.WriteToBigQuery(
                                    table='TesteBeamApache',
                                    dataset='Teste',
                                    project='soulcode-projeto-final',
                                    custom_gcs_temp_location='gs://data_lake_ingest_data/temp_process',
                                    schema=table_schema,
                                    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
                                    create_disposition=bigquery.CreateDisposition.CREATE_IF_NEEDED)
        )

p1.run().wait_until_finish()



'UNKNOWN'