<a href="https://colab.research.google.com/github/louzeiro/desafios/blob/main/smtr_v2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Prefect

Tutorial seguido:<br>
https://towardsdatascience.com/prefect-how-to-write-and-schedule-your-first-etl-pipeline-with-python-54005a34f10b

In [None]:
import os
os.system('pip3 install prefect==1.2.4')

In [11]:
import json
import requests
import pandas as pd
from datetime import datetime, timedelta
import pytz
import prefect
from prefect import task, Flow, Parameter
from prefect.schedules import IntervalSchedule



tz = pytz.timezone('America/Sao_Paulo')

scheduler = IntervalSchedule(
    interval=timedelta(seconds=10)
)
var_downtime=60


In [3]:
@task(max_retries=10, retry_delay=timedelta(seconds=10)) # realizar tentativas em 10 segundos
def extract(url: str) -> dict:
    res = requests.get(url)
    if not res:
        raise Exception('Sem dados!')
    return json.loads(res.content)

In [4]:
@task
def transform(data: dict) -> pd.DataFrame:
    instante = datetime.now(tz=tz)
    transformed = []
    for veiculo in data:
        transformed.append({
            'longitude': veiculo['longitude'],
            'latitude': veiculo['latitude'],
            'vei_nro_gestor': veiculo['vei_nro_gestor'],
            'direcao': veiculo['direcao'],
            'velocidade': veiculo['velocidade'],
            'inicio_viagem': veiculo['inicio_viagem'],
            'linha': veiculo['linha'],
            'nomeLinha': veiculo['nomeLinha'],
            'nomeItinerario': veiculo['nomeItinerario'],
            'comunicacao': veiculo['comunicacao'],
             'info_coleta': instante
        })
    
    return pd.DataFrame(transformed)

In [5]:
@task
def clear_data(data_aux: pd.DataFrame,
               data_updated: pd.DataFrame,
               var_downtime: int) -> pd.DataFrame:
  
  if (data_updated.empty): # verificando se o dataframe foi alimentado
    return data_aux        # retorna o dataframe auxiliar
  
  else:
    #var_downtime = 60
    for idx in data_aux.index:
      # verificando se o veículo está no dataframe antigo
      if not data_aux.vei_nro_gestor[idx] in data_updated.vei_nro_gestor.values:
        new_vehicle = data_aux.loc[data_aux.vei_nro_gestor==data_aux.vei_nro_gestor[idx]]
        data_updated = data_updated.append(new_vehicle, ignore_index=True)
        #print(f'O veículo {str(new_vehicle.vei_nro_gestor.values)} foi adicionado ao dataframe')
      
      # O veiculo já está no dataframe atualizado
      else:
        # salvando a sua posição no dataframe anterior
        idx_vehicle = data_updated[data_updated['vei_nro_gestor']==data_aux['vei_nro_gestor'][idx]].index.values

        # verificando se houve atualização nas coordenadas do veículo
        if(data_updated.loc[idx_vehicle].longitude.values !=data_aux.loc[idx].longitude or
           data_updated.loc[idx_vehicle].latitude.values !=data_aux.loc[idx].latitude):
          #atualizando no dataframe as infos do veiculo
          data_updated.loc[idx_vehicle] = data_aux.loc[idx].values
          #print(f'O veículo {str(data_updated.loc[idx_vehicle].vei_nro_gestor.values)} está em circulação e suas informações foram atualizadas')

        # não houve atualização nas coordenadas, verificar o tempo parado           
        else:
          downtime = data_aux.loc[idx].info_coleta - data_updated.loc[idx_vehicle].info_coleta
          if (float(downtime.dt.total_seconds()) > var_downtime):
            print(f'O veículo {str(data_updated.loc[idx_vehicle].vei_nro_gestor.values)} está parado a {float(downtime.dt.total_seconds())} segundos, então foi removido do dataframe')
            data_updated.drop(idx_vehicle, axis=0, inplace=True)
    
    return data_updated


In [6]:
@task
def load(data: pd.DataFrame, path: str) -> None:
    data.to_csv(path_or_buf=path, index=False)

In [14]:
def prefect_flow(data_updated: pd.DataFrame, var_downtime: int):
    with Flow(name='smtr_veiculos_etl', schedule=scheduler) as flow:

        veiculos = extract(url='http://citgisbrj.tacom.srv.br:9977/gtfs-realtime-exporter/findAll/json')
        data_aux = transform(veiculos)

        data_updated = clear_data(data_aux=data_aux,
                                  data_updated = data_updated,
                                  var_downtime = var_downtime)
        
        load(data=data_updated, path=f'veiculos_{datetime.now(tz=tz).strftime("%Y-%m-%d_%H:%M:%S.%f")[:-3]}.csv')
    
    return flow

In [None]:
if __name__ == '__main__':
  data_updated = pd.DataFrame()  # iniciando um df vazio
  flow = prefect_flow(data_updated=data_updated, var_downtime=var_downtime)
  flow.run()

In [None]:
flow.visualize()

In [210]:
%rm *.csv

rm: cannot remove '*.csv': No such file or directory


# aprendendo

In [None]:
import time
veiculos = extract(url='http://citgisbrj.tacom.srv.br:9977/gtfs-realtime-exporter/findAll/json')
data_updated = transform(veiculos)
time.sleep(65)
veiculos = extract(url='http://citgisbrj.tacom.srv.br:9977/gtfs-realtime-exporter/findAll/json')
data_aux = transform(veiculos)

var_downtime = 60
df = clear_data(data_updated = data_updated, 
                data_aux=data_aux, 
                var_downtime=var_downtime)
df.head(3)

In [None]:
import time
veiculos = extract(url='http://citgisbrj.tacom.srv.br:9977/gtfs-realtime-exporter/findAll/json')
data_updated = transform(veiculos)
time.sleep(65)
veiculos = extract(url='http://citgisbrj.tacom.srv.br:9977/gtfs-realtime-exporter/findAll/json')
data_aux = transform(veiculos)

var_downtime = 60
for idx in data_aux.index:
  # verificando se o veículo está no dataframe antigo
  if not data_aux.vei_nro_gestor[idx] in data_updated.vei_nro_gestor.values:
    new_vehicle = data_aux.loc[data_aux.vei_nro_gestor==data_aux.vei_nro_gestor[idx]]
    data_updated = data_updated.append(new_vehicle, ignore_index=True)
    print(f'O veículo {str(new_vehicle.vei_nro_gestor.values)} foi adicionado ao dataframe')
  
  # O veiculo já está no dataframe atualizado
  else:
    idx_vehicle = data_updated[data_updated['vei_nro_gestor']==data_aux['vei_nro_gestor'][idx]].index.values

    # verificando se houve atualização nas coordenadas do veículo
    if(data_updated.loc[idx_vehicle].longitude.values !=data_aux.loc[idx].longitude or
           data_updated.loc[idx_vehicle].latitude.values !=data_aux.loc[idx].latitude):
        
      #atualizando no dataframe as infos do veiculo
      data_updated.loc[idx_vehicle] = data_aux.loc[idx].values
      print(f'O veículo {str(data_updated.loc[idx_vehicle].vei_nro_gestor.values)} está em circulação e suas informações foram atualizadas')

    # não houve atualização nas coordenadas, verificar o tempo parado           
    else:
      downtime = data_aux.loc[idx].info_coleta - data_updated.loc[idx_vehicle].info_coleta
      if (float(downtime.dt.total_seconds()) > var_downtime):
        print(f'O veículo {str(data_updated.loc[idx_vehicle].vei_nro_gestor.values)} está parado a {float(downtime.dt.total_seconds())} segundos, então foi removido do dataframe')
        data_updated.drop(idx_vehicle, axis=0, inplace=True)
        

In [36]:
from google.cloud import bigquery
from google.oauth2 import service_account

In [41]:
# fazer upload da chave e renomear
credentials = service_account.Credentials.from_service_account_file("smtr-gbq.json", 
                                                                    scopes=["https://www.googleapis.com/auth/cloud-plataform"])
credentials

<google.oauth2.service_account.Credentials at 0x7f21192c3bd0>

In [None]:
data_updated.to_gbq(credentials=credentials,
                    destination_table='smtr.prova',
                    if_exists='append') #substitue caso já exista a tabela, não é o meu caso 

In [44]:
from google.colab import auth
auth.authenticate_user()

In [45]:
project_id = 'smtr-359700'

In [46]:
from google.cloud import bigquery

In [47]:
client = bigquery.Client(project=project_id)

In [None]:
dataset_ref = 