In [None]:
#pip install --upgrade pip
#pip install apache_beam[interactive]
#pip install apache_beam[gcp]
#pip install gcsfs


#GCP

In [None]:
from apache_beam.runners.pipeline_context import pipeline
import apache_beam as beam
import pandas as pd
import os
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage

pipeline_options = {
    'project':'sc-bc26-ed7',
    'runner':'DataflowRunner',
    'region':'us-east4',
    'staging_location':'gs://projeto-final-equipe4/beam/staging/',
    'temp_location':'gs://projeto-final-equipe4/beam/temp/',
    'template_location':'projeto-final-equipe4/beam/models/modelo_batch'
}

serviceAccount = '/content/sc-bc26-ed7-adb0dc2607d9.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = serviceAccount

pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
p1 = beam.Pipeline(options=pipeline_options)

#FINAL

In [89]:
from apache_beam.io.textio import WriteToText
import apache_beam as beam
from apache_beam.dataframe.io import read_csv
import unidecode


p1 = beam.Pipeline()

colunas_bio = ['','regiao','uf','produto','volume_m3','data']
colunas_preco = ['data','regiao','estado','produto','postosPes','uniMedida','mediaRev','desvioRev','menorRev','maiorRev','margemRev','coefRev','mediaDistr','desvioDistr','menorDistr','maiorDistr','coefDistr']

def lista_dicionario(elemento, colunas):
  return dict(zip(colunas, elemento))

def trata_data(elemento):
  # Recebe um dicionario e cria um novo campo com ANO-MES -  Retorna o mesmo dicionario com novo campo 
  elemento['ano_mes']= '-'.join(elemento['data'].split('-')[:2])
  return elemento

def chave_uf(elemento):
#  Receber um dicionario -   Retorna uma tupla com estado e o elemento(UF, dicionario )
  chave = elemento['uf']
  return (chave, elemento)

def volume(elemento):
  #  Recebe um tupla ('SAO PAULO', [{},{}]) -   Retorna uma tupla ('SAO PAULO', 8.0)

  uf, registros = elemento
  for registros in registros:
    yield (f"{uf}-{registros['ano_mes']}", float(registros['volume_m3']))

def chave_estado(elemento):
  chave = elemento['estado']
  return (chave, elemento)

def mediaRev(elemento):
  estado, registros = elemento
  for registros in registros:
    yield (f"{estado}-{registros['ano_mes']}", float(registros['mediaRev']))

def arredonda(elemento):
  #Recebe uma tupla e retorna uma tupla com valor arredondado  
  chave, valor = elemento
  return (chave, round(valor,2))

def filtra_campos_vazios(elemento):
  #Remove elementos que tenham chaves vazias] -   Receber uam tupla e retorna a mesma dupla sem campos vazios   
  chave, dados = elemento
  if all([
      dados['volume_m3'],
      dados['Valor_MedRev']
      ]):
      return True
  return False

def descompactar_elementos(elemento):
  #Receber uma tupla ('DISTRITO FEDERAL-2015-10', {'volume_m3': [4.0], 'Valor_MedRev': [11.67]})   Retorna uma tupla ('DISTRITO FEDERAL', '2015', '10', '4.0', '11.67')  
  chave, dados = elemento
  volume_m3 = dados['volume_m3'][0] #acessando o primeiro elemento dessa lista [0]
  Valor_MedRev = dados['Valor_MedRev'][0]
  uf, ano, mes = chave.split('-')
  return uf, ano, mes, str(volume_m3), str(Valor_MedRev)  #transformar em str para poder usar o join posteriomente

def preparar_csv(elemento, deliminator=';'):
  #Recebe uma tupla e retorna uma string delimitada "DISTRITO FEDERAL;2015;10;4.0;11.67"
  return f"{deliminator}".join(elemento)


biocombustiveis = (
    p1
    |'Extrair do CSV'>> beam.io.ReadFromText('/content/arquivos_trat_dfbio_trat (2)', skip_header_lines=1)  
    |'Separador de dados'>> beam.Map(lambda record: record.split(','))
    |'Filtro por produto'>> beam.Filter(lambda record: str(record[3])== 'HIDRATADO')
    |'Tranformar lista para dicionario'>>beam.Map(lista_dicionario, colunas_bio)
    |'Criar Campo ano_mes'>>beam.Map(trata_data)
    |'Criar chave pelo uf'>> beam.Map(chave_uf)
    |'Agrupar pelo uf'>>beam.GroupByKey()
    |'Descompactar volume'>>beam.FlatMap(volume)
    |'Soma dos volumes pela chave'>> beam.CombinePerKey(sum)
    |'Arredondar resultados'>>beam.Map(arredonda)
    # |'Imprimir o resultado'>> beam.Map(print)
)

precos = (
    p1
    |'Extrair do CSV Dataset Preços'>> beam.io.ReadFromText('/content/arquivos_trat_precos (1).csv', skip_header_lines=1) 
    |'Separador de dados Dataset Preços'>> beam.Map(lambda record: record.split(',')) 
    |'Filtro por produto Dataset Preços'>> beam.Filter(lambda record: str(record[3])== 'ETANOL HIDRATADO')
    |'Tranformar lista para dicionario Dataset Preços'>>beam.Map(lista_dicionario, colunas_preco)
    |'Criar Campo ano_mes Dataset Preços'>>beam.Map(trata_data) 
    |'Criar chave pelo estado Dataset Preços'>> beam.Map(chave_estado)
    |'Agrupar pelo estado Dataset Preços'>>beam.GroupByKey()
    |'Descompactar volume Dataset Preços'>>beam.FlatMap(mediaRev)
    |'Soma dos volumes pela chave Dataset Preços'>> beam.CombinePerKey(sum)
    |'Arredondar resultados de preços'>>beam.Map(arredonda)
    # |'Imprimir o resultado Dataset Preços'>> beam.Map(print)
)

resultado = (
    ({'volume_m3':biocombustiveis,'Valor_MedRev':precos})
    |'Mesclar pcollections'>>beam.CoGroupByKey()
    |'Filtrar dados vazios'>>beam.Filter(filtra_campos_vazios)
    |'Descompactar elementos'>>beam.Map(descompactar_elementos)
    |'Preparar csv'>>beam.Map(preparar_csv)
    # |'Imprimir o resultado da união'>> beam.Map(print)
)

resultado |'Criar arquivo CSV'>> WriteToText('resultado', file_name_suffix='.csv')

p1.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f8e8a1812b0>

In [None]:
#BUCKETS PARA PEGAR LINKS PARA UTILIZAR 
# gs://projeto-final-equipe4/arquivos_trat/dfbio_trat
# gs://projeto-final-equipe4/arquivos_trat/precos.csv

In [78]:
##