In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions

pipeline_options_dict = {
    'runner': 'DirectRunner',  # Ou 'DataflowRunner' se você estiver executando no Dataflow
    'streaming': True,
}

pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict)

with beam.Pipeline(options = pipeline_options) as pipeline:
    processing = (
        pipeline
        | beam.io.ReadFromPubSub(subscription='projects/playground-s-11-a3b55282/subscriptions/dataflow').with_output_types(bytes)
        | beam.Map(print)#beam.WindowInto(beam.window.FixedWindows(window_size), trigger=beam.trigger.AfterWatermark(), accumulation_mode=beam.trigger.AccumulationMode.DISCARDING)
        #| beam.io.WriteToText('./test/')
    )


In [None]:
import argparse
import apache_beam as beam
from apache_beam.transforms.userstate import BagStateSpec
from apache_beam.coders.coders import TupleCoder, FloatCoder
from apache_beam.options.pipeline_options import PipelineOptions

# Constantes
SCHEMA = 'code:STRING,rate:FLOAT,volume:INTEGER,cap:INTEGER,circulatingSupply:INTEGER\
,totalSupply:INTEGER,maxSupply:INTEGER,max_price:FLOAT,min_price:FLOAT,timestamp:TIMESTAMP'
SUBSCRIPTION = 'projects/playground-s-11-7b1242ce/subscriptions/dataflow'

# Funções de transformação
def to_json(data):
    import json
    """Converte uma string JSON em um objeto Python."""
    data = json.loads(data)
    return data

def streaming_columns(crypto_dict):
    """Filtra e renomeia as colunas do dicionário de criptomoedas."""
    return {
        'code':crypto_dict.get('code'),
        'rate':crypto_dict.get('rate'),
        'volume':crypto_dict.get('volume'),
        'cap':crypto_dict.get('cap'),
        'circulatingSupply':crypto_dict.get('circulatingSupply'),
        'totalSupply':crypto_dict.get('totalSupply'),
        'maxSupply':crypto_dict.get('maxSupply')}

class SetCoinKey(beam.DoFn):
    """Define a chave para cada elemento com base no código."""
    def process(self, element, *args, **kwargs):
        yield element['code'], element

class MinMaxBitcoinPriceFn(beam.DoFn):
    """Calcula os preços máximo e mínimo do Bitcoin."""
    PRICE_STATE = BagStateSpec('price_state', TupleCoder((FloatCoder(), FloatCoder())))

    def process(self, element, prev_state=beam.DoFn.StateParam(PRICE_STATE), *args, **kwargs):
        current_price = element[1]['rate']
        previous_prices = list(prev_state.read())

        if previous_prices:
            if current_price > previous_prices[0][0]:
                prev_state.clear()
                prev_state.add((current_price, previous_prices[0][1]))
            elif current_price < previous_prices[0][1]:
                prev_state.clear()
                prev_state.add((previous_prices[0][0], current_price))
        else:
            prev_state.add((current_price, current_price))

        actual_prices = list(prev_state.read())
        element[1]['max_price'] = actual_prices[0][0]
        element[1]['min_price'] = actual_prices[0][1]
        yield element

def format_for_bigquery(element):
    """Formata o elemento para o formato adequado do BigQuery."""
    
    from datetime import datetime
    
    _, data = element
    
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    
    data_to_bq = {
        'code': data['code'],
        'rate': data['rate'],
        'volume': data['volume'],
        'cap': data['cap'],
        'circulatingSupply': data['circulatingSupply'],
        'totalSupply': data['totalSupply'],
        'maxSupply': data['maxSupply'],
        'max_price': data['max_price'],
        'min_price': data['min_price'],
    }

    data_to_bq['timestamp'] = timestamp

    return data_to_bq

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    known_args = parser.parse_known_args()
    p = beam.Pipeline(options=PipelineOptions())

    (p | beam.io.ReadFromPubSub(subscription=SUBSCRIPTION).with_output_types(bytes)
    
    | beam.Map(lambda x: x.decode('utf-8'))
    | beam.FlatMap(to_json)
    | beam.Map(streaming_columns)
    | beam.ParDo(SetCoinKey())
    | beam.ParDo(MinMaxBitcoinPriceFn())
    | beam.Map(format_for_bigquery)
    | beam.Map(add_timestamp)
    | beam.io.WriteToBigQuery('playground-s-11-7b1242ce:crypto.crypto_price',
                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                            schema=SCHEMA))
    result = p.run()
    result.wait_until_finish()


In [None]:
provider "google" {
    project = var.project_id
    region = var.region
}

resource "google_secret_manager_secret" "api_secret_key" {
    secret_id = var.secret_id
    
    replication {
      user_managed {
            replicas {
                location = var.region
            }
        }
    }
    }
resource "google_secret_manager_secret_version" "api_secret_key_version" {
    secret = google_secret_manager_secret.api_secret_key.id
    secret_data = var.secret_api_key
    depends_on = [
      google_secret_manager_secret.api_secret_key
    ]
}

resource "google_storage_bucket" "bucket_crypto_api" {
  name     = var.name_bucket_crypto
  location = var.region
  force_destroy = true
}

resource "google_pubsub_topic" "crypto_topic" {
  name = var.name_topic_crypto
}

resource "google_pubsub_subscription" "dataflow_sub" {
  name = var.dataflow_sub
  topic = google_pubsub_topic.crypto_topic.name
}



data "archive_file" "crypto_cloud_function_folder" {
  type        = "zip"
  output_path = "/tmp/${var.name_crypto_function_zip}.zip"
  source_dir  = "${path.module}/../source/crypto_api_function"
}

resource "google_storage_bucket_object" "store_crypto_function" {
    name = "${var.name_crypto_function_zip}.${data.archive_file.crypto_cloud_function_folder.output_sha}.zip"
    bucket = google_storage_bucket.bucket_crypto_api.id
    source = data.archive_file.crypto_cloud_function_folder.output_path
    
}

resource "google_cloudfunctions_function" "crypto_function" {
    
    name         = "crypto-function"
    runtime      = "python310"
    entry_point  = "get_crypto_data"
    trigger_http = true     
    source_archive_bucket = google_storage_bucket.bucket_crypto_api.name
    source_archive_object = google_storage_bucket_object.store_crypto_function.name


    environment_variables = {
      TOPIC_ID   = var.name_topic_crypto
      PROJECT_ID = var.project_id
      SECRET_ID = google_secret_manager_secret_version.api_secret_key_version.secret_data
    }

    region = var.region
    timeout = 60    
    depends_on = [google_secret_manager_secret_version.api_secret_key_version, google_pubsub_topic.crypto_topic]  
}

resource "google_bigquery_dataset" "default" {
  dataset_id                  = var.dataset_id
  description                 = "Dataset that contains rates about crypto coins"
}

data "archive_file" "crypto_dataflow_job_folder" {
  type        = "zip"
  output_path = "/tmp/${var.crypto_job}.zip"
  source_dir  = "${path.module}/../source/dataflow"
}

resource "google_storage_bucket_object" "store_crypto_template_function" {
    name = "${var.crypto_job}.${data.archive_file.crypto_dataflow_job_folder.output_sha}.zip"
    bucket = google_storage_bucket.bucket_crypto_api.id
    source = data.archive_file.crypto_dataflow_job_folder.output_path
    
}
resource "google_dataflow_job" "crypto_job" {
  name        = var.crypto_job
  project     = var.project_id
  region      = var.region
  template_gcs_path = "gs://${google_storage_bucket.bucket_crypto_api.name}/${google_storage_bucket_object.store_crypto_template_function.name}"

  parameters = {
    "subscription"   = google_pubsub_subscription.dataflow_sub.name
    #"output_table"   = "playground-s-11-cdfc0c33:crypto.crypto_price"
}

  temp_gcs_location     = "${google_storage_bucket.bucket_crypto_api.name}/temp"
}


In [None]:
variable "project_id" {}

variable "region" {
    default = "us-east1"
}
variable "secret_id" {
    default = "api_crypto_key" 
}
variable "secret_api_key" {
    
}
variable "name_bucket_crypto" {
  default = "bucket_crypto_api_project_999"
}
variable "dataset_id" {
    default ="crypto"
}
variable "name_topic_crypto" {
  default = "crypto-topic"
}
variable "dataflow_sub" {
    default = "dataflow"
}
variable "name_crypto_function_zip" {
  default = "crypto_function"
}
variable "crypto_job" {
  default = "crypto_job"
}


In [None]:
import argparse
import apache_beam as beam
from apache_beam.transforms.userstate import BagStateSpec
from apache_beam.coders.coders import TupleCoder, FloatCoder
from apache_beam.options.pipeline_options import PipelineOptions

# Constantes
SCHEMA = 'code:STRING,rate:FLOAT,volume:INTEGER,cap:INTEGER,circulatingSupply:INTEGER\
,totalSupply:INTEGER,maxSupply:INTEGER,max_price:FLOAT,min_price:FLOAT,timestamp:TIMESTAMP'
SUBSCRIPTION = 'projects/playground-s-11-cdfc0c33/subscriptions/dataflow'

# Funções de transformação
def to_json(data):
    import json
    """Converte uma string JSON em um objeto Python."""
    data = json.loads(data)
    return data

def streaming_columns(crypto_dict):
    """Filtra e renomeia as colunas do dicionário de criptomoedas."""
    return {
        'code':crypto_dict.get('code'),
        'rate':crypto_dict.get('rate'),
        'volume':crypto_dict.get('volume'),
        'cap':crypto_dict.get('cap'),
        'circulatingSupply':crypto_dict.get('circulatingSupply'),
        'totalSupply':crypto_dict.get('totalSupply'),
        'maxSupply':crypto_dict.get('maxSupply')}

class SetCoinKey(beam.DoFn):
    """Define a chave para cada elemento com base no código."""
    def process(self, element, *args, **kwargs):
        yield element['code'], element

class MinMaxBitcoinPriceFn(beam.DoFn):
    """Calcula os preços máximo e mínimo do Bitcoin."""
    PRICE_STATE = BagStateSpec('price_state', TupleCoder((FloatCoder(), FloatCoder())))

    def process(self, element, prev_state=beam.DoFn.StateParam(PRICE_STATE), *args, **kwargs):
        current_price = element[1]['rate']
        previous_prices = list(prev_state.read())

        if previous_prices:
            if current_price > previous_prices[0][0]:
                prev_state.clear()
                prev_state.add((current_price, previous_prices[0][1]))
            elif current_price < previous_prices[0][1]:
                prev_state.clear()
                prev_state.add((previous_prices[0][0], current_price))
        else:
            prev_state.add((current_price, current_price))

        actual_prices = list(prev_state.read())
        element[1]['max_price'] = actual_prices[0][0]
        element[1]['min_price'] = actual_prices[0][1]
        yield element

def format_for_bigquery(element):
    """Formata o elemento para o formato adequado do BigQuery."""
    _, data = element
    return {
        'code': data['code'],
        'rate': data['rate'],
        'volume': data['volume'],
        'cap': data['cap'],
        'circulatingSupply': data['circulatingSupply'],
        'totalSupply': data['totalSupply'],
        'maxSupply': data['maxSupply'],
        'max_price': data['max_price'],
        'min_price': data['min_price'],
    }

def add_timestamp(element):
    from datetime import datetime
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    element['timestamp'] = timestamp

    return element

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--subscription', help='Pub/Sub subscription')
    known_args = parser.parse_known_args()

    p = beam.Pipeline(options=PipelineOptions())
    (p | beam.io.ReadFromPubSub(subscription=parser.parse_args().subscription).with_output_types(bytes)
    
    | beam.Map(lambda x: x.decode('utf-8'))
    | beam.FlatMap(to_json)
    | beam.Map(streaming_columns)
    | beam.ParDo(SetCoinKey())
    | beam.ParDo(MinMaxBitcoinPriceFn())
    | beam.Map(format_for_bigquery)
    | beam.Map(add_timestamp)
    | beam.io.WriteToBigQuery('playground-s-11-cdfc0c33:crypto.crypto_price',
                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                            schema=SCHEMA))
    result = p.run()
    result.wait_until_finish()

#python pipeline.py --streaming --runner DataflowRunner --project playground-s-11-7b1242ce --temp_location gs://bucket_bitcoin_api_project_99/temp --staging_location gs://bucket_bitcoin_api_project_99/stage --region us-east1 --job_name cryptoz 